survBot/survBot.py

1421 lines
58 KiB
Python
Executable File

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__version__ = '0.1'
__author__ = 'Marcel Paffrath'
import os
import io
import copy
import traceback
import yaml
import argparse
import json
import time
from datetime import timedelta
import numpy as np
import matplotlib.pyplot as plt
from obspy import read, UTCDateTime, Stream
from obspy.clients.filesystem.sds import Client
from write_utils import get_html_text, get_html_link, get_html_row, html_footer, get_html_header, get_print_title_str, \
init_html_table, finish_html_table, get_mail_html_header, add_html_image
from utils import get_bg_color, get_font_color, modify_stream_for_plot, set_axis_yticks, set_axis_color, plot_axis_thresholds
try:
import smtplib
from email.message import EmailMessage
from email.utils import make_msgid
mail_functionality = True
except ImportError:
print('Could not import smtplib or mail. Disabled sending mails.')
mail_functionality = False
pjoin = os.path.join
UP = "\x1B[{length}A"
CLR = "\x1B[0K"
deg_str = '\N{DEGREE SIGN}C'
def read_yaml(file_path, n_read=3):
for index in range(n_read):
try:
with open(file_path, "r") as f:
params = yaml.safe_load(f)
except Exception as e:
print(f'Could not read parameters file: {e}.\nWill try again {n_read - index - 1} time(s).')
time.sleep(10)
continue
return params
def nsl_from_id(nwst_id):
nwst_id = get_full_seed_id(nwst_id)
network, station, location = nwst_id.split('.')
return dict(network=network, station=station, location=location)
def get_full_seed_id(nwst_id):
seed_id = '{}.{}.{}'.format(*nwst_id.split('.'), '')
return seed_id
def get_nwst_id(trace):
stats = trace.stats
return f'{stats.network}.{stats.station}.' # {stats.location}'
def fancy_timestr(dt, thresh=600, modif='+'):
if isinstance(dt, timedelta) and dt > timedelta(seconds=thresh):
value = f'{modif} ' + str(dt) + f' {modif}'
else:
value = str(dt)
return value
class SurveillanceBot(object):
def __init__(self, parameter_path, outpath_html=None):
self.keys = ['last active', '230V', '12V', 'router', 'charger', 'voltage', 'mass', 'clock', 'gaps', 'temp',
'other']
self.parameter_path = parameter_path
self.update_parameters()
self.starttime = UTCDateTime()
self.plot_hour = self.starttime.hour
self.current_day = self.starttime.julday
self.outpath_html = outpath_html
self.filenames = []
self.filenames_wf_data = []
self.filenames_read_last_modif = {}
self.station_list = []
self.analysis_print_list = []
self.analysis_results = {}
self.status_track = {}
self.dataStream = Stream()
self.data = {}
self.gaps = []
self.print_count = 0
self.status_message = ''
self.html_fig_dir = 'figures'
self.active_figures = {}
self.cl = Client(self.parameters.get('datapath')) # TODO: Check if this has to be loaded again on update
self.get_stations()
def update_parameters(self):
self.parameters = read_yaml(self.parameter_path)
# add channels to list in parameters dictionary, also add data channels
channels = list(self.parameters.get('CHANNELS').keys())
for channel in self.parameters.get('data_channels'):
if not channel in channels:
channels.append(channel)
self.parameters['channels'] = channels
self.reread_parameters = self.parameters.get('reread_parameters')
self.dt_thresh = [int(val) for val in self.parameters.get('dt_thresh')]
self.verbosity = self.parameters.get('verbosity')
self.stations_blacklist = self.parameters.get('stations_blacklist')
self.networks_blacklist = self.parameters.get('networks_blacklist')
self.refresh_period = self.parameters.get('interval')
self.transform_parameters()
add_links = self.parameters.get('add_links')
self.add_links = add_links if add_links else {}
add_global_links = self.parameters.get('add_global_links')
# in case user forgets "-" in parameters file
if isinstance(add_global_links, dict):
add_global_links = [add_global_links]
self.add_global_links = add_global_links if add_global_links else []
def transform_parameters(self):
for key in ['networks', 'stations', 'locations', 'channels']:
parameter = self.parameters.get(key)
if type(parameter) == str:
self.parameters[key] = list(self.parameters[key])
elif type(parameter) not in [list]:
raise TypeError(f'Bad input type for {key}: {type(key)}')
def get_stations(self):
networks = self.parameters.get('networks')
stations = self.parameters.get('stations')
self.station_list = []
nwst_list = self.cl.get_all_stations()
for nw, st in nwst_list:
if self.stations_blacklist and st in self.stations_blacklist:
continue
if self.networks_blacklist and nw in self.networks_blacklist:
continue
if (networks == ['*'] or nw in networks) and (stations == ['*'] or st in stations):
nwst_id = f'{nw}.{st}.'
self.station_list.append(nwst_id)
def get_filenames(self):
self.filenames = []
self.filenames_wf_data = []
time_now = UTCDateTime()
t1 = time_now - self.parameters.get('timespan') * 24 * 3600
networks = self.parameters.get('networks')
stations = self.parameters.get('stations')
locations = self.parameters.get('locations')
channels = self.parameters.get('channels')
channels_wf_data = self.parameters.get('data_channels')
for network in networks:
for station in stations:
for location in locations:
for channel in channels:
fnames = list(self.cl._get_filenames(network, station, location, channel,
starttime=t1, endtime=time_now))
self.filenames += fnames
# keep track of filenames with wf data (only read headers later)
if channel in channels_wf_data:
self.filenames_wf_data += fnames
def read_data(self, re_read_at_hour=1):
'''
read data method reads new data into self.stream
:param re_read_at_hour: update archive at specified hour each day (hours up to 24)
'''
self.data = {}
# re-read all data every new day
curr_time = UTCDateTime()
current_day = curr_time.julday
current_hour = curr_time.hour
if re_read_at_hour is not False and current_day != self.current_day and current_hour == re_read_at_hour:
self.filenames_read_last_modif = {}
self.dataStream = Stream()
self.current_day = current_day
# add all data to current stream
for filename in self.filenames:
# if file already read and last modification time is the same as of last read operation: continue
if self.filenames_read_last_modif.get(filename) == os.path.getmtime(filename):
if self.verbosity > 0:
print('Continue on file', filename)
continue
try:
# read only header of wf_data
if filename in self.filenames_wf_data:
st_new = read(filename, headonly=True)
else:
st_new = read(filename, dtype=float)
self.filenames_read_last_modif[filename] = os.path.getmtime(filename)
except Exception as e:
print(f'Could not read file {filename}:', e)
continue
self.dataStream += st_new
self.gaps = self.dataStream.get_gaps(min_gap=self.parameters['THRESHOLDS'].get('min_gap'))
self.dataStream.merge()
# organise data in dictionary with key for each station
for trace in self.dataStream:
nwst_id = get_nwst_id(trace)
if not nwst_id in self.data.keys():
self.data[nwst_id] = Stream()
self.data[nwst_id].append(trace)
def execute_qc(self):
if self.reread_parameters:
self.update_parameters()
self.get_filenames()
self.read_data()
qc_starttime = UTCDateTime()
self.analysis_print_list = []
self.analysis_results = {}
for nwst_id in sorted(self.station_list):
stream = self.data.get(nwst_id)
if stream:
nsl = nsl_from_id(nwst_id)
station_qc = StationQC(self, stream, nsl, self.parameters, self.keys, qc_starttime,
self.verbosity, print_func=self.print,
status_track=self.status_track.get(nwst_id))
analysis_print_result = station_qc.return_print_analysis()
station_dict = station_qc.return_analysis()
else:
analysis_print_result = self.get_no_data_station(nwst_id, to_print=True)
station_dict = self.get_no_data_station(nwst_id)
self.analysis_print_list.append(analysis_print_result)
self.analysis_results[nwst_id] = station_dict
self.track_status()
self.update_status_message()
return 'ok'
def track_status(self):
"""
tracks error status of the last n_track + 1 errors.
"""
n_track = self.parameters.get('n_track')
if not n_track or n_track < 1:
return
for nwst_id, analysis_dict in self.analysis_results.items():
if not nwst_id in self.status_track.keys():
self.status_track[nwst_id] = {}
for key, status in analysis_dict.items():
if not key in self.status_track[nwst_id].keys():
self.status_track[nwst_id][key] = []
track_lst = self.status_track[nwst_id][key]
# pop list until length is n_track + 1
while len(track_lst) > n_track:
track_lst.pop(0)
track_lst.append(status.is_error)
def get_no_data_station(self, nwst_id, no_data='-', to_print=False):
delay = self.get_station_delay(nwst_id)
if not to_print:
status_dict = {}
for key in self.keys:
if key == 'last active':
status_dict[key] = Status(message=timedelta(seconds=int(delay)), detailed_messages=['No data'])
else:
status_dict[key] = Status(message=no_data, detailed_messages=['No data'])
return status_dict
else:
items = [nwst_id.rstrip('.')] + [fancy_timestr(timedelta(seconds=int(delay)))]
for _ in range(len(self.keys) - 1):
items.append(no_data)
return items
def get_station_delay(self, nwst_id):
""" try to get station delay from SDS archive using client"""
locations = ['', '0', '00']
channels = self.parameters.get('channels') + self.parameters.get('data_channels')
network, station = nwst_id.split('.')[:2]
times = []
for channel in channels:
for location in locations:
t = self.cl.get_latency(network, station, location, channel)
if t:
times.append(t)
if len(times) > 0:
return min(times)
def print_analysis(self):
self.print(200 * '+')
title_str = get_print_title_str(self.parameters)
self.print(title_str)
if self.refresh_period > 0:
self.print(f'Refreshing every {self.refresh_period}s.')
items = ['Station'] + self.keys
self.console_print(items, sep='---')
for items in self.analysis_print_list:
self.console_print(items)
def start(self):
'''
Perform qc periodically.
:param refresh_period: Update every x seconds
:return:
'''
first_exec = True
status = 'ok'
while status == 'ok' and self.refresh_period > 0:
status = self.execute_qc()
if self.outpath_html:
self.write_html_table()
if self.parameters.get('html_figures'):
self.write_html_figures(check_plot_time=not (first_exec))
else:
self.print_analysis()
time.sleep(self.refresh_period)
if not self.outpath_html:
self.clear_prints()
first_exec = False
def console_print(self, itemlist, str_len=21, sep='|', seplen=3):
assert len(sep) <= seplen, f'Make sure seperator has less than {seplen} characters'
sl = sep.ljust(seplen)
sr = sep.rjust(seplen)
string = sl
for item in itemlist:
string += item.center(str_len) + sr
self.print(string, flush=False)
def check_plot_hour(self):
''' Check if new hour started '''
current_hour = UTCDateTime().hour
if not current_hour > self.plot_hour:
return False
if current_hour == 23:
self.plot_hour = 0
else:
self.plot_hour += 1
return True
def get_fig_path_abs(self, nwst_id):
return pjoin(self.outpath_html, self.get_fig_path_rel(nwst_id))
def get_fig_path_rel(self, nwst_id, fig_format='png'):
return os.path.join(self.html_fig_dir, f'{nwst_id.rstrip(".")}.{fig_format}')
def check_fig_dir(self):
fdir = pjoin(self.outpath_html, self.html_fig_dir)
if not os.path.isdir(fdir):
os.mkdir(fdir)
def check_html_dir(self):
if not os.path.isdir(self.outpath_html):
os.mkdir(self.outpath_html)
def write_html_figures(self, check_plot_time=True):
""" Write figures for html (e.g. hourly) """
if check_plot_time and not self.check_plot_hour():
return
for nwst_id in self.station_list:
self.write_html_figure(nwst_id)
def write_html_figure(self, nwst_id, save_bytes=False):
""" Write figure for html for specified station """
self.check_fig_dir()
fig = plt.figure(figsize=(16, 9))
fnames_out = [self.get_fig_path_abs(nwst_id), io.BytesIO()]
st = self.data.get(get_full_seed_id(nwst_id))
if st:
# TODO: this section failed once, adding try-except block for analysis and to prevent program from crashing
try:
endtime = UTCDateTime()
starttime = endtime - self.parameters.get('timespan') * 24 * 3600
st = modify_stream_for_plot(st, parameters=self.parameters)
st.plot(fig=fig, show=False, draw=False, block=False, equal_scale=False, method='full',
starttime=starttime, endtime=endtime)
# set_axis_ylabels(fig, self.parameters, self.verbosity)
set_axis_yticks(fig, self.parameters, self.verbosity)
set_axis_color(fig)
plot_axis_thresholds(fig, self.parameters, self.verbosity)
except Exception as e:
print(f'Could not generate plot for {nwst_id}:')
print(traceback.format_exc())
if len(fig.axes) > 0:
ax = fig.axes[0]
ax.set_title(f'Plot refreshed at (UTC) {UTCDateTime.now().strftime("%Y-%m-%d %H:%M:%S")}. '
f'Refreshed hourly or on FAIL status.')
for ax in fig.axes:
ax.grid(True, alpha=0.1)
for fnout in fnames_out:
fig.savefig(fnout, dpi=150., bbox_inches='tight')
# if needed save figure as virtual object (e.g. for mailing)
if save_bytes:
fnames_out[-1].seek(0)
self.active_figures[nwst_id] = fnames_out[-1]
plt.close(fig)
def get_html_class(self, hide_keys_mobile=None, status=None, check_key=None):
""" helper function for html class if a certain condition is fulfilled """
html_class = None
if status and status.is_active:
html_class = 'blink-bg'
if hide_keys_mobile and check_key in hide_keys_mobile:
html_class = 'hidden-mobile'
return html_class
def make_html_table_header(self, default_header_color, hide_keys_mobile=None, add_links=True):
# First write header items
header = self.keys.copy()
# add columns for additional links
if add_links:
for key in self.add_links:
header.insert(-1, key)
header_items = [dict(text='Station', color=default_header_color)]
for check_key in header:
html_class = self.get_html_class(hide_keys_mobile, check_key=check_key)
item = dict(text=check_key, color=default_header_color, html_class=html_class)
header_items.append(item)
return header, header_items
def get_html_row_items(self, status_dict, nwst_id, header, default_color, hide_keys_mobile=None,
hyperlinks=True):
''' create a html table row for the different keys '''
fig_name = self.get_fig_path_rel(nwst_id)
nwst_id_str = nwst_id.rstrip('.')
col_items = [dict(text=nwst_id_str, color=default_color, hyperlink=fig_name if hyperlinks else None,
bold=True, tooltip=f'Show plot of {nwst_id_str}', font_color='#000000')]
for check_key in header:
if check_key in self.keys:
status = status_dict.get(check_key)
message, detailed_message = status.get_status_str()
# get background color
dt_thresh = [timedelta(seconds=sec) for sec in self.dt_thresh]
bg_color = get_bg_color(check_key, status, dt_thresh, hex=True)
if not bg_color:
bg_color = default_color
font_color = get_font_color(bg_color, hex=True)
# add degree sign for temp
if check_key == 'temp':
if not type(message) in [str]:
message = str(message) + deg_str
html_class = self.get_html_class(hide_keys_mobile, status=status, check_key=check_key)
item = dict(text=str(message), tooltip=str(detailed_message), color=bg_color,
html_class=html_class, font_color=font_color)
elif check_key in self.add_links:
value = self.add_links.get(check_key).get('URL')
link_text = self.add_links.get(check_key).get('text')
if not value:
continue
nw, st = nwst_id.split('.')[:2]
hyperlink_dict = dict(nw=nw, st=st, nwst_id=nwst_id)
link = value.format(**hyperlink_dict)
item = dict(text=link_text, tooltip=link, hyperlink=link if hyperlinks else None, color=default_color)
else:
item = dict(text='', tooltip='')
col_items.append(item)
return col_items
def write_html_table(self, default_color='#e6e6e6', default_header_color='#999999', hide_keys_mobile=('other',)):
self.check_html_dir()
fnout = pjoin(self.outpath_html, 'survBot_out.html')
if not fnout:
return
try:
with open(fnout, 'w') as outfile:
outfile.write(get_html_header(self.refresh_period))
# write_html_table_title(self.parameters)
outfile.write(init_html_table())
# write html header row
header, header_items = self.make_html_table_header(default_header_color, hide_keys_mobile)
html_row = get_html_row(header_items, html_key='th')
outfile.write(html_row)
# Write all cells (row after row)
for nwst_id in self.station_list:
# get list with column-wise items to write as a html row
status_dict = self.analysis_results.get(nwst_id)
col_items = self.get_html_row_items(status_dict, nwst_id, header, default_color, hide_keys_mobile)
outfile.write(get_html_row(col_items))
outfile.write(finish_html_table())
# add optional links below html table
for dct in self.add_global_links:
link_str = get_html_link(dct.get('text'), dct.get('URL'))
outfile.write(get_html_text(link_str))
# add status message
outfile.write(get_html_text(self.status_message))
# write footer with optional logo
logo_file = self.parameters.get('html_logo')
if not os.path.isfile(pjoin(self.outpath_html, logo_file)):
print(f'Specified file {logo_file} not found.')
logo_file = None
outfile.write(html_footer(footer_logo=logo_file))
except Exception as e:
print(f'Could not write HTML table to {fnout}:')
print(traceback.format_exc())
if self.verbosity:
print(f'Wrote html table to {fnout}')
def update_status_message(self):
timespan = timedelta(seconds=int(self.parameters.get('timespan') * 24 * 3600))
self.status_message = f'Program starttime (UTC) {self.starttime.strftime("%Y-%m-%d %H:%M:%S")} | ' \
f'Current time (UTC) {UTCDateTime().strftime("%Y-%m-%d %H:%M:%S")} | ' \
f'Refresh period: {self.refresh_period}s | ' \
f'Showing data of last {timespan}'
def print(self, string, **kwargs):
clear_end = CLR + '\n'
n_nl = string.count('\n')
string.replace('\n', clear_end)
print(string, end=clear_end, **kwargs)
self.print_count += n_nl + 1 # number of newlines + actual print with end='\n' (no check for kwargs end!)
# print('pc:', self.print_count)
def clear_prints(self):
print(UP.format(length=self.print_count), end='')
self.print_count = 0
class StationQC(object):
def __init__(self, parent, stream, nsl, parameters, keys, starttime, verbosity, print_func, status_track=None):
"""
Station Quality Check class.
:param nsl: dictionary containing network, station and location (key: str)
:param parameters: parameters dictionary from parameters.yaml file
"""
if status_track is None:
status_track = {}
self.parent = parent
self.stream = stream
self.nsl = nsl
self.network = nsl.get('network')
self.station = nsl.get('station')
self.location = nsl.get('location')
# make a copy of parameters object to prevent accidental changes
self.parameters = copy.deepcopy(parameters)
self.program_starttime = starttime
self.verbosity = verbosity
self.last_active = False
self.print = print_func
self.keys = keys
self.status_dict = {key: Status() for key in self.keys}
if not status_track:
status_track = {}
self.status_track = status_track
self.start()
@property
def nwst_id(self):
return f'{self.network}.{self.station}'
def status_ok(self, key, detailed_message="Everything OK", status_message='OK', overwrite=False):
current_status = self.status_dict.get(key)
# do not overwrite existing warnings or errors
if not overwrite and (current_status.is_warn or current_status.is_error):
return
self.status_dict[key] = StatusOK(message=status_message, detailed_messages=[detailed_message])
def warn(self, key, detailed_message, last_occurrence=None, count=1):
if key == 'other':
self.status_other(detailed_message, last_occurrence, count)
new_warn = StatusWarn(count=count, show_count=self.parameters.get('warn_count'))
current_status = self.status_dict.get(key)
# change this to something more useful, SMS/EMAIL/PUSH
if self.verbosity:
self.print(f'{UTCDateTime()}: {detailed_message}', flush=False)
# if error, do not overwrite with warning
if current_status.is_error:
return
if current_status.is_warn:
current_status.count += count
else:
current_status = new_warn
self._update_status(key, current_status, detailed_message, last_occurrence)
# warnings.warn(message)
# # update detailed status if already existing
# current_message = self.detailed_status_dict.get(key)
# current_message = '' if current_message in [None, '-'] else current_message + ' | '
# self.detailed_status_dict[key] = current_message + detailed_message
#
# # this is becoming a little bit too complicated (adding warnings to existing)
# current_status_message = self.status_dict.get(key)
# current_status_message = '' if current_status_message in [None, 'OK', '-'] else current_status_message + ' | '
# self.status_dict[key] = current_status_message + status_message
def error(self, key, detailed_message, last_occurrence=None, count=1, disc=False):
send_mail = False
new_error = StatusError(count=count, show_count=self.parameters.get('warn_count'))
if disc:
new_error.set_disconnected()
current_status = self.status_dict.get(key)
if current_status.is_error:
current_status.count += count
else:
current_status = new_error
# if error is new and not on program-startup set active and refresh plot (using parent class)
if self.status_track.get(key) and not self.status_track.get(key)[-1]:
self.parent.write_html_figure(self.nwst_id, save_bytes=True)
if self.verbosity:
self.print(f'{UTCDateTime()}: {detailed_message}', flush=False)
# do not send error mail if this is the first run (e.g. program startup) or state was already error (unchanged)
if self.search_previous_errors(key) is True:
send_mail = True
# set status to "inactive" when info mail is sent
current_status.is_active = False
elif self.search_previous_errors(key) == 'active':
current_status.is_active = True
# first update status, then send mail
self._update_status(key, current_status, detailed_message, last_occurrence)
if send_mail:
self.send_mail(key, status_type='FAIL', additional_message=detailed_message)
def search_previous_errors(self, key, n_errors=None):
"""
Check n_track + 1 previous statuses for errors.
If first item in list is no error but all others are return True
(first time n_track errors appeared if ALL n_track + 1 are error: error is old)
If last item is error but not all items are error yet return keyword 'active' -> error active, no message sent
In all other cases return False.
This also prevents sending status (e.g. mail) in case of program startup
"""
if n_errors is None:
n_errors = self.parameters.get('n_track')
# +1 to check whether n_errors + 1 was no error (error is new)
n_errors += 1
# simulate an error specified in json file (dictionary: {nwst_id: key} )
if self._simulated_error_check(key) is True:
print(f'Simulating Error on {self.nwst_id}, {key}')
return True
previous_errors = self.status_track.get(key)
# only if error list is filled n_track times
if previous_errors and len(previous_errors) == n_errors:
# if first entry was no error but all others are, return True (-> new Fail n_track times)
if not previous_errors[0] and all(previous_errors[1:]):
return True
# in case previous_errors exist, last item is error but not all items are error, error still active
if previous_errors and previous_errors[-1] and not all(previous_errors):
return 'active'
return False
def _simulated_error_check(self, key, fname='simulate_fail.json'):
if not os.path.isfile(fname):
return
with open(fname) as fid:
d = json.load(fid)
if d.get(self.nwst_id) == key:
return True
def send_mail(self, key, status_type, additional_message=''):
""" Send info mail using parameters specified in parameters file """
if not mail_functionality:
if self.verbosity:
print('Mail functionality disabled. Return')
return
mail_params = self.parameters.get('EMAIL')
if not mail_params:
if self.verbosity:
print('parameter "EMAIL" not set in parameter file. Return')
return
stations_blacklist = mail_params.get('stations_blacklist')
if stations_blacklist and self.station in stations_blacklist:
if self.verbosity:
print(f'Station {self.station} listed in blacklist. Return')
return
networks_blacklist = mail_params.get('networks_blacklist')
if networks_blacklist and self.network in networks_blacklist:
if self.verbosity:
print(f'Station {self.station} of network {self.network} listed in blacklist. Return')
return
sender = mail_params.get('sender')
addresses = mail_params.get('addresses')
add_addresses = self.get_additional_mail_recipients(mail_params)
if add_addresses:
# create copy of addresses ( [:] ) to prevent changing original, general list with addresses
addresses = addresses[:] + list(add_addresses)
server = mail_params.get('mailserver')
if not sender or not addresses:
if self.verbosity:
print('Mail sender or addresses not (correctly) defined. Return')
return
dt = self.get_dt_for_action()
text = f'{key}: Status {status_type} longer than {dt}: ' + additional_message
msg = EmailMessage()
msg['Subject'] = f'new message on station {self.nwst_id}'
msg['From'] = sender
msg['To'] = ', '.join(addresses)
msg.set_content(text)
# html mail version
html_str = self.add_html_mail_body(text)
msg.add_alternative(html_str, subtype='html')
# send message via SMTP server
s = smtplib.SMTP(server)
s.send_message(msg)
s.quit()
def add_html_mail_body(self, text, default_color='#e6e6e6'):
parent = self.parent
header, header_items = parent.make_html_table_header('#999999', add_links=False)
col_items = parent.get_html_row_items(self.status_dict, self.nwst_id, header, default_color, hyperlinks=False)
# set general status text
html_str = get_html_text(text)
# init html header and table
html_str += get_mail_html_header()
html_str += init_html_table()
# add table header and row of current station
html_str += get_html_row(header_items, html_key='th')
html_str += get_html_row(col_items)
html_str += finish_html_table()
if self.nwst_id in self.parent.active_figures.keys():
fid = self.parent.active_figures.pop(self.nwst_id)
html_str += add_html_image(img_data=fid.read())
html_str += html_footer()
return html_str
def get_additional_mail_recipients(self, mail_params):
""" return additional recipients from external mail list if this station (self.nwst_id) is specified """
eml_filename = mail_params.get('external_mail_list')
if eml_filename:
# try to open file
try:
with open(eml_filename, 'r') as fid:
address_dict = yaml.safe_load(fid)
for address, nwst_ids in address_dict.items():
if self.nwst_id in nwst_ids:
yield address
# file not existing
except FileNotFoundError as e:
if self.verbosity:
print(e)
# no dictionary
except AttributeError as e:
if self.verbosity:
print(f'Could not read dictionary from file {eml_filename}: {e}')
# other exceptions
except Exception as e:
if self.verbosity:
print(f'Could not open file {eml_filename}: {e}')
# no file specified
else:
if self.verbosity:
print('No external mail list set.')
return []
def get_dt_for_action(self):
n_track = self.parameters.get('n_track')
interval = self.parameters.get('interval')
dt = timedelta(seconds=n_track * interval)
return dt
def status_other(self, detailed_message, status_message, last_occurrence=None, count=1):
key = 'other'
new_status = StatusOther(count=count, messages=[status_message])
current_status = self.status_dict.get(key)
if current_status.is_other:
current_status.count += count
current_status.messages.append(status_message)
else:
current_status = new_status
self._update_status(key, current_status, detailed_message, last_occurrence)
def _update_status(self, key, current_status, detailed_message, last_occurrence):
current_status.detailed_messages.append(detailed_message)
current_status.last_occurrence = last_occurrence
self.status_dict[key] = current_status
def activity_check(self, key='last_active'):
self.last_active = self.last_activity()
if not self.last_active:
status = StatusError()
else:
dt_active = timedelta(seconds=int(self.program_starttime - self.last_active))
status = Status(message=dt_active)
self.check_for_inactive_message(key, dt_active)
self.status_dict['last active'] = status
def last_activity(self):
if not self.stream:
return
endtimes = []
for trace in self.stream:
endtimes.append(trace.stats.endtime)
if len(endtimes) > 0:
return max(endtimes)
def check_for_inactive_message(self, key, dt_active):
""" send mail if station is inactive longer than dt_action and no FAIL status is present """
# check if any error is present in status_dict and not disconnected (in that case an email is sent already)
if self.check_for_any_error_no_dcn():
return
dt_action = self.get_dt_for_action()
interval = self.parameters.get('interval')
if dt_action <= dt_active < dt_action + timedelta(seconds=interval):
detailed_message = f'\n{self.nwst_id}\n\n'
for key, status in self.status_dict.items():
detailed_message += f'{key}: {status.message}\n'
self.send_mail(key, status_type='Inactive', additional_message=detailed_message)
def check_for_any_error_no_dcn(self):
return any([status.is_error and not status.connection_error for status in self.status_dict.values()])
def start(self):
self.analyse_channels()
def analyse_channels(self):
timespan = self.parameters.get('timespan') * 24 * 3600
self.analysis_starttime = self.program_starttime - timespan
if self.verbosity > 0:
self.print(150 * '#')
self.print('This is StationQT. Calculating quality for station'
' {network}.{station}.{location}'.format(**self.nsl))
self.voltage_analysis()
self.pb_temp_analysis()
self.pb_power_analysis()
self.pb_rout_charge_analysis()
self.mass_analysis()
self.clock_quality_analysis()
self.gaps_analysis()
# activity check should be done last for useful status output (e.g. email)
self.activity_check()
self._simulate_error()
def _simulate_error(self):
for key in self.keys:
if self._simulated_error_check(key):
self.error(key, 'SIMULATED ERROR')
def return_print_analysis(self):
items = [self.nwst_id]
for key in self.keys:
status = self.status_dict[key]
message = status.message
if key == 'last active':
items.append(fancy_timestr(message))
elif key == 'temp':
items.append(str(message) + deg_str)
else:
items.append(str(message))
return items
def return_analysis(self):
return self.status_dict
def get_unit_factor(self, channel):
""" Get channel multiplier for unit from parameters. If none is specified return 1 """
channel_params = self.parameters.get('CHANNELS').get(channel)
if channel_params:
multiplier = channel_params.get('unit')
if multiplier:
return float(multiplier)
return 1
def get_last_occurrence_timestring(self, trace, indices):
""" returns a nicely formatted string of the timedelta since program starttime and occurrence and abs time"""
last_occur = self.get_last_occurrence(trace, indices)
if not last_occur:
return ''
last_occur_dt = timedelta(seconds=int(self.program_starttime - last_occur))
return f', Last occurrence: {last_occur_dt} ({last_occur.strftime("%Y-%m-%d %H:%M:%S")})'
def get_last_occurrence(self, trace, indices):
return self.get_time(trace, indices[-1])
def clock_quality_analysis(self, channel='LCQ', n_sample_average=10):
""" Analyse clock quality """
key = 'clock'
st = self.stream.select(channel=channel)
trace = self.get_trace(st, key)
if not trace:
return
clock_quality = trace.data
clock_quality_warn_level = self.parameters.get('THRESHOLDS').get('clockquality_warn')
clock_quality_fail_level = self.parameters.get('THRESHOLDS').get('clockquality_fail')
if self.verbosity > 1:
self.print(40 * '-')
self.print('Performing Clock Quality check', flush=False)
clockQuality_warn = np.where(clock_quality < clock_quality_warn_level)[0]
clockQuality_fail = np.where(clock_quality < clock_quality_fail_level)[0]
if len(clockQuality_warn) == 0 and len(clockQuality_fail) == 0:
self.status_ok(key, detailed_message=f'ClockQuality={(clock_quality[-1])}')
return
last_val_average = np.nanmean(clock_quality[-n_sample_average:])
# keep OK status if there are only minor warnings (lower warn level)
warn_message = f'Trace {trace.get_id()}:'
if len(clockQuality_warn) > 0:
# try calculate number of warn peaks from gaps between indices
n_qc_warn = self.calc_occurrences(clockQuality_warn)
detailed_message = warn_message + f' {n_qc_warn}x Clock quality less then {clock_quality_warn_level}%' \
+ self.get_last_occurrence_timestring(trace, clockQuality_warn)
self.status_ok(key, detailed_message=detailed_message)
# set WARN status for severe warnings in the past
if len(clockQuality_fail) > 0:
# try calculate number of fail peaks from gaps between indices
n_qc_fail = self.calc_occurrences(clockQuality_fail)
detailed_message = warn_message + f' {n_qc_fail}x Clock quality less then {clock_quality_fail_level}%' \
+ self.get_last_occurrence_timestring(trace, clockQuality_fail)
self.warn(key, detailed_message=detailed_message, count=n_qc_fail,
last_occurrence=self.get_last_occurrence(trace, clockQuality_fail))
# set FAIL state if last value is less than fail level
if last_val_average < clock_quality_fail_level:
self.error(key, detailed_message=f'ClockQuality={(clock_quality[-1])}')
def voltage_analysis(self, channel='VEI'):
""" Analyse voltage channel for over/undervoltage """
key = 'voltage'
st = self.stream.select(channel=channel)
trace = self.get_trace(st, key)
if not trace:
return
voltage = trace.data * self.get_unit_factor(channel)
low_volt = self.parameters.get('THRESHOLDS').get('low_volt')
high_volt = self.parameters.get('THRESHOLDS').get('high_volt')
if self.verbosity > 1:
self.print(40 * '-')
self.print('Performing Voltage check', flush=False)
overvolt = np.where(voltage > high_volt)[0]
undervolt = np.where(voltage < low_volt)[0]
if len(overvolt) == 0 and len(undervolt) == 0:
self.status_ok(key, detailed_message=f'U={(voltage[-1])}V')
return
warn_message = f'Trace {trace.get_id()}:'
if len(overvolt) > 0:
# try calculate number of voltage peaks from gaps between indices
n_overvolt = len(np.where(np.diff(overvolt) > 1)[0]) + 1
detailed_message = warn_message + f' {n_overvolt}x Voltage over {high_volt}V' \
+ self.get_last_occurrence_timestring(trace, overvolt)
self.warn(key, detailed_message=detailed_message, count=n_overvolt,
last_occurrence=self.get_last_occurrence(trace, overvolt))
if len(undervolt) > 0:
# try calculate number of voltage peaks from gaps between indices
n_undervolt = len(np.where(np.diff(undervolt) > 1)[0]) + 1
detailed_message = warn_message + f' {n_undervolt}x Voltage under {low_volt}V ' \
+ self.get_last_occurrence_timestring(trace, undervolt)
self.warn(key, detailed_message=detailed_message, count=n_undervolt,
last_occurrence=self.get_last_occurrence(trace, undervolt))
def pb_temp_analysis(self, channel='EX1'):
""" Analyse PowBox temperature output. """
key = 'temp'
st = self.stream.select(channel=channel)
trace = self.get_trace(st, key)
if not trace:
return
voltage = trace.data * self.get_unit_factor(channel)
thresholds = self.parameters.get('THRESHOLDS')
temp = 20. * voltage - 20
# average temp
timespan = min([self.parameters.get('timespan') * 24 * 3600, int(len(temp) / trace.stats.sampling_rate)])
nsamp_av = int(trace.stats.sampling_rate) * timespan
av_temp_str = str(round(np.nanmean(temp[-nsamp_av:]), 1)) + deg_str
# dt of average
dt_t_str = str(timedelta(seconds=int(timespan))).replace(', 0:00:00', '')
# current temp
cur_temp = round(temp[-1], 1)
if self.verbosity > 1:
self.print(40 * '-')
self.print('Performing PowBox temperature check (EX1)', flush=False)
self.print(f'Average temperature at {np.nanmean(temp)}\N{DEGREE SIGN}', flush=False)
self.print(f'Peak temperature at {max(temp)}\N{DEGREE SIGN}', flush=False)
self.print(f'Min temperature at {min(temp)}\N{DEGREE SIGN}', flush=False)
max_temp = thresholds.get('max_temp')
t_check = np.where(temp > max_temp)[0]
if len(t_check) > 0:
self.warn(key=key,
detailed_message=f'Trace {trace.get_id()}: '
f'Temperature over {max_temp}\N{DEGREE SIGN} at {trace.get_id()}!'
+ self.get_last_occurrence_timestring(trace, t_check),
last_occurrence=self.get_last_occurrence(trace, t_check))
else:
self.status_ok(key,
status_message=cur_temp,
detailed_message=f'Average temperature of last {dt_t_str}: {av_temp_str}')
def mass_analysis(self, channels=('VM1', 'VM2', 'VM3'), n_samp_mean=10):
""" Analyse datalogger mass channels. """
key = 'mass'
# build stream with all channels
st = Stream()
for channel in channels:
st += self.stream.select(channel=channel).copy()
st.merge()
# return if there are no three components
if not len(st) == 3:
return
# correct for channel unit
for trace in st:
trace.data = trace.data * self.get_unit_factor(trace.stats.channel)
# calculate average of absolute maximum of mass offset of last n_samp_mean
last_values = np.array([trace.data[-n_samp_mean:] for trace in st])
last_val_mean = np.nanmean(last_values, axis=1)
common_highest_val = np.nanmax(abs(last_val_mean))
common_highest_val = round(common_highest_val, 1)
# get thresholds for WARN (max_vm_warn) and FAIL (max_vm_fail)
thresholds = self.parameters.get('THRESHOLDS')
max_vm_warn = thresholds.get('max_vm_warn')
max_vm_fail = thresholds.get('max_vm_fail')
if not max_vm_warn or not max_vm_fail:
return
# change status depending on common_highest_val
if common_highest_val < max_vm_warn:
self.status_ok(key, detailed_message=f'{common_highest_val}V')
elif max_vm_warn <= common_highest_val < max_vm_fail:
self.warn(key=key,
detailed_message=f'Warning raised for mass centering. Highest val (abs) {common_highest_val}V', )
else:
self.error(key=key,
detailed_message=f'Fail status for mass centering. Highest val (abs) {common_highest_val}V',)
if self.verbosity > 1:
self.print(40 * '-')
self.print('Performing mass position check', flush=False)
self.print(f'Average mass position at {common_highest_val}', flush=False)
def pb_power_analysis(self, channel='EX2', pb_dict_key='pb_SOH2'):
""" Analyse EX2 channel of PowBox """
keys = ['230V', '12V']
st = self.stream.select(channel=channel)
trace = self.get_trace(st, keys)
if not trace:
return
voltage = trace.data * self.get_unit_factor(channel)
if self.verbosity > 1:
self.print(40 * '-')
self.print('Performing PowBox 12V/230V check (EX2)', flush=False)
voltage_check, voltage_dict, last_val = self.pb_voltage_ok(trace, voltage, pb_dict_key, channel=channel)
if voltage_check:
for key in keys:
self.status_ok(key)
return
soh2_params = self.parameters.get('POWBOX').get(pb_dict_key)
self.in_depth_voltage_check(trace, voltage_dict, soh2_params, last_val)
def pb_rout_charge_analysis(self, channel='EX3', pb_dict_key='pb_SOH3'):
""" Analyse EX3 channel of PowBox """
keys = ['router', 'charger']
pb_thresh = self.parameters.get('THRESHOLDS').get('pb_1v')
st = self.stream.select(channel=channel)
trace = self.get_trace(st, keys)
if not trace:
return
voltage = trace.data * self.get_unit_factor(channel)
if self.verbosity > 1:
self.print(40 * '-')
self.print('Performing PowBox Router/Charger check (EX3)', flush=False)
voltage_check, voltage_dict, last_val = self.pb_voltage_ok(trace, voltage, pb_dict_key, channel=channel)
if voltage_check:
for key in keys:
self.status_ok(key)
return
soh3_params = self.parameters.get('POWBOX').get(pb_dict_key)
self.in_depth_voltage_check(trace, voltage_dict, soh3_params, last_val)
def in_depth_voltage_check(self, trace, voltage_dict, soh_params, last_val):
""" Associate values in voltage_dict to error messages specified in SOH_params and warn."""
for volt_lvl, ind_array in voltage_dict.items():
if volt_lvl == 1:
continue # No need to do anything here
if len(ind_array) > 0:
# get result from parameter dictionary for voltage level
result = soh_params.get(volt_lvl)
for key, message in result.items():
# if result is OK, continue with next voltage level
if message == 'OK':
self.status_ok(key)
continue
if volt_lvl != 1:
n_occurrences = self.calc_occurrences(ind_array)
self.warn(key=key,
detailed_message=f'Trace {trace.get_id()}: '
f'Found {n_occurrences} occurrence(s) of {volt_lvl}V: {key}: {message}'
+ self.get_last_occurrence_timestring(trace, ind_array),
count=n_occurrences,
last_occurrence=self.get_last_occurrence(trace, ind_array))
# if last_val == current voltage (which is not 1) -> FAIL or last_val < 1: PBox no data
if volt_lvl == last_val:
self.error(key, detailed_message=f'Last PowBox voltage state {last_val}V: {message}')
elif volt_lvl == -1 and last_val < 1:
self.error(key, detailed_message=f'PowBox under 1V - connection error', disc=True)
def gaps_analysis(self, key='gaps'):
""" return gaps of a given nwst_id """
gaps = []
for gap_list in self.parent.gaps:
nw_gap, st_gap = gap_list[:2]
if nw_gap == self.network and st_gap == self.station:
gaps.append(gap_list)
if not gaps:
self.status_ok(key=key)
return
detailed_message = ''
for gap_list in gaps:
text = '{}.{}.{}.{}: last sample - {}, next sample - {}, delta {}, samples {}\n'.format(*gap_list)
detailed_message += text
self.warn(key=key, detailed_message=detailed_message, count=len(gaps))
def calc_occurrences(self, ind_array):
# try calculate number of voltage peaks/plateaus from gaps between indices
if len(ind_array) == 0:
return 0
else:
# start index at 1 if there are gaps (n_peaks = n_gaps + 1)
n_occurrences = 1
min_samples = self.parameters.get('min_sample')
if not min_samples:
min_samples = 1
# calculated differences in index array, diff > 1: gap, diff == 1: within peak/plateau
diffs = np.diff(ind_array)
gap_start_inds = np.where(np.diff(ind_array) > 1)[0]
# iterate over all gaps and check "min_samples" before the gap
for gsi in gap_start_inds:
# right boundary index of peak (gap index - 1)
peak_rb_ind = gsi - 1
# left boundary index of peak
peak_lb_ind = max([0, peak_rb_ind - min_samples])
if all(diffs[peak_lb_ind: peak_rb_ind] == 1):
n_occurrences += 1
return n_occurrences
def get_trace(self, stream, keys):
if not type(keys) == list:
keys = [keys]
if len(stream) == 0:
for key in keys:
self.warn(key, 'NO DATA', 'NO DATA')
return
if len(stream) > 1:
raise Exception('Ambiguity error')
trace = stream[0]
if trace.stats.endtime < self.analysis_starttime:
for key in keys:
self.warn(key, 'NO DATA', 'NO DATA')
return
return trace
def pb_voltage_ok(self, trace, voltage, pb_dict_key, channel=None):
"""
Checks if voltage level is ok everywhere and returns True. If it is not okay it returns a dictionary
with each voltage value associated to the different steps specified in POWBOX > pb_steps. Also raises
self.warn in case there are unassociated voltage values recorded.
"""
pb_thresh = self.parameters.get('THRESHOLDS').get('pb_thresh')
pb_ok = self.parameters.get('POWBOX').get('pb_ok')
# possible voltage levels are keys of pb voltage level dict
voltage_levels = list(self.parameters.get('POWBOX').get(pb_dict_key).keys())
# get mean voltage value of last samples
last_voltage = np.nanmean(voltage[-3:])
# check if voltage is over or under OK-level (1V), if not return True
over = np.where(voltage > pb_ok + pb_thresh)[0]
under = np.where(voltage < pb_ok - pb_thresh)[0]
if len(over) == 0 and len(under) == 0:
return True, {}, last_voltage
# Get voltage levels for classification
voltage_dict = {}
classified_indices = np.array([])
# add classified levels to voltage_dict
for volt in voltage_levels:
indices = np.where((voltage < volt + pb_thresh) & (voltage > volt - pb_thresh))[0]
voltage_dict[volt] = indices
classified_indices = np.append(classified_indices, indices)
# Warn in case of voltage under OK-level (1V)
if len(under) > 0:
# try calculate number of occurences from gaps between indices
n_occurrences = len(np.where(np.diff(under) > 1)[0]) + 1
voltage_dict[-1] = under
self.status_other(detailed_message=f'Trace {trace.get_id()}: '
f'Voltage below {pb_ok}V in {len(under)} samples, {n_occurrences} time(s). '
f'Mean voltage: {np.mean(voltage):.2}'
+ self.get_last_occurrence_timestring(trace, under),
status_message='under 1V ({})'.format(n_occurrences))
# classify last voltage values
for volt in voltage_levels:
if (last_voltage < volt + pb_thresh) and (last_voltage > volt - pb_thresh):
last_val = volt
break
else:
last_val = round(last_voltage, 2)
# in case not all voltage values could be classified
if not len(classified_indices) == len(voltage):
all_indices = np.arange(len(voltage))
unclassified_indices = all_indices[~np.isin(all_indices, classified_indices)]
n_unclassified = len(unclassified_indices)
max_uncl = self.parameters.get('THRESHOLDS').get('unclassified')
if max_uncl and n_unclassified > max_uncl:
self.status_other(detailed_message=f'Trace {trace.get_id()}: '
f'{n_unclassified}/{len(all_indices)} '
f'unclassified voltage values in channel {trace.get_id()}',
status_message=f'{channel}: {n_unclassified} uncl.')
return False, voltage_dict, last_val
def get_time(self, trace, index):
""" get UTCDateTime from trace and index"""
return trace.stats.starttime + trace.stats.delta * index
class Status(object):
""" Basic Status class. All status classes are derived from this class."""
def __init__(self, message=None, detailed_messages=None, count: int = 0, last_occurrence=None, show_count=True):
if message is None:
message = '-'
if detailed_messages is None:
detailed_messages = []
self.show_count = show_count
self.message = message
self.messages = [message]
self.detailed_messages = detailed_messages
self.count = count
self.last_occurrence = last_occurrence
self.is_warn = None
self.is_error = None
self.connection_error = None
self.is_other = False
self.is_active = False
def set_warn(self):
self.is_warn = True
def set_error(self):
self.is_warn = False
self.is_error = True
def set_ok(self):
self.is_warn = False
self.is_error = False
def get_status_str(self):
message = self.message
if self.count > 1 and self.show_count:
message += f' ({self.count})'
detailed_message = ''
for index, dm in enumerate(self.detailed_messages):
if index > 0:
detailed_message += ' | '
detailed_message += dm
return message, detailed_message
class StatusOK(Status):
def __init__(self, message='OK', detailed_messages=None):
super(StatusOK, self).__init__(message=message, detailed_messages=detailed_messages)
self.set_ok()
class StatusWarn(Status):
def __init__(self, message='WARN', count=1, last_occurence=None, detailed_messages=None, show_count=False):
super(StatusWarn, self).__init__(message=message, count=count, last_occurrence=last_occurence,
detailed_messages=detailed_messages, show_count=show_count)
self.set_warn()
class StatusError(Status):
def __init__(self, message='FAIL', count=1, last_occurence=None, detailed_messages=None, show_count=False):
super(StatusError, self).__init__(message=message, count=count, last_occurrence=last_occurence,
detailed_messages=detailed_messages, show_count=show_count)
self.set_error()
self.default_message = message
def set_disconnected(self, message='DCN'):
self.connection_error = True
self.message = message
def set_connected(self):
self.connection_error = False
self.message = self.default_message
class StatusOther(Status):
def __init__(self, messages=None, count=1, last_occurence=None, detailed_messages=None):
super(StatusOther, self).__init__(count=count, last_occurrence=last_occurence,
detailed_messages=detailed_messages)
if messages is None:
messages = []
self.messages = messages
self.is_other = True
def get_status_str(self):
if self.messages == []:
return '-'
message = ''
for index, mes in enumerate(self.messages):
if index > 0:
message += ' | '
message += mes
detailed_message = ''
for index, dm in enumerate(self.detailed_messages):
if index > 0:
detailed_message += ' | '
detailed_message += dm
return message, detailed_message
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Call survBot')
parser.add_argument('-html', dest='html_path', default=None, help='filepath for HTML output')
parser.add_argument('-parfile', dest='parfile', default='parameters.yaml',
help='parameter file (default: parameters.yaml)')
args = parser.parse_args()
survBot = SurveillanceBot(parameter_path=args.parfile, outpath_html=args.html_path)
survBot.start()