suggestion: add new dataclasses; remove unused code

This commit is contained in:
Sebastian Wehling-Benatelli 2024-07-17 14:06:13 +02:00 committed by Sebastian Wehling-Benatelli
parent ef69fc429f
commit c4aeab0d89

View File

@ -4,13 +4,16 @@
import copy import copy
import logging import logging
import os import os
import fnmatch
from dataclasses import dataclass, field
from typing import List
from PySide2.QtWidgets import QMessageBox from PySide2.QtWidgets import QMessageBox
from obspy import read_events from obspy import read, read_events, Stream, Catalog, UTCDateTime
from obspy.core import read, Stream, UTCDateTime
from obspy.core.event import Event as ObsPyEvent from obspy.core.event import Event as ObsPyEvent
from obspy.io.sac import SacIOError from obspy.io.sac import SacIOError
import pylot.core.loc.focmec as focmec import pylot.core.loc.focmec as focmec
import pylot.core.loc.hypodd as hypodd import pylot.core.loc.hypodd as hypodd
import pylot.core.loc.velest as velest import pylot.core.loc.velest as velest
@ -139,29 +142,6 @@ class Data(object):
def setNew(self): def setNew(self):
self._new = True self._new = True
def getCutTimes(self):
"""
Returns earliest start and latest end of all waveform data
:return: minimum start time and maximum end time as a tuple
:rtype: (UTCDateTime, UTCDateTime)
"""
if self.cuttimes is None:
self.updateCutTimes()
return self.cuttimes
def updateCutTimes(self):
"""
Update cuttimes to contain earliest start and latest end time
of all waveform data
:rtype: None
"""
self.cuttimes = full_range(self.getWFData())
def getEventFileName(self):
ID = self.getID()
# handle forbidden filenames especially on windows systems
return fnConstructor(str(ID))
def checkEvent(self, event, fcheck, forceOverwrite=False): def checkEvent(self, event, fcheck, forceOverwrite=False):
""" """
Check information in supplied event and own event and replace with own Check information in supplied event and own event and replace with own
@ -252,184 +232,6 @@ class Data(object):
if picktype in str(pick.method_id.id): if picktype in str(pick.method_id.id):
picks.append(pick) picks.append(pick)
def exportEvent(self, fnout, fnext='.xml', fcheck='auto', upperErrors=None):
"""
Export event to file
:param fnout: basename of file
:param fnext: file extensions xml, cnv, obs, focmec, or/and pha
:param fcheck: check and delete existing information
can be a str or a list of strings of ['manual', 'auto', 'origin', 'magnitude']
"""
from pylot.core.util.defaults import OUTPUTFORMATS
if not type(fcheck) == list:
fcheck = [fcheck]
try:
evtformat = OUTPUTFORMATS[fnext]
except KeyError as e:
errmsg = '{0}; selected file extension {1} not ' \
'supported'.format(e, fnext)
raise FormatError(errmsg)
if hasattr(self.get_evt_data(), 'notes'):
try:
with open(os.path.join(os.path.dirname(fnout), 'notes.txt'), 'w') as notes_file:
notes_file.write(self.get_evt_data().notes)
except Exception as e:
print('Warning: Could not save notes.txt: ', str(e))
# check for already existing xml-file
if fnext == '.xml':
if os.path.isfile(fnout + fnext):
print("xml-file already exists! Check content ...")
cat = read_events(fnout + fnext)
if len(cat) > 1:
raise IOError('Ambigious event information in file {}'.format(fnout + fnext))
if len(cat) < 1:
raise IOError('No event information in file {}'.format(fnout + fnext))
event = cat[0]
if not event.resource_id == self.get_evt_data().resource_id:
QMessageBox.warning(self, 'Warning', 'Different resource IDs!')
return
self.checkEvent(event, fcheck)
self.setEvtData(event)
self.get_evt_data().write(fnout + fnext, format=evtformat)
# try exporting event
else:
evtdata_org = self.get_evt_data()
picks = evtdata_org.picks
eventpath = evtdata_org.path
picks_copy = copy.deepcopy(picks)
evtdata_copy = Event(eventpath)
evtdata_copy.picks = picks_copy
# check for stations picked automatically as well as manually
# Prefer manual picks!
for i in range(len(picks)):
if picks[i].method_id == 'manual':
mstation = picks[i].waveform_id.station_code
mstation_ext = mstation + '_'
for k in range(len(picks_copy)):
if ((picks_copy[k].waveform_id.station_code == mstation) or
(picks_copy[k].waveform_id.station_code == mstation_ext)) and \
(picks_copy[k].method_id == 'auto'):
del picks_copy[k]
break
lendiff = len(picks) - len(picks_copy)
if lendiff != 0:
print("Manual as well as automatic picks available. Prefered the {} manual ones!".format(lendiff))
no_uncertainties_p = []
no_uncertainties_s = []
if upperErrors:
# check for pick uncertainties exceeding adjusted upper errors
# Picks with larger uncertainties will not be saved in output file!
for j in range(len(picks)):
for i in range(len(picks_copy)):
if picks_copy[i].phase_hint[0] == 'P':
# Skipping pick if no upper_uncertainty is found and warning user
if picks_copy[i].time_errors['upper_uncertainty'] is None:
#print("{1} P-Pick of station {0} does not have upper_uncertainty and cant be checked".format(
# picks_copy[i].waveform_id.station_code,
# picks_copy[i].method_id))
if not picks_copy[i].waveform_id.station_code in no_uncertainties_p:
no_uncertainties_p.append(picks_copy[i].waveform_id.station_code)
continue
#print ("checking for upper_uncertainty")
if (picks_copy[i].time_errors['uncertainty'] is None) or \
(picks_copy[i].time_errors['upper_uncertainty'] >= upperErrors[0]):
print("Uncertainty exceeds or equal adjusted upper time error!")
print("Adjusted uncertainty: {}".format(upperErrors[0]))
print("Pick uncertainty: {}".format(picks_copy[i].time_errors['uncertainty']))
print("{1} P-Pick of station {0} will not be saved in outputfile".format(
picks_copy[i].waveform_id.station_code,
picks_copy[i].method_id))
del picks_copy[i]
break
if picks_copy[i].phase_hint[0] == 'S':
# Skipping pick if no upper_uncertainty is found and warning user
if picks_copy[i].time_errors['upper_uncertainty'] is None:
#print("{1} S-Pick of station {0} does not have upper_uncertainty and cant be checked".format(
#picks_copy[i].waveform_id.station_code,
#picks_copy[i].method_id))
if not picks_copy[i].waveform_id.station_code in no_uncertainties_s:
no_uncertainties_s.append(picks_copy[i].waveform_id.station_code)
continue
if (picks_copy[i].time_errors['uncertainty'] is None) or \
(picks_copy[i].time_errors['upper_uncertainty'] >= upperErrors[1]):
print("Uncertainty exceeds or equal adjusted upper time error!")
print("Adjusted uncertainty: {}".format(upperErrors[1]))
print("Pick uncertainty: {}".format(picks_copy[i].time_errors['uncertainty']))
print("{1} S-Pick of station {0} will not be saved in outputfile".format(
picks_copy[i].waveform_id.station_code,
picks_copy[i].method_id))
del picks_copy[i]
break
for s in no_uncertainties_p:
print("P-Pick of station {0} does not have upper_uncertainty and cant be checked".format(s))
for s in no_uncertainties_s:
print("S-Pick of station {0} does not have upper_uncertainty and cant be checked".format(s))
if fnext == '.obs':
try:
evtdata_copy.write(fnout + fnext, format=evtformat)
# write header afterwards
evid = str(evtdata_org.resource_id).split('/')[1]
header = '# EQEVENT: Label: EQ%s Loc: X 0.00 Y 0.00 Z 10.00 OT 0.00 \n' % evid
nllocfile = open(fnout + fnext)
l = nllocfile.readlines()
# Adding A0/Generic Amplitude to .obs file
# l2 = []
# for li in l:
# for amp in evtdata_org.amplitudes:
# if amp.waveform_id.station_code == li[0:5].strip():
# li = li[0:64] + '{:0.2e}'.format(amp.generic_amplitude) + li[73:-1] + '\n'
# l2.append(li)
# l = l2
nllocfile.close()
l.insert(0, header)
nllocfile = open(fnout + fnext, 'w')
nllocfile.write("".join(l))
nllocfile.close()
except KeyError as e:
raise KeyError('''{0} export format
not implemented: {1}'''.format(evtformat, e))
if fnext == '.cnv':
try:
velest.export(picks_copy, fnout + fnext, eventinfo=self.get_evt_data())
except KeyError as e:
raise KeyError('''{0} export format
not implemented: {1}'''.format(evtformat, e))
if fnext == '_focmec.in':
try:
parameter = PylotParameter()
logging.warning('Using default input parameter')
focmec.export(picks_copy, fnout + fnext, parameter, eventinfo=self.get_evt_data())
except KeyError as e:
raise KeyError('''{0} export format
not implemented: {1}'''.format(evtformat, e))
if fnext == '.pha':
try:
parameter = PylotParameter()
logging.warning('Using default input parameter')
hypodd.export(picks_copy, fnout + fnext, parameter, eventinfo=self.get_evt_data())
except KeyError as e:
raise KeyError('''{0} export format
not implemented: {1}'''.format(evtformat, e))
def getComp(self):
"""
Get component (ZNE)
"""
return self.comp
def getID(self): def getID(self):
""" """
Get unique resource id Get unique resource id
@ -470,23 +272,10 @@ class Data(object):
self.tstart = tstart self.tstart = tstart
self.tstop = tstop self.tstop = tstop
# remove directories
fnames = check_fname_exists(fnames) fnames = check_fname_exists(fnames)
fnames_alt = check_fname_exists(fnames_alt) fnames_alt = check_fname_exists(fnames_alt)
# if obspy_dmt:
# wfdir = 'raw'
# self.processed = False
# for fname in fnames:
# if fname.endswith('processed'):
# wfdir = 'processed'
# self.processed = True
# break
# for fpath in fnames:
# if fpath.endswith(wfdir):
# wffnames = [os.path.join(fpath, fname) for fname in os.listdir(fpath)]
# if 'syngine' in fpath.split('/')[-1]:
# wffnames_syn = [os.path.join(fpath, fname) for fname in os.listdir(fpath)]
# else:
# wffnames = fnames
if fnames is not None: if fnames is not None:
self.appendWFData(fnames) self.appendWFData(fnames)
if fnames_alt is not None: if fnames_alt is not None:
@ -494,9 +283,6 @@ class Data(object):
else: else:
return False return False
# various pre-processing steps:
# remove possible underscores in station names
# self.wfdata = remove_underscores(self.wfdata)
# check for gaps and merge # check for gaps and merge
self.wfdata, _ = check_for_gaps_and_merge(self.wfdata) self.wfdata, _ = check_for_gaps_and_merge(self.wfdata)
# check for nans # check for nans
@ -618,11 +404,6 @@ class Data(object):
picks = picks_from_picksdict(picks) picks = picks_from_picksdict(picks)
break break
self.get_evt_data().picks = picks self.get_evt_data().picks = picks
# if 'smi:local' in self.getID() and firstonset:
# fonset_str = firstonset.strftime('%Y_%m_%d_%H_%M_%S')
# ID = ResourceIdentifier('event/' + fonset_str)
# ID.convertIDToQuakeMLURI(authority_id=authority_id)
# self.get_evt_data().resource_id = ID
def applyEvent(event): def applyEvent(event):
""" """
@ -654,6 +435,171 @@ class Data(object):
applydata[typ](data) applydata[typ](data)
self._new = False self._new = False
@dataclass
class SeismicEventData:
event_id: str = ""
catalog: Catalog = field(default_factory=Catalog)
def find_event_files(self, directory: str, extensions: List[str]) -> List[str]:
"""
Browse the directory to find event files with specified extensions.
Parameters:
directory (str): The directory path to search for event files.
extensions (List[str]): List of file extensions to search for.
Returns:
List[str]: List of file paths that match the given extensions.
Example:
>>> sed = SeismicEventData()
>>> sed.find_event_files('test_directory', ['.xml', '.quakeml']) # doctest: +SKIP
['test_directory/event1.xml', 'test_directory/event2.quakeml']
"""
matches = []
for root, _, files in os.walk(directory):
for ext in extensions:
for filename in fnmatch.filter(files, f'*{ext}'):
matches.append(os.path.join(root, filename))
return matches
def read_event_from_directory(self, directory: str, extensions: List[str], format: str) -> None:
"""
Read a seismic event from the first found file in the directory with specified format.
Parameters:
directory (str): The directory path to search for event files.
extensions (List[str]): List of file extensions to search for.
format (str): The format to read the event file.
Example:
>>> sed = SeismicEventData()
>>> sed.read_event_from_directory('test_directory', ['.xml', '.quakeml'], 'QUAKEML') # doctest: +SKIP
"""
event_files = self.find_event_files(directory, extensions)
if event_files:
self.read_event(event_files[0], format)
else:
raise FileNotFoundError(f"No event files found in directory {directory} with extensions {extensions}.")
def read_event(self, file_path: str, format: str) -> None:
"""
Read a seismic event from a file with specified format.
Parameters:
file_path (str): The path to the event file.
format (str): The format to read the event file.
Example:
>>> sed = SeismicEventData()
>>> sed.read_event('test_directory/event1.xml', 'QUAKEML') # doctest: +SKIP
"""
if os.path.exists(file_path):
self.catalog = read_events(file_path, format=format)
self.event_id = self.catalog[0].resource_id.id.split('/')[-1] if self.catalog else ""
else:
raise FileNotFoundError(f"File {file_path} does not exist.")
def write_event(self, file_path: str, format: str) -> None:
"""
Write the seismic event to a file with specified format.
Parameters:
file_path (str): The path to the output file.
format (str): The format to write the event file.
Example:
>>> sed = SeismicEventData(event_id='12345')
>>> sed.write_event('output_directory/event1.xml', 'QUAKEML') # doctest: +SKIP
"""
self.catalog.write(file_path, format=format)
@dataclass
class WaveformData:
stream: Stream = field(default_factory=Stream)
def find_waveform_files(self, directory: str, extensions: List[str]) -> List[str]:
"""
Browse the directory to find waveform files with specified extensions.
Parameters:
directory (str): The directory path to search for waveform files.
extensions (List[str]): List of file extensions to search for.
Returns:
List[str]: List of file paths that match the given extensions.
Example:
>>> wd = WaveformData()
>>> wd.find_waveform_files('test_directory', ['.mseed']) # doctest: +SKIP
['test_directory/waveform1.mseed']
"""
matches = []
for root, _, files in os.walk(directory):
for ext in extensions:
for filename in fnmatch.filter(files, f'*{ext}'):
matches.append(os.path.join(root, filename))
return matches
def read_waveform_from_directory(self, directory: str, extensions: List[str], format: str) -> None:
"""
Read waveform data from the first found file in the directory with specified format.
Parameters:
directory (str): The directory path to search for waveform files.
extensions (List[str]): List of file extensions to search for.
format (str): The format to read the waveform file.
Example:
>>> wd = WaveformData()
>>> wd.read_waveform_from_directory('test_directory', ['.mseed'], 'MSEED') # doctest: +SKIP
"""
waveform_files = self.find_waveform_files(directory, extensions)
if waveform_files:
self.read_waveform(waveform_files[0], format)
else:
raise FileNotFoundError(f"No waveform files found in directory {directory} with extensions {extensions}.")
def read_waveform(self, file_path: str, format: str) -> None:
"""
Read waveform data from a file with specified format.
Parameters:
file_path (str): The path to the waveform file.
format (str): The format to read the waveform file.
Example:
>>> wd = WaveformData()
>>> wd.read_waveform('test_directory/waveform1.mseed', 'MSEED') # doctest: +SKIP
"""
if os.path.exists(file_path):
self.stream = read(file_path, format=format)
else:
raise FileNotFoundError(f"File {file_path} does not exist.")
def write_waveform(self, file_path: str, format: str) -> None:
"""
Write the waveform data to a file with specified format.
Parameters:
file_path (str): The path to the output file.
format (str): The format to write the waveform file.
Example:
>>> wd = WaveformData()
>>> wd.write_waveform('output_directory/waveform1.mseed', 'MSEED') # doctest: +SKIP
"""
self.stream.write(file_path, format=format)
# Example usage:
# seismic_event = SeismicEventData()
# seismic_event.read_event_from_directory("path_to_directory", extensions=[".xml", ".quakeml"], format="QUAKEML")
# seismic_event.write_event("output_event_file.xml", format="QUAKEML")
# waveform_data = WaveformData()
# waveform_data.read_waveform_from_directory("path_to_directory", extensions=[".mseed"], format="MSEED")
# waveform_data.write_waveform("output_waveform_file.mseed", format="MSEED")
class GenericDataStructure(object): class GenericDataStructure(object):
""" """
@ -837,22 +783,6 @@ class PilotDataStructure(GenericDataStructure):
self.setExpandFields(['root', 'database']) self.setExpandFields(['root', 'database'])
class ObspyDMTdataStructure(GenericDataStructure):
"""
Object containing the data access information for the old PILOT data
structure.
"""
def __init__(self, **fields):
if not fields:
fields = {'database': '',
'root': ''}
GenericDataStructure.__init__(self, **fields)
self.setExpandFields(['root', 'database'])
class SeiscompDataStructure(GenericDataStructure): class SeiscompDataStructure(GenericDataStructure):
""" """
Dictionary containing the data access information for an SDS data archive: Dictionary containing the data access information for an SDS data archive: