10 Commits

Author SHA1 Message Date
a89ea1b06d [refactor] PEP8 naming convention 2022-12-22 16:00:37 +01:00
24d15d9d55 [fix] mutable default argument 2022-12-22 16:00:07 +01:00
bc70dc0816 [minor] soft-coded unit factor for channel analysis 2022-12-22 15:56:32 +01:00
03616a2b7b [update] refined and enabled clock quality check 2022-12-22 15:36:55 +01:00
bf000fe042 [minor] visual tweaks 2022-12-21 16:03:10 +01:00
c90b430fa8 [bugfix] corrected error check (plot on new FAIL status)
[minor] message output
2022-12-21 15:48:18 +01:00
7d5f9cf516 [update] re-worked channel definition in parameters.yaml, each channel now has its own dictionary with optional plotting flags
[WIP] clock quality work in progress, currently disabled
2022-12-21 12:51:01 +01:00
b17ee1288c [minor] try re-read yaml in case it failed 2022-12-21 11:57:37 +01:00
bf82148449 Merge remote-tracking branch 'origin/develop' into develop
# Conflicts:
#	parameters.yaml
#	survBot.py
2022-12-20 17:02:16 +01:00
174a8e0823 [new] mass channel surveillance added 2022-12-20 16:54:27 +01:00
4 changed files with 292 additions and 93 deletions

View File

@@ -3,7 +3,6 @@ datapath: "/data/SDS/" # SC3 Datapath
networks: ["1Y", "HA"] # select networks, list or str
stations: "*" # select stations, list or str
locations: "*" # select locations, list or str
channels: ["EX1", "EX2", "EX3", "VEI", "LCQ"] # Specify SOH channels, currently supported EX[1-3], VEI and LCQ
stations_blacklist: ["TEST", "EREA"] # exclude these stations
networks_blacklist: [] # exclude these networks
interval: 60 # Perform checks every x seconds
@@ -43,9 +42,73 @@ THRESHOLDS:
low_volt: 12 # min voltage for low voltage warning
high_volt: 14.8 # max voltage for over voltage warning
unclassified: 5 # min voltage samples not classified for warning
max_vm_warn: 1.5 # threshold for mass offset (warn), fail)
max_vm_fail: 2.5 # threshold for mass offset (warn), fail)
clockquality_warn: 90 # clock quality ranges from 0 % to 100 % with 100 % being the best level
clockquality_fail: 70
# ---------------------------------- Specification of input channels ---------------------------------------------------
# Currently supported: EX[1-3], VEI, VM[1-3], LCQ
#
# For each channel a factor 'unit' for unit conversion (e.g. to SI) can be provided, as well as a 'name'
# and 'ticks' [ymin, ymax, ystep] for plotting.
# 'warn' and 'fail' plot horizontal lines in corresponding colors (can be str in TRESHOLDS, int/float or iterable)
#
# 'transform' can be provided for plotting to perform arithmetic operations in given order, e.g.:
# transform: - ["*", 20]
# - ["-", 20]
# --> PBox EX1 V to deg C: 20 * x -20
CHANNELS:
EX1:
unit: 1e-6
name: "PowBox Temperature (°C)"
ticks: [-10, 50, 10]
transform:
- ["*", 20]
- ["-", 20]
warn: "max_temp"
EX2:
unit: 1e-6
name: "PowBox 230V/12V (V)"
ticks: [1, 5, 1]
warn: [2, 3, 4, 4.5, 5]
EX3:
unit: 1e-6
name: "PowBox Router/Charger (V)"
ticks: [1, 5, 1]
warn: [2, 2.5, 3, 4, 5]
VEI:
unit: 1e-3
name: "Datalogger (V)"
ticks: [9, 15, 1]
warn: ["low_volt", "high_volt"]
fail: 10.5
VM1:
unit: 1e-6
name: "Mass position W (V)"
ticks: [-2.5, 2.5, 1]
warn: [-1.5, 1.5]
fail: [-2.5, 2.5]
VM2:
unit: 1e-6
name: "Mass position V (V)"
ticks: [-2.5, 2.5, 1]
warn: [-1.5, 1.5]
fail: [-2.5, 2.5]
VM3:
unit: 1e-6
name: "Mass position U (V)"
ticks: [-2.5, 2.5, 1]
warn: [-1.5, 1.5]
fail: [-2.5, 2.5]
LCQ:
name: "Clock quality (%)"
ticks: [0, 100, 20]
warn: "clockquality_warn"
fail: "clockquality_fail"
# ---------------------------------------- OPTIONAL PARAMETERS ---------------------------------------------------------
# add links to html table with specified key as column and value as relative link, interpretable string parameters:
@@ -62,26 +125,3 @@ EMAIL:
sender: "webmaster@geophysik.ruhr-uni-bochum.de" # mail sender
stations_blacklist: ['GR33'] # do not send emails for specific stations
networks_blacklist: [] # do not send emails for specific network
# names for plotting of the above defined parameter "channels" in the same order
channel_names: ["Clock Quality (%)", "Temperature (°C)", "230V/12V Status (V)", "Router/Charger State (V)", "Logger Voltage (V)"] # names for plotting (optional)
# specify y-ticks (and ylims) giving, (ymin, ymax, step) for each of the above channels (0: default)
CHANNEL_TICKS:
- [0, 100, 20]
- [-10, 50, 10]
- [1, 5, 1]
- [1, 5, 1]
- [9, 15, 1]
# Factor for channel to SI-units (for plotting)
CHANNEL_UNITS:
EX1: 1e-6
EX2: 1e-6
EX3: 1e-6
VEI: 1e-3
# Transform channel for plotting, perform arithmetic operations in given order, e.g.: PBox EX1 V to deg C: 20 * x -20
CHANNEL_TRANSFORM:
EX1:
- ["*", 20]
- ["-", 20]

View File

@@ -19,7 +19,7 @@ from obspy.clients.filesystem.sds import Client
from write_utils import write_html_text, write_html_row, write_html_footer, write_html_header, get_print_title_str, \
init_html_table, finish_html_table
from utils import get_bg_color, modify_stream_for_plot, trace_ylabels, trace_yticks
from utils import get_bg_color, modify_stream_for_plot, set_axis_yticks, set_axis_color, plot_axis_thresholds
try:
import smtplib
@@ -36,9 +36,16 @@ CLR = "\x1B[0K"
deg_str = '\N{DEGREE SIGN}C'
def read_yaml(file_path):
with open(file_path, "r") as f:
return yaml.safe_load(f)
def read_yaml(file_path, n_read=3):
for index in range(n_read):
try:
with open(file_path, "r") as f:
params = yaml.safe_load(f)
except Exception as e:
print(f'Could not read parameters file: {e}.\nWill try again {n_read - index - 1} time(s).')
time.sleep(10)
continue
return params
def nsl_from_id(nwst_id):
@@ -61,7 +68,7 @@ def fancy_timestr(dt, thresh=600, modif='+'):
class SurveillanceBot(object):
def __init__(self, parameter_path, outpath_html=None):
self.keys = ['last active', '230V', '12V', 'router', 'charger', 'voltage', 'clock', 'temp', 'other']
self.keys = ['last active', '230V', '12V', 'router', 'charger', 'voltage', 'mass', 'clock', 'temp', 'other']
self.parameter_path = parameter_path
self.update_parameters()
self.starttime = UTCDateTime()
@@ -85,6 +92,8 @@ class SurveillanceBot(object):
def update_parameters(self):
self.parameters = read_yaml(self.parameter_path)
# add channels to list in parameters dicitonary
self.parameters['channels'] = list(self.parameters.get('CHANNELS').keys())
self.reread_parameters = self.parameters.get('reread_parameters')
self.dt_thresh = [int(val) for val in self.parameters.get('dt_thresh')]
self.verbosity = self.parameters.get('verbosity')
@@ -242,7 +251,7 @@ class SurveillanceBot(object):
def get_station_delay(self, nwst_id):
""" try to get station delay from SDS archive using client"""
locations = ['', '0', '00']
channels = ['HHZ', 'HHE', 'HHN', 'VEI', 'EX1', 'EX2', 'EX3']
channels = ['HHZ', 'HHE', 'HHN'] + self.parameters.get('channels')
network, station = nwst_id.split('.')[:2]
times = []
@@ -341,8 +350,10 @@ class SurveillanceBot(object):
try:
st = modify_stream_for_plot(st, parameters=self.parameters)
st.plot(fig=fig, show=False, draw=False, block=False, equal_scale=False, method='full')
trace_ylabels(fig, self.parameters, self.verbosity)
trace_yticks(fig, self.parameters, self.verbosity)
# set_axis_ylabels(fig, self.parameters, self.verbosity)
set_axis_yticks(fig, self.parameters, self.verbosity)
set_axis_color(fig)
plot_axis_thresholds(fig, self.parameters, self.verbosity)
except Exception as e:
print(f'Could not generate plot for {nwst_id}:')
print(traceback.format_exc())
@@ -434,6 +445,9 @@ class SurveillanceBot(object):
print(f'Could not write HTML table to {fnout}:')
print(traceback.format_exc())
if self.verbosity:
print(f'Wrote html table to {fnout}')
def update_status_message(self):
timespan = timedelta(seconds=int(self.parameters.get('timespan') * 24 * 3600))
self.status_message = f'Program starttime (UTC) {self.starttime.strftime("%Y-%m-%d %H:%M:%S")} | ' \
@@ -455,12 +469,14 @@ class SurveillanceBot(object):
class StationQC(object):
def __init__(self, parent, stream, nsl, parameters, keys, starttime, verbosity, print_func, status_track={}):
def __init__(self, parent, stream, nsl, parameters, keys, starttime, verbosity, print_func, status_track=None):
"""
Station Quality Check class.
:param nsl: dictionary containing network, station and location (key: str)
:param parameters: parameters dictionary from parameters.yaml file
"""
if status_track is None:
status_track = {}
self.parent = parent
self.stream = stream
self.nsl = nsl
@@ -561,8 +577,11 @@ class StationQC(object):
In all other cases return False.
This also prevents sending status (e.g. mail) in case of program startup
"""
if n_errors is not None:
n_errors = self.parameters.get('n_track') + 1
if n_errors is None:
n_errors = self.parameters.get('n_track')
# +1 to check whether n_errors +1 was no error (error is new)
n_errors += 1
previous_errors = self.status_track.get(key)
# only if error list is filled n_track times
@@ -685,6 +704,7 @@ class StationQC(object):
self.pb_temp_analysis()
self.pb_power_analysis()
self.pb_rout_charge_analysis()
self.mass_analysis()
self.clock_quality_analysis()
def return_print_analysis(self):
@@ -703,6 +723,15 @@ class StationQC(object):
def return_analysis(self):
return self.status_dict
def get_unit_factor(self, channel):
""" Get channel multiplier for unit from parameters. If none is specified return 1 """
channel_params = self.parameters.get('CHANNELS').get(channel)
if channel_params:
multiplier = channel_params.get('unit')
if multiplier:
return float(multiplier)
return 1
def get_last_occurrence_timestring(self, trace, indices):
""" returns a nicely formatted string of the timedelta since program starttime and occurrence and abs time"""
last_occur = self.get_last_occurrence(trace, indices)
@@ -714,51 +743,60 @@ class StationQC(object):
def get_last_occurrence(self, trace, indices):
return self.get_time(trace, indices[-1])
def clock_quality_analysis(self, channel='LCQ'):
def clock_quality_analysis(self, channel='LCQ', n_sample_average=10):
""" Analyse clock quality """
key = 'clock'
st = self.stream.select(channel=channel)
trace = self.get_trace(st, key)
if not trace: return
clockQuality = trace.data
clockQuality_warn_level = self.parameters.get('THRESHOLDS').get('clockquality_warn')
clockQuality_fail_level = self.parameters.get('THRESHOLDS').get('clockquality_fail')
if not trace:
return
clock_quality = trace.data
clock_quality_warn_level = self.parameters.get('THRESHOLDS').get('clockquality_warn')
clock_quality_fail_level = self.parameters.get('THRESHOLDS').get('clockquality_fail')
if self.verbosity > 1:
self.print(40 * '-')
self.print('Performing Clock Quality check', flush=False)
clockQuality_warn = np.where(clockQuality < clockQuality_warn_level)[0]
clockQuality_fail = np.where(clockQuality < clockQuality_fail_level)[0]
clockQuality_warn = np.where(clock_quality < clock_quality_warn_level)[0]
clockQuality_fail = np.where(clock_quality < clock_quality_fail_level)[0]
if len(clockQuality_warn) == 0 and len(clockQuality_fail) == 0:
self.status_ok(key, detailed_message=f'ClockQuality={(clockQuality[-1])}')
self.status_ok(key, detailed_message=f'ClockQuality={(clock_quality[-1])}')
return
last_val_average = np.nanmean(clock_quality[-n_sample_average:])
# keep OK status if there are only minor warnings (lower warn level)
warn_message = f'Trace {trace.get_id()}:'
if len(clockQuality_warn) > 0:
# try calculate number of warn peaks from gaps between indices
n_qc_warn = self.calc_occurrences(clockQuality_warn)
detailed_message = warn_message + f' {n_qc_warn}x Qlock Quality less then {clockQuality_warn_level}' \
detailed_message = warn_message + f' {n_qc_warn}x Clock quality less then {clock_quality_warn_level}%' \
+ self.get_last_occurrence_timestring(trace, clockQuality_warn)
self.warn(key, detailed_message=detailed_message, count=n_qc_warn,
last_occurrence=self.get_last_occurrence(trace, clockQuality_warn))
self.status_ok(key, detailed_message=detailed_message)
# set WARN status for severe warnings in the past
if len(clockQuality_fail) > 0:
# try calculate number of fail peaks from gaps between indices
n_qc_fail = self.calc_occurrences(clockQuality_fail)
detailed_message = warn_message + f' {n_qc_fail}x Qlock Quality less then {clockQuality_fail_level}V ' \
detailed_message = warn_message + f' {n_qc_fail}x Clock quality less then {clock_quality_fail_level}%' \
+ self.get_last_occurrence_timestring(trace, clockQuality_fail)
self.error(key, detailed_message=detailed_message, count=n_qc_fail,
self.warn(key, detailed_message=detailed_message, count=n_qc_fail,
last_occurrence=self.get_last_occurrence(trace, clockQuality_fail))
# set FAIL state if last value is less than fail level
if last_val_average < clock_quality_fail_level:
self.error(key, detailed_message=f'ClockQuality={(clock_quality[-1])}')
def voltage_analysis(self, channel='VEI'):
""" Analyse voltage channel for over/undervoltage """
key = 'voltage'
st = self.stream.select(channel=channel)
trace = self.get_trace(st, key)
if not trace: return
voltage = trace.data * 1e-3
if not trace:
return
voltage = trace.data * self.get_unit_factor(channel)
low_volt = self.parameters.get('THRESHOLDS').get('low_volt')
high_volt = self.parameters.get('THRESHOLDS').get('high_volt')
@@ -795,14 +833,15 @@ class StationQC(object):
key = 'temp'
st = self.stream.select(channel=channel)
trace = self.get_trace(st, key)
if not trace: return
voltage = trace.data * 1e-6
if not trace:
return
voltage = trace.data * self.get_unit_factor(channel)
thresholds = self.parameters.get('THRESHOLDS')
temp = 20. * voltage - 20
# average temp
timespan = min([self.parameters.get('timespan') * 24 * 3600, int(len(temp) / trace.stats.sampling_rate)])
nsamp_av = int(trace.stats.sampling_rate) * timespan
av_temp_str = str(round(np.mean(temp[-nsamp_av:]), 1)) + deg_str
av_temp_str = str(round(np.nanmean(temp[-nsamp_av:]), 1)) + deg_str
# dt of average
dt_t_str = str(timedelta(seconds=int(timespan))).replace(', 0:00:00', '')
# current temp
@@ -810,7 +849,7 @@ class StationQC(object):
if self.verbosity > 1:
self.print(40 * '-')
self.print('Performing PowBox temperature check (EX1)', flush=False)
self.print(f'Average temperature at {np.mean(temp)}\N{DEGREE SIGN}', flush=False)
self.print(f'Average temperature at {np.nanmean(temp)}\N{DEGREE SIGN}', flush=False)
self.print(f'Peak temperature at {max(temp)}\N{DEGREE SIGN}', flush=False)
self.print(f'Min temperature at {min(temp)}\N{DEGREE SIGN}', flush=False)
max_temp = thresholds.get('max_temp')
@@ -826,6 +865,52 @@ class StationQC(object):
status_message=cur_temp,
detailed_message=f'Average temperature of last {dt_t_str}: {av_temp_str}')
def mass_analysis(self, channels=('VM1', 'VM2', 'VM3'), n_samp_mean=10):
""" Analyse datalogger mass channels. """
key = 'mass'
# build stream with all channels
st = Stream()
for channel in channels:
st += self.stream.select(channel=channel).copy()
st.merge()
# return if there are no three components
if not len(st) == 3:
return
# correct for channel unit
for trace in st:
trace.data = trace.data * self.get_unit_factor(trace.stats.channel)
# calculate average of absolute maximum of mass offset of last n_samp_mean
last_values = np.array([trace.data[-n_samp_mean:] for trace in st])
last_val_mean = np.nanmean(last_values, axis=1)
common_highest_val = np.nanmax(abs(last_val_mean))
common_highest_val = round(common_highest_val, 1)
# get thresholds for WARN (max_vm_warn) and FAIL (max_vm_fail)
thresholds = self.parameters.get('THRESHOLDS')
max_vm_warn = thresholds.get('max_vm_warn')
max_vm_fail = thresholds.get('max_vm_fail')
if not max_vm_warn or not max_vm_fail:
return
# change status depending on common_highest_val
if common_highest_val < max_vm_warn:
self.status_ok(key, detailed_message=f'{common_highest_val}V')
elif max_vm_warn <= common_highest_val < max_vm_fail:
self.warn(key=key,
detailed_message=f'Warning raised for mass centering. Highest val (abs) {common_highest_val}V', )
else:
self.error(key=key,
detailed_message=f'Fail status for mass centering. Highest val (abs) {common_highest_val}V',)
if self.verbosity > 1:
self.print(40 * '-')
self.print('Performing mass position check', flush=False)
self.print(f'Average mass position at {common_highest_val}', flush=False)
def pb_power_analysis(self, channel='EX2', pb_dict_key='pb_SOH2'):
""" Analyse EX2 channel of PowBox """
keys = ['230V', '12V']
@@ -834,7 +919,7 @@ class StationQC(object):
if not trace:
return
voltage = trace.data * 1e-6
voltage = trace.data * self.get_unit_factor(channel)
if self.verbosity > 1:
self.print(40 * '-')
self.print('Performing PowBox 12V/230V check (EX2)', flush=False)
@@ -857,7 +942,7 @@ class StationQC(object):
if not trace:
return
voltage = trace.data * 1e-6
voltage = trace.data * self.get_unit_factor(channel)
if self.verbosity > 1:
self.print(40 * '-')
self.print('Performing PowBox Router/Charger check (EX3)', flush=False)

View File

@@ -34,7 +34,7 @@ from obspy import UTCDateTime
from survBot import SurveillanceBot
from write_utils import *
from utils import get_bg_color, modify_stream_for_plot, trace_ylabels, trace_yticks
from utils import get_bg_color, modify_stream_for_plot, set_axis_yticks, set_axis_color, plot_axis_thresholds
try:
from rest_api.utils import get_station_iccid
@@ -316,8 +316,10 @@ class MainWindow(QtWidgets.QMainWindow):
self.plot_widget.setWindowTitle(nwst_id)
st = modify_stream_for_plot(st, parameters=self.parameters)
st.plot(equal_scale=False, method='full', block=False, fig=self.plot_widget.canvas.fig)
trace_ylabels(fig=self.plot_widget.canvas.fig, parameters=self.parameters)
trace_yticks(fig=self.plot_widget.canvas.fig, parameters=self.parameters)
# set_axis_ylabels(fig=self.plot_widget.canvas.fig, parameters=self.parameters)
set_axis_yticks(fig=self.plot_widget.canvas.fig, parameters=self.parameters)
set_axis_color(fig=self.plot_widget.canvas.fig)
plot_axis_thresholds(fig=self.plot_widget.canvas.fig, parameters=self.parameters)
self.plot_widget.show()
def notification(self, text):

136
utils.py
View File

@@ -2,6 +2,9 @@
# -*- coding: utf-8 -*-
import matplotlib
import numpy as np
from obspy import Stream
def get_bg_color(check_key, status, dt_thresh=None, hex=False):
@@ -10,9 +13,11 @@ def get_bg_color(check_key, status, dt_thresh=None, hex=False):
bg_color = get_time_delay_color(message, dt_thresh)
elif check_key == 'temp':
bg_color = get_temp_color(message)
elif check_key == 'mass':
bg_color = get_mass_color(message)
else:
if status.is_warn:
bg_color = get_color('WARNX')(status.count)
bg_color = get_warn_color(status.count)
elif status.is_error:
bg_color = get_color('FAIL')
else:
@@ -30,12 +35,16 @@ def get_color(key):
colors_dict = {'FAIL': (255, 50, 0, 255),
'NO DATA': (255, 255, 125, 255),
'WARN': (255, 255, 80, 255),
'WARNX': lambda x: (min([255, 200 + x ** 2]), 255, 80, 255),
'OK': (125, 255, 125, 255),
'undefined': (230, 230, 230, 255)}
return colors_dict.get(key)
def get_color_mpl(key):
color_tup = get_color(key)
return np.array([color/255. for color in color_tup])
def get_time_delay_color(dt, dt_thresh):
""" Set color of time delay after thresholds specified in self.dt_thresh """
if dt < dt_thresh[0]:
@@ -45,6 +54,18 @@ def get_time_delay_color(dt, dt_thresh):
return get_color('FAIL')
def get_warn_color(count):
color = (min([255, 200 + count ** 2]), 255, 80, 255)
return color
def get_mass_color(message):
# can change this to something else if wanted. This way it always returns get_color (without warn count)
if isinstance(message, (float, int)):
return get_color('OK')
return get_color(message)
def get_temp_color(temp, vmin=-10, vmax=60, cmap='coolwarm'):
""" Get an rgba temperature value back from specified cmap, linearly interpolated between vmin and vmax. """
if type(temp) in [str]:
@@ -55,29 +76,43 @@ def get_temp_color(temp, vmin=-10, vmax=60, cmap='coolwarm'):
return rgba
def modify_stream_for_plot(st, parameters):
def modify_stream_for_plot(input_stream, parameters):
""" copy (if necessary) and modify stream for plotting """
ch_units = parameters.get('CHANNEL_UNITS')
ch_transf = parameters.get('CHANNEL_TRANSFORM')
# if either of both are defined make copy
if ch_units or ch_transf:
st = st.copy()
# make a copy
st = Stream()
# modify trace for plotting by multiplying unit factor (e.g. 1e-3 mV to V)
if ch_units:
for tr in st:
channel = tr.stats.channel
unit_factor = ch_units.get(channel)
if unit_factor:
tr.data = tr.data * float(unit_factor)
# modify trace for plotting by other arithmetic expressions
if ch_transf:
for tr in st:
channel = tr.stats.channel
transf = ch_transf.get(channel)
if transf:
tr.data = transform_trace(tr.data, transf)
channels_dict = parameters.get('CHANNELS')
# iterate over all channels and put them to new stream in order
for index, ch_tup in enumerate(channels_dict.items()):
# unpack tuple from items
channel, channel_dict = ch_tup
# get correct channel from stream
st_sel = input_stream.select(channel=channel)
# in case there are != 1 there is ambiguity
if not len(st_sel) == 1:
continue
# make a copy to not modify original stream!
tr = st_sel[0].copy()
# multiply with conversion factor for unit
unit_factor = channel_dict.get('unit')
if unit_factor:
tr.data = tr.data * float(unit_factor)
# apply transformations if provided
transform = channel_dict.get('transform')
if transform:
tr.data = transform_trace(tr.data, transform)
# modify trace id to maintain plotting order
name = channel_dict.get('name')
tr.id = f'{index + 1}: {name} - {tr.id}'
st.append(tr)
return st
@@ -104,13 +139,11 @@ def transform_trace(data, transf):
return data
def trace_ylabels(fig, parameters, verbosity=0):
def set_axis_ylabels(fig, parameters, verbosity=0):
"""
Adds channel names to y-axis if defined in parameters.
Can get mixed up if channel order in stream and channel names defined in parameters.yaml differ, but it is
difficult to assess the correct order from Obspy plotting routing.
"""
names = parameters.get('channel_names')
names = [channel.get('name') for channel in parameters.get('CHANNELS').values()]
if not names: # or not len(st.traces):
return
if not len(names) == len(fig.axes):
@@ -122,13 +155,20 @@ def trace_ylabels(fig, parameters, verbosity=0):
ax.set_ylabel(channel_name)
def trace_yticks(fig, parameters, verbosity=0):
def set_axis_color(fig, color='grey'):
"""
Set all axes of figure to specific color
"""
for ax in fig.axes:
for key in ['bottom', 'top', 'right', 'left']:
ax.spines[key].set_color(color)
def set_axis_yticks(fig, parameters, verbosity=0):
"""
Adds channel names to y-axis if defined in parameters.
Can get mixed up if channel order in stream and channel names defined in parameters.yaml differ, but it is
difficult to assess the correct order from Obspy plotting routing.
"""
ticks = parameters.get('CHANNEL_TICKS')
ticks = [channel.get('ticks') for channel in parameters.get('CHANNELS').values()]
if not ticks:
return
if not len(ticks) == len(fig.axes):
@@ -140,6 +180,38 @@ def trace_yticks(fig, parameters, verbosity=0):
continue
ymin, ymax, step = ytick_tripple
yticks = list(range(ymin, ymax + step, step))
yticks = list(np.arange(ymin, ymax + step, step))
ax.set_yticks(yticks)
ax.set_ylim(ymin - step, ymax + step)
ax.set_ylim(ymin - 0.33 * step, ymax + 0.33 * step)
def plot_axis_thresholds(fig, parameters, verbosity=0):
"""
Adds channel thresholds (warn, fail) to y-axis if defined in parameters.
"""
if verbosity > 0:
print('Plotting trace thresholds')
keys_colors = {'warn': dict(color=0.8 * get_color_mpl('WARN'), linestyle=(0, (5, 10)), alpha=0.5, linewidth=0.7),
'fail': dict(color=0.8 * get_color_mpl('FAIL'), linestyle='solid', alpha=0.5, linewidth=0.7)}
for key, kwargs in keys_colors.items():
channel_threshold_list = [channel.get(key) for channel in parameters.get('CHANNELS').values()]
if not channel_threshold_list:
continue
plot_threshold_lines(fig, channel_threshold_list, parameters, **kwargs)
def plot_threshold_lines(fig, channel_threshold_list, parameters, **kwargs):
for channel_thresholds, ax in zip(channel_threshold_list, fig.axes):
if not channel_thresholds:
continue
if not isinstance(channel_thresholds, (list, tuple)):
channel_thresholds = [channel_thresholds]
for warn_thresh in channel_thresholds:
if isinstance(warn_thresh, str):
warn_thresh = parameters.get('THRESHOLDS').get(warn_thresh)
if type(warn_thresh in (float, int)):
ax.axhline(warn_thresh, **kwargs)