survBot/utils.py

208 lines
6.7 KiB
Python
Raw Normal View History

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import matplotlib
import numpy as np
from obspy import Stream
def get_bg_color(check_key, status, dt_thresh=None, hex=False):
message = status.message
if check_key == 'last active':
bg_color = get_time_delay_color(message, dt_thresh)
elif check_key == 'temp':
bg_color = get_temp_color(message)
2022-12-20 16:54:27 +01:00
elif check_key == 'mass':
bg_color = get_mass_color(message)
else:
if status.is_warn:
2022-12-20 16:54:27 +01:00
bg_color = get_warn_color(status.count)
elif status.is_error:
bg_color = get_color('FAIL')
else:
bg_color = get_color(message)
if not bg_color:
bg_color = get_color('undefined')
if hex:
bg_color = '#{:02x}{:02x}{:02x}'.format(*bg_color[:3])
return bg_color
def get_color(key):
# some GUI default colors
colors_dict = {'FAIL': (255, 50, 0, 255),
'NO DATA': (255, 255, 125, 255),
'WARN': (255, 255, 80, 255),
'OK': (125, 255, 125, 255),
'undefined': (230, 230, 230, 255)}
return colors_dict.get(key)
def get_color_mpl(key):
color_tup = get_color(key)
return np.array([color/255. for color in color_tup])
def get_time_delay_color(dt, dt_thresh):
""" Set color of time delay after thresholds specified in self.dt_thresh """
if dt < dt_thresh[0]:
return get_color('OK')
elif dt_thresh[0] <= dt < dt_thresh[1]:
return get_color('WARN')
return get_color('FAIL')
2022-12-20 16:54:27 +01:00
def get_warn_color(count):
color = (min([255, 200 + count ** 2]), 255, 80, 255)
return color
def get_mass_color(message):
# can change this to something else if wanted. This way it always returns get_color (without warn count)
if isinstance(message, (float, int)):
return get_color('OK')
return get_color(message)
def get_temp_color(temp, vmin=-10, vmax=60, cmap='coolwarm'):
""" Get an rgba temperature value back from specified cmap, linearly interpolated between vmin and vmax. """
if type(temp) in [str]:
return get_color('undefined')
cmap = matplotlib.cm.get_cmap(cmap)
val = (temp - vmin) / (vmax - vmin)
rgba = [int(255 * c) for c in cmap(val)]
return rgba
def modify_stream_for_plot(input_stream, parameters):
""" copy (if necessary) and modify stream for plotting """
2022-12-20 16:54:27 +01:00
# make a copy
st = Stream()
channels_dict = parameters.get('CHANNELS')
# iterate over all channels and put them to new stream in order
for index, ch_tup in enumerate(channels_dict.items()):
# unpack tuple from items
channel, channel_dict = ch_tup
# get correct channel from stream
st_sel = input_stream.select(channel=channel)
# in case there are != 1 there is ambiguity
if not len(st_sel) == 1:
continue
# make a copy to not modify original stream!
tr = st_sel[0].copy()
# multiply with conversion factor for unit
unit_factor = channel_dict.get('unit')
if unit_factor:
tr.data = tr.data * float(unit_factor)
# apply transformations if provided
transform = channel_dict.get('transform')
if transform:
tr.data = transform_trace(tr.data, transform)
# modify trace id to maintain plotting order
name = channel_dict.get('name')
tr.id = f'trace {index + 1}: {name} - {tr.id}'
st.append(tr)
2022-12-20 16:54:27 +01:00
return st
def transform_trace(data, transf):
"""
Transform trace with arithmetic operations in order, specified in transf
@param data: numpy array
@param transf: list of lists with arithmetic operations (e.g. [['*', '20'], ] -> multiply data by 20
"""
# This looks a little bit hardcoded, however it is safer than using e.g. "eval"
for operator_str, val in transf:
if operator_str == '+':
data = data + val
elif operator_str == '-':
data = data - val
elif operator_str == '*':
data = data * val
elif operator_str == '/':
data = data / val
else:
raise IOError(f'Unknown arithmethic operator string: {operator_str}')
return data
2022-11-22 14:20:22 +01:00
def trace_ylabels(fig, parameters, verbosity=0):
2022-11-22 14:20:22 +01:00
"""
Adds channel names to y-axis if defined in parameters.
"""
names = [channel.get('name') for channel in parameters.get('CHANNELS').values()]
2022-11-22 14:20:22 +01:00
if not names: # or not len(st.traces):
return
if not len(names) == len(fig.axes):
if verbosity:
print('Mismatch in axis and label lengths. Not adding plot labels')
return
for channel_name, ax in zip(names, fig.axes):
if channel_name:
ax.set_ylabel(channel_name)
def trace_yticks(fig, parameters, verbosity=0):
"""
Adds channel names to y-axis if defined in parameters.
"""
ticks = [channel.get('ticks') for channel in parameters.get('CHANNELS').values()]
if not ticks:
return
if not len(ticks) == len(fig.axes):
if verbosity:
print('Mismatch in axis tick and label lengths. Not changing plot ticks.')
return
for ytick_tripple, ax in zip(ticks, fig.axes):
if not ytick_tripple:
continue
ymin, ymax, step = ytick_tripple
yticks = list(np.arange(ymin, ymax + step, step))
ax.set_yticks(yticks)
ax.set_ylim(ymin - 0.33 * step, ymax + 0.33 * step)
def trace_thresholds(fig, parameters, verbosity=0):
"""
Adds channel thresholds (warn, fail) to y-axis if defined in parameters.
"""
if verbosity > 0:
print('Plotting trace thresholds')
keys_colors = {'warn': dict(color=0.8 * get_color_mpl('WARN'), linestyle=(0, (5, 10)), alpha=0.5, linewidth=0.7),
'fail': dict(color=0.8 * get_color_mpl('FAIL'), linestyle='solid', alpha=0.5, linewidth=0.7)}
for key, kwargs in keys_colors.items():
channel_threshold_list = [channel.get(key) for channel in parameters.get('CHANNELS').values()]
if not channel_threshold_list:
continue
plot_threshold_lines(fig, channel_threshold_list, parameters, **kwargs)
def plot_threshold_lines(fig, channel_threshold_list, parameters, **kwargs):
for channel_thresholds, ax in zip(channel_threshold_list, fig.axes):
if not channel_thresholds:
continue
if not isinstance(channel_thresholds, (list, tuple)):
channel_thresholds = [channel_thresholds]
for warn_thresh in channel_thresholds:
if isinstance(warn_thresh, str):
warn_thresh = parameters.get('THRESHOLDS').get(warn_thresh)
if type(warn_thresh in (float, int)):
ax.axhline(warn_thresh, **kwargs)