[refactor] rewrote/simplified getQualitiesfromxml code, used function already implemented in phases.py

This commit is contained in:
Marcel Paffrath 2022-03-16 16:00:14 +01:00
parent 3cd17ff364
commit dd685d5d5e
5 changed files with 94 additions and 269 deletions

View File

@ -89,7 +89,7 @@ from pylot.core.util.structure import DATASTRUCTURE
from pylot.core.util.thread import Thread, Worker
from pylot.core.util.version import get_git_version as _getVersionString
from pylot.core.io.getEventListFromXML import geteventlistfromxml
from pylot.core.io.getQualitiesfromxml import getQualitiesfromxml
from pylot.core.io.phases import getQualitiesfromxml
from pylot.styles import style_settings
@ -1669,8 +1669,8 @@ class MainWindow(QMainWindow):
self.cmpw.show()
def pickQualities(self):
path = self._inputs['rootpath'] + '/' + self._inputs['datapath'] + '/' + self._inputs['database']
getQualitiesfromxml(path)
path = self.get_current_event_path()
getQualitiesfromxml(path, self._inputs.get('timeerrorsP'), self._inputs.get('timeerrorsS'), plotflag=1)
return
def eventlistXml(self):

View File

@ -1,138 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Script to get onset uncertainties from Quakeml.xml files created by PyLoT.
Uncertainties are tranformed into quality classes and visualized via histogram if desired.
Ludger Küperkoch, BESTEC GmbH, 07/2017
rev.: Ludger Küperkoch, igem, 10/2020
Edited for usage in PyLoT: Jeldrik Gaal, igem, 01/2022
"""
import glob
import matplotlib.pyplot as plt
import numpy as np
from obspy.core.event import read_events
def getQualitiesfromxml(path):
# uncertainties
ErrorsP = [0.02, 0.04, 0.08, 0.16]
ErrorsS = [0.04, 0.08, 0.16, 0.32]
Pw0 = []
Pw1 = []
Pw2 = []
Pw3 = []
Pw4 = []
Sw0 = []
Sw1 = []
Sw2 = []
Sw3 = []
Sw4 = []
# data path
dp = path + '/e*/*.xml'
# list of all available xml-files
xmlnames = glob.glob(dp)
# read all onset weights
for names in xmlnames:
print("Getting onset weights from {}".format(names))
cat = read_events(names)
arrivals = cat.events[0].picks
for Pick in arrivals:
if Pick.phase_hint[0] == 'P':
if Pick.time_errors.uncertainty <= ErrorsP[0]:
Pw0.append(Pick.time_errors.uncertainty)
elif Pick.time_errors.uncertainty > ErrorsP[0] and \
Pick.time_errors.uncertainty <= ErrorsP[1]:
Pw1.append(Pick.time_errors.uncertainty)
elif Pick.time_errors.uncertainty > ErrorsP[1] and \
Pick.time_errors.uncertainty <= ErrorsP[2]:
Pw2.append(Pick.time_errors.uncertainty)
elif Pick.time_errors.uncertainty > ErrorsP[2] and \
Pick.time_errors.uncertainty <= ErrorsP[3]:
Pw3.append(Pick.time_errors.uncertainty)
elif Pick.time_errors.uncertainty > ErrorsP[3]:
Pw4.append(Pick.time_errors.uncertainty)
else:
pass
elif Pick.phase_hint[0] == 'S':
if Pick.time_errors.uncertainty <= ErrorsS[0]:
Sw0.append(Pick.time_errors.uncertainty)
elif Pick.time_errors.uncertainty > ErrorsS[0] and \
Pick.time_errors.uncertainty <= ErrorsS[1]:
Sw1.append(Pick.time_errors.uncertainty)
elif Pick.time_errors.uncertainty > ErrorsS[1] and \
Pick.time_errors.uncertainty <= ErrorsS[2]:
Sw2.append(Pick.time_errors.uncertainty)
elif Pick.time_errors.uncertainty > ErrorsS[2] and \
Pick.time_errors.uncertainty <= ErrorsS[3]:
Sw3.append(Pick.time_errors.uncertainty)
elif Pick.time_errors.uncertainty > ErrorsS[3]:
Sw4.append(Pick.time_errors.uncertainty)
else:
pass
else:
print("Phase hint not defined for picking!")
pass
# get percentage of weights
numPweights = np.sum([len(Pw0), len(Pw1), len(Pw2), len(Pw3), len(Pw4)])
numSweights = np.sum([len(Sw0), len(Sw1), len(Sw2), len(Sw3), len(Sw4)])
try:
P0perc = 100.0 / numPweights * len(Pw0)
except:
P0perc = 0
try:
P1perc = 100.0 / numPweights * len(Pw1)
except:
P1perc = 0
try:
P2perc = 100.0 / numPweights * len(Pw2)
except:
P2perc = 0
try:
P3perc = 100.0 / numPweights * len(Pw3)
except:
P3perc = 0
try:
P4perc = 100.0 / numPweights * len(Pw4)
except:
P4perc = 0
try:
S0perc = 100.0 / numSweights * len(Sw0)
except:
Soperc = 0
try:
S1perc = 100.0 / numSweights * len(Sw1)
except:
S1perc = 0
try:
S2perc = 100.0 / numSweights * len(Sw2)
except:
S2perc = 0
try:
S3perc = 100.0 / numSweights * len(Sw3)
except:
S3perc = 0
try:
S4perc = 100.0 / numSweights * len(Sw4)
except:
S4perc = 0
weights = ('0', '1', '2', '3', '4')
y_pos = np.arange(len(weights))
width = 0.34
p1, = plt.bar(0 - width, P0perc, width, color='black')
p2, = plt.bar(0, S0perc, width, color='red')
plt.bar(y_pos - width, [P0perc, P1perc, P2perc, P3perc, P4perc], width, color='black')
plt.bar(y_pos, [S0perc, S1perc, S2perc, S3perc, S4perc], width, color='red')
plt.ylabel('%')
plt.xticks(y_pos, weights)
plt.xlim([-0.5, 4.5])
plt.xlabel('Qualities')
plt.title('{0} P-Qualities, {1} S-Qualities'.format(numPweights, numSweights))
plt.legend([p1, p2], ['P-Weights', 'S-Weights'])
plt.show()

View File

@ -17,7 +17,7 @@ from pylot.core.io.location import create_event, \
create_magnitude
from pylot.core.pick.utils import select_for_phase, get_quality_class
from pylot.core.util.utils import getOwner, full_range, four_digits, transformFilterString4Export, \
backtransformFilterString
backtransformFilterString, loopIdentifyPhase, identifyPhase
def add_amplitudes(event, amplitudes):
@ -375,7 +375,6 @@ def picks_from_picksdict(picks, creation_info=None):
def reassess_pilot_db(root_dir, db_dir, out_dir=None, fn_param=None, verbosity=0):
import glob
# TODO: change root to datapath
db_root = os.path.join(root_dir, db_dir)
evt_list = glob.glob1(db_root, 'e????.???.??')
@ -1056,37 +1055,60 @@ def merge_picks(event, picks):
return event
def getQualitiesfromxml(xmlnames, ErrorsP, ErrorsS, plotflag=1):
def getQualitiesfromxml(path, errorsP, errorsS, plotflag=1, figure=None, verbosity=0):
"""
Script to get onset uncertainties from Quakeml.xml files created by PyLoT.
Uncertainties are tranformed into quality classes and visualized via histogram if desired.
Ludger Küperkoch, BESTEC GmbH, 07/2017
:param xmlnames: list of xml obspy event files containing picks
:type xmlnames: list
:param ErrorsP: time errors of P waves for the four discrete quality classes
:type ErrorsP:
:param ErrorsS: time errors of S waves for the four discrete quality classes
:type ErrorsS:
:param path: path containing xml files
:type path: str
:param errorsP: time errors of P waves for the four discrete quality classes
:type errorsP:
:param errorsS: time errors of S waves for the four discrete quality classes
:type errorsS:
:param plotflag:
:type plotflag:
:return:
:rtype:
"""
from pylot.core.pick.utils import get_quality_class
from pylot.core.util.utils import loopIdentifyPhase, identifyPhase
def calc_perc(uncertainties, ntotal):
if len(uncertainties) == 0:
return 0
else:
return 100 / ntotal * len(uncertainties)
def calc_weight_perc(psweights, weight_ids):
# count total number of list items for this phase
numWeights = np.sum([len(weight) for weight in psweights.values()])
# iterate over all available weights to return a list with percentages for plotting
plot_list = []
for weight_id in weight_ids:
plot_list.append(calc_perc(psweights[weight_id], numWeights))
return plot_list, numWeights
xmlnames = glob.glob(os.path.join(path, '*.xml'))
if len(xmlnames) == 0:
print(f'No files found in path {path}.')
return False
# first define possible phases here
phases = ['P', 'S']
# define possible weights (0-4)
weight_ids = list(range(5))
# put both error lists in a dictionary with P/S key so that amount of code can be halfed by simply using P/S as key
errors = dict(P=errorsP, S=errorsS)
# create dictionaries for each phase (P/S) with a dictionary of empty list for each weight defined in weights
# tuple above
weights = {}
for phase in phases:
weights[phase] = {weight_id: [] for weight_id in weight_ids}
# read all onset weights
Pw0 = []
Pw1 = []
Pw2 = []
Pw3 = []
Pw4 = []
Sw0 = []
Sw1 = []
Sw2 = []
Sw3 = []
Sw4 = []
for names in xmlnames:
print("Getting onset weights from {}".format(names))
cat = read_events(names)
@ -1094,119 +1116,60 @@ def getQualitiesfromxml(xmlnames, ErrorsP, ErrorsS, plotflag=1):
arrivals = cat.events[0].picks
arrivals_copy = cat_copy.events[0].picks
# Prefere manual picks if qualities are sufficient!
for Pick in arrivals:
if Pick.method_id.id.split('/')[1] == 'manual':
mstation = Pick.waveform_id.station_code
for pick in arrivals:
if pick.method_id.id.split('/')[1] == 'manual':
mstation = pick.waveform_id.station_code
mstation_ext = mstation + '_'
for mpick in arrivals_copy:
phase = identifyPhase(loopIdentifyPhase(Pick.phase_hint))
if phase == 'P':
if ((mpick.waveform_id.station_code == mstation) or
(mpick.waveform_id.station_code == mstation_ext)) and \
(mpick.method_id.id.split('/')[1] == 'auto') and \
(mpick.time_errors['uncertainty'] <= ErrorsP[3]):
del mpick
break
elif phase == 'S':
if ((mpick.waveform_id.station_code == mstation) or
(mpick.waveform_id.station_code == mstation_ext)) and \
(mpick.method_id.id.split('/')[1] == 'auto') and \
(mpick.time_errors['uncertainty'] <= ErrorsS[3]):
del mpick
break
phase = identifyPhase(loopIdentifyPhase(pick.phase_hint)) # MP MP catch if this fails?
if ((mpick.waveform_id.station_code == mstation) or
(mpick.waveform_id.station_code == mstation_ext)) and \
(mpick.method_id.id.split('/')[1] == 'auto') and \
(mpick.time_errors['uncertainty'] <= errors[phase][3]):
del mpick
break
lendiff = len(arrivals) - len(arrivals_copy)
if lendiff != 0:
print("Found manual as well as automatic picks, prefered the {} manual ones!".format(lendiff))
for Pick in arrivals_copy:
phase = identifyPhase(loopIdentifyPhase(Pick.phase_hint))
if phase == 'P':
Pqual = get_quality_class(Pick.time_errors.uncertainty, ErrorsP)
if Pqual == 0:
Pw0.append(Pick.time_errors.uncertainty)
elif Pqual == 1:
Pw1.append(Pick.time_errors.uncertainty)
elif Pqual == 2:
Pw2.append(Pick.time_errors.uncertainty)
elif Pqual == 3:
Pw3.append(Pick.time_errors.uncertainty)
elif Pqual == 4:
Pw4.append(Pick.time_errors.uncertainty)
elif phase == 'S':
Squal = get_quality_class(Pick.time_errors.uncertainty, ErrorsS)
if Squal == 0:
Sw0.append(Pick.time_errors.uncertainty)
elif Squal == 1:
Sw1.append(Pick.time_errors.uncertainty)
elif Squal == 2:
Sw2.append(Pick.time_errors.uncertainty)
elif Squal == 3:
Sw3.append(Pick.time_errors.uncertainty)
elif Squal == 4:
Sw4.append(Pick.time_errors.uncertainty)
else:
for pick in arrivals_copy:
phase = identifyPhase(loopIdentifyPhase(pick.phase_hint))
uncertainty = pick.time_errors.uncertainty
if not uncertainty:
if verbosity > 0:
print('No uncertainty, pick {} invalid!'.format(pick.method_id.id))
continue
# check P/S phase
if phase not in phases:
print("Phase hint not defined for picking!")
pass
continue
qual = get_quality_class(uncertainty, errors[phase])
weights[phase][qual].append(uncertainty)
if plotflag == 0:
Punc = [Pw0, Pw1, Pw2, Pw3, Pw4]
Sunc = [Sw0, Sw1, Sw2, Sw3, Sw4]
return Punc, Sunc
p_unc = [weights['P'][weight_id] for weight_id in weight_ids]
s_unc = [weights['S'][weight_id] for weight_id in weight_ids]
return p_unc, s_unc
else:
if not figure:
fig = plt.figure()
ax = fig.add_subplot(111)
# get percentage of weights
numPweights = np.sum([len(Pw0), len(Pw1), len(Pw2), len(Pw3), len(Pw4)])
numSweights = np.sum([len(Sw0), len(Sw1), len(Sw2), len(Sw3), len(Sw4)])
if len(Pw0) > 0:
P0perc = 100 / numPweights * len(Pw0)
else:
P0perc = 0
if len(Pw1) > 0:
P1perc = 100 / numPweights * len(Pw1)
else:
P1perc = 0
if len(Pw2) > 0:
P2perc = 100 / numPweights * len(Pw2)
else:
P2perc = 0
if len(Pw3) > 0:
P3perc = 100 / numPweights * len(Pw3)
else:
P3perc = 0
if len(Pw4) > 0:
P4perc = 100 / numPweights * len(Pw4)
else:
P4perc = 0
if len(Sw0) > 0:
S0perc = 100 / numSweights * len(Sw0)
else:
S0perc = 0
if len(Sw1) > 0:
S1perc = 100 / numSweights * len(Sw1)
else:
S1perc = 0
if len(Sw2) > 0:
S2perc = 100 / numSweights * len(Sw2)
else:
S2perc = 0
if len(Sw3) > 0:
S3perc = 100 / numSweights * len(Sw3)
else:
S3perc = 0
if len(Sw4) > 0:
S4perc = 100 / numSweights * len(Sw4)
else:
S4perc = 0
listP, numPweights = calc_weight_perc(weights['P'], weight_ids)
listS, numSweights = calc_weight_perc(weights['S'], weight_ids)
weights = ('0', '1', '2', '3', '4')
y_pos = np.arange(len(weights))
y_pos = np.arange(len(weight_ids))
width = 0.34
plt.bar(y_pos - width, [P0perc, P1perc, P2perc, P3perc, P4perc], width, color='black')
plt.bar(y_pos, [S0perc, S1perc, S2perc, S3perc, S4perc], width, color='red')
plt.ylabel('%')
plt.xticks(y_pos, weights)
plt.xlim([-0.5, 4.5])
plt.xlabel('Qualities')
plt.title('{0} P-Qualities, {1} S-Qualities'.format(numPweights, numSweights))
plt.show()
ax.bar(y_pos - width, listP, width, color='black')
ax.bar(y_pos, listS, width, color='red')
ax.set_ylabel('%')
ax.set_xticks(y_pos, weight_ids)
ax.set_xlim([-0.5, 4.5])
ax.set_xlabel('Qualities')
ax.set_title('{0} P-Qualities, {1} S-Qualities'.format(numPweights, numSweights))
return [P0perc, P1perc, P2perc, P3perc, P4perc], [S0perc, S1perc, S2perc, S3perc, S4perc]
if not figure:
fig.show()
return listP, listS

View File

@ -1320,7 +1320,7 @@ def get_quality_class(uncertainty, weight_classes):
:return: quality of pick (0-4)
:rtype: int
"""
if not uncertainty: return max(weight_classes)
if not uncertainty: return len(weight_classes)
try:
# create generator expression containing all indices of values in weight classes that are >= than uncertainty.
# call next on it once to receive first value

View File

@ -5,7 +5,7 @@ from pylot.core.io.phases import getQualitiesfromxml
class TestQualityFromXML(unittest.TestCase):
def setUp(self):
self.xmlpaths = ['PyLoT_e0019.048.13.xml']
self.path = '.'
self.ErrorsP = [0.02, 0.04, 0.08, 0.16]
self.ErrorsS = [0.04, 0.08, 0.16, 0.32]
self.test0_result = [[0.0136956521739, 0.0126, 0.0101612903226, 0.00734848484849, 0.0135069444444,
@ -23,10 +23,10 @@ class TestQualityFromXML(unittest.TestCase):
[92.0, 4.0, 4.0, 0, 0]
def test_result_plotflag0(self):
self.assertEqual(getQualitiesfromxml(self.xmlpaths, self.ErrorsP, self.ErrorsS, 0), self.test0_result)
self.assertEqual(getQualitiesfromxml(self.path, self.ErrorsP, self.ErrorsS, 0), self.test0_result)
def test_result_plotflag1(self):
self.assertEqual(getQualitiesfromxml(self.xmlpaths, self.ErrorsP, self.ErrorsS, 1), self.test1_result)
self.assertEqual(getQualitiesfromxml(self.path, self.ErrorsP, self.ErrorsS, 1), self.test1_result)
if __name__ == '__main__':