From 393289245febc032bd40bc43742f6fcdeab9a500 Mon Sep 17 00:00:00 2001 From: Marcel Paffrath Date: Thu, 6 Apr 2017 15:37:54 +0200 Subject: [PATCH] multiprocessing implemented for restitution and autopicker --- QtPyLoT.py | 6 +- autoPyLoT.py | 5 +- pylot/RELEASE-VERSION | 2 +- pylot/core/pick/autopick.py | 22 +++- pylot/core/util/dataprocessing.py | 165 +++++++++++++++++------------- pylot/core/util/utils.py | 17 ++- 6 files changed, 125 insertions(+), 92 deletions(-) mode change 100644 => 100755 autoPyLoT.py diff --git a/QtPyLoT.py b/QtPyLoT.py index 49f47ad8..5cd37dfc 100755 --- a/QtPyLoT.py +++ b/QtPyLoT.py @@ -1054,9 +1054,9 @@ class MainWindow(QMainWindow): self.metadata = read_metadata(fninv) wf_copy = self.get_data().getWFData().copy() - [corr_wf, rest_flag] = restitute_data(wf_copy, *self.metadata) - if not rest_flag: - raise ProcessingError('Restitution of waveform data failed!') + corr_wf = restitute_data(wf_copy, *self.metadata) + # if not rest_flag: + # raise ProcessingError('Restitution of waveform data failed!') if type == 'ML': local_mag = RichterMagnitude(corr_wf, self.get_data().get_evt_data(), self.inputs.get('sstop'), verbosity = True) return local_mag.updated_event() diff --git a/autoPyLoT.py b/autoPyLoT.py old mode 100644 new mode 100755 index f7dbdd5d..df1c4d6b --- a/autoPyLoT.py +++ b/autoPyLoT.py @@ -29,6 +29,7 @@ from pylot.core.util.version import get_git_version as _getVersionString __version__ = _getVersionString() + def autoPyLoT(inputfile, fnames=None, savepath=None): """ Determine phase onsets automatically utilizing the automatic picking @@ -146,7 +147,7 @@ def autoPyLoT(inputfile, fnames=None, savepath=None): wfdat = data.getWFData() # all available streams wfdat = remove_underscores(wfdat) metadata = read_metadata(parameter.get('invdir')) - corr_dat, rest_flag = restitute_data(wfdat.copy(), *metadata) + corr_dat = restitute_data(wfdat.copy(), *metadata) print('Working on event %s' % event) print(data) @@ -319,7 +320,7 @@ if __name__ == "__main__": action='store', help='''optional, list of data file names''') parser.add_argument('-s', '-S', '--spath', type=str, - action=store, + action='store', help='''optional, save path for autoPyLoT output''') parser.add_argument('-v', '-V', '--version', action='version', version='autoPyLoT ' + __version__, diff --git a/pylot/RELEASE-VERSION b/pylot/RELEASE-VERSION index 70e4ee49..7f721368 100644 --- a/pylot/RELEASE-VERSION +++ b/pylot/RELEASE-VERSION @@ -1 +1 @@ -f5c0-dirty +ef17-dirty diff --git a/pylot/core/pick/autopick.py b/pylot/core/pick/autopick.py index 7d5553be..a062992f 100644 --- a/pylot/core/pick/autopick.py +++ b/pylot/core/pick/autopick.py @@ -17,13 +17,14 @@ from pylot.core.pick.charfuns import CharacteristicFunction from pylot.core.pick.charfuns import HOScf, AICcf, ARZcf, ARHcf, AR3Ccf from pylot.core.pick.utils import checksignallength, checkZ4S, earllatepicker, \ getSNR, fmpicker, checkPonsets, wadaticheck -from pylot.core.util.utils import getPatternLine +from pylot.core.util.utils import getPatternLine, gen_Pool from pylot.core.io.data import Data def autopickevent(data, param): stations = [] all_onsets = {} + input_tuples = [] # get some parameters for quality control from # parameter input file (usually autoPyLoT.in). @@ -40,8 +41,18 @@ def autopickevent(data, param): for station in stations: topick = data.select(station=station) - all_onsets[station] = autopickstation(topick, param, verbose=apverbose) + #all_onsets[station] = autopickstation(topick, param, verbose=apverbose) + input_tuples.append((topick, param, apverbose)) + + pool = gen_Pool() + result = pool.map(call_autopickstation, input_tuples) + pool.close() + for pick in result: + station = pick['station'] + pick.pop('station') + all_onsets[station] = pick + # quality control # median check and jackknife on P-onset times jk_checked_onsets = checkPonsets(all_onsets, mdttolerance, iplot) @@ -49,6 +60,11 @@ def autopickevent(data, param): return wadaticheck(jk_checked_onsets, wdttolerance, iplot) +def call_autopickstation(input_tuple): + wfstream, pickparam, verbose = input_tuple + return autopickstation(wfstream, pickparam, verbose) + + def autopickstation(wfstream, pickparam, verbose=False): """ :param wfstream: `~obspy.core.stream.Stream` containing waveform @@ -789,7 +805,7 @@ def autopickstation(wfstream, pickparam, verbose=False): spick = dict(channel=ccode, network=ncode, lpp=lpickS, epp=epickS, mpp=mpickS, spe=Serror, snr=SNRS, snrdb=SNRSdB, weight=Sweight, fm=None, picker=picker, Ao=Ao) # merge picks into returning dictionary - picks = dict(P=ppick, S=spick) + picks = dict(P=ppick, S=spick, station=zdat[0].stats.station) return picks diff --git a/pylot/core/util/dataprocessing.py b/pylot/core/util/dataprocessing.py index 516cd6eb..82a09caf 100644 --- a/pylot/core/util/dataprocessing.py +++ b/pylot/core/util/dataprocessing.py @@ -11,7 +11,7 @@ import numpy as np from obspy import UTCDateTime, read_inventory, read from obspy.io.xseed import Parser from pylot.core.util.utils import key_for_set_value, find_in_list, \ - remove_underscores + remove_underscores, gen_Pool def time_from_header(header): @@ -197,6 +197,79 @@ def read_metadata(path_to_inventory): return invtype, robj +def restitute_trace(input_tuple): + tr, invtype, inobj, unit, force = input_tuple + + remove_trace = False + + seed_id = tr.get_id() + # check, whether this trace has already been corrected + if 'processing' in tr.stats.keys() \ + and np.any(['remove' in p for p in tr.stats.processing]) \ + and not force: + print("Trace {0} has already been corrected!".format(seed_id)) + return tr, False + stime = tr.stats.starttime + prefilt = get_prefilt(tr) + if invtype == 'resp': + fresp = find_in_list(inobj, seed_id) + if not fresp: + raise IOError('no response file found ' + 'for trace {0}'.format(seed_id)) + fname = fresp + seedresp = dict(filename=fname, + date=stime, + units=unit) + kwargs = dict(paz_remove=None, pre_filt=prefilt, seedresp=seedresp) + elif invtype == 'dless': + if type(inobj) is list: + fname = Parser(find_in_list(inobj, seed_id)) + else: + fname = inobj + seedresp = dict(filename=fname, + date=stime, + units=unit) + kwargs = dict(pre_filt=prefilt, seedresp=seedresp) + elif invtype == 'xml': + invlist = inobj + if len(invlist) > 1: + finv = find_in_list(invlist, seed_id) + else: + finv = invlist[0] + inventory = read_inventory(finv, format='STATIONXML') + elif invtype == None: + print("No restitution possible, as there are no station-meta data available!") + return tr, True + else: + remove_trace = True + # apply restitution to data + print("Correcting instrument at station %s, channel %s" \ + % (tr.stats.station, tr.stats.channel)) + try: + if invtype in ['resp', 'dless']: + try: + tr.simulate(**kwargs) + except ValueError as e: + vmsg = '{0}'.format(e) + print(vmsg) + + else: + tr.attach_response(inventory) + tr.remove_response(output=unit, + pre_filt=prefilt) + except ValueError as e: + msg0 = 'Response for {0} not found in Parser'.format(seed_id) + msg1 = 'evalresp failed to calculate response' + if msg0 not in e.message or msg1 not in e.message: + raise + else: + # restitution done to copies of data thus deleting traces + # that failed should not be a problem + remove_trace = True + + return tr, remove_trace + + def restitute_data(data, invtype, inobj, unit='VEL', force=False): """ takes a data stream and a path_to_inventory and returns the corrected @@ -216,82 +289,28 @@ def restitute_data(data, invtype, inobj, unit='VEL', force=False): data = remove_underscores(data) # loop over traces + input_tuples = [] for tr in data: - seed_id = tr.get_id() - # check, whether this trace has already been corrected - if 'processing' in tr.stats.keys() \ - and np.any(['remove' in p for p in tr.stats.processing]) \ - and not force: - print("Trace {0} has already been corrected!".format(seed_id)) - continue - stime = tr.stats.starttime - prefilt = get_prefilt(tr) - if invtype == 'resp': - fresp = find_in_list(inobj, seed_id) - if not fresp: - raise IOError('no response file found ' - 'for trace {0}'.format(seed_id)) - fname = fresp - seedresp = dict(filename=fname, - date=stime, - units=unit) - kwargs = dict(paz_remove=None, pre_filt=prefilt, seedresp=seedresp) - elif invtype == 'dless': - if type(inobj) is list: - fname = Parser(find_in_list(inobj, seed_id)) - else: - fname = inobj - seedresp = dict(filename=fname, - date=stime, - units=unit) - kwargs = dict(pre_filt=prefilt, seedresp=seedresp) - elif invtype == 'xml': - invlist = inobj - if len(invlist) > 1: - finv = find_in_list(invlist, seed_id) - else: - finv = invlist[0] - inventory = read_inventory(finv, format='STATIONXML') - elif invtype == None: - print("No restitution possible, as there are no station-meta data available!") - break - else: - data.remove(tr) - continue - # apply restitution to data - print("Correcting instrument at station %s, channel %s" \ - % (tr.stats.station, tr.stats.channel)) - try: - if invtype in ['resp', 'dless']: - try: - tr.simulate(**kwargs) - except ValueError as e: - vmsg = '{0}'.format(e) - print(vmsg) - - else: - tr.attach_response(inventory) - tr.remove_response(output=unit, - pre_filt=prefilt) - except ValueError as e: - msg0 = 'Response for {0} not found in Parser'.format(seed_id) - msg1 = 'evalresp failed to calculate response' - if msg0 not in e.message or msg1 not in e.message: - raise - else: - # restitution done to copies of data thus deleting traces - # that failed should not be a problem - data.remove(tr) - continue - restflag.append(True) + input_tuples.append((tr, invtype, inobj, unit, force)) + data.remove(tr) + + pool = gen_Pool() + result = pool.map(restitute_trace, input_tuples) + pool.close() + + for tr, remove_trace in result: + if not remove_trace: + data.traces.append(tr) + # check if ALL traces could be restituted, take care of large datasets # better try restitution for smaller subsets of data (e.g. station by # station) - if len(restflag) > 0: - restflag = bool(np.all(restflag)) - else: - restflag = False - return data, restflag + + # if len(restflag) > 0: + # restflag = bool(np.all(restflag)) + # else: + # restflag = False + return data def get_prefilt(trace, tlow=(0.5, 0.9), thi=(5., 2.), verbosity=0): diff --git a/pylot/core/util/utils.py b/pylot/core/util/utils.py index fadd2f2d..aab1cee2 100644 --- a/pylot/core/util/utils.py +++ b/pylot/core/util/utils.py @@ -29,19 +29,16 @@ def getindexbounds(f, eta): u = find_nearest(f[mi:], b) + mi return mi, l, u -def worker(func, input, cores='max', async=False): + +def gen_Pool(ncores='max'): import multiprocessing - if cores == 'max': - cores = multiprocessing.cpu_count() + if ncores=='max': + ncores=multiprocessing.cpu_count() + + pool = multiprocessing.Pool(ncores) + return pool - pool = multiprocessing.Pool(cores) - if async == True: - result = pool.map_async(func, input) - else: - result = pool.map(func, input) - pool.close() - return result def clims(lim1, lim2): """