multiprocessing implemented for restitution and autopicker

This commit is contained in:
Marcel Paffrath 2017-04-06 15:37:54 +02:00
parent ef1755a80f
commit 393289245f
6 changed files with 125 additions and 92 deletions

View File

@ -1054,9 +1054,9 @@ class MainWindow(QMainWindow):
self.metadata = read_metadata(fninv)
wf_copy = self.get_data().getWFData().copy()
[corr_wf, rest_flag] = restitute_data(wf_copy, *self.metadata)
if not rest_flag:
raise ProcessingError('Restitution of waveform data failed!')
corr_wf = restitute_data(wf_copy, *self.metadata)
# if not rest_flag:
# raise ProcessingError('Restitution of waveform data failed!')
if type == 'ML':
local_mag = RichterMagnitude(corr_wf, self.get_data().get_evt_data(), self.inputs.get('sstop'), verbosity = True)
return local_mag.updated_event()

5
autoPyLoT.py Normal file → Executable file
View File

@ -29,6 +29,7 @@ from pylot.core.util.version import get_git_version as _getVersionString
__version__ = _getVersionString()
def autoPyLoT(inputfile, fnames=None, savepath=None):
"""
Determine phase onsets automatically utilizing the automatic picking
@ -146,7 +147,7 @@ def autoPyLoT(inputfile, fnames=None, savepath=None):
wfdat = data.getWFData() # all available streams
wfdat = remove_underscores(wfdat)
metadata = read_metadata(parameter.get('invdir'))
corr_dat, rest_flag = restitute_data(wfdat.copy(), *metadata)
corr_dat = restitute_data(wfdat.copy(), *metadata)
print('Working on event %s' % event)
print(data)
@ -319,7 +320,7 @@ if __name__ == "__main__":
action='store',
help='''optional, list of data file names''')
parser.add_argument('-s', '-S', '--spath', type=str,
action=store,
action='store',
help='''optional, save path for autoPyLoT output''')
parser.add_argument('-v', '-V', '--version', action='version',
version='autoPyLoT ' + __version__,

View File

@ -1 +1 @@
f5c0-dirty
ef17-dirty

View File

@ -17,13 +17,14 @@ from pylot.core.pick.charfuns import CharacteristicFunction
from pylot.core.pick.charfuns import HOScf, AICcf, ARZcf, ARHcf, AR3Ccf
from pylot.core.pick.utils import checksignallength, checkZ4S, earllatepicker, \
getSNR, fmpicker, checkPonsets, wadaticheck
from pylot.core.util.utils import getPatternLine
from pylot.core.util.utils import getPatternLine, gen_Pool
from pylot.core.io.data import Data
def autopickevent(data, param):
stations = []
all_onsets = {}
input_tuples = []
# get some parameters for quality control from
# parameter input file (usually autoPyLoT.in).
@ -40,8 +41,18 @@ def autopickevent(data, param):
for station in stations:
topick = data.select(station=station)
all_onsets[station] = autopickstation(topick, param, verbose=apverbose)
#all_onsets[station] = autopickstation(topick, param, verbose=apverbose)
input_tuples.append((topick, param, apverbose))
pool = gen_Pool()
result = pool.map(call_autopickstation, input_tuples)
pool.close()
for pick in result:
station = pick['station']
pick.pop('station')
all_onsets[station] = pick
# quality control
# median check and jackknife on P-onset times
jk_checked_onsets = checkPonsets(all_onsets, mdttolerance, iplot)
@ -49,6 +60,11 @@ def autopickevent(data, param):
return wadaticheck(jk_checked_onsets, wdttolerance, iplot)
def call_autopickstation(input_tuple):
wfstream, pickparam, verbose = input_tuple
return autopickstation(wfstream, pickparam, verbose)
def autopickstation(wfstream, pickparam, verbose=False):
"""
:param wfstream: `~obspy.core.stream.Stream` containing waveform
@ -789,7 +805,7 @@ def autopickstation(wfstream, pickparam, verbose=False):
spick = dict(channel=ccode, network=ncode, lpp=lpickS, epp=epickS, mpp=mpickS, spe=Serror, snr=SNRS,
snrdb=SNRSdB, weight=Sweight, fm=None, picker=picker, Ao=Ao)
# merge picks into returning dictionary
picks = dict(P=ppick, S=spick)
picks = dict(P=ppick, S=spick, station=zdat[0].stats.station)
return picks

View File

@ -11,7 +11,7 @@ import numpy as np
from obspy import UTCDateTime, read_inventory, read
from obspy.io.xseed import Parser
from pylot.core.util.utils import key_for_set_value, find_in_list, \
remove_underscores
remove_underscores, gen_Pool
def time_from_header(header):
@ -197,6 +197,79 @@ def read_metadata(path_to_inventory):
return invtype, robj
def restitute_trace(input_tuple):
tr, invtype, inobj, unit, force = input_tuple
remove_trace = False
seed_id = tr.get_id()
# check, whether this trace has already been corrected
if 'processing' in tr.stats.keys() \
and np.any(['remove' in p for p in tr.stats.processing]) \
and not force:
print("Trace {0} has already been corrected!".format(seed_id))
return tr, False
stime = tr.stats.starttime
prefilt = get_prefilt(tr)
if invtype == 'resp':
fresp = find_in_list(inobj, seed_id)
if not fresp:
raise IOError('no response file found '
'for trace {0}'.format(seed_id))
fname = fresp
seedresp = dict(filename=fname,
date=stime,
units=unit)
kwargs = dict(paz_remove=None, pre_filt=prefilt, seedresp=seedresp)
elif invtype == 'dless':
if type(inobj) is list:
fname = Parser(find_in_list(inobj, seed_id))
else:
fname = inobj
seedresp = dict(filename=fname,
date=stime,
units=unit)
kwargs = dict(pre_filt=prefilt, seedresp=seedresp)
elif invtype == 'xml':
invlist = inobj
if len(invlist) > 1:
finv = find_in_list(invlist, seed_id)
else:
finv = invlist[0]
inventory = read_inventory(finv, format='STATIONXML')
elif invtype == None:
print("No restitution possible, as there are no station-meta data available!")
return tr, True
else:
remove_trace = True
# apply restitution to data
print("Correcting instrument at station %s, channel %s" \
% (tr.stats.station, tr.stats.channel))
try:
if invtype in ['resp', 'dless']:
try:
tr.simulate(**kwargs)
except ValueError as e:
vmsg = '{0}'.format(e)
print(vmsg)
else:
tr.attach_response(inventory)
tr.remove_response(output=unit,
pre_filt=prefilt)
except ValueError as e:
msg0 = 'Response for {0} not found in Parser'.format(seed_id)
msg1 = 'evalresp failed to calculate response'
if msg0 not in e.message or msg1 not in e.message:
raise
else:
# restitution done to copies of data thus deleting traces
# that failed should not be a problem
remove_trace = True
return tr, remove_trace
def restitute_data(data, invtype, inobj, unit='VEL', force=False):
"""
takes a data stream and a path_to_inventory and returns the corrected
@ -216,82 +289,28 @@ def restitute_data(data, invtype, inobj, unit='VEL', force=False):
data = remove_underscores(data)
# loop over traces
input_tuples = []
for tr in data:
seed_id = tr.get_id()
# check, whether this trace has already been corrected
if 'processing' in tr.stats.keys() \
and np.any(['remove' in p for p in tr.stats.processing]) \
and not force:
print("Trace {0} has already been corrected!".format(seed_id))
continue
stime = tr.stats.starttime
prefilt = get_prefilt(tr)
if invtype == 'resp':
fresp = find_in_list(inobj, seed_id)
if not fresp:
raise IOError('no response file found '
'for trace {0}'.format(seed_id))
fname = fresp
seedresp = dict(filename=fname,
date=stime,
units=unit)
kwargs = dict(paz_remove=None, pre_filt=prefilt, seedresp=seedresp)
elif invtype == 'dless':
if type(inobj) is list:
fname = Parser(find_in_list(inobj, seed_id))
else:
fname = inobj
seedresp = dict(filename=fname,
date=stime,
units=unit)
kwargs = dict(pre_filt=prefilt, seedresp=seedresp)
elif invtype == 'xml':
invlist = inobj
if len(invlist) > 1:
finv = find_in_list(invlist, seed_id)
else:
finv = invlist[0]
inventory = read_inventory(finv, format='STATIONXML')
elif invtype == None:
print("No restitution possible, as there are no station-meta data available!")
break
else:
data.remove(tr)
continue
# apply restitution to data
print("Correcting instrument at station %s, channel %s" \
% (tr.stats.station, tr.stats.channel))
try:
if invtype in ['resp', 'dless']:
try:
tr.simulate(**kwargs)
except ValueError as e:
vmsg = '{0}'.format(e)
print(vmsg)
else:
tr.attach_response(inventory)
tr.remove_response(output=unit,
pre_filt=prefilt)
except ValueError as e:
msg0 = 'Response for {0} not found in Parser'.format(seed_id)
msg1 = 'evalresp failed to calculate response'
if msg0 not in e.message or msg1 not in e.message:
raise
else:
# restitution done to copies of data thus deleting traces
# that failed should not be a problem
data.remove(tr)
continue
restflag.append(True)
input_tuples.append((tr, invtype, inobj, unit, force))
data.remove(tr)
pool = gen_Pool()
result = pool.map(restitute_trace, input_tuples)
pool.close()
for tr, remove_trace in result:
if not remove_trace:
data.traces.append(tr)
# check if ALL traces could be restituted, take care of large datasets
# better try restitution for smaller subsets of data (e.g. station by
# station)
if len(restflag) > 0:
restflag = bool(np.all(restflag))
else:
restflag = False
return data, restflag
# if len(restflag) > 0:
# restflag = bool(np.all(restflag))
# else:
# restflag = False
return data
def get_prefilt(trace, tlow=(0.5, 0.9), thi=(5., 2.), verbosity=0):

View File

@ -29,19 +29,16 @@ def getindexbounds(f, eta):
u = find_nearest(f[mi:], b) + mi
return mi, l, u
def worker(func, input, cores='max', async=False):
def gen_Pool(ncores='max'):
import multiprocessing
if cores == 'max':
cores = multiprocessing.cpu_count()
if ncores=='max':
ncores=multiprocessing.cpu_count()
pool = multiprocessing.Pool(ncores)
return pool
pool = multiprocessing.Pool(cores)
if async == True:
result = pool.map_async(func, input)
else:
result = pool.map(func, input)
pool.close()
return result
def clims(lim1, lim2):
"""