[new] initial commit, moving repository from code base

This commit is contained in:
Marcel Paffrath 2022-11-03 15:39:23 +01:00
parent 68e12afc61
commit cc8c1833f1
7 changed files with 1122 additions and 1 deletions

View File

@ -1,3 +1,41 @@
# survBot # survBot
small program used to track station quality channels of DSEBRA stations using PowBox version: 0.1
survBot is a small program used to track station quality channels of DSEBRA stations via PowBox output over SOH channels
by analysing contents of a Seiscomp3 datapath.
## Requirements
The following packages are required:
* Python 3
* obspy
(the following are dependencies of the above):
* numpy
* matplotlib
to use the GUI:
* PySide2, PyQt4 or PyQt5
## Usage
Configurations of *datapath*, *networks*, *stations* etc. can be done in the **parameters.yaml** input file.
The main program is executed by entering
```shell script
python survBot.py
```
The GUI can be loaded via
```shell script
python survBotGui.py
```
## Staff
Original author: M.Paffrath (marcel.paffrath@rub.de)
November 2022

0
__init__.py Normal file
View File

37
parameters.yaml Normal file
View File

@ -0,0 +1,37 @@
# Parameters file for Surveillance Bot
datapath: '/data/SDS/' # SC3 Datapath
networks: ['1Y', 'HA']
stations: '*'
locations: '*'
channels: ['EX1', 'EX2', 'EX3', 'VEI'] # Specify SOH channels, currently supported EX[1-3] and VEI
stations_blacklist: ['TEST', 'EREA']
networks_blacklist: []
interval: 20 # Perform checks every x seconds
timespan: 7 # Check data of the recent x days
verbosity: 0
track_changes: True # tracks all changes since GUI startup by text highlighting (GUI only)
POWBOX:
pb_ok: 1 # Voltage for PowBox OK
pb_SOH2: # PowBox channel 2 voltage translations
1: {"230V": 'OK', "12V": "OK"}
2: {"230V": "OFF", "12V": "OK"}
3: {"230V": "OK", "12V": "overvoltage"}
4: {"230V": "OK", "12V": "undervoltage"}
4.5: {"230V": "OFF", "12V": "overvoltage"}
5: {"230V": "OFF", "12V": "undervoltage"}
pb_SOH3: # PowBox channel 3 voltage translations
1: {"router": "OK", "charger": "OK"}
2: {"router": "OK", "charger": "0 < resets < 3"}
2.5: {"router": "OK", "charger": "locked"}
3: {"router": "FAIL", "charger": "OK"}
4: {"router": "FAIL", "charger": "0 < resets < 3"}
5: {"router": "FAIL", "charger": "locked"}
THRESHOLDS:
pb_thresh: 0.2 # Threshold for PowBox Voltage check +/- (V)
max_temp: 50 # max temperature for temperature warning
low_volt: 12 # min voltage for low voltage warning
high_volt: 14.8 # max voltage for over voltage warning
unclassified: 5 # min voltage samples not classified for warning

15
submit_bot.sh Normal file
View File

@ -0,0 +1,15 @@
#!/bin/bash
ulimit -s 8192
#$ -l low
#$ -l os=*stretch
#$ -cwd
#$ -pe smp 1
##$ -q "*@minos15"
export PYTHONPATH="$PYTHONPATH:/home/marcel/git/"
source /opt/anaconda3/etc/profile.d/conda.sh
conda activate py37
python survBot.py

548
survBot.py Executable file
View File

@ -0,0 +1,548 @@
#! /usr/bin/env python
__version__ = '0.1'
__author__ = 'Marcel Paffrath'
import os
import yaml
import time
from datetime import timedelta
import numpy as np
from obspy import read, UTCDateTime, Stream
from obspy.clients.filesystem.sds import Client
pjoin = os.path.join
UP = "\x1B[{length}A"
CLR = "\x1B[0K"
deg_str = '\N{DEGREE SIGN}C'
def read_yaml(file_path):
with open(file_path, "r") as f:
return yaml.safe_load(f)
def nsl_from_id(st_id):
network, station, location = st_id.split('.')
return dict(network=network, station=station, location=location)
def get_st_id(trace):
stats = trace.stats
return f'{stats.network}.{stats.station}.'#{stats.location}'
def fancy_timestr(dt, thresh=600, modif='+'):
if dt > timedelta(seconds=thresh):
value = f'{modif} ' + str(dt) + f' {modif}'
else:
value = str(dt)
return value
class SurveillanceBot(object):
def __init__(self, parameter_path):
self.keys = ['last active', '230V', '12V', 'router', 'charger', 'voltage', 'temp', 'other']
self.parameters = read_yaml(parameter_path)
self.transform_parameters()
self.starttime = UTCDateTime()
self.verbosity = self.parameters.get('verbosity')
self.filenames = []
self.filenames_read = []
self.station_list = []
self.analysis_print_list = []
self.analysis_results = {}
self.stations_blacklist = self.parameters.get('stations_blacklist')
self.networks_blacklist = self.parameters.get('networks_blacklist')
self.dataStream = Stream()
self.data = {}
self.print_count = 0
self.refresh_period = 0
self.cl = Client(self.parameters.get('datapath')) #TODO: Check if this has to be loaded again on update
self.get_stations()
def transform_parameters(self):
for key in ['networks', 'stations', 'locations', 'channels']:
parameter = self.parameters.get(key)
if type(parameter) == str:
self.parameters[key] = list(self.parameters[key])
elif type(parameter) not in [list]:
raise TypeError(f'Bad input type for {key}: {type(key)}')
def get_stations(self):
networks = self.parameters.get('networks')
stations = self.parameters.get('stations')
self.station_list = []
nwst_list = self.cl.get_all_stations()
for nw, st in nwst_list:
if self.stations_blacklist and st in self.stations_blacklist:
continue
if self.networks_blacklist and nw in self.networks_blacklist:
continue
if (networks == ['*'] or nw in networks) and (stations == ['*'] or st in stations):
st_id = f'{nw}.{st}.'
self.station_list.append(st_id)
def get_filenames(self):
self.filenames = []
time_now = UTCDateTime()
t1 = time_now - self.parameters.get('timespan') * 24 * 3600
networks = self.parameters.get('networks')
stations = self.parameters.get('stations')
locations = self.parameters.get('locations')
channels = self.parameters.get('channels')
for network in networks:
for station in stations:
for location in locations:
for channel in channels:
self.filenames += list(self.cl._get_filenames(network, station, location, channel,
starttime=t1, endtime=time_now))
def read_data(self):
self.data = {}
# add all data to current stream
for filename in self.filenames:
if filename in self.filenames_read:
continue
try:
st_new = read(filename)
julday = UTCDateTime().julday
# add file to read filenames to prevent re-reading in case it is not the current dayfile
if not filename.endswith(str(julday)):
self.filenames_read.append(filename)
except Exception as e:
print(f'Could not read file {filename}:', e)
continue
self.dataStream += st_new
self.dataStream.merge()
# organise data in dictionary with key for each station
for trace in self.dataStream:
st_id = get_st_id(trace)
if not st_id in self.data.keys():
self.data[st_id] = Stream()
self.data[st_id].append(trace)
def execute_qc(self):
self.starttime = UTCDateTime()
self.get_filenames()
self.read_data()
self.analysis_print_list = []
self.analysis_results = {}
for st_id in sorted(self.station_list):
stream = self.data.get(st_id)
if stream:
nsl = nsl_from_id(st_id)
station_qc = StationQC(stream, nsl, self.parameters, self.keys, self.starttime, self.verbosity,
print_func=self.print)
analysis_print_result = station_qc.return_print_analysis()
station_dict, warn_dict = station_qc.return_analysis()
else:
analysis_print_result = self.get_no_data_station(st_id, to_print=True)
station_dict, warn_dict = self.get_no_data_station(st_id)
self.analysis_print_list.append(analysis_print_result)
self.analysis_results[st_id] = (station_dict, warn_dict)
return 'ok'
def get_no_data_station(self, st_id, no_data='-', to_print=False):
delay = self.get_station_delay(st_id)
if not to_print:
status_dict = {}
warn_dict = {}
for key in self.keys:
if key == 'last active':
status_dict[key] = timedelta(seconds=int(delay))
warn_dict[key] = 'No data within set timespan'
else:
status_dict[key] = no_data
warn_dict[key] = 'No data'
return status_dict, warn_dict
else:
items = [st_id.rstrip('.')] + [fancy_timestr(timedelta(seconds=int(delay)))]
for _ in range(len(self.keys) - 1):
items.append(no_data)
return items
def get_station_delay(self, st_id):
""" try to get station delay from SDS archive using client"""
locations = ['', '0', '00']
channels = ['HHZ', 'HHE', 'HHN', 'VEI', 'EX1', 'EX2', 'EX3']
network, station = st_id.split('.')[:2]
times = []
for channel in channels:
for location in locations:
t = self.cl.get_latency(network, station, location, channel)
if t:
times.append(t)
if len(times) > 0:
return min(times)
def print_analysis(self):
timespan = self.parameters.get('timespan') * 24 * 3600
self.print(200*'+')
tdelta_str = str(timedelta(seconds=int(timespan)))
title_str = f'Analysis table of router quality within the last {tdelta_str}'
self.print(title_str)
if self.refresh_period > 0:
self.print(f'Refreshing every {self.refresh_period}s.')
items = ['Station'] + self.keys
self.console_print(items, sep='---')
for items in self.analysis_print_list:
self.console_print(items)
def start(self, refresh_period=30):
'''
Perform qc periodically.
:param refresh_period: Update every x seconds
:return:
'''
self.refresh_period = refresh_period
status = 'ok'
while status == 'ok' and self.refresh_period > 0:
status = self.execute_qc()
self.print_analysis()
time.sleep(self.refresh_period)
self.clear_prints()
def console_print(self, itemlist, str_len=21, sep='|', seplen=3):
assert len(sep) <= seplen, f'Make sure seperator has less than {seplen} characters'
sl = sep.ljust(seplen)
sr = sep.rjust(seplen)
string = sl
for item in itemlist:
string += item.center(str_len) + sr
self.print(string, flush=False)
def print(self, string, **kwargs):
clear_end = CLR + '\n'
n_nl = string.count('\n')
string.replace('\n', clear_end)
print(string, end=clear_end, **kwargs)
self.print_count += n_nl + 1 #number of newlines + actual print with end='\n' (no check for kwargs end!)
#print('pc:', self.print_count)
def clear_prints(self):
print(UP.format(length=self.print_count), end='')
self.print_count = 0
class StationQC(object):
def __init__(self, stream, nsl, parameters, keys, starttime, verbosity, print_func):
"""
Station Quality Check class.
:param nsl: dictionary containing network, station and location (key: str)
:param parameters: parameters dictionary from parameters.yaml file
"""
self.stream = stream
self.nsl = nsl
self.network = nsl.get('network')
self.station = nsl.get('station')
self.location = nsl.get('location')
self.parameters = parameters
self.program_starttime = starttime
self.verbosity = verbosity
self.last_active = False
self.print = print_func
timespan = self.parameters.get('timespan') * 24 * 3600
self.analysis_starttime = self.program_starttime - timespan
self.keys = keys
self.detailed_status_dict = {key: None for key in self.keys}
self.status_dict = {key: '-' for key in self.keys}
self.activity_check()
self.analyse_channels()
def status_ok(self, key, message=None, status_message='OK'):
self.status_dict[key] = status_message
if message:
self.detailed_status_dict[key] = message
def warn(self, key, message, status_message='WARN'):
self.detailed_status_dict[key] = message
self.status_dict[key] = status_message
# change this to something more useful, SMS/EMAIL/PUSH
if self.verbosity:
self.print(f'{UTCDateTime()}: {message}', flush=False)
# warnings.warn(message)
def error(self, key, message):
self.detailed_status_dict[key] = message
self.status_dict[key] = 'FAIL'
# change this to something more useful, SMS/EMAIL/PUSH
if self.verbosity:
self.print(f'{UTCDateTime()}: {message}', flush=False)
# warnings.warn(message)
def activity_check(self):
self.last_active = self.last_activity()
if not self.last_active:
message = 'FAIL'
else:
message = timedelta(seconds=int(self.program_starttime - self.last_active))
self.status_dict['last active'] = message
def last_activity(self):
if not self.stream:
return
endtimes = []
for trace in self.stream:
endtimes.append(trace.stats.endtime)
if len(endtimes) > 0:
return max(endtimes)
def analyse_channels(self):
if self.verbosity > 0:
self.print(150*'#')
self.print('This is StationQT. Calculating quality for station'
' {network}.{station}.{location}'.format(**self.nsl))
self.voltage_analysis()
self.pb_temp_analysis()
self.pb_power_analysis()
self.pb_rout_charge_analysis()
def return_print_analysis(self):
items = [f'{self.network}.{self.station}']
for key in self.keys:
item = self.status_dict[key]
if key == 'last active':
items.append(fancy_timestr(item))
elif key == 'temp':
items.append(str(item) + deg_str)
else:
items.append(str(item))
return items
def return_analysis(self):
return self.status_dict, self.detailed_status_dict
def get_last_occurrence_timestring(self, trace, indices):
""" returns a nicely formatted string of the timedelta since program starttime and occurrence and abs time"""
last_occur = self.get_time(trace, indices[-1])
if not last_occur:
return ''
last_occur_dt = timedelta(seconds=int(self.program_starttime - last_occur))
return f', Last occurrence: {last_occur_dt} ({last_occur.strftime("%Y-%m-%d %H:%M:%S")})'
def voltage_analysis(self, channel='VEI'):
""" Analyse voltage channel for over/undervoltage """
key='voltage'
st = self.stream.select(channel=channel)
trace = self.get_trace(st, key)
if not trace: return
voltage = trace.data * 1e-3
low_volt = self.parameters.get('THRESHOLDS').get('low_volt')
high_volt = self.parameters.get('THRESHOLDS').get('high_volt')
if self.verbosity > 1:
self.print(40 * '-')
self.print('Performing Voltage check', flush=False)
overvolt = np.where(voltage > high_volt)[0]
undervolt = np.where(voltage < low_volt)[0]
if len(overvolt) == 0 and len(undervolt) == 0:
self.status_ok(key, message=f'U={(voltage[-1])}V')
return
# try calculate number of voltage peaks from gaps between indices
n_overvolt = len(np.where(np.diff(overvolt) > 1)[0]) + 1
n_undervolt = len(np.where(np.diff(undervolt) > 1)[0]) + 1
warn_message = f'Trace {trace.get_id()}:'
if len(overvolt) > 0:
warn_message += f' {n_overvolt}x Voltage over {high_volt}V' \
+ self.get_last_occurrence_timestring(trace, overvolt)
if len(undervolt) > 0:
warn_message += f' {n_undervolt}x Voltage under {low_volt}V ' \
+ self.get_last_occurrence_timestring(trace, undervolt)
self.warn(key, message=warn_message, status_message='WARN ({})'.format(n_overvolt + n_undervolt))
def pb_temp_analysis(self, channel='EX1'):
""" Analyse PowBox temperature output. """
key='temp'
st = self.stream.select(channel=channel)
trace = self.get_trace(st, key)
if not trace: return
voltage = trace.data * 1e-6
thresholds = self.parameters.get('THRESHOLDS')
temp = 20. * voltage - 20
# average temp
timespan = min([self.parameters.get('timespan') * 24 * 3600, int(len(temp)/trace.stats.sampling_rate)])
nsamp_av = int(trace.stats.sampling_rate) * timespan
av_temp_str = str(round(np.mean(temp[-nsamp_av:]), 1)) + deg_str
# dt of average
dt_t_str = str(timedelta(seconds=int(timespan)))
# current temp
cur_temp = round(temp[-1], 1)
if self.verbosity > 1:
self.print(40 * '-')
self.print('Performing PowBox temperature check (EX1)', flush=False)
self.print(f'Average temperature at {np.mean(temp)}\N{DEGREE SIGN}', flush=False)
self.print(f'Peak temperature at {max(temp)}\N{DEGREE SIGN}', flush=False)
self.print(f'Min temperature at {min(temp)}\N{DEGREE SIGN}', flush=False)
max_temp = thresholds.get('max_temp')
t_check = np.where(temp > max_temp)[0]
if len(t_check) > 0:
self.warn(key=key,
status_message=cur_temp,
message=f'Trace {trace.get_id()}: '
f'Temperature over {max_temp}\N{DEGREE SIGN} at {trace.get_id()}!'
+ self.get_last_occurrence_timestring(trace, t_check))
else:
self.status_ok(key,
status_message=cur_temp,
message=f'Average temperature of last {dt_t_str}: {av_temp_str}')
def pb_power_analysis(self, channel='EX2', pb_dict_key='pb_SOH2'):
""" Analyse EX2 channel of PowBox """
keys = ['230V', '12V']
st = self.stream.select(channel=channel)
trace = self.get_trace(st, keys)
if not trace: return
voltage = trace.data * 1e-6
if self.verbosity > 1:
self.print(40 * '-')
self.print('Performing PowBox 12V/230V check (EX2)', flush=False)
voltage_check, voltage_dict, last_val = self.pb_voltage_ok(trace, voltage, pb_dict_key)
if voltage_check:
for key in keys:
self.status_ok(key)
return
soh2_params = self.parameters.get('POWBOX').get(pb_dict_key)
self.in_depth_voltage_check(trace, voltage_dict, soh2_params, last_val)
def pb_rout_charge_analysis(self, channel='EX3', pb_dict_key='pb_SOH3'):
""" Analyse EX3 channel of PowBox """
keys = ['router', 'charger']
pb_thresh = self.parameters.get('THRESHOLDS').get('pb_1v')
st = self.stream.select(channel=channel)
trace = self.get_trace(st, keys)
if not trace: return
voltage = trace.data * 1e-6
if self.verbosity > 1:
self.print(40 * '-')
self.print('Performing PowBox Router/Charger check (EX3)', flush=False)
voltage_check, voltage_dict, last_val = self.pb_voltage_ok(trace, voltage, pb_dict_key)
if voltage_check:
for key in keys:
self.status_ok(key)
return
soh3_params = self.parameters.get('POWBOX').get(pb_dict_key)
self.in_depth_voltage_check(trace, voltage_dict, soh3_params, last_val)
def in_depth_voltage_check(self, trace, voltage_dict, soh_params, last_val):
""" Associate values in voltage_dict to error messages specified in SOH_params and warn."""
for volt_lvl, ind_array in voltage_dict.items():
if volt_lvl == 1: continue # No need to do anything here
if len(ind_array) > 0:
result = soh_params.get(volt_lvl)
for key, message in result.items():
if message == 'OK':
self.status_ok(key)
continue
# try calculate number of voltage peaks from gaps between indices
n_occurrences = len(np.where(np.diff(ind_array) > 1)[0]) + 1
self.warn(key=key,
message=f'Trace {trace.get_id()}: '
f'Found {n_occurrences} occurrence(s) of {volt_lvl}V: {key}: {message}'
+ self.get_last_occurrence_timestring(trace, ind_array),
status_message='WARN ({})'.format(n_occurrences))
if last_val != 1:
self.error(key, message=f'Last PowBox voltage state {last_val}V: {message}')
def get_trace(self, stream, keys):
if not type(keys) == list:
keys = [keys]
if len(stream) == 0:
for key in keys:
self.warn(key, 'NO DATA', 'NO DATA')
return
if len(stream) > 1:
raise Exception('Ambiguity error')
trace = stream[0]
if trace.stats.endtime < self.analysis_starttime:
for key in keys:
self.warn(key, 'NO DATA', 'NO DATA')
return
return trace
def pb_voltage_ok(self, trace, voltage, pb_dict_key):
"""
Checks if voltage level is ok everywhere and returns True. If it is not okay it returns a dictionary
with each voltage value associated to the different steps specified in POWBOX > pb_steps. Also raises
self.warn in case there are unassociated voltage values recorded.
"""
pb_thresh = self.parameters.get('THRESHOLDS').get('pb_thresh')
pb_ok = self.parameters.get('POWBOX').get('pb_ok')
# possible voltage levels are keys of pb voltage level dict
voltage_levels = list(self.parameters.get('POWBOX').get(pb_dict_key).keys())
# get mean voltage value of last samples
last_voltage = np.nanmean(voltage[-3:])
# check if voltage is over or under OK-level (1V), if not return True
over = np.where(voltage > pb_ok + pb_thresh)[0]
under = np.where(voltage < pb_ok - pb_thresh)[0]
if len(over) == 0 and len(under) == 0:
return True, {}, last_voltage
# Warn in case of voltage under OK-level (1V)
if len(under) > 0:
self.warn(key='other',
message=f'Trace {trace.get_id()}: '
f'Voltage below {pb_ok}V {len(under)} times. '
f'Mean voltage: {np.mean(voltage)}'
+ self.get_last_occurrence_timestring(trace, under),
status_message='UNDER 1V')
# Get voltage levels for classification
voltage_dict = {}
classified_indices = np.array([])
# add classified levels to voltage_dict
for volt in voltage_levels:
indices = np.where((voltage < volt + pb_thresh) & (voltage > volt - pb_thresh))[0]
voltage_dict[volt] = indices
classified_indices = np.append(classified_indices, indices)
# classify last voltage values
for volt in voltage_levels:
if (last_voltage < volt + pb_thresh) and (last_voltage > volt - pb_thresh):
last_val = volt
break
else:
last_val = np.nan
# in case not all voltage values could be classified
if not len(classified_indices) == len(voltage):
all_indices = np.arange(len(voltage))
unclassified_indices = all_indices[~np.isin(all_indices, classified_indices)]
n_unclassified = len(unclassified_indices)
max_uncl = self.parameters.get('THRESHOLDS').get('unclassified')
if max_uncl and n_unclassified > max_uncl:
self.warn(key='other', message=f'Trace {trace.get_id()}: '
f'{n_unclassified}/{len(all_indices)} '
f'unclassified voltage values in channel {trace.get_id()}',
status_message=f'{n_unclassified} UNCLASSIFIED')
return False, voltage_dict, last_val
def get_time(self, trace, index):
""" get UTCDateTime from trace and index"""
return trace.stats.starttime + trace.stats.delta * index
if __name__ == '__main__':
survBot = SurveillanceBot(parameter_path='parameters.yaml')
survBot.start(refresh_period=30)

469
survBotGUI.py Executable file
View File

@ -0,0 +1,469 @@
#! /usr/bin/env python
"""
GUI overlay for the main survBot to show quality control of different stations specified in parameters.yaml file.
"""
__version__ = '0.1'
__author__ = 'Marcel Paffrath'
import os
import sys
import traceback
from datetime import timedelta
try:
from PySide2 import QtGui, QtCore, QtWidgets
except ImportError:
try:
from PySide6 import QtGui, QtCore, QtWidgets
except ImportError:
try:
from PyQt5 import QtGui, QtCore, QtWidgets
except ImportError:
raise ImportError('Could import neither of PySide2, PySide6 or PyQt5')
import matplotlib
from matplotlib.figure import Figure
if QtGui.__package__ in ['PySide2', 'PyQt5']:
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT
else:
raise Exception('Not implemented')
from obspy import UTCDateTime
from survBot import SurveillanceBot
try:
from rest_api.utils import get_station_iccid
from rest_api.rest_api_utils import get_last_messages, send_message, get_default_params
sms_funcs = True
except ImportError:
print('Could not load rest_api utils, SMS functionality disabled.')
sms_funcs = False
deg_str = '\N{DEGREE SIGN}C'
class Thread(QtCore.QThread):
"""
A simple thread that runs outside of the main event loop. Executes the function "runnable" and prevents
freezing of the GUI. Run method is executed outside main event loop when called with thread.start().
"""
update = QtCore.Signal()
def __init__(self, parent, runnable, verbosity=0):
super(Thread, self).__init__(parent=parent)
self.setParent(parent)
self.verbosity = verbosity
self.runnable = runnable
self.is_active = True
def run(self):
""" Try to run self.runnable and emit update signal, or print Exception if failed. """
try:
t0 = UTCDateTime()
self.runnable()
self.update.emit()
except Exception as e:
self.is_active = False
print(e)
print(traceback.format_exc())
finally:
if self.verbosity > 0:
print(f'Time for Thread execution: {UTCDateTime() - t0}')
class MainWindow(QtWidgets.QMainWindow):
def __init__(self, parameters='parameters.yaml', dt_thresh=(300, 1800)):
"""
Main window of survBot GUI.
:param parameters: Parameters dictionary file (yaml format)
:param dt_thresh: threshold for timing delay colourisation (yellow/red)
"""
super(MainWindow, self).__init__()
# some GUI default colors
self.colors_dict = {'FAIL': (255, 50, 0, 255),
'NO DATA': (255, 255, 125, 255),
'WARN': (255, 255, 125, 255),
'WARNX': lambda x: (min([255, 200 + x**2]), 255, 125, 255),
'OK': (125, 255, 125, 255),
'undefined': (230, 230, 230, 255)}
# init some attributes
self.dt_thresh = dt_thresh
self.last_mouse_loc = None
self.starttime = UTCDateTime()
# setup main layout of the GUI
self.main_layout = QtWidgets.QVBoxLayout()
self.centralWidget = QtWidgets.QWidget()
self.centralWidget.setLayout(self.main_layout)
self.setCentralWidget(self.centralWidget)
# init new survBot instance, set parameters and refresh
self.survBot = SurveillanceBot(parameter_path=parameters)
self.parameters = self.survBot.parameters
self.refresh_period = self.parameters.get('interval')
# create thread that is used to update
self.thread = Thread(parent=self, runnable=self.survBot.execute_qc)
self.thread.update.connect(self.fill_table)
self.init_table()
self.init_buttons()
# These filters were used to track current mouse position if an event (i.e. mouseclick) is triggered
self.table.installEventFilter(self)
self.installEventFilter(self)
# initiate clear_on_refresh flag and set status bar text
self.clear_on_refresh = False
self.fill_status_bar()
# start thread that executes qc at first initiation, then activate timer for further thread activation
self.thread.start()
self.run_refresh_timer()
def init_table(self):
self.table = QtWidgets.QTableWidget()
keys = self.survBot.keys
station_list = self.survBot.station_list
self.table.setColumnCount(len(keys))
self.table.setRowCount(len(station_list))
self.table.setHorizontalHeaderLabels(keys)
for index, st_id in enumerate(station_list):
item = QtWidgets.QTableWidgetItem()
item.setText(str(st_id.rstrip('.')))
item.setData(QtCore.Qt.UserRole, st_id)
self.table.setVerticalHeaderItem(index, item)
self.main_layout.addWidget(self.table)
self.table.itemDoubleClicked.connect(self.plot_stream)
self.table.setEditTriggers(QtWidgets.QTableWidget.NoEditTriggers)
if sms_funcs:
self.table.verticalHeader().sectionClicked.connect(self.sms_context_menu)
self.set_stretch()
def init_buttons(self):
if self.parameters.get('track_changes'):
button_text = 'Clear track and refresh'
else:
button_text = 'Refresh'
self.clear_button = QtWidgets.QPushButton(button_text)
self.clear_button.setToolTip('Reset track changes and refresh table')
self.clear_button.clicked.connect(self.refresh)
self.main_layout.addWidget(self.clear_button)
def refresh(self):
self.set_clear_on_refresh()
self.run_refresh_timer()
self.thread.start()
def run_refresh_timer(self):
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self.thread.start)
self.timer.start(int(self.refresh_period * 1e3))
def eventFilter(self, object, event):
"""
An event filter that stores last mouse position if an event is raised by the table. All events are passed
to the parent class of the Mainwindow afterwards.
"""
print(object, event)
if hasattr(event, 'pos'):
self.last_mouse_loc = event.pos()
return super(QtWidgets.QMainWindow, self).eventFilter(object, event)
def sms_context_menu(self, row_ind):
""" Open a context menu when left-clicking vertical header item """
header_item = self.table.verticalHeaderItem(row_ind)
if not header_item:
return
st_id = header_item.data(QtCore.Qt.UserRole)
context_menu = QtWidgets.QMenu()
read_sms = context_menu.addAction('Get last SMS')
send_sms = context_menu.addAction('Send SMS')
action = context_menu.exec_(self.mapToGlobal(self.last_mouse_loc))
if action == read_sms:
self.read_sms(st_id)
elif action == send_sms:
self.send_sms(st_id)
def read_sms(self, st_id):
""" Read recent SMS over rest_api using whereversim portal """
station = st_id.split('.')[1]
iccid = get_station_iccid(station)
if not iccid:
print('Could not find iccid for station', st_id)
return
sms_widget = ReadSMSWidget(parent=self, iccid=iccid)
sms_widget.setWindowTitle(f'Recent SMS of station: {st_id}')
if sms_widget.data:
sms_widget.show()
else:
self.notification('No recent messages found.')
def send_sms(self, st_id):
""" Send SMS over rest_api using whereversim portal """
station = st_id.split('.')[1]
iccid = get_station_iccid(station)
sms_widget = SendSMSWidget(parent=self, iccid=iccid)
sms_widget.setWindowTitle(f'Send SMS to station: {st_id}')
sms_widget.show()
def set_clear_on_refresh(self):
self.clear_on_refresh = True
def fill_status_bar(self):
""" Set status bar text """
status_bar = self.statusBar()
timespan = timedelta(seconds=int(self.parameters.get('timespan') * 24 * 3600))
status_bar.showMessage(f'Program starttime (UTC) {self.starttime.strftime("%Y-%m-%d %H:%M:%S")} | '
f'Refresh period: {self.refresh_period}s | '
f'Showing data of last {timespan}')
def fill_table(self):
""" Fills the table with most recent information. Executed after execute_qc thread is done or on refresh. """
for col_ind, check_key in enumerate(self.survBot.keys):
for row_ind, st_id in enumerate(self.survBot.station_list):
status_dict, detailed_dict = self.survBot.analysis_results.get(st_id)
status = status_dict.get(check_key)
detailed_message = detailed_dict.get(check_key)
if check_key == 'last active':
bg_color = self.get_time_delay_color(status)
elif check_key == 'temp':
bg_color = self.get_temp_color(status)
if not type(status) in [str]:
status = str(status) + deg_str
else:
statussplit = status.split(' ')
if len(statussplit) > 1 and statussplit[0] == 'WARN':
x = int(status.split(' ')[-1].lstrip('(').rstrip(')'))
bg_color = self.colors_dict.get('WARNX')(x)
else:
bg_color = self.colors_dict.get(status)
if not bg_color:
bg_color = self.colors_dict.get('undefined')
# Continue if nothing changed
text = str(status)
cur_item = self.table.item(row_ind, col_ind)
if cur_item and text == cur_item.text():
if not self.parameters.get('track_changes') or self.clear_on_refresh:
# set item to default color/font and continue
self.set_font(cur_item)
self.set_fg_color(cur_item)
continue
# Create new data item
item = QtWidgets.QTableWidgetItem()
item.setText(str(status))
item.setTextAlignment(QtCore.Qt.AlignCenter)
item.setData(QtCore.Qt.UserRole, (st_id, check_key))
# if text changed (known from above) set highlight color/font else (new init) set to default
cur_item = self.table.item(row_ind, col_ind)
if cur_item and check_key != 'last active':
self.set_fg_color(item, (0, 0, 0, 255))
self.set_font_bold(item)
else:
self.set_fg_color(item)
self.set_font(item)
# set item tooltip
if detailed_message:
item.setToolTip(str(detailed_message))
# set bg color corresponding to current text (OK/WARN/ERROR etc.)
self.set_bg_color(item, bg_color)
# insert new item
self.table.setItem(row_ind, col_ind, item)
# table filling/refreshing done, set clear_on_refresh to False
self.clear_on_refresh = False
def get_time_delay_color(self, dt):
""" Set color of time delay after thresholds specified in self.dt_thresh """
dt_thresh = [timedelta(seconds=sec) for sec in self.dt_thresh]
if dt < dt_thresh[0]:
return self.colors_dict.get('OK')
elif dt_thresh[0] <= dt < dt_thresh[1]:
return self.colors_dict.get('WARN')
return self.colors_dict.get('FAIL')
def get_temp_color(self, temp, vmin=-10, vmax=60, cmap='coolwarm'):
""" Get an rgba temperature value back from specified cmap, linearly interpolated between vmin and vmax. """
if type(temp) in [str]:
return self.colors_dict.get('undefined')
cmap = matplotlib.cm.get_cmap(cmap)
val = (temp - vmin) / (vmax - vmin)
rgba = [int(255 * c) for c in cmap(val)]
return rgba
def set_font_bold(self, item):
""" Set item font bold """
f = item.font()
f.setWeight(QtGui.QFont.Bold)
item.setFont(f)
def set_font(self, item):
""" Set item font normal """
f = item.font()
f.setWeight(QtGui.QFont.Normal)
item.setFont(f)
def set_bg_color(self, item, color):
""" Set background color of item, color is RGBA tuple """
color = QtGui.QColor(*color)
item.setBackground(color)
def set_fg_color(self, item, color=(20, 20, 20, 255)):
""" Set foreground (font) color of item, color is RGBA tuple """
color = QtGui.QColor(*color)
item.setForeground(color)
def set_stretch(self):
hheader = self.table.horizontalHeader()
for index in range(hheader.count()):
hheader.setSectionResizeMode(index, QtWidgets.QHeaderView.Stretch)
vheader = self.table.verticalHeader()
for index in range(vheader.count()):
vheader.setSectionResizeMode(index, QtWidgets.QHeaderView.Stretch)
def plot_stream(self, item):
st_id, check = item.data(QtCore.Qt.UserRole)
st = self.survBot.data.get(st_id)
if st:
self.plot_widget = PlotWidget(self)
self.plot_widget.setWindowTitle(st_id)
st.plot(equal_scale=False, method='full', block=False, fig=self.plot_widget.canvas.fig)
self.plot_widget.show()
def notification(self, text):
mbox = QtWidgets.QMessageBox()
mbox.setWindowTitle('Notification')
#mbox.setDetailedText()
mbox.setText(text)
mbox.exec_()
def closeEvent(self, event):
self.thread.exit()
event.accept()
class PlotCanvas(FigureCanvas):
def __init__(self, parent=None, width=10, height=8, dpi=100):
self.fig = Figure(figsize=(width, height), dpi=dpi)
FigureCanvas.__init__(self, self.fig)
self.setParent(parent)
FigureCanvas.setSizePolicy(self, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Expanding)
FigureCanvas.updateGeometry(self)
class PlotWidget(QtWidgets.QDialog):
def __init__(self, *args, **kwargs):
QtWidgets.QDialog.__init__(self, *args, **kwargs)
self.setLayout(QtWidgets.QVBoxLayout())
self.canvas = PlotCanvas(self, width=10, height=8)
self.toolbar = NavigationToolbar2QT(self.canvas, self)
self.layout().addWidget(self.toolbar)
self.layout().addWidget(self.canvas)
class ReadSMSWidget(QtWidgets.QDialog):
def __init__(self, iccid, *args, **kwargs):
QtWidgets.QDialog.__init__(self, *args, **kwargs)
self.setLayout(QtWidgets.QVBoxLayout())
self.table = QtWidgets.QTableWidget()
self.layout().addWidget(self.table)
self.resize(1280, 400)
self.iccid = iccid
self.data = self.print_sms_table()
self.set_stretch()
def print_sms_table(self, n=5, ntextbreak=40):
messages = []
params = get_default_params(self.iccid)
for message in get_last_messages(params, n, only_delivered=False):
messages.append(message)
if not messages:
return
# pull dates to front
keys = ['dateSent', 'dateModified', 'dateReceived']
for item in messages[0].keys():
if not item in keys:
keys.append(item)
self.table.setRowCount(n)
self.table.setColumnCount(len(keys))
self.table.setHorizontalHeaderLabels(keys)
for row_index, message in enumerate(messages):
for col_index, key in enumerate(keys):
text = message.get(key)
if type(text) == str and len(text) > ntextbreak:
textlist = list(text)
for index in range(ntextbreak, len(text), ntextbreak):
textlist.insert(index, '\n')
text = ''.join(textlist)
item = QtWidgets.QTableWidgetItem()
item.setText(str(text))
self.table.setItem(row_index, col_index, item)
return True
def set_stretch(self):
hheader = self.table.horizontalHeader()
nheader = hheader.count()
for index in range(nheader):
if index < nheader - 1:
hheader.setSectionResizeMode(index, QtWidgets.QHeaderView.ResizeToContents)
else:
hheader.setSectionResizeMode(index, QtWidgets.QHeaderView.Stretch)
vheader = self.table.verticalHeader()
for index in range(vheader.count()):
vheader.setSectionResizeMode(index, QtWidgets.QHeaderView.Stretch)
class SendSMSWidget(QtWidgets.QDialog):
def __init__(self, iccid, *args, **kwargs):
QtWidgets.QDialog.__init__(self, *args, **kwargs)
self.main_layout = QtWidgets.QVBoxLayout()
self.setLayout(self.main_layout)
self.resize(400, 100)
self.line_edit = QtWidgets.QLineEdit()
self.main_layout.addWidget(self.line_edit)
self.iccid = iccid
self.buttonBox = QtWidgets.QDialogButtonBox(QtWidgets.QDialogButtonBox.Ok |
QtWidgets.QDialogButtonBox.Close)
self.main_layout.addWidget(self.buttonBox)
self.buttonBox.accepted.connect(self.send_sms)
self.buttonBox.rejected.connect(self.reject)
def send_sms(self):
text = self.line_edit.text()
params = get_default_params(self.iccid)
send_message(params, text)
self.close()
if __name__ == '__main__':
program_path = sys.path[0]
parameters = os.path.join(program_path, 'parameters.yaml')
app = QtWidgets.QApplication([])
window = MainWindow(parameters=parameters)
window.showMaximized()
sys.exit(app.exec_())

14
survBotGUI.sh Executable file
View File

@ -0,0 +1,14 @@
#!/bin/bash
ulimit -s 8192
#$ -l os=*stretch
##$ -cwd
#$ -pe smp 1
##$ -q "*@minos15"
export PYTHONPATH="$PYTHONPATH:/home/marcel/git/code_base"
source /opt/anaconda3/etc/profile.d/conda.sh
conda activate py37
python /home/marcel/git/code_base/station_surveillance_bot/survBotGUI.py