20 Commits

Author SHA1 Message Date
28f75cedcb Merge branch 'refs/heads/develop' into correlation_picker 2024-09-11 10:31:50 +02:00
e02b62696d [bugfix] no actual UTCDateTime object was used to check metadata availability for check4rotated 2024-09-10 16:59:02 +02:00
e4217f0e30 [critical] fixing a major bug in checksignallength, testing needed 2024-09-10 16:58:12 +02:00
8f154e70d7 [minor] plot coloring 2024-09-10 16:57:18 +02:00
6542b6cc4f [minor] slightly improved test output 2024-09-10 16:57:00 +02:00
5ab6c494c5 [update] increased code readability and improved figures created in autopick.py and picker.py 2024-09-10 16:16:46 +02:00
3da47c6f6b [revert] changed slope calculation in AICPicker back to older state (probably causing problems changing results in test_autopickstation.py) 2024-09-09 16:56:38 +02:00
cc7716a2b7 [minor] improved unittest result 2024-09-09 16:05:02 +02:00
03947d2363 [update] removed bad STA/LTA implementation from CF class 2024-09-09 14:42:54 +02:00
e1b0d48527 [refactor] removed unused parameter "data" from calcCF methods 2024-09-09 14:20:41 +02:00
431dbe8924 [testing] improved dictionary comparison. Failed tests have completely different picks (not only snrdb) 2024-08-30 15:07:31 +02:00
63810730e5 [bugfix] added missing parameter "taup_phases" introduced a long time ago into default parameters and parameters for unit tests 2024-08-30 14:51:30 +02:00
f2159c47f9 [testing] brought test_autopickstation up-to-date using, removing deprecated methods and using pytest.approx
Certain tests fail on snrdb calculation which has to be examined (WIP)
2024-08-30 12:41:16 +02:00
d0fbb91ffe [update] added test for AutoPyLoT, added test files for correlation picker as well 2024-08-29 16:46:30 +02:00
424d42aa1c Merge branch 'refs/heads/develop' into correlation_picker
# Conflicts:
#	pylot/core/pick/charfuns.py
#	tests/test_autopicker/pylot_alparray_mantle_corr_stack_0.03-0.5.in
#	tests/test_autopicker/test_autopylot.py
2024-08-29 16:37:15 +02:00
2cea10088d [update] added test for AutoPyLoT, added test files for correlation picker as well 2024-08-29 16:35:04 +02:00
5971508cab [bugfix] Metadata object did not find inventory for relative directory paths/unescaped backslashes 2024-08-29 16:34:37 +02:00
c765e7c66b [bugfix] fixed import for tukey in newer scipy versions which moved to signal.windows module 2024-08-29 16:33:39 +02:00
466f19eb2e [bugfix] fixed import for tukey in newer scipy versions 2024-08-28 18:01:03 +02:00
e6a4ba7ee2 [update] remove mean from picks (WIP for residual plotting) pt2 2024-08-28 10:37:31 +02:00
24 changed files with 66005 additions and 144 deletions

1
.gitignore vendored
View File

@@ -2,3 +2,4 @@
*~
.idea
pylot/RELEASE-VERSION
/tests/test_autopicker/dmt_database_test/

View File

@@ -1031,22 +1031,23 @@ class MainWindow(QMainWindow):
if not fname:
return
qmb = QMessageBox(self, icon=QMessageBox.Question,
text='Do you want to overwrite this data?',)
overwrite_button = qmb.addButton('Overwrite', QMessageBox.YesRole)
merge_button = qmb.addButton('Merge', QMessageBox.NoRole)
qmb.exec_()
if qmb.clickedButton() == overwrite_button:
merge_strategy = 'Overwrite'
elif qmb.clickedButton() == merge_button:
merge_strategy = 'Merge'
else:
return
if not event:
event = self.get_current_event()
if event.picks:
qmb = QMessageBox(self, icon=QMessageBox.Question,
text='Do you want to overwrite the data?',)
overwrite_button = qmb.addButton('Overwrite', QMessageBox.YesRole)
merge_button = qmb.addButton('Merge', QMessageBox.NoRole)
qmb.exec_()
if qmb.clickedButton() == overwrite_button:
merge_strategy = 'Overwrite'
elif qmb.clickedButton() == merge_button:
merge_strategy = 'Merge'
else:
return
data = Data(self, event)
try:
data_new = Data(self, evtdata=str(fname))

View File

@@ -184,15 +184,15 @@ def autoPyLoT(input_dict=None, parameter=None, inputfile=None, fnames=None, even
if not input_dict:
# started in production mode
datapath = datastructure.expandDataPath()
if fnames == 'None' and parameter['eventID'] == '*':
if fnames in [None, 'None'] and parameter['eventID'] == '*':
# multiple event processing
# read each event in database
events = [event for event in glob.glob(os.path.join(datapath, '*')) if
(os.path.isdir(event) and not event.endswith('EVENTS-INFO'))]
elif fnames == 'None' and parameter['eventID'] != '*' and not type(parameter['eventID']) == list:
elif fnames in [None, 'None'] and parameter['eventID'] != '*' and not type(parameter['eventID']) == list:
# single event processing
events = glob.glob(os.path.join(datapath, parameter['eventID']))
elif fnames == 'None' and type(parameter['eventID']) == list:
elif fnames in [None, 'None'] and type(parameter['eventID']) == list:
# multiple event processing
events = []
for eventID in parameter['eventID']:
@@ -234,12 +234,15 @@ def autoPyLoT(input_dict=None, parameter=None, inputfile=None, fnames=None, even
data.get_evt_data().path = eventpath
print('Reading event data from filename {}...'.format(filename))
except Exception as e:
print('Could not read event from file {}: {}'.format(filename, e))
if type(e) == FileNotFoundError:
print('Creating new event file.')
else:
print('Could not read event from file {}: {}'.format(filename, e))
data = Data()
pylot_event = Event(eventpath) # event should be path to event directory
data.setEvtData(pylot_event)
if fnames == 'None':
data.setWFData(glob.glob(os.path.join(datapath, event_datapath, '*')))
if fnames in [None, 'None']:
data.setWFData(glob.glob(os.path.join(event_datapath, '*')))
# the following is necessary because within
# multiple event processing no event ID is provided
# in autopylot.in

View File

@@ -41,6 +41,7 @@ global #extent# %extent of a
1150.0 #sstop# %end time [s] after P-onset for calculating CF for S-picking
True #use_taup# %use estimated traveltimes from TauPy for calculating windows for CF
iasp91 #taup_model# %define TauPy model for traveltime estimation. Possible values: 1066a, 1066b, ak135, ak135f, herrin, iasp91, jb, prem, pwdk, sp6
P,Pdiff #taup_phases# %Specify possible phases for TauPy (comma separated). See Obspy TauPy documentation for possible values.
0.05 0.5 #bpz1# %lower/upper corner freq. of first band pass filter Z-comp. [Hz]
0.001 0.5 #bpz2# %lower/upper corner freq. of second band pass filter Z-comp. [Hz]
0.05 0.5 #bph1# %lower/upper corner freq. of first band pass filter H-comp. [Hz]

View File

@@ -40,7 +40,8 @@ local #extent# %extent of a
-0.5 #sstart# %start time [s] relative to P-onset for calculating CF for S-picking
10.0 #sstop# %end time [s] after P-onset for calculating CF for S-picking
False #use_taup# %use estimated traveltimes from TauPy for calculating windows for CF
iasp91 #taup_model# %define TauPy model for traveltime estimation
iasp91 #taup_model# %define TauPy model for traveltime estimation
P #taup_phases# %Specify possible phases for TauPy (comma separated). See Obspy TauPy documentation for possible values.
2.0 20.0 #bpz1# %lower/upper corner freq. of first band pass filter Z-comp. [Hz]
2.0 30.0 #bpz2# %lower/upper corner freq. of second band pass filter Z-comp. [Hz]
2.0 10.0 #bph1# %lower/upper corner freq. of first band pass filter H-comp. [Hz]

View File

@@ -40,7 +40,8 @@ local #extent# %extent of a
-1.0 #sstart# %start time [s] relative to P-onset for calculating CF for S-picking
10.0 #sstop# %end time [s] after P-onset for calculating CF for S-picking
True #use_taup# %use estimated traveltimes from TauPy for calculating windows for CF
iasp91 #taup_model# %define TauPy model for traveltime estimation
iasp91 #taup_model# %define TauPy model for traveltime estimation
P #taup_phases# %Specify possible phases for TauPy (comma separated). See Obspy TauPy documentation for possible values.
2.0 10.0 #bpz1# %lower/upper corner freq. of first band pass filter Z-comp. [Hz]
2.0 12.0 #bpz2# %lower/upper corner freq. of second band pass filter Z-comp. [Hz]
2.0 8.0 #bph1# %lower/upper corner freq. of first band pass filter H-comp. [Hz]

View File

@@ -262,6 +262,10 @@ class AutopickStation(object):
self.metadata = metadata
self.origin = origin
# initialize TauPy pick estimates
self.estFirstP = None
self.estFirstS = None
# initialize picking results
self.p_results = PickingResults()
self.s_results = PickingResults()
@@ -443,15 +447,15 @@ class AutopickStation(object):
for arr in arrivals:
phases[identifyPhaseID(arr.phase.name)].append(arr)
# get first P and S onsets from arrivals list
estFirstP = 0
estFirstS = 0
arrival_time_p = 0
arrival_time_s = 0
if len(phases['P']) > 0:
arrP, estFirstP = min([(arr, arr.time) for arr in phases['P']], key=lambda t: t[1])
arrP, arrival_time_p = min([(arr, arr.time) for arr in phases['P']], key=lambda t: t[1])
if len(phases['S']) > 0:
arrS, estFirstS = min([(arr, arr.time) for arr in phases['S']], key=lambda t: t[1])
arrS, arrival_time_s = min([(arr, arr.time) for arr in phases['S']], key=lambda t: t[1])
print('autopick: estimated first arrivals for P: {} s, S:{} s after event'
' origin time using TauPy'.format(estFirstP, estFirstS))
return estFirstP, estFirstS
' origin time using TauPy'.format(arrival_time_p, arrival_time_s))
return arrival_time_p, arrival_time_s
def exit_taupy():
"""If taupy failed to calculate theoretical starttimes, picking continues.
@@ -477,10 +481,13 @@ class AutopickStation(object):
raise AttributeError('No source origins given!')
arrivals = create_arrivals(self.metadata, self.origin, self.pickparams["taup_model"])
estFirstP, estFirstS = first_PS_onsets(arrivals)
arrival_P, arrival_S = first_PS_onsets(arrivals)
self.estFirstP = (self.origin[0].time + arrival_P) - self.ztrace.stats.starttime
# modifiy pstart and pstop relative to estimated first P arrival (relative to station time axis)
self.pickparams["pstart"] += (self.origin[0].time + estFirstP) - self.ztrace.stats.starttime
self.pickparams["pstop"] += (self.origin[0].time + estFirstP) - self.ztrace.stats.starttime
self.pickparams["pstart"] += self.estFirstP
self.pickparams["pstop"] += self.estFirstP
print('autopick: CF calculation times respectively:'
' pstart: {} s, pstop: {} s'.format(self.pickparams["pstart"], self.pickparams["pstop"]))
# make sure pstart and pstop are inside the starttime/endtime of vertical trace
@@ -491,9 +498,10 @@ class AutopickStation(object):
# for the two horizontal components take earliest and latest time to make sure that the s onset is not clipped
# if start and endtime of horizontal traces differ, the s windowsize will automatically increase
trace_s_start = min([self.etrace.stats.starttime, self.ntrace.stats.starttime])
self.estFirstS = (self.origin[0].time + arrival_S) - trace_s_start
# modifiy sstart and sstop relative to estimated first S arrival (relative to station time axis)
self.pickparams["sstart"] += (self.origin[0].time + estFirstS) - trace_s_start
self.pickparams["sstop"] += (self.origin[0].time + estFirstS) - trace_s_start
self.pickparams["sstart"] += self.estFirstS
self.pickparams["sstop"] += self.estFirstS
print('autopick: CF calculation times respectively:'
' sstart: {} s, sstop: {} s'.format(self.pickparams["sstart"], self.pickparams["sstop"]))
# make sure pstart and pstop are inside the starttime/endtime of horizontal traces
@@ -609,6 +617,12 @@ class AutopickStation(object):
# plot tapered trace filtered with bpz2 filter settings
ax1.plot(tdata, self.tr_filt_z_bpz2.data / max(self.tr_filt_z_bpz2.data), color=linecolor, linewidth=0.7,
label='Data')
# plot pickwindows for P
pstart, pstop = self.pickparams['pstart'], self.pickparams['pstop']
if pstart is not None and pstop is not None:
ax1.axvspan(pstart, pstop, color='r', alpha=0.1, zorder=0, label='P window')
if self.estFirstP is not None:
ax1.axvline(self.estFirstP, ls='dashed', color='r', alpha=0.4, label='TauPy estimate')
if self.p_results.weight < 4:
# plot CF of initial onset (HOScf or ARZcf)
ax1.plot(self.cf1.getTimeArray(), self.cf1.getCF() / max(self.cf1.getCF()), 'b', label='CF1')
@@ -713,6 +727,15 @@ class AutopickStation(object):
ax3.plot([refSpick.getpick() - 0.5, refSpick.getpick() + 0.5], [-1.3, -1.3], 'g', linewidth=2)
ax3.plot([self.s_results.lpp, self.s_results.lpp], [-1.1, 1.1], 'g--', label='lpp')
ax3.plot([self.s_results.epp, self.s_results.epp], [-1.1, 1.1], 'g--', label='epp')
# plot pickwindows for S
sstart, sstop = self.pickparams['sstart'], self.pickparams['sstop']
if sstart is not None and sstop is not None:
for axis in [ax2, ax3]:
axis.axvspan(sstart, sstop, color='b', alpha=0.1, zorder=0, label='S window')
if self.estFirstS is not None:
axis.axvline(self.estFirstS, ls='dashed', color='b', alpha=0.4, label='TauPy estimate')
ax3.legend(loc=1)
ax3.set_yticks([])
ax3.set_ylim([-1.5, 1.5])
@@ -835,14 +858,21 @@ class AutopickStation(object):
self.cf1 = None
assert isinstance(self.cf1, CharacteristicFunction), 'cf1 is not set correctly: maybe the algorithm name ({})' \
' is corrupted'.format(self.pickparams["algoP"])
# get the original waveform stream from first CF class cut to identical length as CF for plotting
cut_ogstream = self.cf1.getDataArray(self.cf1.getCut())
# MP: Rename to cf_stream for further use of z_copy and to prevent chaos when z_copy suddenly becomes a cf
# stream and later again a waveform stream
cf_stream = z_copy.copy()
cf_stream[0].data = self.cf1.getCF()
# calculate AIC cf from first cf (either HOS or ARZ)
z_copy[0].data = self.cf1.getCF()
aiccf = AICcf(z_copy, cuttimes)
aiccf = AICcf(cf_stream, cuttimes)
# get preliminary onset time from AIC-CF
self.set_current_figure('aicFig')
aicpick = AICPicker(aiccf, self.pickparams["tsnrz"], self.pickparams["pickwinP"], self.iplot,
Tsmooth=self.pickparams["aictsmooth"], fig=self.current_figure,
linecolor=self.current_linecolor)
linecolor=self.current_linecolor, ogstream=cut_ogstream)
# save aicpick for plotting later
self.p_data.aicpick = aicpick
# add pstart and pstop to aic plot
@@ -855,7 +885,7 @@ class AutopickStation(object):
label='P stop')
ax.legend(loc=1)
Pflag = self._pick_p_quality_control(aicpick, z_copy, tr_filt)
Pflag = self._pick_p_quality_control(aicpick, cf_stream, tr_filt)
# go on with processing if AIC onset passes quality control
slope = aicpick.getSlope()
if not slope: slope = 0
@@ -894,7 +924,7 @@ class AutopickStation(object):
refPpick = PragPicker(self.cf2, self.pickparams["tsnrz"], self.pickparams["pickwinP"], self.iplot,
self.pickparams["ausP"],
self.pickparams["tsmoothP"], aicpick.getpick(), self.current_figure,
self.current_linecolor)
self.current_linecolor, ogstream=cut_ogstream)
# save PragPicker result for plotting
self.p_data.refPpick = refPpick
self.p_results.mpp = refPpick.getpick()
@@ -1146,11 +1176,14 @@ class AutopickStation(object):
# calculate AIC cf
haiccf = self._calculate_aic_cf_s_pick(cuttimesh)
# get the original waveform stream cut to identical length as CF for plotting
ogstream = haiccf.getDataArray(haiccf.getCut())
# get preliminary onset time from AIC cf
self.set_current_figure('aicARHfig')
aicarhpick = AICPicker(haiccf, self.pickparams["tsnrh"], self.pickparams["pickwinS"], self.iplot,
Tsmooth=self.pickparams["aictsmoothS"], fig=self.current_figure,
linecolor=self.current_linecolor)
linecolor=self.current_linecolor, ogstream=ogstream)
# save pick for later plotting
self.aicarhpick = aicarhpick

View File

@@ -17,7 +17,11 @@ autoregressive prediction: application ot local and regional distances, Geophys.
:author: MAGS2 EP3 working group
"""
import numpy as np
from scipy import signal
try:
from scipy.signal import tukey
except ImportError:
from scipy.signal.windows import tukey
from obspy.core import Stream
from pylot.core.pick.utils import PickingFailedException
@@ -56,7 +60,7 @@ class CharacteristicFunction(object):
self.setOrder(order)
self.setFnoise(fnoise)
self.setARdetStep(t2)
self.calcCF(self.getDataArray())
self.calcCF()
self.arpara = np.array([])
self.xpred = np.array([])
@@ -208,17 +212,15 @@ class CharacteristicFunction(object):
data = self.orig_data.copy()
return data
def calcCF(self, data=None):
self.cf = data
def calcCF(self):
pass
class AICcf(CharacteristicFunction):
def calcCF(self, data):
def calcCF(self):
"""
Function to calculate the Akaike Information Criterion (AIC) after Maeda (1985).
:param data: data, time series (whether seismogram or CF)
:type data: tuple
:return: AIC function
:rtype:
"""
@@ -227,7 +229,7 @@ class AICcf(CharacteristicFunction):
ind = np.where(~np.isnan(xnp))[0]
if ind.size:
xnp[:ind[0]] = xnp[ind[0]]
xnp = signal.tukey(len(xnp), alpha=0.05) * xnp
xnp = tukey(len(xnp), alpha=0.05) * xnp
xnp = xnp - np.mean(xnp)
datlen = len(xnp)
k = np.arange(1, datlen)
@@ -256,13 +258,11 @@ class HOScf(CharacteristicFunction):
"""
super(HOScf, self).__init__(data, cut, pickparams["tlta"], pickparams["hosorder"])
def calcCF(self, data):
def calcCF(self):
"""
Function to calculate skewness (statistics of order 3) or kurtosis
(statistics of order 4), using one long moving window, as published
in Kueperkoch et al. (2010), or order 2, i.e. STA/LTA.
:param data: data, time series (whether seismogram or CF)
:type data: tuple
:return: HOS cf
:rtype:
"""
@@ -277,47 +277,28 @@ class HOScf(CharacteristicFunction):
elif self.getOrder() == 4: # this is kurtosis
y = np.power(xnp, 4)
y1 = np.power(xnp, 2)
elif self.getOrder() == 2: # this is variance, used for STA/LTA processing
y = np.power(xnp, 2)
y1 = np.power(xnp, 2)
# Initialisation
# t2: long term moving window
ilta = int(round(self.getTime2() / self.getIncrement()))
ista = int(round((self.getTime2() / 10) / self.getIncrement())) # TODO: still hard coded!!
lta = y[0]
lta1 = y1[0]
sta = y[0]
# moving windows
LTA = np.zeros(len(xnp))
STA = np.zeros(len(xnp))
for j in range(0, len(xnp)):
if j < 4:
LTA[j] = 0
STA[j] = 0
elif j <= ista and self.getOrder() == 2:
lta = (y[j] + lta * (j - 1)) / j
if self.getOrder() == 2:
sta = (y[j] + sta * (j - 1)) / j
# elif j < 4:
elif j <= ilta:
lta = (y[j] + lta * (j - 1)) / j
lta1 = (y1[j] + lta1 * (j - 1)) / j
if self.getOrder() == 2:
sta = (y[j] - y[j - ista]) / ista + sta
else:
lta = (y[j] - y[j - ilta]) / ilta + lta
lta1 = (y1[j] - y1[j - ilta]) / ilta + lta1
if self.getOrder() == 2:
sta = (y[j] - y[j - ista]) / ista + sta
# define LTA
if self.getOrder() == 3:
LTA[j] = lta / np.power(lta1, 1.5)
elif self.getOrder() == 4:
LTA[j] = lta / np.power(lta1, 2)
else:
LTA[j] = lta
STA[j] = sta
# remove NaN's with first not-NaN-value,
# so autopicker doesnt pick discontinuity at start of the trace
@@ -326,10 +307,7 @@ class HOScf(CharacteristicFunction):
first = ind[0]
LTA[:first] = LTA[first]
if self.getOrder() > 2:
self.cf = LTA
else: # order 2 means STA/LTA!
self.cf = STA / LTA
self.cf = LTA
self.xcf = x
@@ -339,12 +317,10 @@ class ARZcf(CharacteristicFunction):
super(ARZcf, self).__init__(data, cut, t1=t1, t2=t2, order=pickparams["Parorder"],
fnoise=pickparams["addnoise"])
def calcCF(self, data):
def calcCF(self):
"""
function used to calculate the AR prediction error from a single vertical trace. Can be used to pick
P onsets.
:param data:
:type data: ~obspy.core.stream.Stream
:return: ARZ cf
:rtype:
"""
@@ -475,14 +451,12 @@ class ARHcf(CharacteristicFunction):
super(ARHcf, self).__init__(data, cut, t1=t1, t2=t2, order=pickparams["Sarorder"],
fnoise=pickparams["addnoise"])
def calcCF(self, data):
def calcCF(self):
"""
Function to calculate a characteristic function using autoregressive modelling of the waveform of
both horizontal traces.
The waveform is predicted in a moving time window using the calculated AR parameters. The difference
between the predicted and the actual waveform servers as a characteristic function.
:param data: wavefor stream
:type data: ~obspy.core.stream.Stream
:return: ARH cf
:rtype:
"""
@@ -631,14 +605,12 @@ class AR3Ccf(CharacteristicFunction):
super(AR3Ccf, self).__init__(data, cut, t1=t1, t2=t2, order=pickparams["Sarorder"],
fnoise=pickparams["addnoise"])
def calcCF(self, data):
def calcCF(self):
"""
Function to calculate a characteristic function using autoregressive modelling of the waveform of
all three traces.
The waveform is predicted in a moving time window using the calculated AR parameters. The difference
between the predicted and the actual waveform servers as a characteristic function
:param data: stream holding all three traces
:type data: ~obspy.core.stream.Stream
:return: AR3C cf
:rtype:
"""

View File

@@ -37,7 +37,8 @@ class AutoPicker(object):
warnings.simplefilter('ignore')
def __init__(self, cf, TSNR, PickWindow, iplot=0, aus=None, Tsmooth=None, Pick1=None, fig=None, linecolor='k'):
def __init__(self, cf, TSNR, PickWindow, iplot=0, aus=None, Tsmooth=None, Pick1=None,
fig=None, linecolor='k', ogstream=None):
"""
Create AutoPicker object
:param cf: characteristic function, on which the picking algorithm is applied
@@ -59,12 +60,15 @@ class AutoPicker(object):
:type fig: `~matplotlib.figure.Figure`
:param linecolor: matplotlib line color string
:type linecolor: str
:param ogstream: original stream (waveform), e.g. for plotting purposes
:type ogstream: `~obspy.core.stream.Stream`
"""
assert isinstance(cf, CharacteristicFunction), "%s is not a CharacteristicFunction object" % str(cf)
self._linecolor = linecolor
self._pickcolor_p = 'b'
self.cf = cf.getCF()
self.ogstream = ogstream
self.Tcf = cf.getTimeArray()
self.Data = cf.getXCF()
self.dt = cf.getIncrement()
@@ -173,7 +177,7 @@ class AICPicker(AutoPicker):
nn = np.isnan(self.cf)
if len(nn) > 1:
self.cf[nn] = 0
# taper AIC-CF to get rid off side maxima
# taper AIC-CF to get rid of side maxima
tap = np.hanning(len(self.cf))
aic = tap * self.cf + max(abs(self.cf))
# smooth AIC-CF
@@ -316,16 +320,7 @@ class AICPicker(AutoPicker):
plt.close(fig)
return
iislope = islope[0][0:imax + 1]
# MP MP change slope calculation
# get all maxima of aicsmooth
iaicmaxima = argrelmax(aicsmooth)[0]
# get first index of maximum after pickindex (indices saved in iaicmaxima)
aicmax = iaicmaxima[np.where(iaicmaxima > pickindex)[0]]
if len(aicmax) > 0:
iaicmax = aicmax[0]
else:
iaicmax = -1
dataslope = aicsmooth[pickindex: iaicmax]
dataslope = self.Data[0].data[iislope]
# calculate slope as polynomal fit of order 1
xslope = np.arange(0, len(dataslope), 1)
try:
@@ -336,7 +331,7 @@ class AICPicker(AutoPicker):
else:
self.slope = 1 / (len(dataslope) * self.Data[0].stats.delta) * (datafit[-1] - datafit[0])
# normalize slope to maximum of cf to make it unit independent
self.slope /= aicsmooth[iaicmax]
self.slope /= self.Data[0].data[icfmax]
except Exception as e:
print("AICPicker: Problems with data fitting! {}".format(e))
@@ -356,6 +351,12 @@ class AICPicker(AutoPicker):
self.Tcf = self.Tcf[0:len(self.Tcf) - 1]
ax1.plot(self.Tcf, cf / max(cf), color=self._linecolor, linewidth=0.7, label='(HOS-/AR-) Data')
ax1.plot(self.Tcf, aicsmooth / max(aicsmooth), 'r', label='Smoothed AIC-CF')
# plot the original waveform also for evaluation of the CF and pick
if self.ogstream:
data = self.ogstream[0].data
if len(data) == len(self.Tcf):
ax1.plot(self.Tcf, 0.5 * data / max(data), 'k', label='Seismogram', alpha=0.3, zorder=0,
lw=0.5)
if self.Pick is not None:
ax1.plot([self.Pick, self.Pick], [-0.1, 0.5], 'b', linewidth=2, label='AIC-Pick')
ax1.set_xlabel('Time [s] since %s' % self.Data[0].stats.starttime)
@@ -376,7 +377,7 @@ class AICPicker(AutoPicker):
label='Signal Window')
ax2.axvspan(self.Tcf[iislope[0]], self.Tcf[iislope[-1]], color='g', alpha=0.2, lw=0,
label='Slope Window')
ax2.plot(self.Tcf[pickindex: iaicmax], datafit, 'g', linewidth=2,
ax2.plot(self.Tcf[iislope], datafit, 'g', linewidth=2,
label='Slope') # MP MP changed temporarily!
if self.slope is not None:

View File

@@ -15,7 +15,7 @@ import numpy as np
from obspy.core import Stream, UTCDateTime
from scipy.signal import argrelmax
from pylot.core.util.utils import get_bool, get_none, SetChannelComponents
from pylot.core.util.utils import get_bool, get_none, SetChannelComponents, common_range
def earllatepicker(X, nfac, TSNR, Pick1, iplot=0, verbosity=1, fig=None, linecolor='k'):
@@ -828,14 +828,22 @@ def checksignallength(X, pick, minsiglength, pickparams, iplot=0, fig=None, line
if len(X) > 1:
# all three components available
# make sure, all components have equal lengths
ilen = min([len(X[0].data), len(X[1].data), len(X[2].data)])
x1 = X[0][0:ilen]
x2 = X[1][0:ilen]
x3 = X[2][0:ilen]
earliest_starttime = min(tr.stats.starttime for tr in X)
cuttimes = common_range(X)
X = X.slice(cuttimes[0], cuttimes[1])
x1, x2, x3 = X[:3]
if not (len(x1) == len(x2) == len(x3)):
raise PickingFailedException('checksignallength: unequal lengths of components!')
# get RMS trace
rms = np.sqrt((np.power(x1, 2) + np.power(x2, 2) + np.power(x3, 2)) / 3)
ilen = len(rms)
dt = earliest_starttime - X[0].stats.starttime
pick -= dt
else:
x1 = X[0].data
x2 = x3 = None
ilen = len(x1)
rms = abs(x1)
@@ -874,6 +882,10 @@ def checksignallength(X, pick, minsiglength, pickparams, iplot=0, fig=None, line
fig._tight = True
ax = fig.add_subplot(111)
ax.plot(t, rms, color=linecolor, linewidth=0.7, label='RMS Data')
ax.plot(t, x1, 'k', alpha=0.3, lw=0.3, zorder=0)
if x2 is not None and x3 is not None:
ax.plot(t, x2, 'r', alpha=0.3, lw=0.3, zorder=0)
ax.plot(t, x3, 'g', alpha=0.3, lw=0.3, zorder=0)
ax.axvspan(t[inoise[0]], t[inoise[-1]], color='y', alpha=0.2, lw=0, label='Noise Window')
ax.axvspan(t[isignal[0]], t[isignal[-1]], color='b', alpha=0.2, lw=0, label='Signal Window')
ax.plot([t[isignal[0]], t[isignal[len(isignal) - 1]]],
@@ -883,6 +895,7 @@ def checksignallength(X, pick, minsiglength, pickparams, iplot=0, fig=None, line
ax.set_xlabel('Time [s] since %s' % X[0].stats.starttime)
ax.set_ylabel('Counts')
ax.set_title('Check for Signal Length, Station %s' % X[0].stats.station)
ax.set_xlim(pickparams["pstart"], pickparams["pstop"])
ax.set_yticks([])
if plt_flag == 1:
fig.show()

View File

@@ -14,6 +14,8 @@ import obspy
from PySide2 import QtWidgets, QtGui
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
from obspy import UTCDateTime
from pylot.core.util.utils import identifyPhaseID
from scipy.interpolate import griddata
@@ -60,6 +62,7 @@ class Array_map(QtWidgets.QWidget):
self.parameter = parameter if parameter else parent._inputs
self.picks_rel = {}
self.picks_rel_mean_corrected = {}
self.marked_stations = []
self.highlighted_stations = []
@@ -106,6 +109,7 @@ class Array_map(QtWidgets.QWidget):
self.map_reset_button = QtWidgets.QPushButton('Reset Map View')
self.save_map_button = QtWidgets.QPushButton('Save Map')
self.go2eq_button = QtWidgets.QPushButton('Go to Event Location')
self.subtract_mean_cb = QtWidgets.QCheckBox('Subtract mean')
self.main_box = QtWidgets.QVBoxLayout()
self.setLayout(self.main_box)
@@ -152,6 +156,7 @@ class Array_map(QtWidgets.QWidget):
self.bot_row.addWidget(self.map_reset_button, 2)
self.bot_row.addWidget(self.go2eq_button, 2)
self.bot_row.addWidget(self.save_map_button, 2)
self.bot_row.addWidget(self.subtract_mean_cb, 0)
self.bot_row.addWidget(self.status_label, 5)
def init_colormap(self):
@@ -212,6 +217,7 @@ class Array_map(QtWidgets.QWidget):
self.map_reset_button.clicked.connect(self.org_map_view)
self.go2eq_button.clicked.connect(self.go2eq)
self.save_map_button.clicked.connect(self.saveFigure)
self.subtract_mean_cb.stateChanged.connect(self.toggle_subtract_mean)
self.plotWidget.mpl_connect('motion_notify_event', self.mouse_moved)
self.plotWidget.mpl_connect('scroll_event', self.mouse_scroll)
@@ -368,12 +374,6 @@ class Array_map(QtWidgets.QWidget):
def get_max_from_stations(self, key):
return self._from_dict(max, key)
def get_min_from_picks(self):
return min(self.picks_rel.values())
def get_max_from_picks(self):
return max(self.picks_rel.values())
def current_picks_dict(self):
picktype = self.comboBox_am.currentText().split(' ')[0]
auto_manu = {'auto': self.autopicks_dict,
@@ -418,17 +418,17 @@ class Array_map(QtWidgets.QWidget):
print('Cannot display pick for station {}. Reason: {}'.format(station_name, e))
return picks, uncertainties
def get_picks_rel(picks):
def get_picks_rel(picks, func=min):
picks_rel = {}
picks_utc = []
for pick in picks.values():
if type(pick) is obspy.core.utcdatetime.UTCDateTime:
picks_utc.append(pick)
if type(pick) is UTCDateTime:
picks_utc.append(pick.timestamp)
if picks_utc:
self._earliest_picktime = min(picks_utc)
self._reference_picktime = UTCDateTime(func(picks_utc))
for st_id, pick in picks.items():
if type(pick) is obspy.core.utcdatetime.UTCDateTime:
pick -= self._earliest_picktime
if type(pick) is UTCDateTime:
pick -= self._reference_picktime
picks_rel[st_id] = pick
return picks_rel
@@ -437,6 +437,15 @@ class Array_map(QtWidgets.QWidget):
self.picks, self.uncertainties = get_picks(self.stations_dict)
self.picks_rel = get_picks_rel(self.picks)
self.picks_rel_mean_corrected = get_picks_rel_mean_corr(self.picks)
def toggle_subtract_mean(self):
if self.subtract_mean_cb.isChecked():
cmap = 'seismic'
else:
cmap = 'viridis'
self.cmaps_box.setCurrentIndex(self.cmaps_box.findText(cmap))
self._refresh_drawings()
def init_lat_lon_dimensions(self):
# init minimum and maximum lon and lat dimensions
@@ -467,11 +476,12 @@ class Array_map(QtWidgets.QWidget):
return stations, latitudes, longitudes
def get_picks_lat_lon(self):
picks_rel = self.picks_rel_mean_corrected if self.subtract_mean_cb.isChecked() else self.picks_rel
picks = []
uncertainties = []
latitudes = []
longitudes = []
for st_id, pick in self.picks_rel.items():
for st_id, pick in picks_rel.items():
picks.append(pick)
uncertainties.append(self.uncertainties.get(st_id))
latitudes.append(self.stations_dict[st_id]['latitude'])
@@ -538,13 +548,20 @@ class Array_map(QtWidgets.QWidget):
print(message, e)
print(traceback.format_exc())
def draw_contour_filled(self, nlevel=50):
levels = np.linspace(self.get_min_from_picks(), self.get_max_from_picks(), nlevel)
def draw_contour_filled(self, nlevel=51):
if self.subtract_mean_cb.isChecked():
abs_max = self.get_residuals_absmax()
levels = np.linspace(-abs_max, abs_max, nlevel)
else:
levels = np.linspace(min(self.picks_rel.values()), max(self.picks_rel.values()), nlevel)
self.contourf = self.ax.contourf(self.longrid, self.latgrid, self.picksgrid_active, levels,
linewidths=self.linewidth * 5, transform=ccrs.PlateCarree(),
alpha=0.4, zorder=8, cmap=self.get_colormap())
def get_residuals_absmax(self):
return np.max(np.absolute(list(self.picks_rel_mean_corrected.values())))
def get_colormap(self):
return plt.get_cmap(self.cmaps_box.currentText())
@@ -575,7 +592,12 @@ class Array_map(QtWidgets.QWidget):
for uncertainty in uncertainties])
cmap = self.get_colormap()
self.sc_picked = self.ax.scatter(lons, lats, s=sizes, edgecolors='white', cmap=cmap,
vmin = vmax = None
if self.subtract_mean_cb.isChecked():
vmin, vmax = -self.get_residuals_absmax(), self.get_residuals_absmax()
self.sc_picked = self.ax.scatter(lons, lats, s=sizes, edgecolors='white', cmap=cmap, vmin=vmin, vmax=vmax,
c=picks, zorder=11, label='Picked', transform=ccrs.PlateCarree())
def annotate_ax(self):
@@ -652,7 +674,9 @@ class Array_map(QtWidgets.QWidget):
if picks_available:
self.scatter_picked_stations()
if hasattr(self, 'sc_picked'):
self.cbar = self.add_cbar(label='Time relative to first onset ({}) [s]'.format(self._earliest_picktime))
self.cbar = self.add_cbar(
label='Time relative to reference onset ({}) [s]'.format(self._reference_picktime)
)
self.comboBox_phase.setEnabled(True)
else:
self.comboBox_phase.setEnabled(False)

View File

@@ -27,6 +27,10 @@ class Metadata(object):
# saves which metadata files are from obspy dmt
self.obspy_dmt_invs = []
if inventory:
# make sure that no accidental backslashes mess up the path
if isinstance(inventory, str):
inventory = inventory.replace('\\', '/')
inventory = os.path.abspath(inventory)
if os.path.isdir(inventory):
self.add_inventory(inventory)
if os.path.isfile(inventory):

View File

@@ -1076,7 +1076,7 @@ def check4rotated(data, metadata=None, verbosity=1):
return wfs_in
# check metadata quality
t_start = full_range(wfs_in)
t_start = full_range(wfs_in)[0]
try:
azimuths = []
dips = []

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -4,10 +4,8 @@
%Parameters are optimized for %extent data sets!
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
#main settings#
#rootpath# %project path
#datapath# %data path
#database# %name of data base
20171010_063224.a #eventID# %event ID for single event processing (* for all events found in database)
dmt_database_test #datapath# %data path
20171010_063224.a #eventID# %event ID for single event processing (* for all events found in database)
#invdir# %full path to inventory or dataless-seed file
PILOT #datastructure# %choose data structure
True #apverbose# %choose 'True' or 'False' for terminal output

View File

@@ -1,6 +1,8 @@
import os
import pytest
from obspy import read_events
from autoPyLoT import autoPyLoT
@@ -8,7 +10,11 @@ class TestAutopickerGlobal():
def init(self):
self.params_infile = 'pylot_alparray_mantle_corr_stack_0.03-0.5.in'
self.test_event_dir = 'dmt_database_test'
self.fname_outfile_xml = os.path.join(
self.test_event_dir, '20171010_063224.a', 'PyLoT_20171010_063224.a_autopylot.xml'
)
# check if the input files exist
if not os.path.isfile(self.params_infile):
print(f'Test input file {os.path.abspath(self.params_infile)} not found.')
return False
@@ -24,4 +30,38 @@ class TestAutopickerGlobal():
def test_autopicker(self):
assert self.init(), 'Initialization failed due to missing input files.'
#autoPyLoT(inputfile=self.params_infile, eventid='20171010_063224.a')
# check for output file in test directory and remove it if necessary
if os.path.isfile(self.fname_outfile_xml):
os.remove(self.fname_outfile_xml)
autoPyLoT(inputfile=self.params_infile, eventid='20171010_063224.a', obspyDMT_wfpath='processed')
# test for different known output files if they are identical or not
compare_pickfiles(self.fname_outfile_xml, 'PyLoT_20171010_063224.a_autopylot.xml', True)
compare_pickfiles(self.fname_outfile_xml, 'PyLoT_20171010_063224.a_saved_from_GUI.xml', True)
compare_pickfiles(self.fname_outfile_xml, 'PyLoT_20171010_063224.a_corrected_taup_times_0.03-0.5_P.xml', False)
def compare_pickfiles(pickfile1: str, pickfile2: str, samefile: bool = True) -> None:
"""
Compare the pick times and errors from two pick files.
Parameters:
pickfile1 (str): The path to the first pick file.
pickfile2 (str): The path to the second pick file.
samefile (bool): A flag indicating whether the two files are expected to be the same. Defaults to True.
Returns:
None
"""
cat1 = read_events(pickfile1)
cat2 = read_events(pickfile2)
picks1 = sorted(cat1[0].picks, key=lambda pick: str(pick.waveform_id))
picks2 = sorted(cat2[0].picks, key=lambda pick: str(pick.waveform_id))
pick_times1 = [pick.time for pick in picks1]
pick_times2 = [pick.time for pick in picks2]
pick_terrs1 = [pick.time_errors for pick in picks1]
pick_terrs2 = [pick.time_errors for pick in picks2]
# check if times and errors are identical or not depending on the samefile flag
assert (pick_times1 == pick_times2) is samefile, 'Pick times error'
assert (pick_terrs1 == pick_terrs2) is samefile, 'Pick time errors errors'

View File

@@ -41,6 +41,7 @@ global #extent# %extent of a
875.0 #sstop# %end time [s] after P-onset for calculating CF for S-picking
False #use_taup# %use estimated traveltimes from TauPy for calculating windows for CF
IASP91 #taup_model# %define TauPy model for traveltime estimation. Possible values: 1066a, 1066b, ak135, ak135f, herrin, iasp91, jb, prem, pwdk, sp6
P,Pdiff,S,Sdiff #taup_phases# %Specify possible phases for TauPy (comma separated). See Obspy TauPy documentation for possible values.
0.01 0.1 #bpz1# %lower/upper corner freq. of first band pass filter Z-comp. [Hz]
0.001 0.5 #bpz2# %lower/upper corner freq. of second band pass filter Z-comp. [Hz]
0.01 0.5 #bph1# %lower/upper corner freq. of first band pass filter H-comp. [Hz]

View File

@@ -41,6 +41,7 @@ global #extent# %extent of a
875.0 #sstop# %end time [s] after P-onset for calculating CF for S-picking
True #use_taup# %use estimated traveltimes from TauPy for calculating windows for CF
IASP91 #taup_model# %define TauPy model for traveltime estimation. Possible values: 1066a, 1066b, ak135, ak135f, herrin, iasp91, jb, prem, pwdk, sp6
P,Pdiff,S,Sdiff #taup_phases# %Specify possible phases for TauPy (comma separated). See Obspy TauPy documentation for possible values.
0.01 0.1 #bpz1# %lower/upper corner freq. of first band pass filter Z-comp. [Hz]
0.001 0.5 #bpz2# %lower/upper corner freq. of second band pass filter Z-comp. [Hz]
0.01 0.5 #bph1# %lower/upper corner freq. of first band pass filter H-comp. [Hz]

View File

@@ -1,6 +1,7 @@
import os
import sys
import unittest
import pytest
import obspy
from obspy import UTCDateTime
@@ -105,7 +106,6 @@ class TestAutopickStation(unittest.TestCase):
# show complete diff when difference in results dictionaries are found
self.maxDiff = None
# @skip("Works")
def test_autopickstation_taupy_disabled_gra1(self):
expected = {
'P': {'picker': 'auto', 'snrdb': 15.405649120980094, 'weight': 0, 'Mo': None, 'marked': [], 'Mw': None,
@@ -121,8 +121,8 @@ class TestAutopickStation(unittest.TestCase):
with HidePrints():
result, station = autopickstation(wfstream=self.gra1, pickparam=self.pickparam_taupy_disabled,
metadata=(None, None))
self.assertDictContainsSubset(expected=expected['P'], actual=result['P'])
self.assertDictContainsSubset(expected=expected['S'], actual=result['S'])
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
self.assertEqual('GRA1', station)
def test_autopickstation_taupy_enabled_gra1(self):
@@ -140,8 +140,8 @@ class TestAutopickStation(unittest.TestCase):
with HidePrints():
result, station = autopickstation(wfstream=self.gra1, pickparam=self.pickparam_taupy_enabled,
metadata=self.metadata, origin=self.origin)
self.assertDictContainsSubset(expected=expected['P'], actual=result['P'])
self.assertDictContainsSubset(expected=expected['S'], actual=result['S'])
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
self.assertEqual('GRA1', station)
def test_autopickstation_taupy_disabled_gra2(self):
@@ -157,8 +157,8 @@ class TestAutopickStation(unittest.TestCase):
with HidePrints():
result, station = autopickstation(wfstream=self.gra2, pickparam=self.pickparam_taupy_disabled,
metadata=(None, None))
self.assertDictContainsSubset(expected=expected['P'], actual=result['P'])
self.assertDictContainsSubset(expected=expected['S'], actual=result['S'])
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
self.assertEqual('GRA2', station)
def test_autopickstation_taupy_enabled_gra2(self):
@@ -175,8 +175,8 @@ class TestAutopickStation(unittest.TestCase):
with HidePrints():
result, station = autopickstation(wfstream=self.gra2, pickparam=self.pickparam_taupy_enabled,
metadata=self.metadata, origin=self.origin)
self.assertDictContainsSubset(expected=expected['P'], actual=result['P'])
self.assertDictContainsSubset(expected=expected['S'], actual=result['S'])
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
self.assertEqual('GRA2', station)
def test_autopickstation_taupy_disabled_ech(self):
@@ -190,8 +190,8 @@ class TestAutopickStation(unittest.TestCase):
'fm': None, 'spe': None, 'channel': u'LHE'}}
with HidePrints():
result, station = autopickstation(wfstream=self.ech, pickparam=self.pickparam_taupy_disabled)
self.assertDictContainsSubset(expected=expected['P'], actual=result['P'])
self.assertDictContainsSubset(expected=expected['S'], actual=result['S'])
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
self.assertEqual('ECH', station)
def test_autopickstation_taupy_enabled_ech(self):
@@ -208,8 +208,8 @@ class TestAutopickStation(unittest.TestCase):
with HidePrints():
result, station = autopickstation(wfstream=self.ech, pickparam=self.pickparam_taupy_enabled,
metadata=self.metadata, origin=self.origin)
self.assertDictContainsSubset(expected=expected['P'], actual=result['P'])
self.assertDictContainsSubset(expected=expected['S'], actual=result['S'])
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
self.assertEqual('ECH', station)
def test_autopickstation_taupy_disabled_fiesa(self):
@@ -224,8 +224,8 @@ class TestAutopickStation(unittest.TestCase):
'fm': None, 'spe': None, 'channel': u'LHE'}}
with HidePrints():
result, station = autopickstation(wfstream=self.fiesa, pickparam=self.pickparam_taupy_disabled)
self.assertDictContainsSubset(expected=expected['P'], actual=result['P'])
self.assertDictContainsSubset(expected=expected['S'], actual=result['S'])
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
self.assertEqual('FIESA', station)
def test_autopickstation_taupy_enabled_fiesa(self):
@@ -242,8 +242,8 @@ class TestAutopickStation(unittest.TestCase):
with HidePrints():
result, station = autopickstation(wfstream=self.fiesa, pickparam=self.pickparam_taupy_enabled,
metadata=self.metadata, origin=self.origin)
self.assertDictContainsSubset(expected=expected['P'], actual=result['P'])
self.assertDictContainsSubset(expected=expected['S'], actual=result['S'])
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
self.assertEqual('FIESA', station)
def test_autopickstation_gra1_z_comp_missing(self):
@@ -272,7 +272,8 @@ class TestAutopickStation(unittest.TestCase):
with HidePrints():
result, station = autopickstation(wfstream=wfstream, pickparam=self.pickparam_taupy_disabled,
metadata=(None, None))
self.assertEqual(expected, result)
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
self.assertEqual('GRA1', station)
def test_autopickstation_a106_taupy_enabled(self):
@@ -290,7 +291,9 @@ class TestAutopickStation(unittest.TestCase):
with HidePrints():
result, station = autopickstation(wfstream=self.a106, pickparam=self.pickparam_taupy_enabled,
metadata=self.metadata, origin=self.origin)
self.assertEqual(expected, result)
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
def test_autopickstation_station_missing_in_metadata(self):
"""This station is not in the metadata, but Taupy is enabled. Taupy should exit cleanly and modify the starttime
@@ -311,8 +314,37 @@ class TestAutopickStation(unittest.TestCase):
with HidePrints():
result, station = autopickstation(wfstream=self.a005a, pickparam=self.pickparam_taupy_enabled,
metadata=self.metadata, origin=self.origin)
self.assertEqual(expected, result)
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
def run_dict_comparison(result, expected):
for key, expected_value in expected.items():
if isinstance(expected_value, dict):
run_dict_comparison(result[key], expected[key])
else:
res = result[key]
if isinstance(res, UTCDateTime) and isinstance(expected_value, UTCDateTime):
res = res.timestamp
expected_value = expected_value.timestamp
assert expected_value == pytest.approx(res), f'{key}: {expected_value} != {res}'
def compare_dicts(result, expected, hint=''):
try:
run_dict_comparison(result, expected)
except AssertionError:
raise AssertionError(f'{hint}Dictionaries not equal.'
f'\n\n<<Expected>>\n{pretty_print_dict(expected)}'
f'\n\n<<Result>>\n{pretty_print_dict(result)}')
def pretty_print_dict(dct):
retstr = ''
for key, value in sorted(dct.items(), key=lambda x: x[0]):
retstr += f"{key} : {value}\n"
return retstr
if __name__ == '__main__':
unittest.main()