WIP: Simplify data structure #39

Draft
sebastianw wants to merge 31 commits from 38-simplify-data-structure into develop
3 changed files with 383 additions and 578 deletions
Showing only changes of commit 63dac0fff6 - Show all commits

View File

@ -1,605 +1,61 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import copy
import logging
import os
import fnmatch
from dataclasses import dataclass, field
from typing import List
from typing import List, Union
from PySide2.QtWidgets import QMessageBox
from obspy import read, read_events, Stream, Catalog, UTCDateTime
from obspy.core.event import Event as ObsPyEvent
from obspy.io.sac import SacIOError
from obspy import UTCDateTime
from pylot.core.io.event import EventData
from pylot.core.io.waveformdata import WaveformData
import pylot.core.loc.focmec as focmec
import pylot.core.loc.hypodd as hypodd
import pylot.core.loc.velest as velest
from pylot.core.io.phases import readPILOTEvent, picks_from_picksdict, \
picksdict_from_pilot, merge_picks, PylotParameter
from pylot.core.util.errors import FormatError, OverwriteError
from pylot.core.util.event import Event
from pylot.core.util.obspyDMT_interface import qml_from_obspyDMT
from pylot.core.util.utils import fnConstructor, full_range, check4rotated, \
check_for_gaps_and_merge, trim_station_components, check_for_nan
class Data(object):
"""
Data container with attributes wfdata holding ~obspy.core.stream.
:type parent: PySide2.QtWidgets.QWidget object, optional
:param parent: A PySide2.QtWidgets.QWidget object utilized when
called by a GUI to display a PySide2.QtWidgets.QMessageBox instead of printing
to standard out.
:type evtdata: ~obspy.core.event.Event object, optional
:param evtdata ~obspy.core.event.Event object containing all derived or
loaded event. Container object holding, e.g. phase arrivals, etc.
"""
@dataclass
class Data:
event_data: EventData = field(default_factory=EventData)
waveform_data: WaveformData = field(default_factory=WaveformData)
_parent: Union[None, 'QtWidgets.QWidget'] = None
def __init__(self, parent=None, evtdata=None):
self._parent = parent
if self.getParent():
self.comp = parent.getComponent()
else:
self.comp = 'Z'
self.wfdata = Stream()
self._new = False
if isinstance(evtdata, ObsPyEvent) or isinstance(evtdata, Event):
pass
elif isinstance(evtdata, dict):
evt = readPILOTEvent(**evtdata)
evtdata = evt
elif isinstance(evtdata, str):
try:
cat = read_events(evtdata)
if len(cat) != 1:
raise ValueError('ambiguous event information for file: '
'{file}'.format(file=evtdata))
evtdata = cat[0]
except TypeError as e:
if 'Unknown format for file' in e.message:
if 'PHASES' in evtdata:
picks = picksdict_from_pilot(evtdata)
evtdata = ObsPyEvent()
evtdata.picks = picks_from_picksdict(picks)
elif 'LOC' in evtdata:
raise NotImplementedError('PILOT location information '
'read support not yet '
'implemented.')
elif 'event.pkl' in evtdata:
evtdata = qml_from_obspyDMT(evtdata)
else:
raise e
else:
raise e
else: # create an empty Event object
self.setNew()
evtdata = ObsPyEvent()
evtdata.picks = []
self.evtdata = evtdata
self.wforiginal = None
self.cuttimes = None
self.dirty = False
self.processed = None
self.event_data = EventData(evtdata)
self.waveform_data = WaveformData()
def __str__(self):
return str(self.wfdata)
return str(self.waveform_data.wfdata)
def __add__(self, other):
assert isinstance(other, Data), "operands must be of same type 'Data'"
rs_id = self.get_evt_data().get('resource_id')
rs_id_other = other.get_evt_data().get('resource_id')
if other.isNew() and not self.isNew():
picks_to_add = other.get_evt_data().picks
old_picks = self.get_evt_data().picks
wf_ids_old = [pick.waveform_id for pick in old_picks]
for new_pick in picks_to_add:
wf_id = new_pick.waveform_id
if wf_id in wf_ids_old:
for old_pick in old_picks:
comparison = [old_pick.waveform_id == new_pick.waveform_id,
old_pick.phase_hint == new_pick.phase_hint,
old_pick.method_id == new_pick.method_id]
if all(comparison):
del (old_pick)
old_picks.append(new_pick)
elif not other.isNew() and self.isNew():
new = other + self
self.evtdata = new.get_evt_data()
elif self.isNew() and other.isNew():
if not isinstance(other, Data):
raise TypeError("Operands must be of type 'Data'")
if self.event_data.is_new() and other.event_data.is_new():
pass
elif rs_id == rs_id_other:
other.setNew()
elif other.event_data.is_new():
new_picks = other.event_data.evtdata.picks
old_picks = self.event_data.evtdata.picks
old_picks.extend([pick for pick in new_picks if pick not in old_picks])
elif self.event_data.is_new():
return other + self
elif self.event_data.get_id() == other.event_data.get_id():
other.event_data.set_new()
return self + other
else:
raise ValueError("both Data objects have differing "
"unique Event identifiers")
raise ValueError("Both Data objects have differing unique Event identifiers")
return self
def getPicksStr(self):
"""
Return picks in event data
:return: picks seperated by newlines
:rtype: str
"""
picks_str = ''
for pick in self.get_evt_data().picks:
picks_str += str(pick) + '\n'
return picks_str
def getParent(self):
"""
Get PySide.QtGui.QWidget parent object
"""
def get_parent(self):
return self._parent
def isNew(self):
return self._new
def filter_wf_data(self, **kwargs):
self.waveform_data.wfdata.detrend('linear')
self.waveform_data.wfdata.taper(0.02, type='cosine')
self.waveform_data.wfdata.filter(**kwargs)
self.waveform_data.dirty = True
def setNew(self):
self._new = True
def checkEvent(self, event, fcheck, forceOverwrite=False):
"""
Check information in supplied event and own event and replace with own
information if no other information are given or forced by forceOverwrite
:param event: Event that supplies information for comparison
:type event: pylot.core.util.event.Event
:param fcheck: check and delete existing information
can be a str or a list of strings of ['manual', 'auto', 'origin', 'magnitude']
:type fcheck: str, [str]
:param forceOverwrite: Set to true to force overwrite own information. If false,
supplied information from event is only used if there is no own information in that
category (given in fcheck: manual, auto, origin, magnitude)
:type forceOverwrite: bool
:return:
:rtype: None
"""
if 'origin' in fcheck:
self.replaceOrigin(event, forceOverwrite)
if 'magnitude' in fcheck:
self.replaceMagnitude(event, forceOverwrite)
if 'auto' in fcheck:
self.replacePicks(event, 'auto')
if 'manual' in fcheck:
self.replacePicks(event, 'manual')
def replaceOrigin(self, event, forceOverwrite=False):
"""
Replace own origin with the one supplied in event if own origin is not
existing or forced by forceOverwrite = True
:param event: Event that supplies information for comparison
:type event: pylot.core.util.event.Event
:param forceOverwrite: always replace own information with supplied one if true
:type forceOverwrite: bool
:return:
:rtype: None
"""
if self.get_evt_data().origins or forceOverwrite:
if event.origins:
print("Found origin, replace it by new origin.")
event.origins = self.get_evt_data().origins
def replaceMagnitude(self, event, forceOverwrite=False):
"""
Replace own magnitude with the one supplied in event if own magnitude is not
existing or forced by forceOverwrite = True
:param event: Event that supplies information for comparison
:type event: pylot.core.util.event.Event
:param forceOverwrite: always replace own information with supplied one if true
:type forceOverwrite: bool
:return:
:rtype: None
"""
if self.get_evt_data().magnitudes or forceOverwrite:
if event.magnitudes:
print("Found magnitude, replace it by new magnitude")
event.magnitudes = self.get_evt_data().magnitudes
def replacePicks(self, event, picktype):
"""
Replace picks in event with own picks
:param event: Event that supplies information for comparison
:type event: pylot.core.util.event.Event
:param picktype: 'auto' or 'manual' picks
:type picktype: str
:return:
:rtype: None
"""
checkflag = 1
picks = event.picks
# remove existing picks
for j, pick in reversed(list(enumerate(picks))):
try:
if picktype in str(pick.method_id.id):
picks.pop(j)
checkflag = 2
except AttributeError as e:
msg = '{}'.format(e)
print(e)
checkflag = 0
if checkflag > 0:
if checkflag == 1:
print("Write new %s picks to catalog." % picktype)
if checkflag == 2:
print("Found %s pick(s), remove them and append new picks to catalog." % picktype)
# append new picks
for pick in self.get_evt_data().picks:
if picktype in str(pick.method_id.id):
picks.append(pick)
def getID(self):
"""
Get unique resource id
"""
try:
return self.evtdata.get('resource_id').id
except:
return None
def filterWFData(self, kwargs):
"""
Filter waveform data
:param kwargs: arguments to pass through to filter function
"""
data = self.getWFData()
data.detrend('linear')
data.taper(0.02, type='cosine')
data.filter(**kwargs)
self.dirty = True
def setWFData(self, fnames, fnames_alt=None, checkRotated=False, metadata=None, tstart=0, tstop=0):
"""
Clear current waveform data and set given waveform data
:param fnames: waveform data names to append
:param fnames_alt: alternative data to show (e.g. synthetic/processed)
:type fnames: list
"""
def check_fname_exists(filenames: list) -> list:
if filenames:
filenames = [fn for fn in filenames if os.path.isfile(fn)]
return filenames
self.wfdata = Stream()
self.wforiginal = None
self.wf_alt = Stream()
if tstart == tstop:
tstart = tstop = None
self.tstart = tstart
self.tstop = tstop
# remove directories
fnames = check_fname_exists(fnames)
fnames_alt = check_fname_exists(fnames_alt)
if fnames is not None:
self.appendWFData(fnames)
if fnames_alt is not None:
self.appendWFData(fnames_alt, alternative=True)
else:
return False
# check for gaps and merge
self.wfdata, _ = check_for_gaps_and_merge(self.wfdata)
# check for nans
check_for_nan(self.wfdata)
# check for stations with rotated components
if checkRotated and metadata is not None:
self.wfdata = check4rotated(self.wfdata, metadata, verbosity=0)
# trim station components to same start value
trim_station_components(self.wfdata, trim_start=True, trim_end=False)
# make a copy of original data
self.wforiginal = self.getWFData().copy()
self.dirty = False
return True
def appendWFData(self, fnames, alternative=False):
"""
Read waveform data from fnames and append it to current wf data
:param fnames: waveform data to append
:type fnames: list
"""
assert isinstance(fnames, list), "input parameter 'fnames' is " \
"supposed to be of type 'list' " \
"but is actually" \
" {0}".format(type(fnames))
if self.dirty:
self.resetWFData()
orig_or_alternative_data = {True: self.wf_alt,
False: self.wfdata}
warnmsg = ''
for fname in set(fnames):
try:
orig_or_alternative_data[alternative] += read(fname, starttime=self.tstart, endtime=self.tstop)
except TypeError:
try:
orig_or_alternative_data[alternative] += read(fname, format='GSE2', starttime=self.tstart, endtime=self.tstop)
except Exception as e:
try:
orig_or_alternative_data[alternative] += read(fname, format='SEGY', starttime=self.tstart,
endtime=self.tstop)
except Exception as e:
warnmsg += '{0}\n{1}\n'.format(fname, e)
except SacIOError as se:
warnmsg += '{0}\n{1}\n'.format(fname, se)
if warnmsg:
warnmsg = 'WARNING in appendWFData: unable to read waveform data\n' + warnmsg
print(warnmsg)
def getWFData(self):
return self.wfdata
def getOriginalWFData(self):
return self.wforiginal
def getAltWFdata(self):
return self.wf_alt
def resetWFData(self):
"""
Set waveform data to original waveform data
"""
if self.getOriginalWFData():
self.wfdata = self.getOriginalWFData().copy()
else:
self.wfdata = Stream()
self.dirty = False
def resetPicks(self):
"""
Clear all picks from event
"""
self.get_evt_data().picks = []
def get_evt_data(self):
return self.evtdata
def setEvtData(self, event):
self.evtdata = event
def applyEVTData(self, data, typ='pick'):
"""
Either takes an `obspy.core.event.Event` object and applies all new
information on the event to the actual data if typ is 'event or
creates ObsPy pick objects and append it to the picks list from the
PyLoT dictionary contain all picks if type is pick
:param data: data to apply, either picks or complete event
:type data:
:param typ: which event data to apply, 'pick' or 'event'
:type typ: str
:param authority_id: (currently unused)
:type: str
:raise OverwriteError:
"""
def applyPicks(picks):
"""
Creates ObsPy pick objects and append it to the picks list from the
PyLoT dictionary contain all picks.
:param picks:
:raise OverwriteError: raises an OverwriteError if the picks list is
not empty. The GUI will then ask for a decision.
"""
# firstonset = find_firstonset(picks)
# check for automatic picks
print("Writing phases to ObsPy-quakeml file")
for key in picks:
if not picks[key].get('P'):
continue
if picks[key]['P']['picker'] == 'auto':
print("Existing auto-picks will be overwritten in pick-dictionary!")
picks = picks_from_picksdict(picks)
break
else:
if self.get_evt_data().picks:
raise OverwriteError('Existing picks would be overwritten!')
else:
picks = picks_from_picksdict(picks)
break
self.get_evt_data().picks = picks
def applyEvent(event):
"""
takes an `obspy.core.event.Event` object and applies all new
information on the event to the actual data
:param event:
"""
if event is None:
print("applyEvent: Received None")
return
if self.isNew():
self.setEvtData(event)
else:
# prevent overwriting original pick information
event_old = self.get_evt_data()
if not event_old.resource_id == event.resource_id:
print("WARNING: Missmatch in event resource id's: {} and {}".format(
event_old.resource_id,
event.resource_id))
else:
picks = copy.deepcopy(event_old.picks)
event = merge_picks(event, picks)
# apply event information from location
event_old.update(event)
applydata = {'pick': applyPicks,
'event': applyEvent}
applydata[typ](data)
self._new = False
@dataclass
class SeismicEventData:
event_id: str = ""
catalog: Catalog = field(default_factory=Catalog)
def find_event_files(self, directory: str, extensions: List[str]) -> List[str]:
"""
Browse the directory to find event files with specified extensions.
Parameters:
directory (str): The directory path to search for event files.
extensions (List[str]): List of file extensions to search for.
Returns:
List[str]: List of file paths that match the given extensions.
Example:
>>> sed = SeismicEventData()
>>> sed.find_event_files('test_directory', ['.xml', '.quakeml']) # doctest: +SKIP
['test_directory/event1.xml', 'test_directory/event2.quakeml']
"""
matches = []
for root, _, files in os.walk(directory):
for ext in extensions:
for filename in fnmatch.filter(files, f'*{ext}'):
matches.append(os.path.join(root, filename))
return matches
def read_event_from_directory(self, directory: str, extensions: List[str], format: str) -> None:
"""
Read a seismic event from the first found file in the directory with specified format.
Parameters:
directory (str): The directory path to search for event files.
extensions (List[str]): List of file extensions to search for.
format (str): The format to read the event file.
Example:
>>> sed = SeismicEventData()
>>> sed.read_event_from_directory('test_directory', ['.xml', '.quakeml'], 'QUAKEML') # doctest: +SKIP
"""
event_files = self.find_event_files(directory, extensions)
if event_files:
self.read_event(event_files[0], format)
else:
raise FileNotFoundError(f"No event files found in directory {directory} with extensions {extensions}.")
def read_event(self, file_path: str, format: str) -> None:
"""
Read a seismic event from a file with specified format.
Parameters:
file_path (str): The path to the event file.
format (str): The format to read the event file.
Example:
>>> sed = SeismicEventData()
>>> sed.read_event('test_directory/event1.xml', 'QUAKEML') # doctest: +SKIP
"""
if os.path.exists(file_path):
self.catalog = read_events(file_path, format=format)
self.event_id = self.catalog[0].resource_id.id.split('/')[-1] if self.catalog else ""
else:
raise FileNotFoundError(f"File {file_path} does not exist.")
def write_event(self, file_path: str, format: str) -> None:
"""
Write the seismic event to a file with specified format.
Parameters:
file_path (str): The path to the output file.
format (str): The format to write the event file.
Example:
>>> sed = SeismicEventData(event_id='12345')
>>> sed.write_event('output_directory/event1.xml', 'QUAKEML') # doctest: +SKIP
"""
self.catalog.write(file_path, format=format)
@dataclass
class WaveformData:
stream: Stream = field(default_factory=Stream)
def find_waveform_files(self, directory: str, extensions: List[str]) -> List[str]:
"""
Browse the directory to find waveform files with specified extensions.
Parameters:
directory (str): The directory path to search for waveform files.
extensions (List[str]): List of file extensions to search for.
Returns:
List[str]: List of file paths that match the given extensions.
Example:
>>> wd = WaveformData()
>>> wd.find_waveform_files('test_directory', ['.mseed']) # doctest: +SKIP
['test_directory/waveform1.mseed']
"""
matches = []
for root, _, files in os.walk(directory):
for ext in extensions:
for filename in fnmatch.filter(files, f'*{ext}'):
matches.append(os.path.join(root, filename))
return matches
def read_waveform_from_directory(self, directory: str, extensions: List[str], format: str) -> None:
"""
Read waveform data from the first found file in the directory with specified format.
Parameters:
directory (str): The directory path to search for waveform files.
extensions (List[str]): List of file extensions to search for.
format (str): The format to read the waveform file.
Example:
>>> wd = WaveformData()
>>> wd.read_waveform_from_directory('test_directory', ['.mseed'], 'MSEED') # doctest: +SKIP
"""
waveform_files = self.find_waveform_files(directory, extensions)
if waveform_files:
self.read_waveform(waveform_files[0], format)
else:
raise FileNotFoundError(f"No waveform files found in directory {directory} with extensions {extensions}.")
def read_waveform(self, file_path: str, format: str) -> None:
"""
Read waveform data from a file with specified format.
Parameters:
file_path (str): The path to the waveform file.
format (str): The format to read the waveform file.
Example:
>>> wd = WaveformData()
>>> wd.read_waveform('test_directory/waveform1.mseed', 'MSEED') # doctest: +SKIP
"""
if os.path.exists(file_path):
self.stream = read(file_path, format=format)
else:
raise FileNotFoundError(f"File {file_path} does not exist.")
def write_waveform(self, file_path: str, format: str) -> None:
"""
Write the waveform data to a file with specified format.
Parameters:
file_path (str): The path to the output file.
format (str): The format to write the waveform file.
Example:
>>> wd = WaveformData()
>>> wd.write_waveform('output_directory/waveform1.mseed', 'MSEED') # doctest: +SKIP
"""
self.stream.write(file_path, format=format)
# Example usage:
# seismic_event = SeismicEventData()
# seismic_event.read_event_from_directory("path_to_directory", extensions=[".xml", ".quakeml"], format="QUAKEML")
# seismic_event.write_event("output_event_file.xml", format="QUAKEML")
# waveform_data = WaveformData()
# waveform_data.read_waveform_from_directory("path_to_directory", extensions=[".mseed"], format="MSEED")
# waveform_data.write_waveform("output_waveform_file.mseed", format="MSEED")
def set_wf_data(self, fnames: List[str], fnames_alt: List[str] = None, check_rotated=False, metadata=None, tstart=0, tstop=0):
return self.waveform_data.set_wf_data(fnames, fnames_alt, check_rotated, metadata, tstart, tstop)
def reset_wf_data(self):
self.waveform_data.reset_wf_data()
class GenericDataStructure(object):
"""

106
pylot/core/io/event.py Normal file
View File

@ -0,0 +1,106 @@
import copy
from dataclasses import dataclass
from typing import Union
from obspy import read_events
from obspy.core.event import Event as ObsPyEvent
@dataclass
class EventData:
evtdata: Union[ObsPyEvent, None] = None
_new: bool = False
def __init__(self, evtdata=None):
self.set_event_data(evtdata)
def set_event_data(self, evtdata):
if isinstance(evtdata, ObsPyEvent):
self.evtdata = evtdata
elif isinstance(evtdata, dict):
self.evtdata = self.read_pilot_event(**evtdata)
elif isinstance(evtdata, str):
self.evtdata = self.read_event_file(evtdata)
else:
self.set_new()
self.evtdata = ObsPyEvent(picks=[])
def read_event_file(self, evtdata: str) -> ObsPyEvent:
try:
cat = read_events(evtdata)
if len(cat) != 1:
raise ValueError(f'ambiguous event information for file: {evtdata}')
return cat[0]
except TypeError as e:
self.handle_event_file_error(e, evtdata)
def handle_event_file_error(self, e: TypeError, evtdata: str):
if 'Unknown format for file' in str(e):
if 'PHASES' in evtdata:
picks = self.picksdict_from_pilot(evtdata)
evtdata = ObsPyEvent(picks=self.picks_from_picksdict(picks))
elif 'LOC' in evtdata:
raise NotImplementedError('PILOT location information read support not yet implemented.')
elif 'event.pkl' in evtdata:
evtdata = self.qml_from_obspy_dmt(evtdata)
else:
raise e
else:
raise e
def set_new(self):
self._new = True
def is_new(self) -> bool:
return self._new
def get_picks_str(self) -> str:
return '\n'.join(str(pick) for pick in self.evtdata.picks)
def replace_origin(self, event: ObsPyEvent, force_overwrite: bool = False):
if self.evtdata.origins or force_overwrite:
event.origins = self.evtdata.origins
def replace_magnitude(self, event: ObsPyEvent, force_overwrite: bool = False):
if self.evtdata.magnitudes or force_overwrite:
event.magnitudes = self.evtdata.magnitudes
def replace_picks(self, event: ObsPyEvent, picktype: str):
checkflag = 1
picks = event.picks
for j, pick in reversed(list(enumerate(picks))):
if picktype in str(pick.method_id.id):
picks.pop(j)
checkflag = 2
if checkflag > 0:
for pick in self.evtdata.picks:
if picktype in str(pick.method_id.id):
picks.append(pick)
def get_id(self) -> Union[str, None]:
try:
return self.evtdata.resource_id.id
except:
return None
def apply_event_data(self, data, typ='pick'):
if typ == 'pick':
self.apply_picks(data)
elif typ == 'event':
self.apply_event(data)
def apply_picks(self, picks):
self.evtdata.picks = picks
def apply_event(self, event: ObsPyEvent):
if self.is_new():
self.evtdata = event
else:
old_event = self.evtdata
if old_event.resource_id == event.resource_id:
picks = copy.deepcopy(old_event.picks)
event = self.merge_picks(event, picks)
old_event.update(event)
else:
print(f"WARNING: Mismatch in event resource id's: {old_event.resource_id} and {event.resource_id}")

View File

@ -0,0 +1,243 @@
import logging
import os
from dataclasses import dataclass, field
from typing import Union, List
import numpy as np
from obspy import Stream, read
from obspy.io.sac import SacIOError
from obspy.signal.rotate import rotate2zne
from pylot.core.util.utils import full_range, get_stations
@dataclass
class WaveformData:
wfdata: Stream = field(default_factory=Stream)
wforiginal: Union[Stream, None] = None
wf_alt: Stream = field(default_factory=Stream)
dirty: bool = False
def set_wf_data(self, fnames: List[str], fnames_alt: List[str] = None, check_rotated=False, metadata=None, tstart=0, tstop=0):
self.clear_data()
fnames = self.check_fname_exists(fnames)
fnames_alt = self.check_fname_exists(fnames_alt)
if fnames:
self.append_wf_data(fnames)
if fnames_alt:
self.append_wf_data(fnames_alt, alternative=True)
self.wfdata, _ = self.check_for_gaps_and_merge(self.wfdata)
self.check_for_nan(self.wfdata)
if check_rotated and metadata:
self.wfdata = self.check4rotated(self.wfdata, metadata, verbosity=0)
self.trim_station_components(self.wfdata, trim_start=True, trim_end=False)
self.wforiginal = self.wfdata.copy()
self.dirty = False
return True
return False
def append_wf_data(self, fnames: List[str], alternative: bool = False):
data_stream = self.wf_alt if alternative else self.wfdata
warnmsg = ''
for fname in set(fnames):
try:
data_stream += read(fname)
except TypeError:
try:
data_stream += read(fname, format='GSE2')
except Exception as e:
try:
data_stream += read(fname, format='SEGY')
except Exception as e:
warnmsg += f'{fname}\n{e}\n'
except SacIOError as se:
warnmsg += f'{fname}\n{se}\n'
if warnmsg:
print(f'WARNING in appendWFData: unable to read waveform data\n{warnmsg}')
def clear_data(self):
self.wfdata = Stream()
self.wforiginal = None
self.wf_alt = Stream()
def reset_wf_data(self):
if self.wforiginal:
self.wfdata = self.wforiginal.copy()
else:
self.wfdata = Stream()
self.dirty = False
def check_fname_exists(self, filenames: List[str]) -> List[str]:
return [fn for fn in filenames if os.path.isfile(fn)]
def check_for_gaps_and_merge(self, stream):
"""
check for gaps in Stream and merge if gaps are found
:param stream: stream of seismic data
:type stream: `~obspy.core.stream.Stream`
:return: data stream, gaps returned from obspy get_gaps
:rtype: `~obspy.core.stream.Stream`
"""
gaps = stream.get_gaps()
if gaps:
merged = ['{}.{}.{}.{}'.format(*gap[:4]) for gap in gaps]
stream.merge(method=1)
print('Merged the following stations because of gaps:')
for merged_station in merged:
print(merged_station)
return stream, gaps
def check_for_nan(self, stream):
"""
Replace all NaNs in data with nan_value (in place)
:param stream: stream of seismic data
:type stream: `~obspy.core.stream.Stream`
:param nan_value: value which all NaNs are set to
:type nan_value: float, int
:return: None
"""
if not stream:
return
for trace in stream:
np.nan_to_num(trace.data, copy=False, nan=0.)
def check4rotated(self, stream, metadata=None, verbosity=1):
"""
Check all traces in data. If a trace is not in ZNE rotation (last symbol of channel code is numeric) and the trace
is in the metadata with azimuth and dip, rotate it to classical ZNE orientation.
Rotating the traces requires them to be of the same length, so, all traces will be trimmed to a common length as a
side effect.
:param stream: stream object containing seismic traces
:type stream: `~obspy.core.stream.Stream`
:param metadata: tuple containing metadata type string and metadata parser object
:type metadata: (str, `~obspy.io.xseed.parser.Parser`)
:param verbosity: if 0 print no information at runtime
:type verbosity: int
:return: stream object with traditionally oriented traces (ZNE) for stations that had misaligned traces (123) before
:rtype: `~obspy.core.stream.Stream`
"""
def rotation_required(trace_ids):
"""
Derive if any rotation is required from the orientation code of the input.
:param trace_ids: string identifier of waveform data trace
:type trace_ids: List(str)
:return: boolean representing if rotation is necessary for any of the traces
:rtype: bool
"""
orientations = [trace_id[-1] for trace_id in trace_ids]
return any([orientation.isnumeric() for orientation in orientations])
def rotate_components(wfs_in, metadata=None):
"""
Rotate components if orientation code is numeric (= non traditional orientation).
Azimut and dip are fetched from metadata. To be rotated, traces of a station have to be cut to the same length.
Returns unrotated traces of no metadata is provided
:param wfs_in: stream containing seismic traces of a station
:type wfs_in: `~obspy.core.stream.Stream`
:param metadata: tuple containing metadata type string and metadata parser object
:type metadata: (str, `~obspy.io.xseed.parser.Parser`)
:return: stream object with traditionally oriented traces (ZNE)
:rtype: `~obspy.core.stream.Stream`
"""
if len(wfs_in) < 3:
print(f"Stream {wfs_in=}, has not enough components to rotate.")
return wfs_in
# check if any traces in this station need to be rotated
trace_ids = [trace.id for trace in wfs_in]
if not rotation_required(trace_ids):
logging.debug(f"Stream does not need any rotation: Traces are {trace_ids=}")
return wfs_in
# check metadata quality
t_start = full_range(wfs_in)
try:
azimuths = []
dips = []
for tr_id in trace_ids:
azimuths.append(metadata.get_coordinates(tr_id, t_start)['azimuth'])
dips.append(metadata.get_coordinates(tr_id, t_start)['dip'])
except (KeyError, TypeError) as err:
logging.error(
f"{type(err)=} occurred: {err=} Rotating not possible, not all azimuth and dip information "
f"available in metadata. Stream remains unchanged.")
return wfs_in
except Exception as err:
print(f"Unexpected {err=}, {type(err)=}")
raise
# to rotate all traces must have same length, so trim them
wfs_out = self.trim_station_components(wfs_in, trim_start=True, trim_end=True)
try:
z, n, e = rotate2zne(wfs_out[0], azimuths[0], dips[0],
wfs_out[1], azimuths[1], dips[1],
wfs_out[2], azimuths[2], dips[2])
print('check4rotated: rotated trace {} to ZNE'.format(trace_ids))
# replace old data with rotated data, change the channel code to ZNE
z_index = dips.index(min(
dips)) # get z-trace index, z has minimum dip of -90 (dip is measured from 0 to -90, with -90
# being vertical)
wfs_out[z_index].data = z
wfs_out[z_index].stats.channel = wfs_out[z_index].stats.channel[0:-1] + 'Z'
del trace_ids[z_index]
for trace_id in trace_ids:
coordinates = metadata.get_coordinates(trace_id, t_start)
dip, az = coordinates['dip'], coordinates['azimuth']
trace = wfs_out.select(id=trace_id)[0]
if az > 315 or az <= 45 or 135 < az <= 225:
trace.data = n
trace.stats.channel = trace.stats.channel[0:-1] + 'N'
elif 45 < az <= 135 or 225 < az <= 315:
trace.data = e
trace.stats.channel = trace.stats.channel[0:-1] + 'E'
except ValueError as err:
print(f"{err=} Rotation failed. Stream remains unchanged.")
return wfs_in
return wfs_out
if metadata is None:
if verbosity:
msg = 'Warning: could not rotate traces since no metadata was given\nset Inventory file!'
print(msg)
return stream
stations = get_stations(stream)
for station in stations: # loop through all stations and rotate data if neccessary
wf_station = stream.select(station=station)
rotate_components(wf_station, metadata)
return stream
def trim_station_components(stream, trim_start=True, trim_end=True):
"""
cut a stream so only the part common to all three traces is kept to avoid dealing with offsets
:param stream: stream of seismic data
:type stream: `~obspy.core.stream.Stream`
:param trim_start: trim start of stream
:type trim_start: bool
:param trim_end: trim end of stream
:type trim_end: bool
:return: data stream
:rtype: `~obspy.core.stream.Stream`
"""
starttime = {False: None}
endtime = {False: None}
stations = get_stations(stream)
print('trim_station_components: Will trim stream for trim_start: {} and for '
'trim_end: {}.'.format(trim_start, trim_end))
for station in stations:
wf_station = stream.select(station=station)
starttime[True] = max([trace.stats.starttime for trace in wf_station])
endtime[True] = min([trace.stats.endtime for trace in wf_station])
wf_station.trim(starttime=starttime[trim_start], endtime=endtime[trim_end])
return stream