Merge branch 'release0.2'

This commit is contained in:
Marcel Paffrath 2023-06-01 10:09:58 +02:00
commit e1a3b498e5
11 changed files with 838 additions and 233 deletions

2
.gitignore vendored
View File

@ -211,3 +211,5 @@ flycheck_*.el
/network-security.data
/__simulate_fail.json
/mailing_list.yaml

View File

@ -1,6 +1,6 @@
# survBot
version: 0.1
version: 0.2
survBot is a small program used to track station quality channels of DSEBRA stations via PowBox output over SOH channels
by analysing contents of a Seiscomp3 datapath.
@ -40,8 +40,16 @@ The GUI can be loaded via
python survBotGui.py
```
## Version Changes
- surveillance of mass, clock and gaps
- individual mailing lists for different stations
- html mail with recent status information
- updated web page design
- restructured parameter file
- recognize if PBox is disconnected
## Staff
Original author: M.Paffrath (marcel.paffrath@rub.de)
November 2022
June 2023

View File

@ -0,0 +1,3 @@
# survBot is a small program used to track station quality channels of DSEBRA stations via PowBox output
# over SOH channels by analysing contents of a Seiscomp3 datapath.
__version__ = "0.2"

9
mailing_list.yaml Normal file
View File

@ -0,0 +1,9 @@
# specify mail addresses and station network ids for which information mails shall be sent, e.g.:
# "mail.address@provider.com, mail.address2@provider2.com":
# - 1Y.GR01
# - 1Y.GR02
# "mail.address3@provder.com":
# - 1Y.GR03
#"kasper.fischer@rub.de":
# - 1Y.GR01

View File

@ -1,18 +1,17 @@
# Parameters file for Surveillance Bot
datapath: "/data/SDS/" # SC3 Datapath
networks: ["1Y", "HA"] # select networks, list or str
networks: ["1Y", "HA", "MK"] # select networks, list or str
stations: "*" # select stations, list or str
locations: "*" # select locations, list or str
channels: ["EX1", "EX2", "EX3", "VEI"] # Specify SOH channels, currently supported EX[1-3] and VEI
stations_blacklist: ["TEST", "EREA"] # exclude these stations
stations_blacklist: ["TEST", "EREA", "DOMV"] # exclude these stations
networks_blacklist: [] # exclude these networks
interval: 60 # Perform checks every x seconds
n_track: 300 # wait n_track * intervals before performing an action (i.e. send mail/end highlight status)
timespan: 7 # Check data of the recent x days
n_track: 360 # wait n_track * intervals before performing an action (i.e. send mail/end highlight status)
timespan: 3 # Check data of the recent x days
verbosity: 0 # verbosity flag
track_changes: True # tracks all changes since GUI startup by text highlighting (GUI only)
warn_count: False # show number of warnings and errors in table
min_sample: 3 # minimum samples for raising Warn/FAIL
warn_count: False # show number of warnings and errors in table
min_sample: 5 # minimum samples for raising Warn/FAIL
dt_thresh: [300, 1800] # threshold (s) for timing delay colourisation (yellow/red)
html_figures: True # Create html figure directory and links
reread_parameters: True # reread parameters file (change parameters on runtime, not for itself/GUI refresh/datapath)
@ -41,8 +40,78 @@ THRESHOLDS:
pb_thresh: 0.2 # Threshold for PowBox Voltage check +/- (V)
max_temp: 50 # max temperature for temperature warning
low_volt: 12 # min voltage for low voltage warning
high_volt: 14.8 # max voltage for over voltage warning
high_volt: 14.8 # max voltage for over voltage warning
unclassified: 5 # min voltage samples not classified for warning
max_vm_warn: 1.5 # threshold for mass offset (warn), fail)
max_vm_fail: 2.5 # threshold for mass offset (warn), fail)
clockquality_warn: 90 # warn level - clock quality ranges from 0 % to 100 % with 100 % being the best level
clockquality_fail: 70 # fail level
min_gap: 0.1 # minimum for gap declaration, should be > 0 [s]
# ---------------------------------- Specification of input channels ---------------------------------------------------
# Currently supported: EX[1-3], VEI, VM[1-3], LCQ
#
# For each channel a factor 'unit' for unit conversion (e.g. to SI) can be provided, as well as a 'name'
# and 'ticks' [ymin, ymax, ystep] for plotting.
# 'warn' and 'fail' plot horizontal lines in corresponding colors (can be str in TRESHOLDS, int/float or iterable)
#
# 'transform' can be provided for plotting to perform arithmetic operations in given order, e.g.:
# transform: - ["*", 20]
# - ["-", 20]
# --> PBox EX1 V to deg C: 20 * x -20
CHANNELS:
EX1:
unit: 1e-6
name: "PowBox Temperature (°C)"
ticks: [-10, 50, 10]
transform:
- ["*", 20]
- ["-", 20]
warn: "max_temp"
EX2:
unit: 1e-6
name: "PowBox 230V/12V (V)"
ticks: [0, 5, 1]
warn: [2, 3, 4, 4.5, 5]
EX3:
unit: 1e-6
name: "PowBox Router/Charger (V)"
ticks: [0, 5, 1]
warn: [2, 2.5, 3, 4, 5]
VEI:
unit: 1e-3
name: "Datalogger (V)"
ticks: [9, 15, 1]
warn: ["low_volt", "high_volt"]
fail: 10.5
VM1:
unit: 1e-6
name: "Mass position W (V)"
ticks: [-2.5, 2.5, 1]
warn: [-1.5, 1.5]
fail: [-2.5, 2.5]
VM2:
unit: 1e-6
name: "Mass position V (V)"
ticks: [-2.5, 2.5, 1]
warn: [-1.5, 1.5]
fail: [-2.5, 2.5]
VM3:
unit: 1e-6
name: "Mass position U (V)"
ticks: [-2.5, 2.5, 1]
warn: [-1.5, 1.5]
fail: [-2.5, 2.5]
LCQ:
name: "Clock quality (%)"
ticks: [0, 100, 20]
warn: "clockquality_warn"
fail: "clockquality_fail"
# specify data channels (can be additional to the above). From these channels only headers will be read
data_channels: ["HHZ", "HHN", "HHE"]
# ---------------------------------------- OPTIONAL PARAMETERS ---------------------------------------------------------
@ -53,6 +122,15 @@ add_links:
slmon: {"URL": "../slmon/{nw}_{st}.html", "text": "show"}
24h-plot: {"URL": "../scheli/{nw}/{st}.png", "text": "plot"}
# add station-independent links below html table (list items separated with -)
add_global_links:
# for example: - {"text": "our homepage", "URL": "https://www.rub.de"}
- {"text": "show recent events on map",
"URL": "https://fdsnws.geophysik.ruhr-uni-bochum.de/map/?lat=39.5&lon=21&zoom=7&baselayer=mapnik"}
# html logo at page bottom (path relative to html directory)
html_logo: "figures/Logo_RUB_BLAU_rgb.png"
# E-mail notifications
EMAIL:
mailserver: "localhost"
@ -60,25 +138,5 @@ EMAIL:
sender: "webmaster@geophysik.ruhr-uni-bochum.de" # mail sender
stations_blacklist: ['GR33'] # do not send emails for specific stations
networks_blacklist: [] # do not send emails for specific network
# names for plotting of the above defined parameter "channels" in the same order
channel_names: ["Temperature (°C)", "230V/12V Status (V)", "Router/Charger State (V)", "Logger Voltage (V)"]
# specify y-ticks (and ylims) giving, (ymin, ymax, step) for each of the above channels (0: default)
CHANNEL_TICKS:
- [-10, 50, 10]
- [1, 5, 1]
- [1, 5, 1]
- [9, 15, 1]
# Factor for channel to SI-units (for plotting)
CHANNEL_UNITS:
EX1: 1e-6
EX2: 1e-6
EX3: 1e-6
VEI: 1e-3
# Transform channel for plotting, perform arithmetic operations in given order, e.g.: PBox EX1 V to deg C: 20 * x -20
CHANNEL_TRANSFORM:
EX1:
- ["*", 20]
- ["-", 20]
# specify recipients for single stations in a yaml: key = email-address, val = station list (e.g. [1Y.GR01, 1Y.GR02])
external_mail_list: "mailing_list.yaml"

View File

@ -2,26 +2,36 @@ body {
background-color: #ffffff;
place-items: center;
text-align: center;
padding-bottom: 30px;
font-family: "Helvetica", "sans-serif";
}
table {
position: relative
}
td {
border-radius: 4px;
border-radius: 2px;
padding: 0px;
white-space: nowrap;
}
th {
background-color: #999;
border-radius: 4px;
color: #fff;
border-radius: 2px;
padding: 3px 1px;
position: sticky;
top: 0;
}
a:link, a:visited {
background-color: #ccc;
background-color: #e8e8e8;
color: #000;
text-decoration: none;
display: block;
border-radius: 4px;
border: 1px solid #bbb;
border: 1px solid #ccc;
}
a:hover {
@ -37,3 +47,12 @@ a:hover {
50% { background-color: #ff3200;}
100% { background-color: #ffcc00;}
}
.footer {
position: fixed;
left: 0;
bottom: 0;
width: 100%;
height: 50px;
text-align: center;
}

View File

@ -2,26 +2,36 @@ body {
background-color: #ffffff;
place-items: center;
text-align: center;
padding-bottom: 30px;
font-family: "Helvetica", "sans-serif";
}
table {
position: relative
}
td {
border-radius: 4px;
border-radius: 3px;
padding: 10px 2px;
white-space: nowrap;
}
th {
background-color: #999;
border-radius: 4px;
color: #fff;
border-radius: 3px;
padding: 10px, 2px;
position: sticky;
top: 0;
}
a:link {
background-color: #ccc;
a:link, a:visited {
background-color: #e8e8e8;
color: #000;
text-decoration: none;
display: block;
border-radius: 4px;
border: 1px solid #bbb;
border-radius: 6px;
border: 1px solid #ccc;
}
a:hover {
@ -37,7 +47,16 @@ a:hover {
animation: blinkingBackground 2s infinite;
}
@keyframes blinkingBackground{
0% { background-color: #ffee00;}
0% { background-color: #ffcc00;}
50% { background-color: #ff3200;}
100% { background-color: #ffee00;}
100% { background-color: #ffcc00;}
}
.footer {
position: fixed;
left: 0;
bottom: 0;
width: 100%;
height: 50px;
text-align: center;
}

View File

@ -5,9 +5,12 @@ __version__ = '0.1'
__author__ = 'Marcel Paffrath'
import os
import io
import copy
import traceback
import yaml
import argparse
import json
import time
from datetime import timedelta
@ -17,13 +20,14 @@ import matplotlib.pyplot as plt
from obspy import read, UTCDateTime, Stream
from obspy.clients.filesystem.sds import Client
from write_utils import write_html_text, write_html_row, write_html_footer, write_html_header, get_print_title_str, \
init_html_table, finish_html_table
from utils import get_bg_color, modify_stream_for_plot, trace_ylabels, trace_yticks
from write_utils import get_html_text, get_html_link, get_html_row, html_footer, get_html_header, get_print_title_str, \
init_html_table, finish_html_table, get_mail_html_header, add_html_image
from utils import get_bg_color, get_font_color, modify_stream_for_plot, set_axis_yticks, set_axis_color, plot_axis_thresholds
try:
import smtplib
from email.mime.text import MIMEText
from email.message import EmailMessage
from email.utils import make_msgid
mail_functionality = True
except ImportError:
@ -36,23 +40,36 @@ CLR = "\x1B[0K"
deg_str = '\N{DEGREE SIGN}C'
def read_yaml(file_path):
with open(file_path, "r") as f:
return yaml.safe_load(f)
def read_yaml(file_path, n_read=3):
for index in range(n_read):
try:
with open(file_path, "r") as f:
params = yaml.safe_load(f)
except Exception as e:
print(f'Could not read parameters file: {e}.\nWill try again {n_read - index - 1} time(s).')
time.sleep(10)
continue
return params
def nsl_from_id(nwst_id):
nwst_id = get_full_seed_id(nwst_id)
network, station, location = nwst_id.split('.')
return dict(network=network, station=station, location=location)
def get_full_seed_id(nwst_id):
seed_id = '{}.{}.{}'.format(*nwst_id.split('.'), '')
return seed_id
def get_nwst_id(trace):
stats = trace.stats
return f'{stats.network}.{stats.station}.' # {stats.location}'
def fancy_timestr(dt, thresh=600, modif='+'):
if dt > timedelta(seconds=thresh):
if isinstance(dt, timedelta) and dt > timedelta(seconds=thresh):
value = f'{modif} ' + str(dt) + f' {modif}'
else:
value = str(dt)
@ -61,7 +78,8 @@ def fancy_timestr(dt, thresh=600, modif='+'):
class SurveillanceBot(object):
def __init__(self, parameter_path, outpath_html=None):
self.keys = ['last active', '230V', '12V', 'router', 'charger', 'voltage', 'temp', 'other']
self.keys = ['last active', '230V', '12V', 'router', 'charger', 'voltage', 'mass', 'clock', 'gaps', 'temp',
'other']
self.parameter_path = parameter_path
self.update_parameters()
self.starttime = UTCDateTime()
@ -69,22 +87,32 @@ class SurveillanceBot(object):
self.current_day = self.starttime.julday
self.outpath_html = outpath_html
self.filenames = []
self.filenames_read = []
self.filenames_wf_data = []
self.filenames_read_last_modif = {}
self.station_list = []
self.analysis_print_list = []
self.analysis_results = {}
self.status_track = {}
self.dataStream = Stream()
self.data = {}
self.gaps = []
self.print_count = 0
self.status_message = ''
self.html_fig_dir = 'figures'
self.active_figures = {}
self.cl = Client(self.parameters.get('datapath')) # TODO: Check if this has to be loaded again on update
self.get_stations()
def update_parameters(self):
self.parameters = read_yaml(self.parameter_path)
# add channels to list in parameters dictionary, also add data channels
channels = list(self.parameters.get('CHANNELS').keys())
for channel in self.parameters.get('data_channels'):
if not channel in channels:
channels.append(channel)
self.parameters['channels'] = channels
self.reread_parameters = self.parameters.get('reread_parameters')
self.dt_thresh = [int(val) for val in self.parameters.get('dt_thresh')]
self.verbosity = self.parameters.get('verbosity')
@ -92,9 +120,16 @@ class SurveillanceBot(object):
self.networks_blacklist = self.parameters.get('networks_blacklist')
self.refresh_period = self.parameters.get('interval')
self.transform_parameters()
add_links = self.parameters.get('add_links')
self.add_links = add_links if add_links else {}
add_global_links = self.parameters.get('add_global_links')
# in case user forgets "-" in parameters file
if isinstance(add_global_links, dict):
add_global_links = [add_global_links]
self.add_global_links = add_global_links if add_global_links else []
def transform_parameters(self):
for key in ['networks', 'stations', 'locations', 'channels']:
parameter = self.parameters.get(key)
@ -120,25 +155,31 @@ class SurveillanceBot(object):
def get_filenames(self):
self.filenames = []
self.filenames_wf_data = []
time_now = UTCDateTime()
t1 = time_now - self.parameters.get('timespan') * 24 * 3600
networks = self.parameters.get('networks')
stations = self.parameters.get('stations')
locations = self.parameters.get('locations')
channels = self.parameters.get('channels')
channels_wf_data = self.parameters.get('data_channels')
for network in networks:
for station in stations:
for location in locations:
for channel in channels:
self.filenames += list(self.cl._get_filenames(network, station, location, channel,
starttime=t1, endtime=time_now))
fnames = list(self.cl._get_filenames(network, station, location, channel,
starttime=t1, endtime=time_now))
self.filenames += fnames
def read_data(self, re_read_at_hour=1, daily_overlap=2):
# keep track of filenames with wf data (only read headers later)
if channel in channels_wf_data:
self.filenames_wf_data += fnames
def read_data(self, re_read_at_hour=1):
'''
read data method reads new data into self.stream
:param re_read_at_hour: update archive at specified hour each day (hours up to 24)
:param daily_overlap: re-read data of previous day until specified hour (hours up to 24)
'''
self.data = {}
@ -146,28 +187,31 @@ class SurveillanceBot(object):
curr_time = UTCDateTime()
current_day = curr_time.julday
current_hour = curr_time.hour
yesterday = (curr_time - 24. * 3600.).julday
if re_read_at_hour is not False and current_day != self.current_day and current_hour == re_read_at_hour:
self.filenames_read = []
self.filenames_read_last_modif = {}
self.dataStream = Stream()
self.current_day = current_day
# add all data to current stream
for filename in self.filenames:
if filename in self.filenames_read:
# if file already read and last modification time is the same as of last read operation: continue
if self.filenames_read_last_modif.get(filename) == os.path.getmtime(filename):
if self.verbosity > 0:
print('Continue on file', filename)
continue
try:
st_new = read(filename, dtype=float)
# add file to read filenames to prevent re-reading in case it is not the current day (or end of
# previous day)
if not filename.endswith(f'{current_day:03}') and not (
filename.endswith(f'{yesterday:03}') and current_hour <= daily_overlap):
self.filenames_read.append(filename)
# read only header of wf_data
if filename in self.filenames_wf_data:
st_new = read(filename, headonly=True)
else:
st_new = read(filename, dtype=float)
self.filenames_read_last_modif[filename] = os.path.getmtime(filename)
except Exception as e:
print(f'Could not read file {filename}:', e)
continue
self.dataStream += st_new
self.dataStream.merge(fill_value=np.nan)
self.gaps = self.dataStream.get_gaps(min_gap=self.parameters['THRESHOLDS'].get('min_gap'))
self.dataStream.merge()
# organise data in dictionary with key for each station
for trace in self.dataStream:
@ -242,7 +286,7 @@ class SurveillanceBot(object):
def get_station_delay(self, nwst_id):
""" try to get station delay from SDS archive using client"""
locations = ['', '0', '00']
channels = ['HHZ', 'HHE', 'HHN', 'VEI', 'EX1', 'EX2', 'EX3']
channels = self.parameters.get('channels') + self.parameters.get('data_channels')
network, station = nwst_id.split('.')[:2]
times = []
@ -329,20 +373,25 @@ class SurveillanceBot(object):
for nwst_id in self.station_list:
self.write_html_figure(nwst_id)
def write_html_figure(self, nwst_id):
def write_html_figure(self, nwst_id, save_bytes=False):
""" Write figure for html for specified station """
self.check_fig_dir()
fig = plt.figure(figsize=(16, 9))
fnout = self.get_fig_path_abs(nwst_id)
st = self.data.get(nwst_id)
fnames_out = [self.get_fig_path_abs(nwst_id), io.BytesIO()]
st = self.data.get(get_full_seed_id(nwst_id))
if st:
# TODO: this section might fail, adding try-except block for analysis and to prevent program from crashing
# TODO: this section failed once, adding try-except block for analysis and to prevent program from crashing
try:
endtime = UTCDateTime()
starttime = endtime - self.parameters.get('timespan') * 24 * 3600
st = modify_stream_for_plot(st, parameters=self.parameters)
st.plot(fig=fig, show=False, draw=False, block=False, equal_scale=False, method='full')
trace_ylabels(fig, self.parameters, self.verbosity)
trace_yticks(fig, self.parameters, self.verbosity)
st.plot(fig=fig, show=False, draw=False, block=False, equal_scale=False, method='full',
starttime=starttime, endtime=endtime)
# set_axis_ylabels(fig, self.parameters, self.verbosity)
set_axis_yticks(fig, self.parameters, self.verbosity)
set_axis_color(fig)
plot_axis_thresholds(fig, self.parameters, self.verbosity)
except Exception as e:
print(f'Could not generate plot for {nwst_id}:')
print(traceback.format_exc())
@ -352,88 +401,132 @@ class SurveillanceBot(object):
f'Refreshed hourly or on FAIL status.')
for ax in fig.axes:
ax.grid(True, alpha=0.1)
fig.savefig(fnout, dpi=150., bbox_inches='tight')
for fnout in fnames_out:
fig.savefig(fnout, dpi=150., bbox_inches='tight')
# if needed save figure as virtual object (e.g. for mailing)
if save_bytes:
fnames_out[-1].seek(0)
self.active_figures[nwst_id] = fnames_out[-1]
plt.close(fig)
def write_html_table(self, default_color='#e6e6e6', default_header_color='#999', hide_keys_mobile=('other')):
def get_html_class(self, hide_keys_mobile=None, status=None, check_key=None):
""" helper function for html class if a certain condition is fulfilled """
html_class = None
if status and status.is_active:
html_class = 'blink-bg'
if hide_keys_mobile and check_key in hide_keys_mobile:
html_class = 'hidden-mobile'
return html_class
def get_html_class(status=None, check_key=None):
""" helper function for html class if a certain condition is fulfilled """
html_class = None
if status and status.is_active:
html_class = 'blink-bg'
if check_key in hide_keys_mobile:
html_class = 'hidden-mobile'
return html_class
def make_html_table_header(self, default_header_color, hide_keys_mobile=None, add_links=True):
# First write header items
header = self.keys.copy()
# add columns for additional links
if add_links:
for key in self.add_links:
header.insert(-1, key)
header_items = [dict(text='Station', color=default_header_color)]
for check_key in header:
html_class = self.get_html_class(hide_keys_mobile, check_key=check_key)
item = dict(text=check_key, color=default_header_color, html_class=html_class)
header_items.append(item)
return header, header_items
def get_html_row_items(self, status_dict, nwst_id, header, default_color, hide_keys_mobile=None,
hyperlinks=True):
''' create a html table row for the different keys '''
fig_name = self.get_fig_path_rel(nwst_id)
nwst_id_str = nwst_id.rstrip('.')
col_items = [dict(text=nwst_id_str, color=default_color, hyperlink=fig_name if hyperlinks else None,
bold=True, tooltip=f'Show plot of {nwst_id_str}', font_color='#000000')]
for check_key in header:
if check_key in self.keys:
status = status_dict.get(check_key)
message, detailed_message = status.get_status_str()
# get background color
dt_thresh = [timedelta(seconds=sec) for sec in self.dt_thresh]
bg_color = get_bg_color(check_key, status, dt_thresh, hex=True)
if not bg_color:
bg_color = default_color
font_color = get_font_color(bg_color, hex=True)
# add degree sign for temp
if check_key == 'temp':
if not type(message) in [str]:
message = str(message) + deg_str
html_class = self.get_html_class(hide_keys_mobile, status=status, check_key=check_key)
item = dict(text=str(message), tooltip=str(detailed_message), color=bg_color,
html_class=html_class, font_color=font_color)
elif check_key in self.add_links:
value = self.add_links.get(check_key).get('URL')
link_text = self.add_links.get(check_key).get('text')
if not value:
continue
nw, st = nwst_id.split('.')[:2]
hyperlink_dict = dict(nw=nw, st=st, nwst_id=nwst_id)
link = value.format(**hyperlink_dict)
item = dict(text=link_text, tooltip=link, hyperlink=link if hyperlinks else None, color=default_color)
else:
item = dict(text='', tooltip='')
col_items.append(item)
return col_items
def write_html_table(self, default_color='#e6e6e6', default_header_color='#999999', hide_keys_mobile=('other',)):
self.check_html_dir()
fnout = pjoin(self.outpath_html, 'survBot_out.html')
if not fnout:
return
try:
with open(fnout, 'w') as outfile:
write_html_header(outfile, self.refresh_period)
# write_html_table_title(outfile, self.parameters)
init_html_table(outfile)
outfile.write(get_html_header(self.refresh_period))
# First write header items
header = self.keys.copy()
# add columns for additional links
for key in self.add_links:
header.insert(-1, key)
header_items = [dict(text='Station', color=default_header_color)]
for check_key in header:
html_class = get_html_class(check_key=check_key)
item = dict(text=check_key, color=default_header_color, html_class=html_class)
header_items.append(item)
write_html_row(outfile, header_items, html_key='th')
# write_html_table_title(self.parameters)
outfile.write(init_html_table())
# Write all cells
# write html header row
header, header_items = self.make_html_table_header(default_header_color, hide_keys_mobile)
html_row = get_html_row(header_items, html_key='th')
outfile.write(html_row)
# Write all cells (row after row)
for nwst_id in self.station_list:
fig_name = self.get_fig_path_rel(nwst_id)
nwst_id_str = nwst_id.rstrip('.')
col_items = [dict(text=nwst_id_str, color=default_color, hyperlink=fig_name,
bold=True, tooltip=f'Show plot of {nwst_id_str}')]
for check_key in header:
if check_key in self.keys:
status_dict = self.analysis_results.get(nwst_id)
status = status_dict.get(check_key)
message, detailed_message = status.get_status_str()
# get list with column-wise items to write as a html row
status_dict = self.analysis_results.get(nwst_id)
col_items = self.get_html_row_items(status_dict, nwst_id, header, default_color, hide_keys_mobile)
outfile.write(get_html_row(col_items))
# get background color
dt_thresh = [timedelta(seconds=sec) for sec in self.dt_thresh]
bg_color = get_bg_color(check_key, status, dt_thresh, hex=True)
if not bg_color:
bg_color = default_color
outfile.write(finish_html_table())
# add degree sign for temp
if check_key == 'temp':
if not type(message) in [str]:
message = str(message) + deg_str
# add optional links below html table
for dct in self.add_global_links:
link_str = get_html_link(dct.get('text'), dct.get('URL'))
outfile.write(get_html_text(link_str))
html_class = get_html_class(status=status, check_key=check_key)
item = dict(text=str(message), tooltip=str(detailed_message), color=bg_color,
html_class=html_class)
elif check_key in self.add_links:
value = self.add_links.get(check_key).get('URL')
link_text = self.add_links.get(check_key).get('text')
if not value:
continue
nw, st = nwst_id.split('.')[:2]
hyperlink_dict = dict(nw=nw, st=st, nwst_id=nwst_id)
link = value.format(**hyperlink_dict)
item = dict(text=link_text, tooltip=link, hyperlink=link, color=default_color)
col_items.append(item)
# add status message
outfile.write(get_html_text(self.status_message))
write_html_row(outfile, col_items)
# write footer with optional logo
logo_file = self.parameters.get('html_logo')
if not os.path.isfile(pjoin(self.outpath_html, logo_file)):
print(f'Specified file {logo_file} not found.')
logo_file = None
outfile.write(html_footer(footer_logo=logo_file))
finish_html_table(outfile)
write_html_text(outfile, self.status_message)
write_html_footer(outfile)
except Exception as e:
print(f'Could not write HTML table to {fnout}:')
print(traceback.format_exc())
if self.verbosity:
print(f'Wrote html table to {fnout}')
def update_status_message(self):
timespan = timedelta(seconds=int(self.parameters.get('timespan') * 24 * 3600))
self.status_message = f'Program starttime (UTC) {self.starttime.strftime("%Y-%m-%d %H:%M:%S")} | ' \
@ -455,19 +548,22 @@ class SurveillanceBot(object):
class StationQC(object):
def __init__(self, parent, stream, nsl, parameters, keys, starttime, verbosity, print_func, status_track={}):
def __init__(self, parent, stream, nsl, parameters, keys, starttime, verbosity, print_func, status_track=None):
"""
Station Quality Check class.
:param nsl: dictionary containing network, station and location (key: str)
:param parameters: parameters dictionary from parameters.yaml file
"""
if status_track is None:
status_track = {}
self.parent = parent
self.stream = stream
self.nsl = nsl
self.network = nsl.get('network')
self.station = nsl.get('station')
self.location = nsl.get('location')
self.parameters = parameters
# make a copy of parameters object to prevent accidental changes
self.parameters = copy.deepcopy(parameters)
self.program_starttime = starttime
self.verbosity = verbosity
self.last_active = False
@ -528,29 +624,35 @@ class StationQC(object):
# current_status_message = '' if current_status_message in [None, 'OK', '-'] else current_status_message + ' | '
# self.status_dict[key] = current_status_message + status_message
def error(self, key, detailed_message, last_occurrence=None, count=1):
def error(self, key, detailed_message, last_occurrence=None, count=1, disc=False):
send_mail = False
new_error = StatusError(count=count, show_count=self.parameters.get('warn_count'))
if disc:
new_error.set_disconnected()
current_status = self.status_dict.get(key)
if current_status.is_error:
current_status.count += count
else:
current_status = new_error
# if error is new and not on program-startup set active and refresh plot (using parent class)
if self.search_previous_errors(key, n_errors=1) is True:
self.parent.write_html_figure(self.nwst_id)
if self.status_track.get(key) and not self.status_track.get(key)[-1]:
self.parent.write_html_figure(self.nwst_id, save_bytes=True)
if self.verbosity:
self.print(f'{UTCDateTime()}: {detailed_message}', flush=False)
# do not send error mail if this is the first run (e.g. program startup) or state was already error (unchanged)
if self.search_previous_errors(key) is True:
self.send_mail(key, status_type='FAIL', additional_message=detailed_message)
# set status to "inactive" after sending info mail
send_mail = True
# set status to "inactive" when info mail is sent
current_status.is_active = False
elif self.search_previous_errors(key) == 'active':
current_status.is_active = True
# first update status, then send mail
self._update_status(key, current_status, detailed_message, last_occurrence)
if send_mail:
self.send_mail(key, status_type='FAIL', additional_message=detailed_message)
def search_previous_errors(self, key, n_errors=None):
"""
@ -561,8 +663,16 @@ class StationQC(object):
In all other cases return False.
This also prevents sending status (e.g. mail) in case of program startup
"""
if n_errors is not None:
n_errors = self.parameters.get('n_track') + 1
if n_errors is None:
n_errors = self.parameters.get('n_track')
# +1 to check whether n_errors + 1 was no error (error is new)
n_errors += 1
# simulate an error specified in json file (dictionary: {nwst_id: key} )
if self._simulated_error_check(key) is True:
print(f'Simulating Error on {self.nwst_id}, {key}')
return True
previous_errors = self.status_track.get(key)
# only if error list is filled n_track times
@ -570,11 +680,19 @@ class StationQC(object):
# if first entry was no error but all others are, return True (-> new Fail n_track times)
if not previous_errors[0] and all(previous_errors[1:]):
return True
# in case previous_errors exists, last item is error but not all items are error, error still active
elif previous_errors and previous_errors[-1] and not all(previous_errors):
# in case previous_errors exist, last item is error but not all items are error, error still active
if previous_errors and previous_errors[-1] and not all(previous_errors):
return 'active'
return False
def _simulated_error_check(self, key, fname='simulate_fail.json'):
if not os.path.isfile(fname):
return
with open(fname) as fid:
d = json.load(fid)
if d.get(self.nwst_id) == key:
return True
def send_mail(self, key, status_type, additional_message=''):
""" Send info mail using parameters specified in parameters file """
if not mail_functionality:
@ -602,23 +720,93 @@ class StationQC(object):
sender = mail_params.get('sender')
addresses = mail_params.get('addresses')
add_addresses = self.get_additional_mail_recipients(mail_params)
if add_addresses:
# create copy of addresses ( [:] ) to prevent changing original, general list with addresses
addresses = addresses[:] + list(add_addresses)
server = mail_params.get('mailserver')
if not sender or not addresses:
if self.verbosity:
print('Mail sender or addresses not correctly defined. Return')
print('Mail sender or addresses not (correctly) defined. Return')
return
dt = self.get_dt_for_action()
text = f'{key}: Status {status_type} longer than {dt}: ' + additional_message
msg = MIMEText(text)
msg = EmailMessage()
msg['Subject'] = f'new message on station {self.nwst_id}'
msg['From'] = sender
msg['To'] = ', '.join(addresses)
msg.set_content(text)
# html mail version
html_str = self.add_html_mail_body(text)
msg.add_alternative(html_str, subtype='html')
# send message via SMTP server
s = smtplib.SMTP(server)
s.sendmail(sender, addresses, msg.as_string())
s.send_message(msg)
s.quit()
def add_html_mail_body(self, text, default_color='#e6e6e6'):
parent = self.parent
header, header_items = parent.make_html_table_header('#999999', add_links=False)
col_items = parent.get_html_row_items(self.status_dict, self.nwst_id, header, default_color, hyperlinks=False)
# set general status text
html_str = get_html_text(text)
# init html header and table
html_str += get_mail_html_header()
html_str += init_html_table()
# add table header and row of current station
html_str += get_html_row(header_items, html_key='th')
html_str += get_html_row(col_items)
html_str += finish_html_table()
if self.nwst_id in self.parent.active_figures.keys():
fid = self.parent.active_figures.pop(self.nwst_id)
html_str += add_html_image(img_data=fid.read())
html_str += html_footer()
return html_str
def get_additional_mail_recipients(self, mail_params):
""" return additional recipients from external mail list if this station (self.nwst_id) is specified """
eml_filename = mail_params.get('external_mail_list')
if eml_filename:
# try to open file
try:
with open(eml_filename, 'r') as fid:
address_dict = yaml.safe_load(fid)
for address, nwst_ids in address_dict.items():
if self.nwst_id in nwst_ids:
yield address
# file not existing
except FileNotFoundError as e:
if self.verbosity:
print(e)
# no dictionary
except AttributeError as e:
if self.verbosity:
print(f'Could not read dictionary from file {eml_filename}: {e}')
# other exceptions
except Exception as e:
if self.verbosity:
print(f'Could not open file {eml_filename}: {e}')
# no file specified
else:
if self.verbosity:
print('No external mail list set.')
return []
def get_dt_for_action(self):
n_track = self.parameters.get('n_track')
interval = self.parameters.get('interval')
@ -664,10 +852,23 @@ class StationQC(object):
return max(endtimes)
def check_for_inactive_message(self, key, dt_active):
""" send mail if station is inactive longer than dt_action and no FAIL status is present """
# check if any error is present in status_dict and not disconnected (in that case an email is sent already)
if self.check_for_any_error_no_dcn():
return
dt_action = self.get_dt_for_action()
interval = self.parameters.get('interval')
if dt_action <= dt_active < dt_action + timedelta(seconds=interval):
self.send_mail(key, status_type='Inactive')
detailed_message = f'\n{self.nwst_id}\n\n'
for key, status in self.status_dict.items():
detailed_message += f'{key}: {status.message}\n'
self.send_mail(key, status_type='Inactive', additional_message=detailed_message)
def check_for_any_error_no_dcn(self):
return any([status.is_error and not status.connection_error for status in self.status_dict.values()])
def start(self):
self.analyse_channels()
@ -680,11 +881,23 @@ class StationQC(object):
self.print(150 * '#')
self.print('This is StationQT. Calculating quality for station'
' {network}.{station}.{location}'.format(**self.nsl))
self.activity_check()
self.voltage_analysis()
self.pb_temp_analysis()
self.pb_power_analysis()
self.pb_rout_charge_analysis()
self.mass_analysis()
self.clock_quality_analysis()
self.gaps_analysis()
# activity check should be done last for useful status output (e.g. email)
self.activity_check()
self._simulate_error()
def _simulate_error(self):
for key in self.keys:
if self._simulated_error_check(key):
self.error(key, 'SIMULATED ERROR')
def return_print_analysis(self):
items = [self.nwst_id]
@ -702,6 +915,15 @@ class StationQC(object):
def return_analysis(self):
return self.status_dict
def get_unit_factor(self, channel):
""" Get channel multiplier for unit from parameters. If none is specified return 1 """
channel_params = self.parameters.get('CHANNELS').get(channel)
if channel_params:
multiplier = channel_params.get('unit')
if multiplier:
return float(multiplier)
return 1
def get_last_occurrence_timestring(self, trace, indices):
""" returns a nicely formatted string of the timedelta since program starttime and occurrence and abs time"""
last_occur = self.get_last_occurrence(trace, indices)
@ -713,13 +935,60 @@ class StationQC(object):
def get_last_occurrence(self, trace, indices):
return self.get_time(trace, indices[-1])
def clock_quality_analysis(self, channel='LCQ', n_sample_average=10):
""" Analyse clock quality """
key = 'clock'
st = self.stream.select(channel=channel)
trace = self.get_trace(st, key)
if not trace:
return
clock_quality = trace.data
clock_quality_warn_level = self.parameters.get('THRESHOLDS').get('clockquality_warn')
clock_quality_fail_level = self.parameters.get('THRESHOLDS').get('clockquality_fail')
if self.verbosity > 1:
self.print(40 * '-')
self.print('Performing Clock Quality check', flush=False)
clockQuality_warn = np.where(clock_quality < clock_quality_warn_level)[0]
clockQuality_fail = np.where(clock_quality < clock_quality_fail_level)[0]
if len(clockQuality_warn) == 0 and len(clockQuality_fail) == 0:
self.status_ok(key, detailed_message=f'ClockQuality={(clock_quality[-1])}')
return
last_val_average = np.nanmean(clock_quality[-n_sample_average:])
# keep OK status if there are only minor warnings (lower warn level)
warn_message = f'Trace {trace.get_id()}:'
if len(clockQuality_warn) > 0:
# try calculate number of warn peaks from gaps between indices
n_qc_warn = self.calc_occurrences(clockQuality_warn)
detailed_message = warn_message + f' {n_qc_warn}x Clock quality less then {clock_quality_warn_level}%' \
+ self.get_last_occurrence_timestring(trace, clockQuality_warn)
self.status_ok(key, detailed_message=detailed_message)
# set WARN status for severe warnings in the past
if len(clockQuality_fail) > 0:
# try calculate number of fail peaks from gaps between indices
n_qc_fail = self.calc_occurrences(clockQuality_fail)
detailed_message = warn_message + f' {n_qc_fail}x Clock quality less then {clock_quality_fail_level}%' \
+ self.get_last_occurrence_timestring(trace, clockQuality_fail)
self.warn(key, detailed_message=detailed_message, count=n_qc_fail,
last_occurrence=self.get_last_occurrence(trace, clockQuality_fail))
# set FAIL state if last value is less than fail level
if last_val_average < clock_quality_fail_level:
self.error(key, detailed_message=f'ClockQuality={(clock_quality[-1])}')
def voltage_analysis(self, channel='VEI'):
""" Analyse voltage channel for over/undervoltage """
key = 'voltage'
st = self.stream.select(channel=channel)
trace = self.get_trace(st, key)
if not trace: return
voltage = trace.data * 1e-3
if not trace:
return
voltage = trace.data * self.get_unit_factor(channel)
low_volt = self.parameters.get('THRESHOLDS').get('low_volt')
high_volt = self.parameters.get('THRESHOLDS').get('high_volt')
@ -756,14 +1025,15 @@ class StationQC(object):
key = 'temp'
st = self.stream.select(channel=channel)
trace = self.get_trace(st, key)
if not trace: return
voltage = trace.data * 1e-6
if not trace:
return
voltage = trace.data * self.get_unit_factor(channel)
thresholds = self.parameters.get('THRESHOLDS')
temp = 20. * voltage - 20
# average temp
timespan = min([self.parameters.get('timespan') * 24 * 3600, int(len(temp) / trace.stats.sampling_rate)])
nsamp_av = int(trace.stats.sampling_rate) * timespan
av_temp_str = str(round(np.mean(temp[-nsamp_av:]), 1)) + deg_str
av_temp_str = str(round(np.nanmean(temp[-nsamp_av:]), 1)) + deg_str
# dt of average
dt_t_str = str(timedelta(seconds=int(timespan))).replace(', 0:00:00', '')
# current temp
@ -771,7 +1041,7 @@ class StationQC(object):
if self.verbosity > 1:
self.print(40 * '-')
self.print('Performing PowBox temperature check (EX1)', flush=False)
self.print(f'Average temperature at {np.mean(temp)}\N{DEGREE SIGN}', flush=False)
self.print(f'Average temperature at {np.nanmean(temp)}\N{DEGREE SIGN}', flush=False)
self.print(f'Peak temperature at {max(temp)}\N{DEGREE SIGN}', flush=False)
self.print(f'Min temperature at {min(temp)}\N{DEGREE SIGN}', flush=False)
max_temp = thresholds.get('max_temp')
@ -787,6 +1057,52 @@ class StationQC(object):
status_message=cur_temp,
detailed_message=f'Average temperature of last {dt_t_str}: {av_temp_str}')
def mass_analysis(self, channels=('VM1', 'VM2', 'VM3'), n_samp_mean=10):
""" Analyse datalogger mass channels. """
key = 'mass'
# build stream with all channels
st = Stream()
for channel in channels:
st += self.stream.select(channel=channel).copy()
st.merge()
# return if there are no three components
if not len(st) == 3:
return
# correct for channel unit
for trace in st:
trace.data = trace.data * self.get_unit_factor(trace.stats.channel)
# calculate average of absolute maximum of mass offset of last n_samp_mean
last_values = np.array([trace.data[-n_samp_mean:] for trace in st])
last_val_mean = np.nanmean(last_values, axis=1)
common_highest_val = np.nanmax(abs(last_val_mean))
common_highest_val = round(common_highest_val, 1)
# get thresholds for WARN (max_vm_warn) and FAIL (max_vm_fail)
thresholds = self.parameters.get('THRESHOLDS')
max_vm_warn = thresholds.get('max_vm_warn')
max_vm_fail = thresholds.get('max_vm_fail')
if not max_vm_warn or not max_vm_fail:
return
# change status depending on common_highest_val
if common_highest_val < max_vm_warn:
self.status_ok(key, detailed_message=f'{common_highest_val}V')
elif max_vm_warn <= common_highest_val < max_vm_fail:
self.warn(key=key,
detailed_message=f'Warning raised for mass centering. Highest val (abs) {common_highest_val}V', )
else:
self.error(key=key,
detailed_message=f'Fail status for mass centering. Highest val (abs) {common_highest_val}V',)
if self.verbosity > 1:
self.print(40 * '-')
self.print('Performing mass position check', flush=False)
self.print(f'Average mass position at {common_highest_val}', flush=False)
def pb_power_analysis(self, channel='EX2', pb_dict_key='pb_SOH2'):
""" Analyse EX2 channel of PowBox """
keys = ['230V', '12V']
@ -795,7 +1111,7 @@ class StationQC(object):
if not trace:
return
voltage = trace.data * 1e-6
voltage = trace.data * self.get_unit_factor(channel)
if self.verbosity > 1:
self.print(40 * '-')
self.print('Performing PowBox 12V/230V check (EX2)', flush=False)
@ -818,7 +1134,7 @@ class StationQC(object):
if not trace:
return
voltage = trace.data * 1e-6
voltage = trace.data * self.get_unit_factor(channel)
if self.verbosity > 1:
self.print(40 * '-')
self.print('Performing PowBox Router/Charger check (EX3)', flush=False)
@ -845,7 +1161,7 @@ class StationQC(object):
if message == 'OK':
self.status_ok(key)
continue
if volt_lvl > 1:
if volt_lvl != 1:
n_occurrences = self.calc_occurrences(ind_array)
self.warn(key=key,
detailed_message=f'Trace {trace.get_id()}: '
@ -854,8 +1170,30 @@ class StationQC(object):
count=n_occurrences,
last_occurrence=self.get_last_occurrence(trace, ind_array))
# if last_val == current voltage (which is not 1) -> FAIL or last_val < 1: PBox no data
if volt_lvl == last_val or (volt_lvl == -1 and last_val < 1):
if volt_lvl == last_val:
self.error(key, detailed_message=f'Last PowBox voltage state {last_val}V: {message}')
elif volt_lvl == -1 and last_val < 1:
self.error(key, detailed_message=f'PowBox under 1V - connection error', disc=True)
def gaps_analysis(self, key='gaps'):
""" return gaps of a given nwst_id """
gaps = []
for gap_list in self.parent.gaps:
nw_gap, st_gap = gap_list[:2]
if nw_gap == self.network and st_gap == self.station:
gaps.append(gap_list)
if not gaps:
self.status_ok(key=key)
return
detailed_message = ''
for gap_list in gaps:
text = '{}.{}.{}.{}: last sample - {}, next sample - {}, delta {}, samples {}\n'.format(*gap_list)
detailed_message += text
self.warn(key=key, detailed_message=detailed_message, count=len(gaps))
def calc_occurrences(self, ind_array):
# try calculate number of voltage peaks/plateaus from gaps between indices
@ -969,19 +1307,23 @@ class StationQC(object):
class Status(object):
""" Basic Status class. All status classes are derived from this class."""
def __init__(self, message=None, detailed_messages=None, count: int = 0, last_occurrence=None, show_count=True):
if message is None:
message = '-'
if detailed_messages is None:
detailed_messages = []
self.show_count = show_count
self.message = message
self.messages = [message]
self.detailed_messages = detailed_messages
self.count = count
self.last_occurrence = last_occurrence
self.is_warn = None
self.is_error = None
self.connection_error = None
self.is_other = False
self.is_active = False
@ -1028,6 +1370,15 @@ class StatusError(Status):
super(StatusError, self).__init__(message=message, count=count, last_occurrence=last_occurence,
detailed_messages=detailed_messages, show_count=show_count)
self.set_error()
self.default_message = message
def set_disconnected(self, message='DCN'):
self.connection_error = True
self.message = message
def set_connected(self):
self.connection_error = False
self.message = self.default_message
class StatusOther(Status):
@ -1061,7 +1412,9 @@ class StatusOther(Status):
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Call survBot')
parser.add_argument('-html', dest='html_path', default=None, help='filepath for HTML output')
parser.add_argument('-parfile', dest='parfile', default='parameters.yaml',
help='parameter file (default: parameters.yaml)')
args = parser.parse_args()
survBot = SurveillanceBot(parameter_path='parameters.yaml', outpath_html=args.html_path)
survBot = SurveillanceBot(parameter_path=args.parfile, outpath_html=args.html_path)
survBot.start()

View File

@ -34,7 +34,7 @@ from obspy import UTCDateTime
from survBot import SurveillanceBot
from write_utils import *
from utils import get_bg_color, modify_stream_for_plot, trace_ylabels, trace_yticks
from utils import get_bg_color, modify_stream_for_plot, set_axis_yticks, set_axis_color, plot_axis_thresholds
try:
from rest_api.utils import get_station_iccid
@ -316,8 +316,10 @@ class MainWindow(QtWidgets.QMainWindow):
self.plot_widget.setWindowTitle(nwst_id)
st = modify_stream_for_plot(st, parameters=self.parameters)
st.plot(equal_scale=False, method='full', block=False, fig=self.plot_widget.canvas.fig)
trace_ylabels(fig=self.plot_widget.canvas.fig, parameters=self.parameters)
trace_yticks(fig=self.plot_widget.canvas.fig, parameters=self.parameters)
# set_axis_ylabels(fig=self.plot_widget.canvas.fig, parameters=self.parameters)
set_axis_yticks(fig=self.plot_widget.canvas.fig, parameters=self.parameters)
set_axis_color(fig=self.plot_widget.canvas.fig)
plot_axis_thresholds(fig=self.plot_widget.canvas.fig, parameters=self.parameters)
self.plot_widget.show()
def notification(self, text):

187
utils.py
View File

@ -2,6 +2,9 @@
# -*- coding: utf-8 -*-
import matplotlib
import numpy as np
from obspy import Stream
def get_bg_color(check_key, status, dt_thresh=None, hex=False):
@ -10,11 +13,16 @@ def get_bg_color(check_key, status, dt_thresh=None, hex=False):
bg_color = get_time_delay_color(message, dt_thresh)
elif check_key == 'temp':
bg_color = get_temp_color(message)
elif check_key == 'mass':
bg_color = get_mass_color(message)
else:
if status.is_warn:
bg_color = get_color('WARNX')(status.count)
bg_color = get_warn_color(status.count)
elif status.is_error:
bg_color = get_color('FAIL')
if status.connection_error:
bg_color = get_color('disc')
else:
bg_color = get_color('FAIL')
else:
bg_color = get_color(message)
if not bg_color:
@ -26,25 +34,51 @@ def get_bg_color(check_key, status, dt_thresh=None, hex=False):
def get_color(key):
# some GUI default colors
colors_dict = {'FAIL': (255, 50, 0, 255),
# some old GUI default colors
# colors_dict = {'FAIL': (255, 85, 50, 255),
# 'NO DATA': (255, 255, 125, 255),
# 'WARN': (255, 255, 80, 255),
# 'OK': (173, 255, 133, 255),
# 'undefined': (230, 230, 230, 255),
# 'disc': (255, 160, 40, 255),}
colors_dict = {'FAIL': (195, 29, 14, 255),
'NO DATA': (255, 255, 125, 255),
'WARN': (255, 255, 80, 255),
'WARNX': lambda x: (min([255, 200 + x ** 2]), 255, 80, 255),
'OK': (125, 255, 125, 255),
'undefined': (230, 230, 230, 255)}
'WARN': (250, 192, 63, 255),
'OK': (185, 245, 145, 255),
'undefined': (240, 240, 240, 255),
'disc': (126, 127, 131, 255), }
return colors_dict.get(key)
def get_color_mpl(key):
color_tup = get_color(key)
return np.array([color/255. for color in color_tup])
def get_time_delay_color(dt, dt_thresh):
""" Set color of time delay after thresholds specified in self.dt_thresh """
if dt < dt_thresh[0]:
return get_color('OK')
elif dt_thresh[0] <= dt < dt_thresh[1]:
return get_color('WARN')
if isinstance(dt, type(dt_thresh[0])):
if dt < dt_thresh[0]:
return get_color('OK')
elif dt_thresh[0] <= dt < dt_thresh[1]:
return get_color('WARN')
return get_color('FAIL')
def get_warn_color(count, n_colors=20):
if count >= n_colors:
count = -1
gradient = np.linspace((240, 245, 110, 255), (250, 192, 63, 255), n_colors, dtype=int)
return tuple(gradient[count])
def get_mass_color(message):
# can change this to something else if wanted. This way it always returns get_color (without warn count)
if isinstance(message, (float, int)):
return get_color('OK')
return get_color(message)
def get_temp_color(temp, vmin=-10, vmax=60, cmap='coolwarm'):
""" Get an rgba temperature value back from specified cmap, linearly interpolated between vmin and vmax. """
if type(temp) in [str]:
@ -55,29 +89,61 @@ def get_temp_color(temp, vmin=-10, vmax=60, cmap='coolwarm'):
return rgba
def modify_stream_for_plot(st, parameters):
def get_font_color(bg_color, hex=False):
if hex:
bg_color = matplotlib.colors.to_rgb(bg_color)
bg_color_hsv = matplotlib.colors.rgb_to_hsv(bg_color)
bg_color_hsl = hsv_to_hsl(bg_color_hsv)
font_color = (255, 255, 255, 255) if bg_color_hsl[2] < 0.6 else (0, 0, 0, 255)
if hex:
font_color = '#{:02x}{:02x}{:02x}'.format(*font_color[:3])
return font_color
def hsv_to_hsl(hsv):
hue, saturation, value = hsv
lightness = value * (1 - saturation / 2)
saturation = 0 if lightness in (0, 1) else (value - lightness) / min(lightness, 1 - lightness)
return hue, saturation, lightness
def modify_stream_for_plot(input_stream, parameters):
""" copy (if necessary) and modify stream for plotting """
ch_units = parameters.get('CHANNEL_UNITS')
ch_transf = parameters.get('CHANNEL_TRANSFORM')
# if either of both are defined make copy
if ch_units or ch_transf:
st = st.copy()
# make a copy
st = Stream()
# modify trace for plotting by multiplying unit factor (e.g. 1e-3 mV to V)
if ch_units:
for tr in st:
channel = tr.stats.channel
unit_factor = ch_units.get(channel)
if unit_factor:
tr.data = tr.data * float(unit_factor)
# modify trace for plotting by other arithmetic expressions
if ch_transf:
for tr in st:
channel = tr.stats.channel
transf = ch_transf.get(channel)
if transf:
tr.data = transform_trace(tr.data, transf)
channels_dict = parameters.get('CHANNELS')
# iterate over all channels and put them to new stream in order
for index, ch_tup in enumerate(channels_dict.items()):
# unpack tuple from items
channel, channel_dict = ch_tup
# get correct channel from stream
st_sel = input_stream.select(channel=channel)
# in case there are != 1 there is ambiguity
if not len(st_sel) == 1:
continue
# make a copy to not modify original stream!
tr = st_sel[0].copy()
# multiply with conversion factor for unit
unit_factor = channel_dict.get('unit')
if unit_factor:
tr.data = tr.data * float(unit_factor)
# apply transformations if provided
transform = channel_dict.get('transform')
if transform:
tr.data = transform_trace(tr.data, transform)
# modify trace id to maintain plotting order
name = channel_dict.get('name')
tr.id = f'{index + 1}: {name} - {tr.id}'
st.append(tr)
return st
@ -104,13 +170,11 @@ def transform_trace(data, transf):
return data
def trace_ylabels(fig, parameters, verbosity=0):
def set_axis_ylabels(fig, parameters, verbosity=0):
"""
Adds channel names to y-axis if defined in parameters.
Can get mixed up if channel order in stream and channel names defined in parameters.yaml differ, but it is
difficult to assess the correct order from Obspy plotting routing.
"""
names = parameters.get('channel_names')
names = [channel.get('name') for channel in parameters.get('CHANNELS').values()]
if not names: # or not len(st.traces):
return
if not len(names) == len(fig.axes):
@ -122,13 +186,20 @@ def trace_ylabels(fig, parameters, verbosity=0):
ax.set_ylabel(channel_name)
def trace_yticks(fig, parameters, verbosity=0):
def set_axis_color(fig, color='0.8'):
"""
Set all axes of figure to specific color
"""
for ax in fig.axes:
for key in ['bottom', 'top', 'right', 'left']:
ax.spines[key].set_color(color)
def set_axis_yticks(fig, parameters, verbosity=0):
"""
Adds channel names to y-axis if defined in parameters.
Can get mixed up if channel order in stream and channel names defined in parameters.yaml differ, but it is
difficult to assess the correct order from Obspy plotting routing.
"""
ticks = parameters.get('CHANNEL_TICKS')
ticks = [channel.get('ticks') for channel in parameters.get('CHANNELS').values()]
if not ticks:
return
if not len(ticks) == len(fig.axes):
@ -140,6 +211,38 @@ def trace_yticks(fig, parameters, verbosity=0):
continue
ymin, ymax, step = ytick_tripple
yticks = list(range(ymin, ymax + step, step))
yticks = list(np.arange(ymin, ymax + step, step))
ax.set_yticks(yticks)
ax.set_ylim(ymin - step, ymax + step)
ax.set_ylim(ymin - 0.33 * step, ymax + 0.33 * step)
def plot_axis_thresholds(fig, parameters, verbosity=0):
"""
Adds channel thresholds (warn, fail) to y-axis if defined in parameters.
"""
if verbosity > 0:
print('Plotting trace thresholds')
keys_colors = {'warn': dict(color=0.8 * get_color_mpl('WARN'), linestyle=(0, (5, 10)), alpha=0.5, linewidth=0.7),
'fail': dict(color=0.8 * get_color_mpl('FAIL'), linestyle='solid', alpha=0.5, linewidth=0.7)}
for key, kwargs in keys_colors.items():
channel_threshold_list = [channel.get(key) for channel in parameters.get('CHANNELS').values()]
if not channel_threshold_list:
continue
plot_threshold_lines(fig, channel_threshold_list, parameters, **kwargs)
def plot_threshold_lines(fig, channel_threshold_list, parameters, **kwargs):
for channel_thresholds, ax in zip(channel_threshold_list, fig.axes):
if not channel_thresholds:
continue
if not isinstance(channel_thresholds, (list, tuple)):
channel_thresholds = [channel_thresholds]
for warn_thresh in channel_thresholds:
if isinstance(warn_thresh, str):
warn_thresh = parameters.get('THRESHOLDS').get(warn_thresh)
if type(warn_thresh in (float, int)):
ax.axhline(warn_thresh, **kwargs)

View File

@ -1,16 +1,21 @@
from base64 import b64encode
from datetime import timedelta
def write_html_table_title(fobj, parameters):
def _convert_to_textstring(lst):
return '\n'.join(lst)
def get_html_table_title(parameters):
title = get_print_title_str(parameters)
fobj.write(f'<h3>{title}</h3>\n')
return f'<h3>{title}</h3>\n'
def write_html_text(fobj, text):
fobj.write(f'<p>{text}</p>\n')
def get_html_text(text):
return f'<p>{text}</p>\n'
def write_html_header(fobj, refresh_rate=10):
def get_html_header(refresh_rate=10):
header = ['<!DOCTYPE html>',
'<html>',
'<head>',
@ -21,28 +26,52 @@ def write_html_header(fobj, refresh_rate=10):
'<meta charset="utf-8">',
'<meta name="viewport" content="width=device-width, initial-scale=1">',
'<body>']
for item in header:
fobj.write(item + '\n')
header = _convert_to_textstring(header)
return header
def init_html_table(fobj):
fobj.write('<table style="width:100%">\n')
def get_mail_html_header():
header = ['<html>',
'<head>',
'</head>',
'<body>']
header = _convert_to_textstring(header)
return header
def finish_html_table(fobj):
fobj.write('</table>\n')
def init_html_table():
return '<table style="width:100%">\n'
def write_html_footer(fobj):
footer = ['</body>',
'</html>']
for item in footer:
fobj.write(item + '\n')
def finish_html_table():
return '</table>\n'
def write_html_row(fobj, items, html_key='td'):
def html_footer(footer_logo=None):
footer = ['</body>']
if footer_logo:
logo_items = [f'<div class="footer">',
f' <img style="float: right; padding: 10px;" src="{footer_logo}" height=30px>',
f'</div>']
footer += logo_items
footer.append('</html>\n')
footer = _convert_to_textstring(footer)
return footer
def add_html_image(img_data, img_format='png'):
return f"""<br>\n<img width="100%" src="data:image/{img_format};base64, {b64encode(img_data).decode('ascii')}">"""
def get_html_link(text, link):
return f'<a href="{link}"> {text} </a>'
def get_html_row(items, html_key='td'):
row_string = ''
default_space = ' '
fobj.write(default_space + '<tr>\n')
row_string += default_space + '<tr>\n'
for item in items:
text = item.get('text')
if item.get('bold'):
@ -50,16 +79,16 @@ def write_html_row(fobj, items, html_key='td'):
if item.get('italic'):
text = '<i>' + text + '</i>'
tooltip = item.get('tooltip')
color = item.get('color')
# check for black background of headers (shouldnt happen anymore)
color = '#e6e6e6' if color == '#000000' else color
font_color = item.get('font_color')
hyperlink = item.get('hyperlink')
image_str = f'<a href="{hyperlink}">' if hyperlink else ''
color = 'transparent' if hyperlink else item.get('color')
text_str = get_html_link(text, hyperlink) if hyperlink else text
html_class = item.get('html_class')
class_str = f' class="{html_class}"' if html_class else ''
fobj.write(2 * default_space + f'<{html_key}{class_str} bgcolor="{color}" title="{tooltip}"> {image_str}'
+ text + f'</{html_key}>\n')
fobj.write(default_space + '</tr>\n')
row_string += 2 * default_space + f'<{html_key}{class_str} bgcolor="{color}" title="{tooltip}"' \
+ f'style="color:{font_color}"> {text_str}</{html_key}>\n'
row_string += default_space + '</tr>\n'
return row_string
def get_print_title_str(parameters):