refactoring picker.py (AICPicker and PragPicker)

This commit is contained in:
Darius Arnold 2017-10-25 21:54:25 +02:00
parent deaa67bb30
commit 8f721da643
2 changed files with 290 additions and 190 deletions

View File

@ -23,44 +23,42 @@ import warnings
import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import argrelmax
from pylot.core.pick.charfuns import CharacteristicFunction
from pylot.core.pick.utils import getnoisewin, getsignalwin
from pylot.core.pick.utils import getnoisewin, getsignalwin, real_Bool, real_None, set_NaNs_to, taper_cf, cf_positive, \
smooth_cf, check_counts_ms, getSNR, getslopewin, calcSlope
class AutoPicker(object):
'''
"""
Superclass of different, automated picking algorithms applied on a CF determined
using AIC, HOS, or AR prediction.
'''
"""
warnings.simplefilter('ignore')
def __init__(self, cf, TSNR, PickWindow, iplot=0, aus=None, Tsmooth=None, Pick1=None, fig=None, linecolor='k'):
'''
:param: cf, characteristic function, on which the picking algorithm is applied
:type: `~pylot.core.pick.CharFuns.CharacteristicFunction` object
:param: TSNR, length of time windows around pick used to determine SNR [s]
:type: tuple (T_noise, T_gap, T_signal)
:param: PickWindow, length of pick window [s]
:type: float
:param: iplot, no. of figure window for plotting interims results
:type: integer
:param: aus ("artificial uplift of samples"), find local minimum at i if aic(i-1)*(1+aus) >= aic(i)
:type: float
:param: Tsmooth, length of moving smoothing window to calculate smoothed CF [s]
:type: float
:param: Pick1, initial (prelimenary) onset time, starting point for PragPicker and
EarlLatePicker
:type: float
'''
"""
Create AutoPicker object
:param cf: characteristic function, on which the picking algorithm is applied
:type cf: `~pylot.core.pick.CharFuns.CharacteristicFunction`
:param TSNR: length of time windows around pick used to determine SNR [s], tuple (T_noise, T_gap, T_signal)
:type TSNR: (float, float, float)
:param PickWindow: length of pick window [s]
:type PickWindow: float
:param iplot: flag used for plotting, if > 1, results will be plotted. Use iplot = 0 to disable plotting
:type iplot: int
:param aus: ("artificial uplift of samples"), find local minimum at i if aic(i-1)*(1+aus) >= aic(i)
:type aus: float
:param Tsmooth: length of moving smoothing window to calculate smoothed CF [s]
:type Tsmooth: float
:param Pick1: initial (preliminary) onset time, starting point for PragPicker and EarlLatePicker
:type Pick1: float
:param fig: matplotlib figure used for plotting. If not given and plotting is enabled, a new figure will
be created
:type fig: `~matplotlib.figure.Figure`
:param linecolor: matplotlib line color string
:type linecolor: str
"""
assert isinstance(cf, CharacteristicFunction), "%s is not a CharacteristicFunction object" % str(cf)
self._linecolor = linecolor
@ -79,6 +77,11 @@ class AutoPicker(object):
self.calcPick()
def __str__(self):
"""
String representation of AutoPicker object
:return:
:rtype: str
"""
return '''\n\t{name} object:\n
TSNR:\t\t\t{TSNR}\n
PickWindow:\t{PickWindow}\n
@ -129,6 +132,13 @@ class AutoPicker(object):
return self.iplot
def setiplot(self, iplot):
try:
iplot = int(iplot)
except ValueError:
if real_Bool(iplot):
self.iplot = 2
else:
self.iplot = 0
self.iplot = iplot
def getpick1(self):
@ -142,169 +152,117 @@ class AutoPicker(object):
class AICPicker(AutoPicker):
'''
"""
Method to derive the onset time of an arriving phase based on CF
derived from AIC. In order to get an impression of the quality of this inital pick,
derived from AIC. In order to get an impression of the quality of this initial pick,
a quality assessment is applied based on SNR and slope determination derived from the CF,
from which the AIC has been calculated.
'''
"""
def calcPick(self):
def prepareCF(self):
"""
data pre processing: remove invalid entries, taper cf, remove offset
"""
print('AICPicker: Get initial onset time (pick) from AIC-CF ...')
self.Pick = None
self.slope = None
self.SNR = None
plt_flag = 0
try:
iplot = int(self.iplot)
except:
if self.iplot == True or self.iplot == 'True':
iplot = 2
else:
iplot = 0
# find NaN's
nn = np.isnan(self.cf)
if len(nn) > 1:
self.cf[nn] = 0
# taper AIC-CF to get rid off side maxima
tap = np.hanning(len(self.cf))
aic = tap * self.cf + max(abs(self.cf))
self.cf = set_NaNs_to(self.cf, 0)
self.cf = taper_cf(self.cf)
self.cf = cf_positive(self.cf)
# smooth AIC-CF
ismooth = int(round(self.Tsmooth / self.dt))
aicsmooth = np.zeros(len(aic))
if len(aic) < ismooth:
try:
self.aicsmooth = smooth_cf(self.cf, self.Tsmooth, self.dt)
except ValueError as e:
print('AICPicker: Tsmooth larger than CF!')
return
else:
for i in range(1, len(aic)):
if i > ismooth:
ii1 = i - ismooth
aicsmooth[i] = aicsmooth[i - 1] + (aic[i] - aic[ii1]) / ismooth
else:
aicsmooth[i] = np.mean(aic[1: i])
# remove offset in AIC function
offset = abs(min(aic) - min(aicsmooth))
aicsmooth = aicsmooth - offset
# get maximum of HOS/AR-CF as startimg point for searching
# minimum in AIC function
raise e
def findMinimum(self):
"""
Find minimum representing the preliminary onset, sets Pick to minimum
"""
# get maximum of HOS/AR-CF as starting point for searching
# minimum in AIC function
icfmax = np.argmax(self.Data[0].data)
# find minimum in AIC-CF front of maximum of HOS/AR-CF
lpickwindow = int(round(self.PickWindow / self.dt))
for i in range(icfmax - 1, max([icfmax - lpickwindow, 2]), -1):
if aicsmooth[i - 1] >= aicsmooth[i]:
if self.aicsmooth[i - 1] >= self.aicsmooth[i]:
self.Pick = self.Tcf[i]
break
# if no minimum could be found:
# search in 1st derivative of AIC-CF
if self.Pick is None:
diffcf = np.diff(aicsmooth)
# find NaN's
nn = np.isnan(diffcf)
if len(nn) > 1:
diffcf[nn] = 0
diffcf = np.diff(self.aicsmooth)
diffcf = set_NaNs_to(diffcf, 0)
# taper CF to get rid off side maxima
tap = np.hanning(len(diffcf))
diffcf = tap * diffcf * max(abs(aicsmooth))
diffcf = taper_cf(diffcf)
diffcf = cf_positive(diffcf)
for i in range(icfmax - 1, max([icfmax - lpickwindow, 2]), -1):
if diffcf[i - 1] >= diffcf[i]:
self.Pick = self.Tcf[i]
break
def calcPick(self):
"""
Calculate pick using cf derived from AIC
:return:
:rtype: None
"""
print('AICPicker: Get initial onset time (pick) from AIC-CF ...')
self.Pick = None
self.slope = -1
self.SNR = -1
plt_flag = 0
iplot = self.iplot
try:
self.prepareCF()
except ValueError:
return
self.findMinimum()
# quality assessment using SNR and slope from CF
if self.Pick is not None:
# get noise window
inoise = getnoisewin(self.Tcf, self.Pick, self.TSNR[0], self.TSNR[1])
# check, if these are counts or m/s, important for slope estimation!
# this is quick and dirty, better solution?
if max(self.Data[0].data < 1e-3) and max(self.Data[0].data >= 1e-6):
self.Data[0].data = self.Data[0].data * 1000000.
elif max(self.Data[0].data < 1e-6):
self.Data[0].data = self.Data[0].data * 1e13
# get signal window
isignal = getsignalwin(self.Tcf, self.Pick, self.TSNR[2])
if len(isignal) == 0:
return
ii = min([isignal[len(isignal) - 1], len(self.Tcf)])
isignal = isignal[0:ii]
try:
self.Data[0].data[isignal]
except IndexError as e:
msg = "Time series out of bounds! {}".format(e)
print(msg)
return
self.Data[0].data = check_counts_ms(self.Data[0].data)
# calculate SNR from CF
self.SNR = max(abs(self.Data[0].data[isignal] - np.mean(self.Data[0].data[isignal]))) / \
max(abs(self.Data[0].data[inoise] - np.mean(self.Data[0].data[inoise])))
self.SNR = getSNR(self.Data, self.TSNR, self.Pick)[0] # TODO Check wether this yields similar results
# calculate slope from CF after initial pick
# get slope window
tslope = self.TSNR[3] # slope determination window
islope = np.where((self.Tcf <= min([self.Pick + tslope, self.Tcf[-1]])) \
& (self.Tcf >= self.Pick)) # TODO: put this in a seperate function like getsignalwin
# find maximum within slope determination window
# 'cause slope should be calculated up to first local minimum only!
try:
dataslope = self.Data[0].data[islope[0][0:-1]]
except IndexError:
print("Slope Calculation: empty array islope, check signal window")
self.slope, iislope, datafit = calcSlope(self.Data, self.aicsmooth, self.Tcf, self.Pick, self.TSNR)
except Exception:
if self.iplot > 1:
if real_None(self.fig) is None:
fig = plt.figure()
plt_flag = 1
else:
fig = self.fig
ax = fig.add_subplot(111)
x = self.Data[0].data
ax.plot(self.Tcf, x / max(x), color=self._linecolor, linewidth=0.7, label='(HOS-/AR-) Data')
ax.plot(self.Tcf, self.aicsmooth / max(self.aicsmooth), 'r', label='Smoothed AIC-CF')
ax.legend(loc=1)
ax.set_xlabel('Time [s] since %s' % self.Data[0].stats.starttime)
ax.set_yticks([])
ax.set_title(self.Data[0].stats.station)
if plt_flag == 1:
fig.show()
try:
input()
except SyntaxError:
pass
plt.close(fig)
return
if len(dataslope) <= 1:
print('No or not enough data in slope window found!')
return
imaxs, = argrelmax(dataslope)
if imaxs.size:
imax = imaxs[0]
else:
imax = np.argmax(dataslope)
iislope = islope[0][0:imax + 1]
if len(iislope) < 2:
# calculate slope from initial onset to maximum of AIC function
print("AICPicker: Not enough data samples left for slope calculation!")
print("Calculating slope from initial onset to maximum of AIC function ...")
imax = np.argmax(aicsmooth[islope[0][0:-1]])
if imax == 0:
print("AICPicker: Maximum for slope determination right at the beginning of the window!")
print("Choose longer slope determination window!")
if self.iplot > 1:
if self.fig == None or self.fig == 'None':
fig = plt.figure()
plt_flag = 1
else:
fig = self.fig
ax = fig.add_subplot(111)
x = self.Data[0].data
ax.plot(self.Tcf, x / max(x), color=self._linecolor, linewidth=0.7, label='(HOS-/AR-) Data')
ax.plot(self.Tcf, aicsmooth / max(aicsmooth), 'r', label='Smoothed AIC-CF')
ax.legend(loc=1)
ax.set_xlabel('Time [s] since %s' % self.Data[0].stats.starttime)
ax.set_yticks([])
ax.set_title(self.Data[0].stats.station)
if plt_flag == 1:
fig.show()
try: input()
except SyntaxError: pass
plt.close(fig)
return
iislope = islope[0][0:imax+1]
dataslope = self.Data[0].data[iislope]
# calculate slope as polynomal fit of order 1
xslope = np.arange(0, len(dataslope), 1)
P = np.polyfit(xslope, dataslope, 1)
datafit = np.polyval(P, xslope)
if datafit[0] >= datafit[-1]:
print('AICPicker: Negative slope, bad onset skipped!')
return
self.slope = 1 / (len(dataslope) * self.Data[0].stats.delta) * (datafit[-1] - datafit[0])
else:
self.SNR = None
self.slope = None
self.SNR = -1
self.slope = -1
if iplot > 1:
if self.fig == None or self.fig == 'None':
inoise = getnoisewin(self.Tcf, self.Pick, self.TSNR[0], self.TSNR[1])
isignal = getsignalwin(self.Tcf, self.Pick, self.TSNR[2])
if real_None(self.fig) is None:
fig = plt.figure() # self.iplot)
plt_flag = 1
else:
@ -315,14 +273,14 @@ class AICPicker(AutoPicker):
if len(self.Tcf) > len(self.Data[0].data): # why? LK
self.Tcf = self.Tcf[0:len(self.Tcf)-1]
ax1.plot(self.Tcf, x / max(x), color=self._linecolor, linewidth=0.7, label='(HOS-/AR-) Data')
ax1.plot(self.Tcf, aicsmooth / max(aicsmooth), 'r', label='Smoothed AIC-CF')
ax1.plot(self.Tcf, self.aicsmooth / max(self.aicsmooth), 'r', label='Smoothed AIC-CF')
if self.Pick is not None:
ax1.plot([self.Pick, self.Pick], [-0.1, 0.5], 'b', linewidth=2, label='AIC-Pick')
ax1.set_xlabel('Time [s] since %s' % self.Data[0].stats.starttime)
ax1.set_yticks([])
ax1.legend(loc=1)
if self.Pick is not None:
if self.Pick is not None and self.SNR is not None:
ax2 = fig.add_subplot(2, 1, 2, sharex=ax1)
ax2.plot(self.Tcf, x, color=self._linecolor, linewidth=0.7, label='Data')
ax1.axvspan(self.Tcf[inoise[0]], self.Tcf[inoise[-1]], color='y', alpha=0.2, lw=0, label='Noise Window')
@ -364,19 +322,13 @@ class AICPicker(AutoPicker):
class PragPicker(AutoPicker):
'''
"""
Method of pragmatic picking exploiting information given by CF.
'''
"""
def calcPick(self):
try:
iplot = int(self.getiplot())
except:
if self.getiplot() == True or self.getiplot() == 'True':
iplot = 2
else:
iplot = 0
iplot = self.getiplot()
if self.getpick1() is not None:
print('PragPicker: Get most likely pick from HOS- or AR-CF using pragmatic picking algorithm ...')
@ -387,19 +339,7 @@ class PragPicker(AutoPicker):
pickflag = 0
plt_flag = 0
# smooth CF
ismooth = int(round(self.Tsmooth / self.dt))
cfsmooth = np.zeros(len(self.cf))
if len(self.cf) < ismooth:
print('PragPicker: Tsmooth larger than CF!')
return
else:
for i in range(1, len(self.cf)):
if i > ismooth:
ii1 = i - ismooth
cfsmooth[i] = cfsmooth[i - 1] + (self.cf[i] - self.cf[ii1]) / ismooth
else:
cfsmooth[i] = np.mean(self.cf[1: i])
cfsmooth = smooth_cf(self.cf, self.Tsmooth, self.dt)
# select picking window
# which is centered around tpick1
ipick = np.where((self.Tcf >= self.getpick1() - self.PickWindow / 2) \
@ -474,7 +414,7 @@ class PragPicker(AutoPicker):
pickflag = 0
if iplot > 1:
if self.fig == None or self.fig == 'None':
if real_None(self.fig) is None:
fig = plt.figure() # self.getiplot())
plt_flag = 1
else:

View File

@ -12,6 +12,7 @@ import warnings
import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import argrelmax
from obspy.core import Stream, UTCDateTime
from pylot.core.util.utils import real_Bool, real_None
@ -413,9 +414,9 @@ def getSNR(X, TSNR, t1, tracenum=0):
assert isinstance(X, Stream), "%s is not a stream object" % str(X)
SNR = None
SNRdB = None
noiselevel = None
SNR = -1
SNRdB = -1
noiselevel = -1
x = X[tracenum].data
npts = X[tracenum].stats.npts
@ -485,13 +486,13 @@ def getsignalwin(t, t1, tsignal):
Function to extract data out of time series for signal level calculation.
Returns an array of indices.
:param t: array of time stamps
:type t: `numpy.ndarray`
:type t: `~numpy.ndarray`
:param t1: time from which relative to it signal window is extracted
:type t1: float
:param tsignal: length of time window [s] for signal level calculation
:type tsignal: float
:return: indices of signal window i t
:rtype: `numpy.ndarray`
:return: indices of signal window in t
:rtype: `~numpy.ndarray`
"""
# get signal window
@ -503,6 +504,26 @@ def getsignalwin(t, t1, tsignal):
return isignal
def getslopewin(Tcf, Pick, tslope):
"""
Function to extract slope window out of time series
>>> (np.arange(15., 85.), 30.0, 10.0)
array([15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25])
:param Tcf:
:type Tcf:
:param Pick:
:type Pick:
:param tslope:a
:type tslope:
:return:
:rtype: `numpy.ndarray`
"""
# TODO: fill out docstring
slope = np.where( (Tcf <= min(Pick + tslope, Tcf[-1])) & (Tcf >= Pick) )
return slope[0]
def getResolutionWindow(snr, extent):
"""
Produce the half of the time resolution window width from given SNR value
@ -1206,6 +1227,145 @@ def getQualityFromUncertainty(uncertainty, Errors):
return quality
def set_NaNs_to(data, nan_value):
"""
Replace all NaNs in data with nan_value
:param data: array holding data
:type data: `~numpy.ndarray`
:param nan_value: value which all NaNs are set to
:type nan_value: float, int
:return: data array with all NaNs replaced with nan_value
:rtype: `~numpy.ndarray`
"""
nn = np.isnan(data)
if np.any(nn):
data[nn] = nan_value
return data
def taper_cf(cf):
"""
Taper cf data to get rid off of side maximas
:param cf: characteristic function data
:type cf: `~numpy.ndarray`
:return: tapered cf
:rtype: `~numpy.ndarray`
"""
tap = np.hanning(len(cf))
return tap * cf
def cf_positive(cf):
"""
Shifts cf so that all values are positive
:param cf:
:type cf: `~numpy.ndarray`
:return:
:rtype: `~numpy.ndarray`
"""
return cf + max(abs(cf))
def smooth_cf(cf, t_smooth, delta):
"""
Smooth cf by taking samples over t_smooth length
:param cf: characteristic function data
:type cf: `~numpy.ndarray`
:param t_smooth: Time from which samples for smoothing will be taken (s)
:type t_smooth: float
:param delta: Sample rate of cf
:type delta: float
:return: smoothed cf data
:rtype: `~numpy.ndarray`
"""
ismooth = int(round(t_smooth / delta)) # smooth values this many indexes apart
cf_smooth = np.zeros(len(cf))
if len(cf) < ismooth:
raise ValueError
for i, val in enumerate(cf):
if i <= ismooth:
cf_smooth[i] = np.mean(cf[0:i+1])
elif i > ismooth:
ii1 = i - ismooth
cf_smooth[i] = cf_smooth[i - 1] + (cf[i] - cf[ii1]) / ismooth
offset = abs(min(cf) - min(cf_smooth))
cf_smooth -= offset # remove offset from smoothed function
return cf_smooth
def check_counts_ms(data):
"""
check if data is in counts or m/s
:param data: data array
:type data: `~numpy.ndarray`
:return:
:rtype: `~numpy.ndarray`
"""
# this is quick and dirty, better solution?
if max(data < 1e-3) and max(data >= 1e-6):
data = data * 1000000.
elif max(data < 1e-6):
data = data * 1e13
return data
def calcSlope(Data, datasmooth, Tcf, Pick, TSNR):
"""
Calculate Slope for Data around a given time Pick.
:param Data: trace containing data for which a slope will be calculated
:type Data: `~obspy.core.trace.Trace`
:param datasmooth: smoothed data array
:type datasmooth: ~numpy.ndarray`
:param Tcf: array of time indices for Data array
:type Tcf: ~numpy.ndarray`
:param Pick: onset time around which the slope should be calculated
:type Pick: float
:param TSNR: tuple containing (tnoise, tsafety, tsignal, tslope). Slope will be calculated in time
window tslope around the onset
:type TSNR: (float, float, float, float)
:return: tuple containing (slope of onset, slope index array, data fit information)
:rtype: (float, `~numpy.ndarray`, `~numpy.ndarray`
"""
islope = getslopewin(Tcf, Pick, TSNR[3])
try:
dataslope = Data[0].data[islope]
except IndexError as e:
print("Slope Calculation: empty array islope, check signal window")
raise e
if len(dataslope) <= 1:
print('Slope window outside data. No or not enough data in slope window found!')
raise ValueError
# find maximum within slope determination window
# 'cause slope should be calculated up to first local minimum only!
imaxs, = argrelmax(dataslope)
if imaxs.size:
imax = imaxs[0]
else:
imax = np.argmax(dataslope)
iislope = islope[0:imax + 1] # cut index so it contains only the first maximum
if len(iislope) < 2:
# calculate slope from initial onset to maximum of AIC function
print("AICPicker: Not enough data samples left for slope calculation!")
print("Calculating slope from initial onset to maximum of AIC function ...")
imax = np.argmax(datasmooth[islope])
if imax == 0:
print("AICPicker: Maximum for slope determination right at the beginning of the window!")
print("Choose longer slope determination window!")
raise IndexError
iislope = islope[0][0:imax + 1] # cut index so it contains only the first maximum
dataslope = Data[0].data[iislope] # slope will only be calculated to the first maximum
# calculate slope as polynomal fit of order 1
xslope = np.arange(0, len(dataslope))
P = np.polyfit(xslope, dataslope, 1)
datafit = np.polyval(P, xslope)
if datafit[0] >= datafit[-1]:
print('AICPicker: Negative slope, bad onset skipped!')
raise ValueError
slope = 1 / (len(dataslope) * Data[0].stats.delta) * (datafit[-1] - datafit[0])
return slope, iislope, datafit
if __name__ == '__main__':
import doctest