#!/usr/bin/env python # -*- coding: utf-8 -*- import logging import matplotlib import numpy as np import smtplib import os from obspy import Stream COLORS_DICT = {'FAIL': (195, 29, 14, 255), 'NO DATA': (255, 255, 125, 255), 'WARN': (250, 192, 63, 255), 'OK': (185, 245, 145, 255), 'undefined': (240, 240, 240, 255), 'disc': (126, 127, 131, 255), } def get_bg_color(check_key, status, dt_thresh=None, hex=False): message = status.message if check_key == 'last active': bg_color = get_time_delay_color(message, dt_thresh) elif check_key == 'temp': bg_color = get_temp_color(message) elif check_key == 'mass': bg_color = get_mass_color(message) else: if status.is_warn: bg_color = get_warn_color(status.count) elif status.is_error: if status.connection_error: bg_color = get_color('disc') else: bg_color = get_color('FAIL') else: bg_color = get_color(message) if not bg_color: bg_color = get_color('undefined') if hex: bg_color = '#{:02x}{:02x}{:02x}'.format(*bg_color[:3]) return bg_color def get_color(key): # some old GUI default colors # colors_dict = {'FAIL': (255, 85, 50, 255), # 'NO DATA': (255, 255, 125, 255), # 'WARN': (255, 255, 80, 255), # 'OK': (173, 255, 133, 255), # 'undefined': (230, 230, 230, 255), # 'disc': (255, 160, 40, 255),} if not key in COLORS_DICT.keys(): key = 'undefined' return COLORS_DICT.get(key) def get_color_mpl(key): color_tup = get_color(key) return np.array([color/255. for color in color_tup]) def get_time_delay_color(dt, dt_thresh): """ Set color of time delay after thresholds specified in self.dt_thresh """ if isinstance(dt, type(dt_thresh[0])): if dt < dt_thresh[0]: return get_color('OK') elif dt_thresh[0] <= dt < dt_thresh[1]: return get_color('WARN') return get_color('FAIL') def get_warn_color(count, n_colors=20): if count >= n_colors: count = -1 gradient = np.linspace((240, 245, 110, 255), (250, 192, 63, 255), n_colors, dtype=int) return tuple(gradient[count]) def get_mass_color(message): # can change this to something else if wanted. This way it always returns get_color (without warn count) if isinstance(message, (float, int)): return get_color('OK') return get_color(message) def get_temp_color(temp, vmin=-10, vmax=60, cmap='coolwarm'): """ Get an rgba temperature value back from specified cmap, linearly interpolated between vmin and vmax. """ if type(temp) in [str]: if temp in COLORS_DICT.keys(): return get_color(temp) return get_color('undefined') cmap = matplotlib.cm.get_cmap(cmap) val = (temp - vmin) / (vmax - vmin) rgba = [int(255 * c) for c in cmap(val)] return rgba def get_font_color(bg_color, hex=False): if hex: bg_color = matplotlib.colors.to_rgb(bg_color) bg_color_hsv = matplotlib.colors.rgb_to_hsv(bg_color) bg_color_hsl = hsv_to_hsl(bg_color_hsv) font_color = (255, 255, 255, 255) if bg_color_hsl[2] < 0.6 else (0, 0, 0, 255) if hex: font_color = '#{:02x}{:02x}{:02x}'.format(*font_color[:3]) return font_color def hsv_to_hsl(hsv): hue, saturation, value = hsv lightness = value * (1 - saturation / 2) saturation = 0 if lightness in (0, 1) else (value - lightness) / min(lightness, 1 - lightness) return hue, saturation, lightness def modify_stream_for_plot(input_stream, parameters): """ copy (if necessary) and modify stream for plotting """ # make a copy st = Stream() channels_dict = parameters.get('CHANNELS') # iterate over all channels and put them to new stream in order for index, ch_tup in enumerate(channels_dict.items()): # unpack tuple from items channel, channel_dict = ch_tup # get correct channel from stream st_sel = input_stream.select(channel=channel) # in case there are != 1 there is ambiguity if not len(st_sel) == 1: continue # make a copy to not modify original stream! tr = st_sel[0].copy() # multiply with conversion factor for unit unit_factor = channel_dict.get('unit') if unit_factor: tr.data = tr.data * float(unit_factor) # apply transformations if provided transform = channel_dict.get('transform') if transform: tr.data = transform_trace(tr.data, transform) # modify trace id to maintain plotting order name = channel_dict.get('name') tr.id = f'{index + 1}: {name} - {tr.id}' st.append(tr) return st def transform_trace(data, transf): """ Transform trace with arithmetic operations in order, specified in transf @param data: numpy array @param transf: list of lists with arithmetic operations (e.g. [['*', '20'], ] -> multiply data by 20 """ # This looks a little bit hardcoded, however it is safer than using e.g. "eval" for operator_str, val in transf: if operator_str == '+': data = data + val elif operator_str == '-': data = data - val elif operator_str == '*': data = data * val elif operator_str == '/': data = data / val else: raise IOError(f'Unknown arithmetic operator string: {operator_str}') return data def set_axis_ylabels(fig, parameters): """ Adds channel names to y-axis if defined in parameters. """ names = [channel.get('name') for channel in parameters.get('CHANNELS').values()] if not names: # or not len(st.traces): return if not len(names) == len(fig.axes): logging.info('Mismatch in axis and label lengths. Not adding plot labels') return for channel_name, ax in zip(names, fig.axes): if channel_name: ax.set_ylabel(channel_name) def set_axis_color(fig, color='0.8', shade_color='0.95'): """ Set all axes (frame) of figure to specific color. Shade every second axis. """ for i, ax in enumerate(fig.axes): for key in ['bottom', 'top', 'right', 'left']: ax.spines[key].set_color(color) if i % 2: ax.set_facecolor(shade_color) def set_axis_yticks(fig, parameters): """ Adds channel names to y-axis if defined in parameters. """ ticks = [channel.get('ticks') for channel in parameters.get('CHANNELS').values()] if not ticks: return if not len(ticks) == len(fig.axes): logging.info('Mismatch in axis tick and label lengths. Not changing plot ticks.') return for ytick_tripple, ax in zip(ticks, fig.axes): if not ytick_tripple: continue ymin, ymax, step = ytick_tripple yticks = list(np.arange(ymin, ymax + step, step)) ax.set_yticks(yticks) ax.set_ylim(ymin - 0.33 * step, ymax + 0.33 * step) def plot_axis_thresholds(fig, parameters): """ Adds channel thresholds (warn, fail) to y-axis if defined in parameters. """ logging.info('Plotting trace thresholds') keys_colors = {'warn': dict(color=0.8 * get_color_mpl('WARN'), linestyle=(0, (5, 10)), alpha=0.5, linewidth=0.7), 'fail': dict(color=0.8 * get_color_mpl('FAIL'), linestyle='solid', alpha=0.5, linewidth=0.7)} for key, kwargs in keys_colors.items(): channel_threshold_list = [channel.get(key) for channel in parameters.get('CHANNELS').values()] if not channel_threshold_list: continue plot_threshold_lines(fig, channel_threshold_list, parameters, **kwargs) def plot_threshold_lines(fig, channel_threshold_list, parameters, **kwargs): for channel_thresholds, ax in zip(channel_threshold_list, fig.axes): if channel_thresholds in ['pb_SOH2', 'pb_SOH3']: annotate_voltage_states(ax, parameters, channel_thresholds) channel_thresholds = get_warn_states_pbox(channel_thresholds, parameters) if not channel_thresholds: continue if not isinstance(channel_thresholds, (list, tuple)): channel_thresholds = [channel_thresholds] for warn_thresh in channel_thresholds: if isinstance(warn_thresh, str): warn_thresh = parameters.get('THRESHOLDS').get(warn_thresh) if isinstance(warn_thresh, (float, int)): ax.axhline(warn_thresh, **kwargs) def get_warn_states_pbox(soh_key: str, parameters: dict) -> list: pb_dict = parameters.get('POWBOX').get(soh_key) if not pb_dict: return [] return [key for key in pb_dict.keys() if key > 1] def annotate_voltage_states(ax, parameters, pb_key, color='0.75'): for voltage, voltage_dict in parameters.get('POWBOX').get(pb_key).items(): if float(voltage) < 1: continue out_string = '' for key, val in voltage_dict.items(): if val != 'OK': if out_string: out_string += ' | ' out_string += f'{key}: {val}' ax.annotate(out_string, (ax.get_xlim()[-1], voltage), color=color, fontsize='xx-small', horizontalalignment='right') def get_credential(source, param): """ Retrieve a credential from a Docker secret or environment variable. """ if source == 'DOCKER': try: with open('/run/secrets/'+param.lower(), 'r') as f: return f.read().strip() except FileNotFoundError as e: logging.error(f'Could not read from Docker secret at /run/secrets/{param.lower()}') logging.error(e) elif source == 'ENV': try: return os.environ.get(param.upper()) except Exception as e: logging.error(f'Could not read from environment variable {param.upper()}') logging.error(e) # return source if no credential was found return source def connect_to_mail_server(mail_params): """ Connect to mail server and return server object. """ # get server from parameters server = mail_params.get('mailserver') # get auth_type from parameters auth_type = mail_params.get('auth_type') # set up connection to mail server if auth_type == 'None': s = smtplib.SMTP(server) else: # user and password from parameters, docker secret or environment variable user = get_credential(mail_params.get('user'), 'mail_user') password = get_credential(mail_params.get('password'), 'mail_password') # create secure connection to server if auth_type == 'SSL': s = smtplib.SMTP_SSL(server, mail_params.get('port')) elif auth_type == 'TLS': s = smtplib.SMTP(server, mail_params.get('port')) s.starttls() else: logging.error('Unknown authentication type. Mails can not be sent') return s.login(user, password) return s