Merge branch 'refs/heads/develop' into correlation_picker

This commit is contained in:
Marcel Paffrath 2024-09-11 10:31:50 +02:00
commit 28f75cedcb
12 changed files with 153 additions and 100 deletions

1
.gitignore vendored
View File

@ -2,3 +2,4 @@
*~
.idea
pylot/RELEASE-VERSION
/tests/test_autopicker/dmt_database_test/

View File

@ -41,6 +41,7 @@ global #extent# %extent of a
1150.0 #sstop# %end time [s] after P-onset for calculating CF for S-picking
True #use_taup# %use estimated traveltimes from TauPy for calculating windows for CF
iasp91 #taup_model# %define TauPy model for traveltime estimation. Possible values: 1066a, 1066b, ak135, ak135f, herrin, iasp91, jb, prem, pwdk, sp6
P,Pdiff #taup_phases# %Specify possible phases for TauPy (comma separated). See Obspy TauPy documentation for possible values.
0.05 0.5 #bpz1# %lower/upper corner freq. of first band pass filter Z-comp. [Hz]
0.001 0.5 #bpz2# %lower/upper corner freq. of second band pass filter Z-comp. [Hz]
0.05 0.5 #bph1# %lower/upper corner freq. of first band pass filter H-comp. [Hz]

View File

@ -40,7 +40,8 @@ local #extent# %extent of a
-0.5 #sstart# %start time [s] relative to P-onset for calculating CF for S-picking
10.0 #sstop# %end time [s] after P-onset for calculating CF for S-picking
False #use_taup# %use estimated traveltimes from TauPy for calculating windows for CF
iasp91 #taup_model# %define TauPy model for traveltime estimation
iasp91 #taup_model# %define TauPy model for traveltime estimation
P #taup_phases# %Specify possible phases for TauPy (comma separated). See Obspy TauPy documentation for possible values.
2.0 20.0 #bpz1# %lower/upper corner freq. of first band pass filter Z-comp. [Hz]
2.0 30.0 #bpz2# %lower/upper corner freq. of second band pass filter Z-comp. [Hz]
2.0 10.0 #bph1# %lower/upper corner freq. of first band pass filter H-comp. [Hz]

View File

@ -40,7 +40,8 @@ local #extent# %extent of a
-1.0 #sstart# %start time [s] relative to P-onset for calculating CF for S-picking
10.0 #sstop# %end time [s] after P-onset for calculating CF for S-picking
True #use_taup# %use estimated traveltimes from TauPy for calculating windows for CF
iasp91 #taup_model# %define TauPy model for traveltime estimation
iasp91 #taup_model# %define TauPy model for traveltime estimation
P #taup_phases# %Specify possible phases for TauPy (comma separated). See Obspy TauPy documentation for possible values.
2.0 10.0 #bpz1# %lower/upper corner freq. of first band pass filter Z-comp. [Hz]
2.0 12.0 #bpz2# %lower/upper corner freq. of second band pass filter Z-comp. [Hz]
2.0 8.0 #bph1# %lower/upper corner freq. of first band pass filter H-comp. [Hz]

View File

@ -262,6 +262,10 @@ class AutopickStation(object):
self.metadata = metadata
self.origin = origin
# initialize TauPy pick estimates
self.estFirstP = None
self.estFirstS = None
# initialize picking results
self.p_results = PickingResults()
self.s_results = PickingResults()
@ -443,15 +447,15 @@ class AutopickStation(object):
for arr in arrivals:
phases[identifyPhaseID(arr.phase.name)].append(arr)
# get first P and S onsets from arrivals list
estFirstP = 0
estFirstS = 0
arrival_time_p = 0
arrival_time_s = 0
if len(phases['P']) > 0:
arrP, estFirstP = min([(arr, arr.time) for arr in phases['P']], key=lambda t: t[1])
arrP, arrival_time_p = min([(arr, arr.time) for arr in phases['P']], key=lambda t: t[1])
if len(phases['S']) > 0:
arrS, estFirstS = min([(arr, arr.time) for arr in phases['S']], key=lambda t: t[1])
arrS, arrival_time_s = min([(arr, arr.time) for arr in phases['S']], key=lambda t: t[1])
print('autopick: estimated first arrivals for P: {} s, S:{} s after event'
' origin time using TauPy'.format(estFirstP, estFirstS))
return estFirstP, estFirstS
' origin time using TauPy'.format(arrival_time_p, arrival_time_s))
return arrival_time_p, arrival_time_s
def exit_taupy():
"""If taupy failed to calculate theoretical starttimes, picking continues.
@ -477,10 +481,13 @@ class AutopickStation(object):
raise AttributeError('No source origins given!')
arrivals = create_arrivals(self.metadata, self.origin, self.pickparams["taup_model"])
estFirstP, estFirstS = first_PS_onsets(arrivals)
arrival_P, arrival_S = first_PS_onsets(arrivals)
self.estFirstP = (self.origin[0].time + arrival_P) - self.ztrace.stats.starttime
# modifiy pstart and pstop relative to estimated first P arrival (relative to station time axis)
self.pickparams["pstart"] += (self.origin[0].time + estFirstP) - self.ztrace.stats.starttime
self.pickparams["pstop"] += (self.origin[0].time + estFirstP) - self.ztrace.stats.starttime
self.pickparams["pstart"] += self.estFirstP
self.pickparams["pstop"] += self.estFirstP
print('autopick: CF calculation times respectively:'
' pstart: {} s, pstop: {} s'.format(self.pickparams["pstart"], self.pickparams["pstop"]))
# make sure pstart and pstop are inside the starttime/endtime of vertical trace
@ -491,9 +498,10 @@ class AutopickStation(object):
# for the two horizontal components take earliest and latest time to make sure that the s onset is not clipped
# if start and endtime of horizontal traces differ, the s windowsize will automatically increase
trace_s_start = min([self.etrace.stats.starttime, self.ntrace.stats.starttime])
self.estFirstS = (self.origin[0].time + arrival_S) - trace_s_start
# modifiy sstart and sstop relative to estimated first S arrival (relative to station time axis)
self.pickparams["sstart"] += (self.origin[0].time + estFirstS) - trace_s_start
self.pickparams["sstop"] += (self.origin[0].time + estFirstS) - trace_s_start
self.pickparams["sstart"] += self.estFirstS
self.pickparams["sstop"] += self.estFirstS
print('autopick: CF calculation times respectively:'
' sstart: {} s, sstop: {} s'.format(self.pickparams["sstart"], self.pickparams["sstop"]))
# make sure pstart and pstop are inside the starttime/endtime of horizontal traces
@ -609,6 +617,12 @@ class AutopickStation(object):
# plot tapered trace filtered with bpz2 filter settings
ax1.plot(tdata, self.tr_filt_z_bpz2.data / max(self.tr_filt_z_bpz2.data), color=linecolor, linewidth=0.7,
label='Data')
# plot pickwindows for P
pstart, pstop = self.pickparams['pstart'], self.pickparams['pstop']
if pstart is not None and pstop is not None:
ax1.axvspan(pstart, pstop, color='r', alpha=0.1, zorder=0, label='P window')
if self.estFirstP is not None:
ax1.axvline(self.estFirstP, ls='dashed', color='r', alpha=0.4, label='TauPy estimate')
if self.p_results.weight < 4:
# plot CF of initial onset (HOScf or ARZcf)
ax1.plot(self.cf1.getTimeArray(), self.cf1.getCF() / max(self.cf1.getCF()), 'b', label='CF1')
@ -713,6 +727,15 @@ class AutopickStation(object):
ax3.plot([refSpick.getpick() - 0.5, refSpick.getpick() + 0.5], [-1.3, -1.3], 'g', linewidth=2)
ax3.plot([self.s_results.lpp, self.s_results.lpp], [-1.1, 1.1], 'g--', label='lpp')
ax3.plot([self.s_results.epp, self.s_results.epp], [-1.1, 1.1], 'g--', label='epp')
# plot pickwindows for S
sstart, sstop = self.pickparams['sstart'], self.pickparams['sstop']
if sstart is not None and sstop is not None:
for axis in [ax2, ax3]:
axis.axvspan(sstart, sstop, color='b', alpha=0.1, zorder=0, label='S window')
if self.estFirstS is not None:
axis.axvline(self.estFirstS, ls='dashed', color='b', alpha=0.4, label='TauPy estimate')
ax3.legend(loc=1)
ax3.set_yticks([])
ax3.set_ylim([-1.5, 1.5])
@ -835,14 +858,21 @@ class AutopickStation(object):
self.cf1 = None
assert isinstance(self.cf1, CharacteristicFunction), 'cf1 is not set correctly: maybe the algorithm name ({})' \
' is corrupted'.format(self.pickparams["algoP"])
# get the original waveform stream from first CF class cut to identical length as CF for plotting
cut_ogstream = self.cf1.getDataArray(self.cf1.getCut())
# MP: Rename to cf_stream for further use of z_copy and to prevent chaos when z_copy suddenly becomes a cf
# stream and later again a waveform stream
cf_stream = z_copy.copy()
cf_stream[0].data = self.cf1.getCF()
# calculate AIC cf from first cf (either HOS or ARZ)
z_copy[0].data = self.cf1.getCF()
aiccf = AICcf(z_copy, cuttimes)
aiccf = AICcf(cf_stream, cuttimes)
# get preliminary onset time from AIC-CF
self.set_current_figure('aicFig')
aicpick = AICPicker(aiccf, self.pickparams["tsnrz"], self.pickparams["pickwinP"], self.iplot,
Tsmooth=self.pickparams["aictsmooth"], fig=self.current_figure,
linecolor=self.current_linecolor)
linecolor=self.current_linecolor, ogstream=cut_ogstream)
# save aicpick for plotting later
self.p_data.aicpick = aicpick
# add pstart and pstop to aic plot
@ -855,7 +885,7 @@ class AutopickStation(object):
label='P stop')
ax.legend(loc=1)
Pflag = self._pick_p_quality_control(aicpick, z_copy, tr_filt)
Pflag = self._pick_p_quality_control(aicpick, cf_stream, tr_filt)
# go on with processing if AIC onset passes quality control
slope = aicpick.getSlope()
if not slope: slope = 0
@ -894,7 +924,7 @@ class AutopickStation(object):
refPpick = PragPicker(self.cf2, self.pickparams["tsnrz"], self.pickparams["pickwinP"], self.iplot,
self.pickparams["ausP"],
self.pickparams["tsmoothP"], aicpick.getpick(), self.current_figure,
self.current_linecolor)
self.current_linecolor, ogstream=cut_ogstream)
# save PragPicker result for plotting
self.p_data.refPpick = refPpick
self.p_results.mpp = refPpick.getpick()
@ -1146,11 +1176,14 @@ class AutopickStation(object):
# calculate AIC cf
haiccf = self._calculate_aic_cf_s_pick(cuttimesh)
# get the original waveform stream cut to identical length as CF for plotting
ogstream = haiccf.getDataArray(haiccf.getCut())
# get preliminary onset time from AIC cf
self.set_current_figure('aicARHfig')
aicarhpick = AICPicker(haiccf, self.pickparams["tsnrh"], self.pickparams["pickwinS"], self.iplot,
Tsmooth=self.pickparams["aictsmoothS"], fig=self.current_figure,
linecolor=self.current_linecolor)
linecolor=self.current_linecolor, ogstream=ogstream)
# save pick for later plotting
self.aicarhpick = aicarhpick

View File

@ -60,7 +60,7 @@ class CharacteristicFunction(object):
self.setOrder(order)
self.setFnoise(fnoise)
self.setARdetStep(t2)
self.calcCF(self.getDataArray())
self.calcCF()
self.arpara = np.array([])
self.xpred = np.array([])
@ -212,17 +212,15 @@ class CharacteristicFunction(object):
data = self.orig_data.copy()
return data
def calcCF(self, data=None):
self.cf = data
def calcCF(self):
pass
class AICcf(CharacteristicFunction):
def calcCF(self, data):
def calcCF(self):
"""
Function to calculate the Akaike Information Criterion (AIC) after Maeda (1985).
:param data: data, time series (whether seismogram or CF)
:type data: tuple
:return: AIC function
:rtype:
"""
@ -260,13 +258,11 @@ class HOScf(CharacteristicFunction):
"""
super(HOScf, self).__init__(data, cut, pickparams["tlta"], pickparams["hosorder"])
def calcCF(self, data):
def calcCF(self):
"""
Function to calculate skewness (statistics of order 3) or kurtosis
(statistics of order 4), using one long moving window, as published
in Kueperkoch et al. (2010), or order 2, i.e. STA/LTA.
:param data: data, time series (whether seismogram or CF)
:type data: tuple
:return: HOS cf
:rtype:
"""
@ -281,47 +277,28 @@ class HOScf(CharacteristicFunction):
elif self.getOrder() == 4: # this is kurtosis
y = np.power(xnp, 4)
y1 = np.power(xnp, 2)
elif self.getOrder() == 2: # this is variance, used for STA/LTA processing
y = np.power(xnp, 2)
y1 = np.power(xnp, 2)
# Initialisation
# t2: long term moving window
ilta = int(round(self.getTime2() / self.getIncrement()))
ista = int(round((self.getTime2() / 10) / self.getIncrement())) # TODO: still hard coded!!
lta = y[0]
lta1 = y1[0]
sta = y[0]
# moving windows
LTA = np.zeros(len(xnp))
STA = np.zeros(len(xnp))
for j in range(0, len(xnp)):
if j < 4:
LTA[j] = 0
STA[j] = 0
elif j <= ista and self.getOrder() == 2:
lta = (y[j] + lta * (j - 1)) / j
if self.getOrder() == 2:
sta = (y[j] + sta * (j - 1)) / j
# elif j < 4:
elif j <= ilta:
lta = (y[j] + lta * (j - 1)) / j
lta1 = (y1[j] + lta1 * (j - 1)) / j
if self.getOrder() == 2:
sta = (y[j] - y[j - ista]) / ista + sta
else:
lta = (y[j] - y[j - ilta]) / ilta + lta
lta1 = (y1[j] - y1[j - ilta]) / ilta + lta1
if self.getOrder() == 2:
sta = (y[j] - y[j - ista]) / ista + sta
# define LTA
if self.getOrder() == 3:
LTA[j] = lta / np.power(lta1, 1.5)
elif self.getOrder() == 4:
LTA[j] = lta / np.power(lta1, 2)
else:
LTA[j] = lta
STA[j] = sta
# remove NaN's with first not-NaN-value,
# so autopicker doesnt pick discontinuity at start of the trace
@ -330,10 +307,7 @@ class HOScf(CharacteristicFunction):
first = ind[0]
LTA[:first] = LTA[first]
if self.getOrder() > 2:
self.cf = LTA
else: # order 2 means STA/LTA!
self.cf = STA / LTA
self.cf = LTA
self.xcf = x
@ -343,12 +317,10 @@ class ARZcf(CharacteristicFunction):
super(ARZcf, self).__init__(data, cut, t1=t1, t2=t2, order=pickparams["Parorder"],
fnoise=pickparams["addnoise"])
def calcCF(self, data):
def calcCF(self):
"""
function used to calculate the AR prediction error from a single vertical trace. Can be used to pick
P onsets.
:param data:
:type data: ~obspy.core.stream.Stream
:return: ARZ cf
:rtype:
"""
@ -479,14 +451,12 @@ class ARHcf(CharacteristicFunction):
super(ARHcf, self).__init__(data, cut, t1=t1, t2=t2, order=pickparams["Sarorder"],
fnoise=pickparams["addnoise"])
def calcCF(self, data):
def calcCF(self):
"""
Function to calculate a characteristic function using autoregressive modelling of the waveform of
both horizontal traces.
The waveform is predicted in a moving time window using the calculated AR parameters. The difference
between the predicted and the actual waveform servers as a characteristic function.
:param data: wavefor stream
:type data: ~obspy.core.stream.Stream
:return: ARH cf
:rtype:
"""
@ -635,14 +605,12 @@ class AR3Ccf(CharacteristicFunction):
super(AR3Ccf, self).__init__(data, cut, t1=t1, t2=t2, order=pickparams["Sarorder"],
fnoise=pickparams["addnoise"])
def calcCF(self, data):
def calcCF(self):
"""
Function to calculate a characteristic function using autoregressive modelling of the waveform of
all three traces.
The waveform is predicted in a moving time window using the calculated AR parameters. The difference
between the predicted and the actual waveform servers as a characteristic function
:param data: stream holding all three traces
:type data: ~obspy.core.stream.Stream
:return: AR3C cf
:rtype:
"""

View File

@ -37,7 +37,8 @@ class AutoPicker(object):
warnings.simplefilter('ignore')
def __init__(self, cf, TSNR, PickWindow, iplot=0, aus=None, Tsmooth=None, Pick1=None, fig=None, linecolor='k'):
def __init__(self, cf, TSNR, PickWindow, iplot=0, aus=None, Tsmooth=None, Pick1=None,
fig=None, linecolor='k', ogstream=None):
"""
Create AutoPicker object
:param cf: characteristic function, on which the picking algorithm is applied
@ -59,12 +60,15 @@ class AutoPicker(object):
:type fig: `~matplotlib.figure.Figure`
:param linecolor: matplotlib line color string
:type linecolor: str
:param ogstream: original stream (waveform), e.g. for plotting purposes
:type ogstream: `~obspy.core.stream.Stream`
"""
assert isinstance(cf, CharacteristicFunction), "%s is not a CharacteristicFunction object" % str(cf)
self._linecolor = linecolor
self._pickcolor_p = 'b'
self.cf = cf.getCF()
self.ogstream = ogstream
self.Tcf = cf.getTimeArray()
self.Data = cf.getXCF()
self.dt = cf.getIncrement()
@ -173,7 +177,7 @@ class AICPicker(AutoPicker):
nn = np.isnan(self.cf)
if len(nn) > 1:
self.cf[nn] = 0
# taper AIC-CF to get rid off side maxima
# taper AIC-CF to get rid of side maxima
tap = np.hanning(len(self.cf))
aic = tap * self.cf + max(abs(self.cf))
# smooth AIC-CF
@ -316,16 +320,7 @@ class AICPicker(AutoPicker):
plt.close(fig)
return
iislope = islope[0][0:imax + 1]
# MP MP change slope calculation
# get all maxima of aicsmooth
iaicmaxima = argrelmax(aicsmooth)[0]
# get first index of maximum after pickindex (indices saved in iaicmaxima)
aicmax = iaicmaxima[np.where(iaicmaxima > pickindex)[0]]
if len(aicmax) > 0:
iaicmax = aicmax[0]
else:
iaicmax = -1
dataslope = aicsmooth[pickindex: iaicmax]
dataslope = self.Data[0].data[iislope]
# calculate slope as polynomal fit of order 1
xslope = np.arange(0, len(dataslope), 1)
try:
@ -336,7 +331,7 @@ class AICPicker(AutoPicker):
else:
self.slope = 1 / (len(dataslope) * self.Data[0].stats.delta) * (datafit[-1] - datafit[0])
# normalize slope to maximum of cf to make it unit independent
self.slope /= aicsmooth[iaicmax]
self.slope /= self.Data[0].data[icfmax]
except Exception as e:
print("AICPicker: Problems with data fitting! {}".format(e))
@ -356,6 +351,12 @@ class AICPicker(AutoPicker):
self.Tcf = self.Tcf[0:len(self.Tcf) - 1]
ax1.plot(self.Tcf, cf / max(cf), color=self._linecolor, linewidth=0.7, label='(HOS-/AR-) Data')
ax1.plot(self.Tcf, aicsmooth / max(aicsmooth), 'r', label='Smoothed AIC-CF')
# plot the original waveform also for evaluation of the CF and pick
if self.ogstream:
data = self.ogstream[0].data
if len(data) == len(self.Tcf):
ax1.plot(self.Tcf, 0.5 * data / max(data), 'k', label='Seismogram', alpha=0.3, zorder=0,
lw=0.5)
if self.Pick is not None:
ax1.plot([self.Pick, self.Pick], [-0.1, 0.5], 'b', linewidth=2, label='AIC-Pick')
ax1.set_xlabel('Time [s] since %s' % self.Data[0].stats.starttime)
@ -376,7 +377,7 @@ class AICPicker(AutoPicker):
label='Signal Window')
ax2.axvspan(self.Tcf[iislope[0]], self.Tcf[iislope[-1]], color='g', alpha=0.2, lw=0,
label='Slope Window')
ax2.plot(self.Tcf[pickindex: iaicmax], datafit, 'g', linewidth=2,
ax2.plot(self.Tcf[iislope], datafit, 'g', linewidth=2,
label='Slope') # MP MP changed temporarily!
if self.slope is not None:

View File

@ -15,7 +15,7 @@ import numpy as np
from obspy.core import Stream, UTCDateTime
from scipy.signal import argrelmax
from pylot.core.util.utils import get_bool, get_none, SetChannelComponents
from pylot.core.util.utils import get_bool, get_none, SetChannelComponents, common_range
def earllatepicker(X, nfac, TSNR, Pick1, iplot=0, verbosity=1, fig=None, linecolor='k'):
@ -828,14 +828,22 @@ def checksignallength(X, pick, minsiglength, pickparams, iplot=0, fig=None, line
if len(X) > 1:
# all three components available
# make sure, all components have equal lengths
ilen = min([len(X[0].data), len(X[1].data), len(X[2].data)])
x1 = X[0][0:ilen]
x2 = X[1][0:ilen]
x3 = X[2][0:ilen]
earliest_starttime = min(tr.stats.starttime for tr in X)
cuttimes = common_range(X)
X = X.slice(cuttimes[0], cuttimes[1])
x1, x2, x3 = X[:3]
if not (len(x1) == len(x2) == len(x3)):
raise PickingFailedException('checksignallength: unequal lengths of components!')
# get RMS trace
rms = np.sqrt((np.power(x1, 2) + np.power(x2, 2) + np.power(x3, 2)) / 3)
ilen = len(rms)
dt = earliest_starttime - X[0].stats.starttime
pick -= dt
else:
x1 = X[0].data
x2 = x3 = None
ilen = len(x1)
rms = abs(x1)
@ -874,6 +882,10 @@ def checksignallength(X, pick, minsiglength, pickparams, iplot=0, fig=None, line
fig._tight = True
ax = fig.add_subplot(111)
ax.plot(t, rms, color=linecolor, linewidth=0.7, label='RMS Data')
ax.plot(t, x1, 'k', alpha=0.3, lw=0.3, zorder=0)
if x2 is not None and x3 is not None:
ax.plot(t, x2, 'r', alpha=0.3, lw=0.3, zorder=0)
ax.plot(t, x3, 'g', alpha=0.3, lw=0.3, zorder=0)
ax.axvspan(t[inoise[0]], t[inoise[-1]], color='y', alpha=0.2, lw=0, label='Noise Window')
ax.axvspan(t[isignal[0]], t[isignal[-1]], color='b', alpha=0.2, lw=0, label='Signal Window')
ax.plot([t[isignal[0]], t[isignal[len(isignal) - 1]]],
@ -883,6 +895,7 @@ def checksignallength(X, pick, minsiglength, pickparams, iplot=0, fig=None, line
ax.set_xlabel('Time [s] since %s' % X[0].stats.starttime)
ax.set_ylabel('Counts')
ax.set_title('Check for Signal Length, Station %s' % X[0].stats.station)
ax.set_xlim(pickparams["pstart"], pickparams["pstop"])
ax.set_yticks([])
if plt_flag == 1:
fig.show()

View File

@ -1076,7 +1076,7 @@ def check4rotated(data, metadata=None, verbosity=1):
return wfs_in
# check metadata quality
t_start = full_range(wfs_in)
t_start = full_range(wfs_in)[0]
try:
azimuths = []
dips = []

View File

@ -41,6 +41,7 @@ global #extent# %extent of a
875.0 #sstop# %end time [s] after P-onset for calculating CF for S-picking
False #use_taup# %use estimated traveltimes from TauPy for calculating windows for CF
IASP91 #taup_model# %define TauPy model for traveltime estimation. Possible values: 1066a, 1066b, ak135, ak135f, herrin, iasp91, jb, prem, pwdk, sp6
P,Pdiff,S,Sdiff #taup_phases# %Specify possible phases for TauPy (comma separated). See Obspy TauPy documentation for possible values.
0.01 0.1 #bpz1# %lower/upper corner freq. of first band pass filter Z-comp. [Hz]
0.001 0.5 #bpz2# %lower/upper corner freq. of second band pass filter Z-comp. [Hz]
0.01 0.5 #bph1# %lower/upper corner freq. of first band pass filter H-comp. [Hz]

View File

@ -41,6 +41,7 @@ global #extent# %extent of a
875.0 #sstop# %end time [s] after P-onset for calculating CF for S-picking
True #use_taup# %use estimated traveltimes from TauPy for calculating windows for CF
IASP91 #taup_model# %define TauPy model for traveltime estimation. Possible values: 1066a, 1066b, ak135, ak135f, herrin, iasp91, jb, prem, pwdk, sp6
P,Pdiff,S,Sdiff #taup_phases# %Specify possible phases for TauPy (comma separated). See Obspy TauPy documentation for possible values.
0.01 0.1 #bpz1# %lower/upper corner freq. of first band pass filter Z-comp. [Hz]
0.001 0.5 #bpz2# %lower/upper corner freq. of second band pass filter Z-comp. [Hz]
0.01 0.5 #bph1# %lower/upper corner freq. of first band pass filter H-comp. [Hz]

View File

@ -1,6 +1,7 @@
import os
import sys
import unittest
import pytest
import obspy
from obspy import UTCDateTime
@ -105,7 +106,6 @@ class TestAutopickStation(unittest.TestCase):
# show complete diff when difference in results dictionaries are found
self.maxDiff = None
# @skip("Works")
def test_autopickstation_taupy_disabled_gra1(self):
expected = {
'P': {'picker': 'auto', 'snrdb': 15.405649120980094, 'weight': 0, 'Mo': None, 'marked': [], 'Mw': None,
@ -121,8 +121,8 @@ class TestAutopickStation(unittest.TestCase):
with HidePrints():
result, station = autopickstation(wfstream=self.gra1, pickparam=self.pickparam_taupy_disabled,
metadata=(None, None))
self.assertDictContainsSubset(expected=expected['P'], actual=result['P'])
self.assertDictContainsSubset(expected=expected['S'], actual=result['S'])
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
self.assertEqual('GRA1', station)
def test_autopickstation_taupy_enabled_gra1(self):
@ -140,8 +140,8 @@ class TestAutopickStation(unittest.TestCase):
with HidePrints():
result, station = autopickstation(wfstream=self.gra1, pickparam=self.pickparam_taupy_enabled,
metadata=self.metadata, origin=self.origin)
self.assertDictContainsSubset(expected=expected['P'], actual=result['P'])
self.assertDictContainsSubset(expected=expected['S'], actual=result['S'])
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
self.assertEqual('GRA1', station)
def test_autopickstation_taupy_disabled_gra2(self):
@ -157,8 +157,8 @@ class TestAutopickStation(unittest.TestCase):
with HidePrints():
result, station = autopickstation(wfstream=self.gra2, pickparam=self.pickparam_taupy_disabled,
metadata=(None, None))
self.assertDictContainsSubset(expected=expected['P'], actual=result['P'])
self.assertDictContainsSubset(expected=expected['S'], actual=result['S'])
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
self.assertEqual('GRA2', station)
def test_autopickstation_taupy_enabled_gra2(self):
@ -175,8 +175,8 @@ class TestAutopickStation(unittest.TestCase):
with HidePrints():
result, station = autopickstation(wfstream=self.gra2, pickparam=self.pickparam_taupy_enabled,
metadata=self.metadata, origin=self.origin)
self.assertDictContainsSubset(expected=expected['P'], actual=result['P'])
self.assertDictContainsSubset(expected=expected['S'], actual=result['S'])
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
self.assertEqual('GRA2', station)
def test_autopickstation_taupy_disabled_ech(self):
@ -190,8 +190,8 @@ class TestAutopickStation(unittest.TestCase):
'fm': None, 'spe': None, 'channel': u'LHE'}}
with HidePrints():
result, station = autopickstation(wfstream=self.ech, pickparam=self.pickparam_taupy_disabled)
self.assertDictContainsSubset(expected=expected['P'], actual=result['P'])
self.assertDictContainsSubset(expected=expected['S'], actual=result['S'])
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
self.assertEqual('ECH', station)
def test_autopickstation_taupy_enabled_ech(self):
@ -208,8 +208,8 @@ class TestAutopickStation(unittest.TestCase):
with HidePrints():
result, station = autopickstation(wfstream=self.ech, pickparam=self.pickparam_taupy_enabled,
metadata=self.metadata, origin=self.origin)
self.assertDictContainsSubset(expected=expected['P'], actual=result['P'])
self.assertDictContainsSubset(expected=expected['S'], actual=result['S'])
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
self.assertEqual('ECH', station)
def test_autopickstation_taupy_disabled_fiesa(self):
@ -224,8 +224,8 @@ class TestAutopickStation(unittest.TestCase):
'fm': None, 'spe': None, 'channel': u'LHE'}}
with HidePrints():
result, station = autopickstation(wfstream=self.fiesa, pickparam=self.pickparam_taupy_disabled)
self.assertDictContainsSubset(expected=expected['P'], actual=result['P'])
self.assertDictContainsSubset(expected=expected['S'], actual=result['S'])
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
self.assertEqual('FIESA', station)
def test_autopickstation_taupy_enabled_fiesa(self):
@ -242,8 +242,8 @@ class TestAutopickStation(unittest.TestCase):
with HidePrints():
result, station = autopickstation(wfstream=self.fiesa, pickparam=self.pickparam_taupy_enabled,
metadata=self.metadata, origin=self.origin)
self.assertDictContainsSubset(expected=expected['P'], actual=result['P'])
self.assertDictContainsSubset(expected=expected['S'], actual=result['S'])
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
self.assertEqual('FIESA', station)
def test_autopickstation_gra1_z_comp_missing(self):
@ -272,7 +272,8 @@ class TestAutopickStation(unittest.TestCase):
with HidePrints():
result, station = autopickstation(wfstream=wfstream, pickparam=self.pickparam_taupy_disabled,
metadata=(None, None))
self.assertEqual(expected, result)
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
self.assertEqual('GRA1', station)
def test_autopickstation_a106_taupy_enabled(self):
@ -290,7 +291,9 @@ class TestAutopickStation(unittest.TestCase):
with HidePrints():
result, station = autopickstation(wfstream=self.a106, pickparam=self.pickparam_taupy_enabled,
metadata=self.metadata, origin=self.origin)
self.assertEqual(expected, result)
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
def test_autopickstation_station_missing_in_metadata(self):
"""This station is not in the metadata, but Taupy is enabled. Taupy should exit cleanly and modify the starttime
@ -311,8 +314,37 @@ class TestAutopickStation(unittest.TestCase):
with HidePrints():
result, station = autopickstation(wfstream=self.a005a, pickparam=self.pickparam_taupy_enabled,
metadata=self.metadata, origin=self.origin)
self.assertEqual(expected, result)
compare_dicts(expected=expected['P'], result=result['P'], hint='P-')
compare_dicts(expected=expected['S'], result=result['S'], hint='S-')
def run_dict_comparison(result, expected):
for key, expected_value in expected.items():
if isinstance(expected_value, dict):
run_dict_comparison(result[key], expected[key])
else:
res = result[key]
if isinstance(res, UTCDateTime) and isinstance(expected_value, UTCDateTime):
res = res.timestamp
expected_value = expected_value.timestamp
assert expected_value == pytest.approx(res), f'{key}: {expected_value} != {res}'
def compare_dicts(result, expected, hint=''):
try:
run_dict_comparison(result, expected)
except AssertionError:
raise AssertionError(f'{hint}Dictionaries not equal.'
f'\n\n<<Expected>>\n{pretty_print_dict(expected)}'
f'\n\n<<Result>>\n{pretty_print_dict(result)}')
def pretty_print_dict(dct):
retstr = ''
for key, value in sorted(dct.items(), key=lambda x: x[0]):
retstr += f"{key} : {value}\n"
return retstr
if __name__ == '__main__':
unittest.main()