code rearrange and minor processing changes

This commit is contained in:
Marcel Paffrath 2016-05-12 14:01:18 +02:00
parent 2bb2ddbc7b
commit 019b801603

View File

@ -6,15 +6,28 @@ import datetime
import numpy as np import numpy as np
class Tomo3d(object): class Tomo3d(object):
def __init__(self, nproc, iterations): def __init__(self, nproc, iterations, citer = 0, overwrite = False):
'''
Class build from FMTOMO script tomo3d. Can be used to run several instances of FMM code in parallel.
:param: nproc, number of parallel processes
:type: integer
:param: iterations, number of iterations
:type: integer
:param: citer, current iteration (default = 0: start new model)
:type: integer
'''
self.setCWD() self.setCWD()
self.defParas() self.defParas()
self.nproc = nproc self.nproc = nproc
self.iter = iterations # number of iterations self.iter = iterations # number of iterations
self.citer = 0 # current iteration self.citer = citer # current iteration
self.sources = self.readSrcFile() self.sources = self.readSrcFile()
self.traces = self.readTraces() self.traces = self.readTraces()
self.directories = [] self.directories = []
self.overwrite = overwrite
def defParas(self): def defParas(self):
self.defFMMParas() self.defFMMParas()
@ -29,6 +42,7 @@ class Tomo3d(object):
self.pg = 'propgrid.in' self.pg = 'propgrid.in'
self.rec = 'receivers.in' self.rec = 'receivers.in'
self.frech = 'frechet.in' self.frech = 'frechet.in'
self.frechout = 'frechet.dat'
self.ot = 'otimes.dat' self.ot = 'otimes.dat'
self.ttim = 'arrivals.dat' self.ttim = 'arrivals.dat'
self.mode = 'mode_set.in' self.mode = 'mode_set.in'
@ -53,8 +67,8 @@ class Tomo3d(object):
self.resout = 'residuals.dat' self.resout = 'residuals.dat'
def copyRef(self): def copyRef(self):
# Attention: Copies reference grids to used grids (e.g. sourcesref.in to sources.in) # Copies reference grids to used grids (e.g. sourcesref.in to sources.in)
os.system('cp %s %s'%(self. ivg, self.cvg)) os.system('cp %s %s'%(self.ivg, self.cvg))
os.system('cp %s %s'%(self.iig, self.cig)) os.system('cp %s %s'%(self.iig, self.cig))
os.system('cp %s %s'%(self.isl, self.csl)) os.system('cp %s %s'%(self.isl, self.csl))
@ -65,6 +79,30 @@ class Tomo3d(object):
def runFrech(self): def runFrech(self):
os.system(self.frechgen) os.system(self.frechgen)
def runTOMO3D(self):
starttime = datetime.datetime.now()
print('Starting TOMO3D on %s parallel processes for %s iteration(s).'
%(self.nproc, self.iter))
if self.citer == 0:
self.makeInvIterDir()
self.startForward(self.cInvIterDir)
self.raiseIter()
while self.citer <= self.iter:
self.makeInvIterDir()
self.startInversion()
self.saveVgrid()
self.startForward(self.cInvIterDir)
self.raiseIter()
if self.citer > self.iter:
self.removeDirectories()
self.unlink(os.path.join(self.cwd, self.frechout))
self.unlink(os.path.join(self.cwd, self.ttim))
tdelta = datetime.datetime.now() - starttime
print('runTOMO3D: Finished %s iterations after %s.'%(self.iter, tdelta))
def runFmm(self, directory, logfile, processes): def runFmm(self, directory, logfile, processes):
os.chdir(directory) os.chdir(directory)
fout = open(logfile, 'w') fout = open(logfile, 'w')
@ -73,6 +111,47 @@ class Tomo3d(object):
os.chdir(self.cwd) os.chdir(self.cwd)
return processes return processes
def startForward(self, logdir):
self._printLine()
print('Starting forward simulation for iteration %s.'%(self.citer))
if self.citer == 0:
self.copyRef()
self.runFrech()
self.makeDirectories()
starttime = datetime.datetime.now()
processes = []
for procID in range(1, self.nproc + 1):
directory = self.getProcDir(procID)
logfn = 'fm3dlog_' + str(procID) + '.out'
log_out = os.path.join(logdir, logfn)
self.writeSrcFile(procID, directory)
self.writeTracesFile(procID, directory)
os.system('cp {cvg} {cig} {mode} {pg} {frechin} {dest}'
.format(cvg=self.cvg, cig=self.cig, frechin=self.frech,
mode=self.mode, pg=self.pg, dest=directory))
processes = self.runFmm(directory, log_out, processes)
for p in processes:
p.wait()
self.mergeOutput(self.cInvIterDir)
self.clearDirectories()
self.copyArrivals()
if self.citer == 0:
self.copyArrivals(self.rtrav)
self.calcRes()
tdelta = datetime.datetime.now() - starttime
print('Finished Forward calculation after %s'%tdelta)
def startInversion(self):
print('Calling %s...'%self.inv)
os.system(self.inv)
def calcRes(self): def calcRes(self):
resout = os.path.join(self.cwd, self.resout) resout = os.path.join(self.cwd, self.resout)
if self.citer == 0: if self.citer == 0:
@ -85,111 +164,91 @@ class Tomo3d(object):
RMS, var, chi2 = residuals[-1].split() RMS, var, chi2 = residuals[-1].split()
print('Residuals: RMS = %s, var = %s, Chi^2 = %s.'%(RMS, var, chi2)) print('Residuals: RMS = %s, var = %s, Chi^2 = %s.'%(RMS, var, chi2))
def runTOMO3D(self):
starttime = datetime.datetime.now()
print('Starting TOMO3D on %s parallel processes for %s iteration(s).'
%(self.nproc, self.iter))
if self.citer == 0:
self.startForward()
self.raiseIter()
while self.citer <= self.iter:
self.startInversion()
self.startForward()
self.raiseIter()
tdelta = datetime.datetime.now() - starttime
print('runTOMO3D: Finished %s iterations after %s.'%(self.iter, tdelta))
def _printLine(self):
print('----------------------------------------')
def raiseIter(self): def raiseIter(self):
self.citer +=1 self.citer +=1
self._printLine() self._printLine()
invfile = open(self.cwd + '/inviter.in', 'w')
invfile.write('%s'%self.citer)
invfile.close()
def startInversion(self): def makeDir(self, directory):
print('Calling %s...'%self.inv)
os.system(self.inv)
def startForward(self):
self._printLine()
print('Starting forward simulation for iteration %s.'%(self.citer))
self.makeDirectories()
starttime = datetime.datetime.now()
processes = []
if self.citer == 0:
self.copyRef()
self.runFrech()
for procID in range(1, self.nproc + 1):
directory = self.getProcDir(procID)
log_out = self.cwd + '/fm3dlog_' + str(procID) + '.out'
self.writeSrcFile(procID, directory)
self.writeTracesFile(procID, directory)
os.system('cp {cvg} {cig} {mode} {pg} {frechout} {dest}'
.format(cvg=self.cvg, cig=self.cig, frechout=self.frech,
mode=self.mode, pg=self.pg, dest=directory))
processes = self.runFmm(directory, log_out, processes)
for p in processes:
p.wait()
self.mergeOutput()
self.clearDirectories()
self.copyArrivals()
if self.citer == 0:
self.copyArrivals(self.rtrav)
self.calcRes()
tdelta = datetime.datetime.now() - starttime
print('Finished Forward calculation after %s'%tdelta)
def copyArrivals(self, target = None):
if target == None:
target = self.mtrav
os.system('cp %s %s'%(self.ttim, target))
def makeDIR(self, directory):
err = os.system('mkdir %s'%directory) err = os.system('mkdir %s'%directory)
if err is 0:
self.directories.append(directory)
return
if err is 256: if err is 256:
response = raw_input('Warning: Directory already existing. Continue (y/n)?\n') if self.overwrite == True:
if response == 'y':
print('Overwriting existing files.') print('Overwriting existing files.')
else: self.clearDir(directory)
sys.exit('Aborted') self.directories.append(directory)
return
self.directories.append(directory) raise RuntimeError('Could not create directory: %s'%directory)
def clearDIR(self, directory):
err = os.system('rm -r %s'%directory)
def makeDirectories(self): def makeDirectories(self):
for procID in range(1, self.nproc + 1): for procID in range(1, self.nproc + 1):
directory = self.getProcDir(procID) directory = self.getProcDir(procID)
self.makeDIR(directory) self.makeDir(directory)
def makeInvIterDir(self):
invIterDir = self.cwd + '/it_%s'%(self.citer)
err = os.system('mkdir %s'%invIterDir)
if err is 256:
if self.overwrite:
self.clearDir(invIterDir)
elif err is not 0:
raise RuntimeError('Could not create directory: %s'%invIterDir)
self.cInvIterDir = invIterDir
def clearDir(self, directory):
print('Wiping directory %s...'%directory)
for filename in os.listdir(directory):
filenp = os.path.join(directory, filename)
os.remove(filenp)
def clearDirectories(self): def clearDirectories(self):
for directory in self.directories: for directory in self.directories:
self.clearDIR(directory) self.clearDir(directory)
def rmDir(self, directory):
print('Removing directory %s...'%directory)
return os.rmdir(directory)
def removeDirectories(self):
for directory in self.directories:
self.rmDir(directory)
self.directories = [] self.directories = []
def readNsrc(self): def getProcDir(self, procID):
srcfile = open(self.csl, 'r') return os.path.join(self.cwd, self.folder) + str(procID)
nsrc = int(srcfile.readline())
srcfile.close()
return nsrc
def readNtraces(self): def getTraceIDs4Sources(self, sourceIDs):
recfile = open(self.rec, 'r') traceIDs = []
nrec = int(recfile.readline()) for traceID in self.traces.keys():
recfile.close() if self.traces[traceID]['source'] in sourceIDs:
return nrec traceIDs.append(traceID)
return traceIDs
def getTraceIDs4Source(self, sourceID):
traceIDs = []
for traceID in self.traces.keys():
if self.traces[traceID]['source'] == sourceID:
traceIDs.append(traceID)
return traceIDs
def copyArrivals(self, target = None):
if target == None:
target = os.path.join(self.cwd, self.mtrav)
os.system('cp %s %s'%(os.path.join(
self.cInvIterDir, self.ttim), target))
def saveVgrid(self):
vgpath = os.path.join(self.cwd, 'vgrids.in')
os.system('cp %s %s'%(vgpath, self.cInvIterDir))
def calcSrcPerKernel(self): def calcSrcPerKernel(self):
nsrc = self.readNsrc() nsrc = self.readNsrc()
if self.nproc > nsrc:
raise ValueError('Number of spawned processes higher than number of sources')
return nsrc/self.nproc, nsrc%self.nproc return nsrc/self.nproc, nsrc%self.nproc
def srcIDs4Kernel(self, procID): def srcIDs4Kernel(self, procID):
@ -208,6 +267,18 @@ class Tomo3d(object):
start = (srcPK + 1) * remain + srcPK * (proc - remain) + 1 start = (srcPK + 1) * remain + srcPK * (proc - remain) + 1
return range(start, start + srcPK) return range(start, start + srcPK)
def readNsrc(self):
srcfile = open(self.csl, 'r')
nsrc = int(srcfile.readline())
srcfile.close()
return nsrc
def readNtraces(self):
recfile = open(self.rec, 'r')
nrec = int(recfile.readline())
recfile.close()
return nrec
def readSrcFile(self): def readSrcFile(self):
nsrc = self.readNsrc() nsrc = self.readNsrc()
srcfile = open(self.csl, 'r') srcfile = open(self.csl, 'r')
@ -258,6 +329,62 @@ class Tomo3d(object):
return traces return traces
def readArrivals(self, procID):
directory = self.getProcDir(procID)
arrfile = open(directory + '/arrivals.dat', 'r')
sourceIDs = self.srcIDs4Kernel(procID)
arrivals = []
for sourceID in sourceIDs:
traceIDs = self.getTraceIDs4Source(sourceID)
for traceID in traceIDs:
line = arrfile.readline().split()
if line != []:
# recID and srcID for the individual processor will not be needed
recID_proc, srcID_proc, ray, normal, arrtime, diff, head = line
arrivals.append([traceID, sourceID, ray, normal, arrtime, diff, head])
return arrivals
def readRays(self, procID):
directory = self.getProcDir(procID)
raysfile = open(directory + '/rays.dat', 'r')
sourceIDs = self.srcIDs4Kernel(procID)
rays = {}
for sourceID in sourceIDs:
traceIDs = self.getTraceIDs4Source(sourceID)
for traceID in traceIDs:
line1 = raysfile.readline().split()
if line1 != []:
# recID and srcID for the individual processor will not be needed
recID_proc, srcID_proc, ray, normal, nsec = line1
raysecs = {}
for sec in range(int(nsec)):
line2 = raysfile.readline().split()
npoints, region, diff, head = line2
raypoints = []
for j in range(int(npoints)):
raypoints.append(raysfile.readline() + '\n')
raysecs[sec] = {'npoints': npoints,
'region': region,
'diff': diff,
'head': head,
'raypoints': raypoints
}
rays[traceID] = {'sourceID': sourceID,
'raypath': ray,
'normal': normal,
'nsec': nsec,
'raysections': raysecs
}
return rays
def writeSrcFile(self, procID, directory): def writeSrcFile(self, procID, directory):
srcfile = open('%s/sources.in'%directory, 'w') srcfile = open('%s/sources.in'%directory, 'w')
sourceIDs = self.srcIDs4Kernel(procID) sourceIDs = self.srcIDs4Kernel(procID)
@ -289,154 +416,44 @@ class Tomo3d(object):
recfile.write('%s\n'%source) recfile.write('%s\n'%source)
recfile.write('%s\n'%trace['path']) recfile.write('%s\n'%trace['path'])
def getTraceIDs4Sources(self, sourceIDs): def mergeArrivals(self, directory):
traceIDs = [] arrfn = os.path.join(directory, self.ttim)
for traceID in self.traces.keys(): arrivalsOut = open(arrfn, 'w')
if self.traces[traceID]['source'] in sourceIDs:
traceIDs.append(traceID)
return traceIDs
def getTraceIDs4Source(self, sourceID):
traceIDs = []
for traceID in self.traces.keys():
if self.traces[traceID]['source'] == sourceID:
traceIDs.append(traceID)
return traceIDs
def readArrivals(self, procID):
directory = self.getProcDir(procID)
arrfile = open(directory + '/arrivals.dat', 'r')
sourceIDs = self.srcIDs4Kernel(procID)
arrivals = []
for sourceID in sourceIDs:
traceIDs = self.getTraceIDs4Source(sourceID)
for traceID in traceIDs:
line = arrfile.readline().split()
if line != []:
# recID and srcID for the individual processor will not be needed
recID_proc, srcID_proc, ray, normal, arrtime, diff, head = line
arrivals.append([traceID, sourceID, ray, normal, arrtime, diff, head])
return arrivals
def mergeArrivals(self):
arrivalsOut = open(os.path.join(self.cwd, self.ttim), 'w')
print('Merging arrivals.dat...') print('Merging arrivals.dat...')
for procID in range(1, self.nproc + 1): for procID in range(1, self.nproc + 1):
arrivals = self.readArrivals(procID) arrivals = self.readArrivals(procID)
for line in arrivals: for line in arrivals:
arrivalsOut.write('%6s %6s %6s %6s %15s %5s %5s\n'%tuple(line)) arrivalsOut.write('%6s %6s %6s %6s %15s %5s %5s\n'%tuple(line))
# def mergeFrechet(self): os.system('ln -fs %s %s'%(arrfn, os.path.join(self.cwd, self.ttim)))
# print('Merging frechet.dat...')
# frechetOut = open(self.cwd + '/frechet.dat', 'w')
# for procID in range(1, self.nproc + 1):
# frechet = self.readFrechet(procID)
# traceIDs = frechet.keys()
# traceIDs.sort()
# for traceID in traceIDs:
# frech = frechet[traceID]
# frechetOut.write('%6s %6s %6s %6s %6s\n'%
# (traceID,
# frech['sourceID'],
# frech['raypath'],
# frech['normal'],
# frech['NPDEV']))
# for pdev in frech['PDEV']:
# frechetOut.writelines(pdev)
def readRays(self, procID): def mergeRays(self, directory):
directory = self.getProcDir(procID)
raysfile = open(directory + '/rays.dat', 'r')
sourceIDs = self.srcIDs4Kernel(procID)
rays = {}
for sourceID in sourceIDs:
traceIDs = self.getTraceIDs4Source(sourceID)
for traceID in traceIDs:
line1 = raysfile.readline().split()
if line1 != []:
# recID and srcID for the individual processor will not be needed
recID_proc, srcID_proc, ray, normal, nsec = line1
raysecs = {}
for sec in range(int(nsec)):
line2 = raysfile.readline.split()
npoints, region, diff, head = line2
raypoints = []
for j in range(int(npoints)):
raypoints.append(raysfile.readline() + '\n')
raysecs[sec] = {'npoints': npoints,
'region': region,
'diff': diff,
'head': head,
'raypoints': raypoints
}
rays[traceID] = {'sourceID': sourceID,
'raypath': ray,
'normal': normal,
'nsec': nsec,
'raysections': raysecs
}
return rays
def mergeRays(self):
print('Merging rays.dat...') print('Merging rays.dat...')
with open(self.cwd + 'rays.dat', 'w') as outfile: with open(directory + '/rays.dat', 'w') as outfile:
for procID in range(1, self.nproc + 1): for procID in range(1, self.nproc + 1):
rays = self.readRays(procID) rays = self.readRays(procID)
for traceID in rays: for traceID in rays:
ray = rays[traceID] ray = rays[traceID]
outfile.write('%6s %6s %6s %6s %6s'%(traceID, outfile.write('%6s %6s %6s %6s %6s\n'%(traceID,
ray['sourceID'], ray['sourceID'],
ray['raypath'], ray['raypath'],
ray['normal'], ray['normal'],
ray['nsec'])) ray['nsec']))
for sec in range(int(ray['nsec'])): for sec in range(int(ray['nsec'])):
raysec = ray['raysections'][sec] raysec = ray['raysections'][sec]
outfile.write('%6s %6s %6s %6s'%(raysec['npoints'], outfile.write('%6s %6s %6s %6s\n'%(raysec['npoints'],
raysec['region'], raysec['region'],
raysec['diff'], raysec['diff'],
raysec['head'])) raysec['head']))
outfile.writelines(raysec['raypoints']) outfile.writelines(raysec['raypoints'])
def mergeFrechet(self, directory):
# def readFrechet(self, procID):
# directory = self.getProcDir(procID)
# frechfile = open(directory + '/frechet.dat', 'r')
# sourceIDs = self.srcIDs4Kernel(procID)
# frechet = {}
# for sourceID in sourceIDs:
# traceIDs = self.getTraceIDs4Source(sourceID)
# for traceID in traceIDs:
# line = frechfile.readline().split()
# if line != []:
# # recID and srcID for the individual processor will not be needed
# PDEV = []
# recID_proc, srcID_proc, ray, normal, NPDEV = line
# for i in range(int(NPDEV)):
# PDEV.append(frechfile.readline())
# frechet[traceID] = {'sourceID': sourceID,
# 'raypath': ray,
# 'normal': normal,
# 'NPDEV': NPDEV,
# 'PDEV': PDEV
# }
# return frechet
def mergeFrechet(self):
print('Merging frechet.dat...') print('Merging frechet.dat...')
with open(self.cwd + '/frechet.dat', 'w') as outfile: frechfnout = os.path.join(directory, self.frechout)
with open(frechfnout, 'w') as outfile:
for procID in range(1, self.nproc + 1): for procID in range(1, self.nproc + 1):
filename = self.getProcDir(procID) + '/frechet.dat' filename = os.path.join(self.getProcDir(procID), self.frechout)
with open(filename) as infile: with open(filename) as infile:
for sourceID in self.srcIDs4Kernel(procID): for sourceID in self.srcIDs4Kernel(procID):
for traceID in self.getTraceIDs4Source(sourceID): for traceID in self.getTraceIDs4Source(sourceID):
@ -445,15 +462,18 @@ class Tomo3d(object):
for index in range(int(NPDEV)): for index in range(int(NPDEV)):
outfile.write(infile.readline()) outfile.write(infile.readline())
os.system('ln -fs %s %s'%(frechfnout, os.path.join(self.cwd, self.frechout)))
def getProcDir(self, procID): def mergeOutput(self, directory):
return os.path.join(self.cwd, self.folder) + str(procID) self.mergeArrivals(directory)
self.mergeFrechet(directory)
self.mergeRays(directory)
def mergeOutput(self): def unlink(self, filepath):
self.mergeArrivals() return os.system('unlink %s'%filepath)
self.mergeFrechet()
self.mergeRays()
def _printLine(self):
print('----------------------------------------')
def vgrids2VTK(inputfile='vgrids.in', outputfile='vgrids.vtk', absOrRel='abs', inputfileref='vgridsref.in'): def vgrids2VTK(inputfile='vgrids.in', outputfile='vgrids.vtk', absOrRel='abs', inputfileref='vgridsref.in'):
''' '''