From 019b801603e793c1cb1903243827f630ce539cfb Mon Sep 17 00:00:00 2001 From: Marcel Paffrath Date: Thu, 12 May 2016 14:01:18 +0200 Subject: [PATCH] code rearrange and minor processing changes --- pylot/core/active/fmtomoUtils.py | 490 ++++++++++++++++--------------- 1 file changed, 255 insertions(+), 235 deletions(-) diff --git a/pylot/core/active/fmtomoUtils.py b/pylot/core/active/fmtomoUtils.py index 7dfe928b..f70b0b43 100644 --- a/pylot/core/active/fmtomoUtils.py +++ b/pylot/core/active/fmtomoUtils.py @@ -6,15 +6,28 @@ import datetime import numpy as np class Tomo3d(object): - def __init__(self, nproc, iterations): + def __init__(self, nproc, iterations, citer = 0, overwrite = False): + ''' + Class build from FMTOMO script tomo3d. Can be used to run several instances of FMM code in parallel. + + :param: nproc, number of parallel processes + :type: integer + + :param: iterations, number of iterations + :type: integer + + :param: citer, current iteration (default = 0: start new model) + :type: integer + ''' self.setCWD() self.defParas() self.nproc = nproc self.iter = iterations # number of iterations - self.citer = 0 # current iteration + self.citer = citer # current iteration self.sources = self.readSrcFile() self.traces = self.readTraces() self.directories = [] + self.overwrite = overwrite def defParas(self): self.defFMMParas() @@ -29,6 +42,7 @@ class Tomo3d(object): self.pg = 'propgrid.in' self.rec = 'receivers.in' self.frech = 'frechet.in' + self.frechout = 'frechet.dat' self.ot = 'otimes.dat' self.ttim = 'arrivals.dat' self.mode = 'mode_set.in' @@ -53,8 +67,8 @@ class Tomo3d(object): self.resout = 'residuals.dat' def copyRef(self): - # Attention: Copies reference grids to used grids (e.g. sourcesref.in to sources.in) - os.system('cp %s %s'%(self. ivg, self.cvg)) + # Copies reference grids to used grids (e.g. sourcesref.in to sources.in) + os.system('cp %s %s'%(self.ivg, self.cvg)) os.system('cp %s %s'%(self.iig, self.cig)) os.system('cp %s %s'%(self.isl, self.csl)) @@ -65,6 +79,30 @@ class Tomo3d(object): def runFrech(self): os.system(self.frechgen) + def runTOMO3D(self): + starttime = datetime.datetime.now() + print('Starting TOMO3D on %s parallel processes for %s iteration(s).' + %(self.nproc, self.iter)) + if self.citer == 0: + self.makeInvIterDir() + self.startForward(self.cInvIterDir) + self.raiseIter() + + while self.citer <= self.iter: + self.makeInvIterDir() + self.startInversion() + self.saveVgrid() + self.startForward(self.cInvIterDir) + self.raiseIter() + + if self.citer > self.iter: + self.removeDirectories() + self.unlink(os.path.join(self.cwd, self.frechout)) + self.unlink(os.path.join(self.cwd, self.ttim)) + + tdelta = datetime.datetime.now() - starttime + print('runTOMO3D: Finished %s iterations after %s.'%(self.iter, tdelta)) + def runFmm(self, directory, logfile, processes): os.chdir(directory) fout = open(logfile, 'w') @@ -73,6 +111,47 @@ class Tomo3d(object): os.chdir(self.cwd) return processes + def startForward(self, logdir): + self._printLine() + print('Starting forward simulation for iteration %s.'%(self.citer)) + + if self.citer == 0: + self.copyRef() + self.runFrech() + self.makeDirectories() + + starttime = datetime.datetime.now() + processes = [] + + for procID in range(1, self.nproc + 1): + directory = self.getProcDir(procID) + logfn = 'fm3dlog_' + str(procID) + '.out' + log_out = os.path.join(logdir, logfn) + + self.writeSrcFile(procID, directory) + self.writeTracesFile(procID, directory) + os.system('cp {cvg} {cig} {mode} {pg} {frechin} {dest}' + .format(cvg=self.cvg, cig=self.cig, frechin=self.frech, + mode=self.mode, pg=self.pg, dest=directory)) + processes = self.runFmm(directory, log_out, processes) + + for p in processes: + p.wait() + + self.mergeOutput(self.cInvIterDir) + self.clearDirectories() + self.copyArrivals() + if self.citer == 0: + self.copyArrivals(self.rtrav) + + self.calcRes() + tdelta = datetime.datetime.now() - starttime + print('Finished Forward calculation after %s'%tdelta) + + def startInversion(self): + print('Calling %s...'%self.inv) + os.system(self.inv) + def calcRes(self): resout = os.path.join(self.cwd, self.resout) if self.citer == 0: @@ -85,111 +164,91 @@ class Tomo3d(object): RMS, var, chi2 = residuals[-1].split() print('Residuals: RMS = %s, var = %s, Chi^2 = %s.'%(RMS, var, chi2)) - def runTOMO3D(self): - starttime = datetime.datetime.now() - print('Starting TOMO3D on %s parallel processes for %s iteration(s).' - %(self.nproc, self.iter)) - if self.citer == 0: - self.startForward() - self.raiseIter() - - while self.citer <= self.iter: - self.startInversion() - self.startForward() - self.raiseIter() - - tdelta = datetime.datetime.now() - starttime - print('runTOMO3D: Finished %s iterations after %s.'%(self.iter, tdelta)) - - def _printLine(self): - print('----------------------------------------') - def raiseIter(self): self.citer +=1 self._printLine() + invfile = open(self.cwd + '/inviter.in', 'w') + invfile.write('%s'%self.citer) + invfile.close() - def startInversion(self): - print('Calling %s...'%self.inv) - os.system(self.inv) - - def startForward(self): - self._printLine() - print('Starting forward simulation for iteration %s.'%(self.citer)) - self.makeDirectories() - starttime = datetime.datetime.now() - processes = [] - - if self.citer == 0: - self.copyRef() - self.runFrech() - - for procID in range(1, self.nproc + 1): - directory = self.getProcDir(procID) - log_out = self.cwd + '/fm3dlog_' + str(procID) + '.out' - - self.writeSrcFile(procID, directory) - self.writeTracesFile(procID, directory) - os.system('cp {cvg} {cig} {mode} {pg} {frechout} {dest}' - .format(cvg=self.cvg, cig=self.cig, frechout=self.frech, - mode=self.mode, pg=self.pg, dest=directory)) - processes = self.runFmm(directory, log_out, processes) - - for p in processes: - p.wait() - - self.mergeOutput() - self.clearDirectories() - self.copyArrivals() - if self.citer == 0: - self.copyArrivals(self.rtrav) - - self.calcRes() - tdelta = datetime.datetime.now() - starttime - print('Finished Forward calculation after %s'%tdelta) - - def copyArrivals(self, target = None): - if target == None: - target = self.mtrav - os.system('cp %s %s'%(self.ttim, target)) - - def makeDIR(self, directory): + def makeDir(self, directory): err = os.system('mkdir %s'%directory) + if err is 0: + self.directories.append(directory) + return if err is 256: - response = raw_input('Warning: Directory already existing. Continue (y/n)?\n') - if response == 'y': + if self.overwrite == True: print('Overwriting existing files.') - else: - sys.exit('Aborted') + self.clearDir(directory) + self.directories.append(directory) + return + raise RuntimeError('Could not create directory: %s'%directory) - self.directories.append(directory) - - def clearDIR(self, directory): - err = os.system('rm -r %s'%directory) - def makeDirectories(self): for procID in range(1, self.nproc + 1): directory = self.getProcDir(procID) - self.makeDIR(directory) + self.makeDir(directory) + def makeInvIterDir(self): + invIterDir = self.cwd + '/it_%s'%(self.citer) + err = os.system('mkdir %s'%invIterDir) + if err is 256: + if self.overwrite: + self.clearDir(invIterDir) + elif err is not 0: + raise RuntimeError('Could not create directory: %s'%invIterDir) + self.cInvIterDir = invIterDir + + def clearDir(self, directory): + print('Wiping directory %s...'%directory) + for filename in os.listdir(directory): + filenp = os.path.join(directory, filename) + os.remove(filenp) + def clearDirectories(self): for directory in self.directories: - self.clearDIR(directory) + self.clearDir(directory) + + def rmDir(self, directory): + print('Removing directory %s...'%directory) + return os.rmdir(directory) + + def removeDirectories(self): + for directory in self.directories: + self.rmDir(directory) self.directories = [] - def readNsrc(self): - srcfile = open(self.csl, 'r') - nsrc = int(srcfile.readline()) - srcfile.close() - return nsrc + def getProcDir(self, procID): + return os.path.join(self.cwd, self.folder) + str(procID) - def readNtraces(self): - recfile = open(self.rec, 'r') - nrec = int(recfile.readline()) - recfile.close() - return nrec + def getTraceIDs4Sources(self, sourceIDs): + traceIDs = [] + for traceID in self.traces.keys(): + if self.traces[traceID]['source'] in sourceIDs: + traceIDs.append(traceID) + return traceIDs + + def getTraceIDs4Source(self, sourceID): + traceIDs = [] + for traceID in self.traces.keys(): + if self.traces[traceID]['source'] == sourceID: + traceIDs.append(traceID) + return traceIDs + + def copyArrivals(self, target = None): + if target == None: + target = os.path.join(self.cwd, self.mtrav) + os.system('cp %s %s'%(os.path.join( + self.cInvIterDir, self.ttim), target)) + + def saveVgrid(self): + vgpath = os.path.join(self.cwd, 'vgrids.in') + os.system('cp %s %s'%(vgpath, self.cInvIterDir)) def calcSrcPerKernel(self): nsrc = self.readNsrc() + if self.nproc > nsrc: + raise ValueError('Number of spawned processes higher than number of sources') return nsrc/self.nproc, nsrc%self.nproc def srcIDs4Kernel(self, procID): @@ -207,6 +266,18 @@ class Tomo3d(object): elif proc > remain: start = (srcPK + 1) * remain + srcPK * (proc - remain) + 1 return range(start, start + srcPK) + + def readNsrc(self): + srcfile = open(self.csl, 'r') + nsrc = int(srcfile.readline()) + srcfile.close() + return nsrc + + def readNtraces(self): + recfile = open(self.rec, 'r') + nrec = int(recfile.readline()) + recfile.close() + return nrec def readSrcFile(self): nsrc = self.readNsrc() @@ -258,6 +329,62 @@ class Tomo3d(object): return traces + def readArrivals(self, procID): + directory = self.getProcDir(procID) + arrfile = open(directory + '/arrivals.dat', 'r') + sourceIDs = self.srcIDs4Kernel(procID) + + arrivals = [] + for sourceID in sourceIDs: + traceIDs = self.getTraceIDs4Source(sourceID) + for traceID in traceIDs: + line = arrfile.readline().split() + if line != []: + # recID and srcID for the individual processor will not be needed + recID_proc, srcID_proc, ray, normal, arrtime, diff, head = line + arrivals.append([traceID, sourceID, ray, normal, arrtime, diff, head]) + + return arrivals + + def readRays(self, procID): + directory = self.getProcDir(procID) + raysfile = open(directory + '/rays.dat', 'r') + sourceIDs = self.srcIDs4Kernel(procID) + + rays = {} + for sourceID in sourceIDs: + traceIDs = self.getTraceIDs4Source(sourceID) + for traceID in traceIDs: + line1 = raysfile.readline().split() + if line1 != []: + # recID and srcID for the individual processor will not be needed + recID_proc, srcID_proc, ray, normal, nsec = line1 + raysecs = {} + + for sec in range(int(nsec)): + line2 = raysfile.readline().split() + npoints, region, diff, head = line2 + raypoints = [] + + for j in range(int(npoints)): + raypoints.append(raysfile.readline() + '\n') + + raysecs[sec] = {'npoints': npoints, + 'region': region, + 'diff': diff, + 'head': head, + 'raypoints': raypoints + } + + + rays[traceID] = {'sourceID': sourceID, + 'raypath': ray, + 'normal': normal, + 'nsec': nsec, + 'raysections': raysecs + } + return rays + def writeSrcFile(self, procID, directory): srcfile = open('%s/sources.in'%directory, 'w') sourceIDs = self.srcIDs4Kernel(procID) @@ -289,154 +416,44 @@ class Tomo3d(object): recfile.write('%s\n'%source) recfile.write('%s\n'%trace['path']) - def getTraceIDs4Sources(self, sourceIDs): - traceIDs = [] - for traceID in self.traces.keys(): - if self.traces[traceID]['source'] in sourceIDs: - traceIDs.append(traceID) - return traceIDs - - def getTraceIDs4Source(self, sourceID): - traceIDs = [] - for traceID in self.traces.keys(): - if self.traces[traceID]['source'] == sourceID: - traceIDs.append(traceID) - return traceIDs - - def readArrivals(self, procID): - directory = self.getProcDir(procID) - arrfile = open(directory + '/arrivals.dat', 'r') - sourceIDs = self.srcIDs4Kernel(procID) - - arrivals = [] - for sourceID in sourceIDs: - traceIDs = self.getTraceIDs4Source(sourceID) - for traceID in traceIDs: - line = arrfile.readline().split() - if line != []: - # recID and srcID for the individual processor will not be needed - recID_proc, srcID_proc, ray, normal, arrtime, diff, head = line - arrivals.append([traceID, sourceID, ray, normal, arrtime, diff, head]) - - return arrivals - - def mergeArrivals(self): - arrivalsOut = open(os.path.join(self.cwd, self.ttim), 'w') + def mergeArrivals(self, directory): + arrfn = os.path.join(directory, self.ttim) + arrivalsOut = open(arrfn, 'w') print('Merging arrivals.dat...') for procID in range(1, self.nproc + 1): arrivals = self.readArrivals(procID) for line in arrivals: arrivalsOut.write('%6s %6s %6s %6s %15s %5s %5s\n'%tuple(line)) - # def mergeFrechet(self): - # print('Merging frechet.dat...') - # frechetOut = open(self.cwd + '/frechet.dat', 'w') - # for procID in range(1, self.nproc + 1): - # frechet = self.readFrechet(procID) - # traceIDs = frechet.keys() - # traceIDs.sort() - # for traceID in traceIDs: - # frech = frechet[traceID] - # frechetOut.write('%6s %6s %6s %6s %6s\n'% - # (traceID, - # frech['sourceID'], - # frech['raypath'], - # frech['normal'], - # frech['NPDEV'])) - # for pdev in frech['PDEV']: - # frechetOut.writelines(pdev) + os.system('ln -fs %s %s'%(arrfn, os.path.join(self.cwd, self.ttim))) - def readRays(self, procID): - directory = self.getProcDir(procID) - raysfile = open(directory + '/rays.dat', 'r') - sourceIDs = self.srcIDs4Kernel(procID) - - rays = {} - for sourceID in sourceIDs: - traceIDs = self.getTraceIDs4Source(sourceID) - for traceID in traceIDs: - line1 = raysfile.readline().split() - if line1 != []: - # recID and srcID for the individual processor will not be needed - recID_proc, srcID_proc, ray, normal, nsec = line1 - raysecs = {} - - for sec in range(int(nsec)): - line2 = raysfile.readline.split() - npoints, region, diff, head = line2 - raypoints = [] - - for j in range(int(npoints)): - raypoints.append(raysfile.readline() + '\n') - - raysecs[sec] = {'npoints': npoints, - 'region': region, - 'diff': diff, - 'head': head, - 'raypoints': raypoints - } - - - rays[traceID] = {'sourceID': sourceID, - 'raypath': ray, - 'normal': normal, - 'nsec': nsec, - 'raysections': raysecs - } - return rays - - def mergeRays(self): + def mergeRays(self, directory): print('Merging rays.dat...') - with open(self.cwd + 'rays.dat', 'w') as outfile: - for procID in range(1, self.nproc + 1): - rays = self.readRays(procID) - for traceID in rays: - ray = rays[traceID] - outfile.write('%6s %6s %6s %6s %6s'%(traceID, - ray['sourceID'], - ray['raypath'], - ray['normal'], - ray['nsec'])) - for sec in range(int(ray['nsec'])): - raysec = ray['raysections'][sec] - outfile.write('%6s %6s %6s %6s'%(raysec['npoints'], - raysec['region'], - raysec['diff'], - raysec['head'])) - outfile.writelines(raysec['raypoints']) - - - # def readFrechet(self, procID): - # directory = self.getProcDir(procID) - # frechfile = open(directory + '/frechet.dat', 'r') - # sourceIDs = self.srcIDs4Kernel(procID) - - # frechet = {} - # for sourceID in sourceIDs: - # traceIDs = self.getTraceIDs4Source(sourceID) - # for traceID in traceIDs: - # line = frechfile.readline().split() - # if line != []: - # # recID and srcID for the individual processor will not be needed - # PDEV = [] - # recID_proc, srcID_proc, ray, normal, NPDEV = line - # for i in range(int(NPDEV)): - # PDEV.append(frechfile.readline()) - - # frechet[traceID] = {'sourceID': sourceID, - # 'raypath': ray, - # 'normal': normal, - # 'NPDEV': NPDEV, - # 'PDEV': PDEV - # } - # return frechet - - def mergeFrechet(self): - print('Merging frechet.dat...') - - with open(self.cwd + '/frechet.dat', 'w') as outfile: + with open(directory + '/rays.dat', 'w') as outfile: for procID in range(1, self.nproc + 1): - filename = self.getProcDir(procID) + '/frechet.dat' + rays = self.readRays(procID) + for traceID in rays: + ray = rays[traceID] + outfile.write('%6s %6s %6s %6s %6s\n'%(traceID, + ray['sourceID'], + ray['raypath'], + ray['normal'], + ray['nsec'])) + for sec in range(int(ray['nsec'])): + raysec = ray['raysections'][sec] + outfile.write('%6s %6s %6s %6s\n'%(raysec['npoints'], + raysec['region'], + raysec['diff'], + raysec['head'])) + outfile.writelines(raysec['raypoints']) + + def mergeFrechet(self, directory): + print('Merging frechet.dat...') + + frechfnout = os.path.join(directory, self.frechout) + with open(frechfnout, 'w') as outfile: + for procID in range(1, self.nproc + 1): + filename = os.path.join(self.getProcDir(procID), self.frechout) with open(filename) as infile: for sourceID in self.srcIDs4Kernel(procID): for traceID in self.getTraceIDs4Source(sourceID): @@ -445,15 +462,18 @@ class Tomo3d(object): for index in range(int(NPDEV)): outfile.write(infile.readline()) + os.system('ln -fs %s %s'%(frechfnout, os.path.join(self.cwd, self.frechout))) - def getProcDir(self, procID): - return os.path.join(self.cwd, self.folder) + str(procID) - - def mergeOutput(self): - self.mergeArrivals() - self.mergeFrechet() - self.mergeRays() + def mergeOutput(self, directory): + self.mergeArrivals(directory) + self.mergeFrechet(directory) + self.mergeRays(directory) + def unlink(self, filepath): + return os.system('unlink %s'%filepath) + + def _printLine(self): + print('----------------------------------------') def vgrids2VTK(inputfile='vgrids.in', outputfile='vgrids.vtk', absOrRel='abs', inputfileref='vgridsref.in'): '''