diff --git a/pylot/core/pick/picker.py b/pylot/core/pick/picker.py index baf36653..31ea86f8 100644 --- a/pylot/core/pick/picker.py +++ b/pylot/core/pick/picker.py @@ -23,44 +23,42 @@ import warnings import matplotlib.pyplot as plt import numpy as np -from scipy.signal import argrelmax from pylot.core.pick.charfuns import CharacteristicFunction -from pylot.core.pick.utils import getnoisewin, getsignalwin +from pylot.core.pick.utils import getnoisewin, getsignalwin, real_Bool, real_None, set_NaNs_to, taper_cf, cf_positive, \ + smooth_cf, check_counts_ms, getSNR, getslopewin, calcSlope class AutoPicker(object): - ''' + """ Superclass of different, automated picking algorithms applied on a CF determined using AIC, HOS, or AR prediction. - ''' + """ warnings.simplefilter('ignore') def __init__(self, cf, TSNR, PickWindow, iplot=0, aus=None, Tsmooth=None, Pick1=None, fig=None, linecolor='k'): - ''' - :param: cf, characteristic function, on which the picking algorithm is applied - :type: `~pylot.core.pick.CharFuns.CharacteristicFunction` object - - :param: TSNR, length of time windows around pick used to determine SNR [s] - :type: tuple (T_noise, T_gap, T_signal) - - :param: PickWindow, length of pick window [s] - :type: float - - :param: iplot, no. of figure window for plotting interims results - :type: integer - - :param: aus ("artificial uplift of samples"), find local minimum at i if aic(i-1)*(1+aus) >= aic(i) - :type: float - - :param: Tsmooth, length of moving smoothing window to calculate smoothed CF [s] - :type: float - - :param: Pick1, initial (prelimenary) onset time, starting point for PragPicker and - EarlLatePicker - :type: float - - ''' + """ + Create AutoPicker object + :param cf: characteristic function, on which the picking algorithm is applied + :type cf: `~pylot.core.pick.CharFuns.CharacteristicFunction` + :param TSNR: length of time windows around pick used to determine SNR [s], tuple (T_noise, T_gap, T_signal) + :type TSNR: (float, float, float) + :param PickWindow: length of pick window [s] + :type PickWindow: float + :param iplot: flag used for plotting, if > 1, results will be plotted. Use iplot = 0 to disable plotting + :type iplot: int + :param aus: ("artificial uplift of samples"), find local minimum at i if aic(i-1)*(1+aus) >= aic(i) + :type aus: float + :param Tsmooth: length of moving smoothing window to calculate smoothed CF [s] + :type Tsmooth: float + :param Pick1: initial (preliminary) onset time, starting point for PragPicker and EarlLatePicker + :type Pick1: float + :param fig: matplotlib figure used for plotting. If not given and plotting is enabled, a new figure will + be created + :type fig: `~matplotlib.figure.Figure` + :param linecolor: matplotlib line color string + :type linecolor: str + """ assert isinstance(cf, CharacteristicFunction), "%s is not a CharacteristicFunction object" % str(cf) self._linecolor = linecolor @@ -79,6 +77,11 @@ class AutoPicker(object): self.calcPick() def __str__(self): + """ + String representation of AutoPicker object + :return: + :rtype: str + """ return '''\n\t{name} object:\n TSNR:\t\t\t{TSNR}\n PickWindow:\t{PickWindow}\n @@ -129,6 +132,13 @@ class AutoPicker(object): return self.iplot def setiplot(self, iplot): + try: + iplot = int(iplot) + except ValueError: + if real_Bool(iplot): + self.iplot = 2 + else: + self.iplot = 0 self.iplot = iplot def getpick1(self): @@ -142,169 +152,117 @@ class AutoPicker(object): class AICPicker(AutoPicker): - ''' + """ Method to derive the onset time of an arriving phase based on CF - derived from AIC. In order to get an impression of the quality of this inital pick, + derived from AIC. In order to get an impression of the quality of this initial pick, a quality assessment is applied based on SNR and slope determination derived from the CF, from which the AIC has been calculated. - ''' + """ - def calcPick(self): + def prepareCF(self): + """ + data pre processing: remove invalid entries, taper cf, remove offset + """ - print('AICPicker: Get initial onset time (pick) from AIC-CF ...') - - self.Pick = None - self.slope = None - self.SNR = None - plt_flag = 0 - try: - iplot = int(self.iplot) - except: - if self.iplot == True or self.iplot == 'True': - iplot = 2 - else: - iplot = 0 - - # find NaN's - nn = np.isnan(self.cf) - if len(nn) > 1: - self.cf[nn] = 0 - # taper AIC-CF to get rid off side maxima - tap = np.hanning(len(self.cf)) - aic = tap * self.cf + max(abs(self.cf)) + self.cf = set_NaNs_to(self.cf, 0) + self.cf = taper_cf(self.cf) + self.cf = cf_positive(self.cf) # smooth AIC-CF - ismooth = int(round(self.Tsmooth / self.dt)) - aicsmooth = np.zeros(len(aic)) - if len(aic) < ismooth: + try: + self.aicsmooth = smooth_cf(self.cf, self.Tsmooth, self.dt) + except ValueError as e: print('AICPicker: Tsmooth larger than CF!') - return - else: - for i in range(1, len(aic)): - if i > ismooth: - ii1 = i - ismooth - aicsmooth[i] = aicsmooth[i - 1] + (aic[i] - aic[ii1]) / ismooth - else: - aicsmooth[i] = np.mean(aic[1: i]) - # remove offset in AIC function - offset = abs(min(aic) - min(aicsmooth)) - aicsmooth = aicsmooth - offset - # get maximum of HOS/AR-CF as startimg point for searching - # minimum in AIC function + raise e + + def findMinimum(self): + """ + Find minimum representing the preliminary onset, sets Pick to minimum + """ + + # get maximum of HOS/AR-CF as starting point for searching + # minimum in AIC function icfmax = np.argmax(self.Data[0].data) # find minimum in AIC-CF front of maximum of HOS/AR-CF lpickwindow = int(round(self.PickWindow / self.dt)) for i in range(icfmax - 1, max([icfmax - lpickwindow, 2]), -1): - if aicsmooth[i - 1] >= aicsmooth[i]: + if self.aicsmooth[i - 1] >= self.aicsmooth[i]: self.Pick = self.Tcf[i] break # if no minimum could be found: # search in 1st derivative of AIC-CF if self.Pick is None: - diffcf = np.diff(aicsmooth) - # find NaN's - nn = np.isnan(diffcf) - if len(nn) > 1: - diffcf[nn] = 0 + diffcf = np.diff(self.aicsmooth) + diffcf = set_NaNs_to(diffcf, 0) # taper CF to get rid off side maxima - tap = np.hanning(len(diffcf)) - diffcf = tap * diffcf * max(abs(aicsmooth)) + diffcf = taper_cf(diffcf) + diffcf = cf_positive(diffcf) for i in range(icfmax - 1, max([icfmax - lpickwindow, 2]), -1): if diffcf[i - 1] >= diffcf[i]: self.Pick = self.Tcf[i] break + def calcPick(self): + """ + Calculate pick using cf derived from AIC + :return: + :rtype: None + """ + + print('AICPicker: Get initial onset time (pick) from AIC-CF ...') + + self.Pick = None + self.slope = -1 + self.SNR = -1 + plt_flag = 0 + iplot = self.iplot + + try: + self.prepareCF() + except ValueError: + return + + self.findMinimum() + # quality assessment using SNR and slope from CF if self.Pick is not None: - # get noise window - inoise = getnoisewin(self.Tcf, self.Pick, self.TSNR[0], self.TSNR[1]) - # check, if these are counts or m/s, important for slope estimation! - # this is quick and dirty, better solution? - if max(self.Data[0].data < 1e-3) and max(self.Data[0].data >= 1e-6): - self.Data[0].data = self.Data[0].data * 1000000. - elif max(self.Data[0].data < 1e-6): - self.Data[0].data = self.Data[0].data * 1e13 - # get signal window - isignal = getsignalwin(self.Tcf, self.Pick, self.TSNR[2]) - if len(isignal) == 0: - return - ii = min([isignal[len(isignal) - 1], len(self.Tcf)]) - isignal = isignal[0:ii] - try: - self.Data[0].data[isignal] - except IndexError as e: - msg = "Time series out of bounds! {}".format(e) - print(msg) - return + self.Data[0].data = check_counts_ms(self.Data[0].data) # calculate SNR from CF - self.SNR = max(abs(self.Data[0].data[isignal] - np.mean(self.Data[0].data[isignal]))) / \ - max(abs(self.Data[0].data[inoise] - np.mean(self.Data[0].data[inoise]))) + self.SNR = getSNR(self.Data, self.TSNR, self.Pick)[0] # TODO Check wether this yields similar results # calculate slope from CF after initial pick - # get slope window - tslope = self.TSNR[3] # slope determination window - islope = np.where((self.Tcf <= min([self.Pick + tslope, self.Tcf[-1]])) \ - & (self.Tcf >= self.Pick)) # TODO: put this in a seperate function like getsignalwin - # find maximum within slope determination window - # 'cause slope should be calculated up to first local minimum only! try: - dataslope = self.Data[0].data[islope[0][0:-1]] - except IndexError: - print("Slope Calculation: empty array islope, check signal window") + self.slope, iislope, datafit = calcSlope(self.Data, self.aicsmooth, self.Tcf, self.Pick, self.TSNR) + except Exception: + if self.iplot > 1: + if real_None(self.fig) is None: + fig = plt.figure() + plt_flag = 1 + else: + fig = self.fig + ax = fig.add_subplot(111) + x = self.Data[0].data + ax.plot(self.Tcf, x / max(x), color=self._linecolor, linewidth=0.7, label='(HOS-/AR-) Data') + ax.plot(self.Tcf, self.aicsmooth / max(self.aicsmooth), 'r', label='Smoothed AIC-CF') + ax.legend(loc=1) + ax.set_xlabel('Time [s] since %s' % self.Data[0].stats.starttime) + ax.set_yticks([]) + ax.set_title(self.Data[0].stats.station) + if plt_flag == 1: + fig.show() + try: + input() + except SyntaxError: + pass + plt.close(fig) return - if len(dataslope) <= 1: - print('No or not enough data in slope window found!') - return - imaxs, = argrelmax(dataslope) - if imaxs.size: - imax = imaxs[0] - else: - imax = np.argmax(dataslope) - iislope = islope[0][0:imax + 1] - if len(iislope) < 2: - # calculate slope from initial onset to maximum of AIC function - print("AICPicker: Not enough data samples left for slope calculation!") - print("Calculating slope from initial onset to maximum of AIC function ...") - imax = np.argmax(aicsmooth[islope[0][0:-1]]) - if imax == 0: - print("AICPicker: Maximum for slope determination right at the beginning of the window!") - print("Choose longer slope determination window!") - if self.iplot > 1: - if self.fig == None or self.fig == 'None': - fig = plt.figure() - plt_flag = 1 - else: - fig = self.fig - ax = fig.add_subplot(111) - x = self.Data[0].data - ax.plot(self.Tcf, x / max(x), color=self._linecolor, linewidth=0.7, label='(HOS-/AR-) Data') - ax.plot(self.Tcf, aicsmooth / max(aicsmooth), 'r', label='Smoothed AIC-CF') - ax.legend(loc=1) - ax.set_xlabel('Time [s] since %s' % self.Data[0].stats.starttime) - ax.set_yticks([]) - ax.set_title(self.Data[0].stats.station) - if plt_flag == 1: - fig.show() - try: input() - except SyntaxError: pass - plt.close(fig) - return - iislope = islope[0][0:imax+1] - dataslope = self.Data[0].data[iislope] - # calculate slope as polynomal fit of order 1 - xslope = np.arange(0, len(dataslope), 1) - P = np.polyfit(xslope, dataslope, 1) - datafit = np.polyval(P, xslope) - if datafit[0] >= datafit[-1]: - print('AICPicker: Negative slope, bad onset skipped!') - return - self.slope = 1 / (len(dataslope) * self.Data[0].stats.delta) * (datafit[-1] - datafit[0]) - else: - self.SNR = None - self.slope = None + self.SNR = -1 + self.slope = -1 if iplot > 1: - if self.fig == None or self.fig == 'None': + inoise = getnoisewin(self.Tcf, self.Pick, self.TSNR[0], self.TSNR[1]) + isignal = getsignalwin(self.Tcf, self.Pick, self.TSNR[2]) + if real_None(self.fig) is None: fig = plt.figure() # self.iplot) plt_flag = 1 else: @@ -315,14 +273,14 @@ class AICPicker(AutoPicker): if len(self.Tcf) > len(self.Data[0].data): # why? LK self.Tcf = self.Tcf[0:len(self.Tcf)-1] ax1.plot(self.Tcf, x / max(x), color=self._linecolor, linewidth=0.7, label='(HOS-/AR-) Data') - ax1.plot(self.Tcf, aicsmooth / max(aicsmooth), 'r', label='Smoothed AIC-CF') + ax1.plot(self.Tcf, self.aicsmooth / max(self.aicsmooth), 'r', label='Smoothed AIC-CF') if self.Pick is not None: ax1.plot([self.Pick, self.Pick], [-0.1, 0.5], 'b', linewidth=2, label='AIC-Pick') ax1.set_xlabel('Time [s] since %s' % self.Data[0].stats.starttime) ax1.set_yticks([]) ax1.legend(loc=1) - if self.Pick is not None: + if self.Pick is not None and self.SNR is not None: ax2 = fig.add_subplot(2, 1, 2, sharex=ax1) ax2.plot(self.Tcf, x, color=self._linecolor, linewidth=0.7, label='Data') ax1.axvspan(self.Tcf[inoise[0]], self.Tcf[inoise[-1]], color='y', alpha=0.2, lw=0, label='Noise Window') @@ -364,19 +322,13 @@ class AICPicker(AutoPicker): class PragPicker(AutoPicker): - ''' + """ Method of pragmatic picking exploiting information given by CF. - ''' + """ def calcPick(self): - try: - iplot = int(self.getiplot()) - except: - if self.getiplot() == True or self.getiplot() == 'True': - iplot = 2 - else: - iplot = 0 + iplot = self.getiplot() if self.getpick1() is not None: print('PragPicker: Get most likely pick from HOS- or AR-CF using pragmatic picking algorithm ...') @@ -387,19 +339,7 @@ class PragPicker(AutoPicker): pickflag = 0 plt_flag = 0 # smooth CF - ismooth = int(round(self.Tsmooth / self.dt)) - cfsmooth = np.zeros(len(self.cf)) - if len(self.cf) < ismooth: - print('PragPicker: Tsmooth larger than CF!') - return - else: - for i in range(1, len(self.cf)): - if i > ismooth: - ii1 = i - ismooth - cfsmooth[i] = cfsmooth[i - 1] + (self.cf[i] - self.cf[ii1]) / ismooth - else: - cfsmooth[i] = np.mean(self.cf[1: i]) - + cfsmooth = smooth_cf(self.cf, self.Tsmooth, self.dt) # select picking window # which is centered around tpick1 ipick = np.where((self.Tcf >= self.getpick1() - self.PickWindow / 2) \ @@ -474,7 +414,7 @@ class PragPicker(AutoPicker): pickflag = 0 if iplot > 1: - if self.fig == None or self.fig == 'None': + if real_None(self.fig) is None: fig = plt.figure() # self.getiplot()) plt_flag = 1 else: diff --git a/pylot/core/pick/utils.py b/pylot/core/pick/utils.py index 3f5bcdcb..c8c68ff7 100644 --- a/pylot/core/pick/utils.py +++ b/pylot/core/pick/utils.py @@ -12,6 +12,7 @@ import warnings import matplotlib.pyplot as plt import numpy as np +from scipy.signal import argrelmax from obspy.core import Stream, UTCDateTime from pylot.core.util.utils import real_Bool, real_None @@ -413,9 +414,9 @@ def getSNR(X, TSNR, t1, tracenum=0): assert isinstance(X, Stream), "%s is not a stream object" % str(X) - SNR = None - SNRdB = None - noiselevel = None + SNR = -1 + SNRdB = -1 + noiselevel = -1 x = X[tracenum].data npts = X[tracenum].stats.npts @@ -485,13 +486,13 @@ def getsignalwin(t, t1, tsignal): Function to extract data out of time series for signal level calculation. Returns an array of indices. :param t: array of time stamps - :type t: `numpy.ndarray` + :type t: `~numpy.ndarray` :param t1: time from which relative to it signal window is extracted :type t1: float :param tsignal: length of time window [s] for signal level calculation :type tsignal: float - :return: indices of signal window i t - :rtype: `numpy.ndarray` + :return: indices of signal window in t + :rtype: `~numpy.ndarray` """ # get signal window @@ -503,6 +504,26 @@ def getsignalwin(t, t1, tsignal): return isignal +def getslopewin(Tcf, Pick, tslope): + """ + Function to extract slope window out of time series + + >>> (np.arange(15., 85.), 30.0, 10.0) + array([15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25]) + + :param Tcf: + :type Tcf: + :param Pick: + :type Pick: + :param tslope:a + :type tslope: + :return: + :rtype: `numpy.ndarray` + """ + # TODO: fill out docstring + slope = np.where( (Tcf <= min(Pick + tslope, Tcf[-1])) & (Tcf >= Pick) ) + return slope[0] + def getResolutionWindow(snr, extent): """ Produce the half of the time resolution window width from given SNR value @@ -1206,6 +1227,145 @@ def getQualityFromUncertainty(uncertainty, Errors): return quality +def set_NaNs_to(data, nan_value): + """ + Replace all NaNs in data with nan_value + :param data: array holding data + :type data: `~numpy.ndarray` + :param nan_value: value which all NaNs are set to + :type nan_value: float, int + :return: data array with all NaNs replaced with nan_value + :rtype: `~numpy.ndarray` + """ + nn = np.isnan(data) + if np.any(nn): + data[nn] = nan_value + return data + +def taper_cf(cf): + """ + Taper cf data to get rid off of side maximas + :param cf: characteristic function data + :type cf: `~numpy.ndarray` + :return: tapered cf + :rtype: `~numpy.ndarray` + """ + tap = np.hanning(len(cf)) + return tap * cf + +def cf_positive(cf): + """ + Shifts cf so that all values are positive + :param cf: + :type cf: `~numpy.ndarray` + :return: + :rtype: `~numpy.ndarray` + """ + return cf + max(abs(cf)) + +def smooth_cf(cf, t_smooth, delta): + """ + Smooth cf by taking samples over t_smooth length + :param cf: characteristic function data + :type cf: `~numpy.ndarray` + :param t_smooth: Time from which samples for smoothing will be taken (s) + :type t_smooth: float + :param delta: Sample rate of cf + :type delta: float + :return: smoothed cf data + :rtype: `~numpy.ndarray` + """ + + ismooth = int(round(t_smooth / delta)) # smooth values this many indexes apart + cf_smooth = np.zeros(len(cf)) + + if len(cf) < ismooth: + raise ValueError + + for i, val in enumerate(cf): + if i <= ismooth: + cf_smooth[i] = np.mean(cf[0:i+1]) + elif i > ismooth: + ii1 = i - ismooth + cf_smooth[i] = cf_smooth[i - 1] + (cf[i] - cf[ii1]) / ismooth + offset = abs(min(cf) - min(cf_smooth)) + cf_smooth -= offset # remove offset from smoothed function + return cf_smooth + +def check_counts_ms(data): + """ + check if data is in counts or m/s + :param data: data array + :type data: `~numpy.ndarray` + :return: + :rtype: `~numpy.ndarray` + """ + # this is quick and dirty, better solution? + if max(data < 1e-3) and max(data >= 1e-6): + data = data * 1000000. + elif max(data < 1e-6): + data = data * 1e13 + return data + + +def calcSlope(Data, datasmooth, Tcf, Pick, TSNR): + """ + Calculate Slope for Data around a given time Pick. + + :param Data: trace containing data for which a slope will be calculated + :type Data: `~obspy.core.trace.Trace` + :param datasmooth: smoothed data array + :type datasmooth: ~numpy.ndarray` + :param Tcf: array of time indices for Data array + :type Tcf: ~numpy.ndarray` + :param Pick: onset time around which the slope should be calculated + :type Pick: float + :param TSNR: tuple containing (tnoise, tsafety, tsignal, tslope). Slope will be calculated in time + window tslope around the onset + :type TSNR: (float, float, float, float) + :return: tuple containing (slope of onset, slope index array, data fit information) + :rtype: (float, `~numpy.ndarray`, `~numpy.ndarray` + """ + islope = getslopewin(Tcf, Pick, TSNR[3]) + try: + dataslope = Data[0].data[islope] + except IndexError as e: + print("Slope Calculation: empty array islope, check signal window") + raise e + if len(dataslope) <= 1: + print('Slope window outside data. No or not enough data in slope window found!') + raise ValueError + # find maximum within slope determination window + # 'cause slope should be calculated up to first local minimum only! + imaxs, = argrelmax(dataslope) + if imaxs.size: + imax = imaxs[0] + else: + imax = np.argmax(dataslope) + iislope = islope[0:imax + 1] # cut index so it contains only the first maximum + if len(iislope) < 2: + # calculate slope from initial onset to maximum of AIC function + print("AICPicker: Not enough data samples left for slope calculation!") + print("Calculating slope from initial onset to maximum of AIC function ...") + imax = np.argmax(datasmooth[islope]) + if imax == 0: + print("AICPicker: Maximum for slope determination right at the beginning of the window!") + print("Choose longer slope determination window!") + raise IndexError + iislope = islope[0][0:imax + 1] # cut index so it contains only the first maximum + dataslope = Data[0].data[iislope] # slope will only be calculated to the first maximum + # calculate slope as polynomal fit of order 1 + xslope = np.arange(0, len(dataslope)) + P = np.polyfit(xslope, dataslope, 1) + datafit = np.polyval(P, xslope) + if datafit[0] >= datafit[-1]: + print('AICPicker: Negative slope, bad onset skipped!') + raise ValueError + + slope = 1 / (len(dataslope) * Data[0].stats.delta) * (datafit[-1] - datafit[0]) + return slope, iislope, datafit + + if __name__ == '__main__': import doctest