changes while testing parallelization of autopicker

This commit is contained in:
Marcel Paffrath 2016-05-24 14:20:59 +02:00
parent a6eaac6c33
commit 8ca87bc777
4 changed files with 59 additions and 26 deletions

View File

@ -3,7 +3,11 @@ import sys
import numpy as np import numpy as np
from pylot.core.active import seismicshot from pylot.core.active import seismicshot
from pylot.core.active.surveyUtils import cleanUp from pylot.core.active.surveyUtils import cleanUp
import copy_reg
import types
from pylot.core.util.utils import worker, _pickle_method
copy_reg.pickle(types.MethodType, _pickle_method)
class Survey(object): class Survey(object):
def __init__(self, path, sourcefile, receiverfile, useDefaultParas=False): def __init__(self, path, sourcefile, receiverfile, useDefaultParas=False):
@ -196,6 +200,12 @@ class Survey(object):
plt.xlabel('Difference in time (auto - manual) [s]') plt.xlabel('Difference in time (auto - manual) [s]')
return diffs return diffs
def pickShot(self, shTr):
shotnumber, traceID = shTr
shot = self.getShotForShotnumber(shotnumber)
traceID, pick = shot.pickTrace(traceID)
return shotnumber, traceID, pick
def pickAllShots(self, vmin=333, vmax=5500, folm=0.6, HosAic='hos', def pickAllShots(self, vmin=333, vmax=5500, folm=0.6, HosAic='hos',
aicwindow=(10, 0)): aicwindow=(10, 0)):
''' '''
@ -218,12 +228,22 @@ class Survey(object):
count = 0 count = 0
tpicksum = starttime - starttime tpicksum = starttime - starttime
shTr = []
for shot in self.data.values(): for shot in self.data.values():
tstartpick = datetime.now() tstartpick = datetime.now()
shot.setVmin(vmin) shot.setVmin(vmin)
shot.setVmax(vmax) shot.setVmax(vmax)
count += 1 count += 1
shot.pickParallel(folm) #shot.pickParallel(folm)
shot.setPickParameters(folm = folm, method = HosAic, aicwindow = aicwindow)
for traceID in shot.getTraceIDlist():
shTr.append((shot.getShotnumber(), traceID))
picks = worker(self.pickShot, shTr, async = True)
for shotnumber, traceID, pick in picks.get():
self.getShotForShotnumber(shotnumber).setPick(traceID, pick)
# tpicksum += (datetime.now() - tstartpick); # tpicksum += (datetime.now() - tstartpick);
# tpick = tpicksum / count # tpick = tpicksum / count

View File

@ -280,7 +280,7 @@ class Tomo3d(object):
''' '''
Wipes a certain directory. Wipes a certain directory.
''' '''
print('Wiping directory %s...'%directory) #print('Wiping directory %s...'%directory)
for filename in os.listdir(directory): for filename in os.listdir(directory):
filenp = os.path.join(directory, filename) filenp = os.path.join(directory, filename)
os.remove(filenp) os.remove(filenp)

View File

@ -12,10 +12,14 @@ from pylot.core.pick.utils import getSNR
from pylot.core.pick.utils import earllatepicker from pylot.core.pick.utils import earllatepicker
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import warnings import warnings
import copy_reg
import types
from pylot.core.util.utils import worker, _pickle_method
copy_reg.pickle(types.MethodType, _pickle_method)
plt.interactive('True') plt.interactive('True')
class SeismicShot(object): class SeismicShot(object):
''' '''
SuperClass for a seismic shot object. SuperClass for a seismic shot object.
@ -325,27 +329,24 @@ class SeismicShot(object):
self.setPick(traceID, None) self.setPick(traceID, None)
warnings.warn('ambigious or empty traceID: %s' % traceID) warnings.warn('ambigious or empty traceID: %s' % traceID)
def pickParallel(self, folm, method = 'hos', aicwindow = (10, 0)): def setPickParameters(self, folm, method = 'hos', aicwindow = (10, 0)):
import multiprocessing
from pylot.core.util.utils import worker
self.setFolm(folm) self.setFolm(folm)
self.setMethod(method) self.setMethod(method)
self.setAicwindow(aicwindow) self.setAicwindow(aicwindow)
maxthreads = multiprocessing.cpu_count() # def pickParallel(self):
pool = multiprocessing.Pool(maxthreads) # traceIDs = self.getTraceIDlist()
# picks = []
# #picks = worker(self.pickTrace, traceIDs)
traceIDs = self.getTraceIDlist() # # for traceID, pick in picks:
# # self.setPick(traceID, pick)
# picks = worker(self.pickTrace, traceIDs, maxthreads) # for traceID in traceIDs:
# trID, pick = self.pickTrace(traceID)
# for traceID, pick in picks: # picks.append([trID, pick])
# self.setPick(traceID, pick) # #self.setPick(traceID, pick)
# return picks
for traceID in traceIDs:
trID, pick = self.pickTrace(traceID)
self.setPick(traceID, pick)
def pickTrace(self, traceID): def pickTrace(self, traceID):
''' '''

View File

@ -10,6 +10,25 @@ import numpy as np
from obspy.core import UTCDateTime from obspy.core import UTCDateTime
import obspy.core.event as ope import obspy.core.event as ope
def _pickle_method(m):
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
def worker(func, input, cores = 'max', async = False):
import multiprocessing
if cores == 'max':
cores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(cores)
if async == True:
result = pool.map_async(func, input)
else:
result = pool.map(func, input)
pool.close()
return result
def createAmplitude(pickID, amp, unit, category, cinfo): def createAmplitude(pickID, amp, unit, category, cinfo):
''' '''
@ -450,13 +469,6 @@ def runProgram(cmd, parameter=None):
output = subprocess.check_output('{} | tee /dev/stderr'.format(cmd), output = subprocess.check_output('{} | tee /dev/stderr'.format(cmd),
shell=True) shell=True)
def worker(func, input, cores):
from multiprocessing import Pool
pool = Pool(cores)
result = pool.map(func, input)
pool.close()
return result
if __name__ == "__main__": if __name__ == "__main__":
import doctest import doctest