WIP: Simplify data structure #39
@ -167,11 +167,11 @@ def clims(lim1, lim2):
|
||||
"""
|
||||
takes two pairs of limits and returns one pair of common limts
|
||||
:param lim1: limit 1
|
||||
:type lim1: int
|
||||
:type lim1: List[int]
|
||||
:param lim2: limit 2
|
||||
:type lim2: int
|
||||
:type lim2: List[int]
|
||||
:return: new upper and lower limit common to both given limits
|
||||
:rtype: [int, int]
|
||||
:rtype: List[int]
|
||||
|
||||
>>> clims([0, 4], [1, 3])
|
||||
[0, 4]
|
||||
@ -539,8 +539,7 @@ def isSorted(iterable):
|
||||
>>> isSorted([2,3,1,4])
|
||||
False
|
||||
"""
|
||||
assert isIterable(iterable), 'object is not iterable; object: {' \
|
||||
'}'.format(iterable)
|
||||
assert isIterable(iterable), "object is not iterable; object: {}".format(iterable)
|
||||
if type(iterable) is str:
|
||||
iterable = [s for s in iterable]
|
||||
return sorted(iterable) == iterable
|
||||
@ -551,7 +550,7 @@ def isIterable(obj):
|
||||
takes a python object and returns True is the object is iterable and
|
||||
False otherwise
|
||||
:param obj: a python object
|
||||
:type obj: object
|
||||
:type obj: obj
|
||||
:return: True of False
|
||||
:rtype: bool
|
||||
"""
|
||||
@ -959,66 +958,87 @@ def check4rotated(data, metadata=None, verbosity=1):
|
||||
:rtype: `~obspy.core.stream.Stream`
|
||||
"""
|
||||
|
||||
def rotate_components(wfstream, metadata=None):
|
||||
def rotation_required(trace_ids):
|
||||
"""
|
||||
Derive if any rotation is required from the orientation code of the input.
|
||||
|
||||
:param trace_ids: string identifier of waveform data trace
|
||||
:type trace_ids: List(str)
|
||||
:return: boolean representing if rotation is necessary for any of the traces
|
||||
:rtype: bool
|
||||
"""
|
||||
orientations = [trace_id[-1] for trace_id in trace_ids]
|
||||
return any([orientation.isnumeric() for orientation in orientations])
|
||||
|
||||
def rotate_components(wfs_in, metadata=None):
|
||||
"""
|
||||
Rotate components if orientation code is numeric (= non traditional orientation).
|
||||
|
||||
Azimut and dip are fetched from metadata. To be rotated, traces of a station have to be cut to the same length.
|
||||
Returns unrotated traces of no metadata is provided
|
||||
:param wfstream: stream containing seismic traces of a station
|
||||
:type wfstream: `~obspy.core.stream.Stream`
|
||||
:param wfs_in: stream containing seismic traces of a station
|
||||
:type wfs_in: `~obspy.core.stream.Stream`
|
||||
:param metadata: tuple containing metadata type string and metadata parser object
|
||||
:type metadata: (str, `~obspy.io.xseed.parser.Parser`)
|
||||
:return: stream object with traditionally oriented traces (ZNE)
|
||||
:rtype: `~obspy.core.stream.Stream`
|
||||
"""
|
||||
|
||||
if len(wfs_in) < 3:
|
||||
print(f"Stream {wfs_in=}, has not enough components to rotate.")
|
||||
return wfs_in
|
||||
|
||||
# check if any traces in this station need to be rotated
|
||||
trace_ids = [trace.id for trace in wfstream]
|
||||
orientations = [trace_id[-1] for trace_id in trace_ids]
|
||||
rotation_required = [orientation.isnumeric() for orientation in orientations]
|
||||
if any(rotation_required):
|
||||
t_start = full_range(wfstream)
|
||||
try:
|
||||
azimuts = []
|
||||
dips = []
|
||||
for tr_id in trace_ids:
|
||||
azimuts.append(metadata.get_coordinates(tr_id, t_start)['azimuth'])
|
||||
dips.append(metadata.get_coordinates(tr_id, t_start)['dip'])
|
||||
except (KeyError, TypeError) as e:
|
||||
print('Failed to rotate trace {}, no azimuth or dip available in metadata'.format(tr_id))
|
||||
return wfstream
|
||||
if len(wfstream) < 3:
|
||||
print('Failed to rotate Stream {}, not enough components available.'.format(wfstream))
|
||||
return wfstream
|
||||
# to rotate all traces must have same length, so trim them
|
||||
wfstream = trim_station_components(wfstream, trim_start=True, trim_end=True)
|
||||
try:
|
||||
z, n, e = rotate2zne(wfstream[0], azimuts[0], dips[0],
|
||||
wfstream[1], azimuts[1], dips[1],
|
||||
wfstream[2], azimuts[2], dips[2])
|
||||
print('check4rotated: rotated trace {} to ZNE'.format(trace_ids))
|
||||
# replace old data with rotated data, change the channel code to ZNE
|
||||
z_index = dips.index(min(
|
||||
dips)) # get z-trace index, z has minimum dip of -90 (dip is measured from 0 to -90, with -90 being vertical)
|
||||
wfstream[z_index].data = z
|
||||
wfstream[z_index].stats.channel = wfstream[z_index].stats.channel[0:-1] + 'Z'
|
||||
del trace_ids[z_index]
|
||||
for trace_id in trace_ids:
|
||||
coordinates = metadata.get_coordinates(trace_id, t_start)
|
||||
dip, az = coordinates['dip'], coordinates['azimuth']
|
||||
trace = wfstream.select(id=trace_id)[0]
|
||||
if az > 315 or az <= 45 or az > 135 and az <= 225:
|
||||
trace.data = n
|
||||
trace.stats.channel = trace.stats.channel[0:-1] + 'N'
|
||||
elif az > 45 and az <= 135 or az > 225 and az <= 315:
|
||||
trace.data = e
|
||||
trace.stats.channel = trace.stats.channel[0:-1] + 'E'
|
||||
except (ValueError) as e:
|
||||
print(e)
|
||||
return wfstream
|
||||
trace_ids = [trace.id for trace in wfs_in]
|
||||
if not rotation_required(trace_ids):
|
||||
print(f"Stream does not need any rotation: Traces are {trace_ids=}")
|
||||
return wfs_in
|
||||
|
||||
return wfstream
|
||||
# check metadata quality
|
||||
t_start = full_range(wfs_in)
|
||||
try:
|
||||
azimuths = []
|
||||
dips = []
|
||||
for tr_id in trace_ids:
|
||||
azimuths.append(metadata.get_coordinates(tr_id, t_start)['azimuth'])
|
||||
dips.append(metadata.get_coordinates(tr_id, t_start)['dip'])
|
||||
except (KeyError, TypeError) as err:
|
||||
print(f"{type(err)=} occurred: {err=} Rotating not possible, not all azimuth and dip information "
|
||||
f"available in metadata. Stream remains unchanged.")
|
||||
return wfs_in
|
||||
except Exception as err:
|
||||
print(f"Unexpected {err=}, {type(err)=}")
|
||||
raise
|
||||
|
||||
# to rotate all traces must have same length, so trim them
|
||||
wfs_out = trim_station_components(wfs_in, trim_start=True, trim_end=True)
|
||||
try:
|
||||
z, n, e = rotate2zne(wfs_out[0], azimuths[0], dips[0],
|
||||
wfs_out[1], azimuths[1], dips[1],
|
||||
wfs_out[2], azimuths[2], dips[2])
|
||||
print('check4rotated: rotated trace {} to ZNE'.format(trace_ids))
|
||||
# replace old data with rotated data, change the channel code to ZNE
|
||||
z_index = dips.index(min(
|
||||
dips)) # get z-trace index, z has minimum dip of -90 (dip is measured from 0 to -90, with -90
|
||||
# being vertical)
|
||||
wfs_out[z_index].data = z
|
||||
wfs_out[z_index].stats.channel = wfs_out[z_index].stats.channel[0:-1] + 'Z'
|
||||
del trace_ids[z_index]
|
||||
for trace_id in trace_ids:
|
||||
coordinates = metadata.get_coordinates(trace_id, t_start)
|
||||
dip, az = coordinates['dip'], coordinates['azimuth']
|
||||
trace = wfs_out.select(id=trace_id)[0]
|
||||
if az > 315 or az <= 45 or 135 < az <= 225:
|
||||
trace.data = n
|
||||
trace.stats.channel = trace.stats.channel[0:-1] + 'N'
|
||||
elif 45 < az <= 135 or 225 < az <= 315:
|
||||
trace.data = e
|
||||
trace.stats.channel = trace.stats.channel[0:-1] + 'E'
|
||||
except ValueError as err:
|
||||
print(f"{err=} Rotation failed. Stream remains unchanged.")
|
||||
return wfs_in
|
||||
|
||||
return wfs_out
|
||||
|
||||
if metadata is None:
|
||||
if verbosity:
|
||||
|
Loading…
Reference in New Issue
Block a user