alphabetical sorting of functions and editing docstring

This commit is contained in:
sebastianp 2015-12-03 17:20:05 +01:00
parent 79829706be
commit 74794061f6

View File

@ -10,165 +10,60 @@ import numpy as np
from obspy.core import UTCDateTime from obspy.core import UTCDateTime
import obspy.core.event as ope import obspy.core.event as ope
def runProgram(cmd, parameter=None): def createAmplitude(pickID, amp, unit, category, cinfo):
"""
run an external program specified by cmd with parameters input returning the
stdout output
:param cmd: name of the command to run
:type cmd: str
:param parameter: filename of parameter file or parameter string
:type parameter: str
:return: stdout output
:rtype: str
"""
if parameter:
cmd.strip()
cmd += ' %s 2>&1' % parameter
output = subprocess.check_output('{} | tee /dev/stderr'.format(cmd),
shell = True)
def isSorted(iterable):
return sorted(iterable) == iterable
def fnConstructor(s):
if type(s) is str:
s = s.split(':')[-1]
else:
s = getHash(UTCDateTime())
badchars = re.compile(r'[^A-Za-z0-9_. ]+|^\.|\.$|^ | $|^$')
badsuffix = re.compile(r'(aux|com[1-9]|con|lpt[1-9]|prn)(\.|$)')
fn = badchars.sub('_', s)
if badsuffix.match(fn):
fn = '_' + fn
return fn
def getLogin():
return pwd.getpwuid(os.getuid())[0]
def getHash(time):
''' '''
:param time: time object for which a hash should be calculated
:type time: :class: `~obspy.core.utcdatetime.UTCDateTime` object :param pickID:
:return: str :param amp:
:param unit:
:param category:
:param cinfo:
:return:
''' '''
hg = hashlib.sha1() amplitude = ope.Amplitude()
hg.update(time.strftime('%Y-%m-%d %H:%M:%S.%f')) amplitude.creation_info = cinfo
return hg.hexdigest() amplitude.generic_amplitude = amp
amplitude.unit = ope.AmplitudeUnit(unit)
amplitude.type = ope.AmplitudeCategory(category)
amplitude.pick_id = pickID
return amplitude
def createArrival(pickresID, cinfo, phase, azimuth=None, dist=None):
'''
createArrival - function to create an Obspy Arrival
def getOwner(fn): :param pickresID: Resource identifier of the created pick
return pwd.getpwuid(os.stat(fn).st_uid).pw_name :type pickresID: :class: `~obspy.core.event.ResourceIdentifier` object
:param cinfo: An ObsPy :class: `~obspy.core.event.CreationInfo` object
def getPatternLine(fn, pattern): holding information on the creation of the returned object
""" :type cinfo: :class: `~obspy.core.event.CreationInfo` object
Takes a file name and a pattern string to search for in the file and :param phase: name of the arrivals seismic phase
returns the first line which contains the pattern string otherwise None. :type phase: str
:param azimuth: azimuth between source and receiver
:param fn: file name :type azimuth: float or int, optional
:type fn: str :param dist: distance between source and receiver
:param pattern: pattern string to search for :type dist: float or int, optional
:type pattern: str :return: An ObsPy :class: `~obspy.core.event.Arrival` object
:return: the complete line containing pattern or None '''
arrival = ope.Arrival()
>>> getPatternLine('utils.py', 'python') arrival.creation_info = cinfo
'#!/usr/bin/env python\\n' arrival.pick_id = pickresID
>>> print(getPatternLine('version.py', 'palindrome')) arrival.phase = phase
None if azimuth is not None:
""" arrival.azimuth = float(azimuth) if azimuth > -180 else azimuth + 360.
fobj = open(fn, 'r')
for line in fobj.readlines():
if pattern in line:
fobj.close()
return line
return None
def prepTimeAxis(stime, trace):
nsamp = trace.stats.npts
srate = trace.stats.sampling_rate
tincr = trace.stats.delta
etime = stime + nsamp / srate
time_ax = np.arange(stime, etime, tincr)
if len(time_ax) < nsamp:
print 'elongate time axes by one datum'
time_ax = np.arange(stime, etime + tincr, tincr)
elif len(time_ax) > nsamp:
print 'shorten time axes by one datum'
time_ax = np.arange(stime, etime - tincr, tincr)
if len(time_ax) != nsamp:
raise ValueError('{0} samples of data \n '
'{1} length of time vector \n'
'delta: {2}'.format(nsamp, len(time_ax), tincr))
return time_ax
def scaleWFData(data, factor=None, components='all'):
"""
produce scaled waveforms from given waveform data and a scaling factor,
waveform may be selected by their components name
:param data: waveform data to be scaled
:type data: `~obspy.core.stream.Stream` object
:param factor: scaling factor
:type factor: float
:param components: components labels for the traces in data to be scaled by
the scaling factor (optional, default: 'all')
:type components: tuple
:return: scaled waveform data
:rtype: `~obspy.core.stream.Stream` object
"""
if components is not 'all':
for comp in components:
if factor is None:
max_val = np.max(np.abs(data.select(component=comp)[0].data))
data.select(component=comp)[0].data /= 2 * max_val
else: else:
data.select(component=comp)[0].data /= 2 * factor arrival.azimuth = azimuth
else: arrival.distance = dist
for tr in data: return arrival
if factor is None:
max_val = float(np.max(np.abs(tr.data)))
tr.data /= 2 * max_val
else:
tr.data /= 2 * factor
return data
def demeanTrace(trace, window):
"""
returns the DATA where each trace is demean by the average value within
WINDOW
:param trace: waveform trace object
:type trace: `~obspy.core.stream.Trace`
:param inoise: range of indices of DATA within the WINDOW
:type window: tuple
:return: trace
:rtype: `~obspy.core.stream.Trace`
"""
trace.data -= trace.data[window].mean()
return trace
def getGlobalTimes(stream):
min_start = UTCDateTime()
max_end = None
for trace in stream:
if trace.stats.starttime < min_start:
min_start = trace.stats.starttime
if max_end is None or trace.stats.endtime > max_end:
max_end = trace.stats.endtime
return min_start, max_end
def createCreationInfo(agency_id=None, creation_time=None, author=None): def createCreationInfo(agency_id=None, creation_time=None, author=None):
'''
:param agency_id:
:param creation_time:
:param author:
:return:
'''
if author is None: if author is None:
author = getLogin() author = getLogin()
if creation_time is None: if creation_time is None:
@ -176,57 +71,6 @@ def createCreationInfo(agency_id=None, creation_time=None, author=None):
return ope.CreationInfo(agency_id=agency_id, author=author, return ope.CreationInfo(agency_id=agency_id, author=author,
creation_time=creation_time) creation_time=creation_time)
def createResourceID(timetohash, restype, authority_id=None, hrstr=None):
'''
:param timetohash:
:param restype: type of the resource, e.g. 'orig', 'earthquake' ...
:type restype: str
:param authority_id: name of the institution carrying out the processing
:type authority_id: str, optional
:return:
'''
assert isinstance(timetohash, UTCDateTime), "'timetohash' is not an ObsPy" \
"UTCDateTime object"
hid = getHash(timetohash)
if hrstr is None:
resID = ope.ResourceIdentifier(restype + '/' + hid[0:6])
else:
resID = ope.ResourceIdentifier(restype + '/' + hrstr + '_' + hid[0:6])
if authority_id is not None:
resID.convertIDToQuakeMLURI(authority_id=authority_id)
return resID
def createOrigin(origintime, cinfo, latitude, longitude, depth):
'''
createOrigin - function to create an ObsPy Origin
:param origintime: the origins time of occurence
:type origintime: :class: `~obspy.core.utcdatetime.UTCDateTime` object
:param latitude: latitude in decimal degree of the origins location
:type latitude: float
:param longitude: longitude in decimal degree of the origins location
:type longitude: float
:param depth: hypocentral depth of the origin
:type depth: float
:return: An ObsPy :class: `~obspy.core.event.Origin` object
'''
assert isinstance(origintime, UTCDateTime), "origintime has to be " \
"a UTCDateTime object, but " \
"actually is of type " \
"'%s'" % type(origintime)
origin = ope.Origin()
origin.time = origintime
origin.creation_info = cinfo
origin.latitude = latitude
origin.longitude = longitude
origin.depth = depth
return origin
def createEvent(origintime, cinfo, originloc=None, etype=None, resID=None, def createEvent(origintime, cinfo, originloc=None, etype=None, resID=None,
authority_id=None): authority_id=None):
''' '''
@ -271,14 +115,60 @@ def createEvent(origintime, cinfo, originloc=None, etype=None, resID=None,
event.origins = [o] event.origins = [o]
return event return event
def createMagnitude(originID, cinfo):
'''
createMagnitude - function to create an ObsPy Magnitude object
:param originID:
:type originID:
:param cinfo:
:type cinfo:
:return:
'''
magnitude = ope.Magnitude()
magnitude.creation_info = cinfo
magnitude.origin_id = originID
return magnitude
def createOrigin(origintime, cinfo, latitude, longitude, depth):
'''
createOrigin - function to create an ObsPy Origin
:param origintime: the origins time of occurence
:type origintime: :class: `~obspy.core.utcdatetime.UTCDateTime` object
:param cinfo:
:type cinfo:
:param latitude: latitude in decimal degree of the origins location
:type latitude: float
:param longitude: longitude in decimal degree of the origins location
:type longitude: float
:param depth: hypocentral depth of the origin
:type depth: float
:return: An ObsPy :class: `~obspy.core.event.Origin` object
'''
assert isinstance(origintime, UTCDateTime), "origintime has to be " \
"a UTCDateTime object, but " \
"actually is of type " \
"'%s'" % type(origintime)
origin = ope.Origin()
origin.time = origintime
origin.creation_info = cinfo
origin.latitude = latitude
origin.longitude = longitude
origin.depth = depth
return origin
def createPick(origintime, picknum, picktime, eventnum, cinfo, phase, station, def createPick(origintime, picknum, picktime, eventnum, cinfo, phase, station,
wfseedstr, authority_id): wfseedstr, authority_id):
''' '''
createPick - function to create an ObsPy Pick createPick - function to create an ObsPy Pick
:param origintime:
:type origintime:
:param picknum: number of the created pick :param picknum: number of the created pick
:type picknum: int :type picknum: int
:param picktime:
:type picktime:
:param eventnum: human-readable event identifier :param eventnum: human-readable event identifier
:type eventnum: str :type eventnum: str
:param cinfo: An ObsPy :class: `~obspy.core.event.CreationInfo` object :param cinfo: An ObsPy :class: `~obspy.core.event.CreationInfo` object
@ -306,64 +196,43 @@ def createPick(origintime, picknum, picktime, eventnum, cinfo, phase, station,
pick.waveform_id = ope.ResourceIdentifier(id=wfseedstr, prefix='file:/') pick.waveform_id = ope.ResourceIdentifier(id=wfseedstr, prefix='file:/')
return pick return pick
def createResourceID(timetohash, restype, authority_id=None, hrstr=None):
def createArrival(pickresID, cinfo, phase, azimuth=None, dist=None):
''' '''
createArrival - function to create an Obspy Arrival
:param pickresID: Resource identifier of the created pick :param timetohash:
:type pickresID: :class: `~obspy.core.event.ResourceIdentifier` object :type timetohash
:param eventnum: human-readable event identifier :param restype: type of the resource, e.g. 'orig', 'earthquake' ...
:type eventnum: str :type restype: str
:param cinfo: An ObsPy :class: `~obspy.core.event.CreationInfo` object
holding information on the creation of the returned object
:type cinfo: :class: `~obspy.core.event.CreationInfo` object
:param phase: name of the arrivals seismic phase
:type phase: str
:param station: name of the station at which the seismic phase has been
picked
:type station: str
:param authority_id: name of the institution carrying out the processing :param authority_id: name of the institution carrying out the processing
:type authority_id: str :type authority_id: str, optional
:param azimuth: azimuth between source and receiver :param hrstr:
:type azimuth: float or int, optional :type hrstr:
:param dist: distance between source and receiver
:type dist: float or int, optional
:return: An ObsPy :class: `~obspy.core.event.Arrival` object
'''
arrival = ope.Arrival()
arrival.creation_info = cinfo
arrival.pick_id = pickresID
arrival.phase = phase
if azimuth is not None:
arrival.azimuth = float(azimuth) if azimuth > -180 else azimuth + 360.
else:
arrival.azimuth = azimuth
arrival.distance = dist
return arrival
def createMagnitude(originID, cinfo):
'''
createMagnitude - function to create an ObsPy Magnitude object
:param originID:
:param cinfo:
:param authority_id:
:return: :return:
''' '''
magnitude = ope.Magnitude() assert isinstance(timetohash, UTCDateTime), "'timetohash' is not an ObsPy" \
magnitude.creation_info = cinfo "UTCDateTime object"
magnitude.origin_id = originID hid = getHash(timetohash)
return magnitude if hrstr is None:
resID = ope.ResourceIdentifier(restype + '/' + hid[0:6])
else:
resID = ope.ResourceIdentifier(restype + '/' + hrstr + '_' + hid[0:6])
if authority_id is not None:
resID.convertIDToQuakeMLURI(authority_id=authority_id)
return resID
def demeanTrace(trace, window):
def createAmplitude(pickID, amp, unit, category, cinfo): """
amplitude = ope.Amplitude() returns the DATA where each trace is demean by the average value within
amplitude.creation_info = cinfo WINDOW
amplitude.generic_amplitude = amp :param trace: waveform trace object
amplitude.unit = ope.AmplitudeUnit(unit) :type trace: `~obspy.core.stream.Trace`
amplitude.type = ope.AmplitudeCategory(category) :param window:
amplitude.pick_id = pickID :type window: tuple
return amplitude :return: trace
:rtype: `~obspy.core.stream.Trace`
"""
trace.data -= trace.data[window].mean()
return trace
def findComboBoxIndex(combo_box, val): def findComboBoxIndex(combo_box, val):
""" """
@ -372,11 +241,184 @@ def findComboBoxIndex(combo_box, val):
:param combo_box: Combo box object. :param combo_box: Combo box object.
:type combo_box: QComboBox :type combo_box: QComboBox
:param val: Name of a combo box to search for. :param val: Name of a combo box to search for.
:type val:
:return: index value of item with name val or 0 :return: index value of item with name val or 0
""" """
return combo_box.findText(val) if combo_box.findText(val) is not -1 else 0 return combo_box.findText(val) if combo_box.findText(val) is not -1 else 0
def fnConstructor(s):
'''
:param s:
:type s:
:return:
'''
if type(s) is str:
s = s.split(':')[-1]
else:
s = getHash(UTCDateTime())
badchars = re.compile(r'[^A-Za-z0-9_. ]+|^\.|\.$|^ | $|^$')
badsuffix = re.compile(r'(aux|com[1-9]|con|lpt[1-9]|prn)(\.|$)')
fn = badchars.sub('_', s)
if badsuffix.match(fn):
fn = '_' + fn
return fn
def getGlobalTimes(stream):
'''
:param stream:
:type stream
:return:
'''
min_start = UTCDateTime()
max_end = None
for trace in stream:
if trace.stats.starttime < min_start:
min_start = trace.stats.starttime
if max_end is None or trace.stats.endtime > max_end:
max_end = trace.stats.endtime
return min_start, max_end
def getHash(time):
'''
:param time: time object for which a hash should be calculated
:type time: :class: `~obspy.core.utcdatetime.UTCDateTime` object
:return: str
'''
hg = hashlib.sha1()
hg.update(time.strftime('%Y-%m-%d %H:%M:%S.%f'))
return hg.hexdigest()
def getLogin():
'''
:return:
'''
return pwd.getpwuid(os.getuid())[0]
def getOwner(fn):
'''
:param fn:
:type fn:
:return:
'''
return pwd.getpwuid(os.stat(fn).st_uid).pw_name
def getPatternLine(fn, pattern):
"""
Takes a file name and a pattern string to search for in the file and
returns the first line which contains the pattern string otherwise None.
:param fn: file name
:type fn: str
:param pattern: pattern string to search for
:type pattern: str
:return: the complete line containing pattern or None
>>> getPatternLine('utils.py', 'python')
'#!/usr/bin/env python\\n'
>>> print(getPatternLine('version.py', 'palindrome'))
None
"""
fobj = open(fn, 'r')
for line in fobj.readlines():
if pattern in line:
fobj.close()
return line
return None
def isSorted(iterable):
'''
:param iterable:
:type iterable:
:return:
'''
return sorted(iterable) == iterable
def prepTimeAxis(stime, trace):
'''
:param stime:
:type stime:
:param trace:
:type trace:
:return:
'''
nsamp = trace.stats.npts
srate = trace.stats.sampling_rate
tincr = trace.stats.delta
etime = stime + nsamp / srate
time_ax = np.arange(stime, etime, tincr)
if len(time_ax) < nsamp:
print 'elongate time axes by one datum'
time_ax = np.arange(stime, etime + tincr, tincr)
elif len(time_ax) > nsamp:
print 'shorten time axes by one datum'
time_ax = np.arange(stime, etime - tincr, tincr)
if len(time_ax) != nsamp:
raise ValueError('{0} samples of data \n '
'{1} length of time vector \n'
'delta: {2}'.format(nsamp, len(time_ax), tincr))
return time_ax
def scaleWFData(data, factor=None, components='all'):
"""
produce scaled waveforms from given waveform data and a scaling factor,
waveform may be selected by their components name
:param data: waveform data to be scaled
:type data: `~obspy.core.stream.Stream` object
:param factor: scaling factor
:type factor: float
:param components: components labels for the traces in data to be scaled by
the scaling factor (optional, default: 'all')
:type components: tuple
:return: scaled waveform data
:rtype: `~obspy.core.stream.Stream` object
"""
if components is not 'all':
for comp in components:
if factor is None:
max_val = np.max(np.abs(data.select(component=comp)[0].data))
data.select(component=comp)[0].data /= 2 * max_val
else:
data.select(component=comp)[0].data /= 2 * factor
else:
for tr in data:
if factor is None:
max_val = float(np.max(np.abs(tr.data)))
tr.data /= 2 * max_val
else:
tr.data /= 2 * factor
return data
def runProgram(cmd, parameter=None):
"""
run an external program specified by cmd with parameters input returning the
stdout output
:param cmd: name of the command to run
:type cmd: str
:param parameter: filename of parameter file or parameter string
:type parameter: str
:return: stdout output
:rtype: str
"""
if parameter:
cmd.strip()
cmd += ' %s 2>&1' % parameter
output = subprocess.check_output('{} | tee /dev/stderr'.format(cmd),
shell = True)
if __name__ == "__main__": if __name__ == "__main__":
import doctest import doctest
doctest.testmod() doctest.testmod()