Compare commits
No commits in common. "db11e125c04fca82515d721970f3214c5b50f4a7" and "61c3f40063933418a202829eb660728ba092f742" have entirely different histories.
@ -11,7 +11,7 @@ import traceback
import glob
import json
from datetime import datetime
from typing import Any, Optional
from typing import Any
import numpy as np
import matplotlib.pyplot as plt
@ -19,7 +19,6 @@ import yaml
from joblib import Parallel, delayed
from obspy import read, Stream, UTCDateTime, Trace
from obspy.core.event import Event
from obspy.taup import TauPyModel
from obspy.core.event.base import WaveformStreamID, ResourceIdentifier
from obspy.core.event.origin import Pick, Origin
@ -29,7 +28,6 @@ from obspy.signal.cross_correlation import correlate, xcorr_max
from import PylotParameter
from import picks_from_picksdict
from pylot.core.pick.autopick import autopickstation
from pylot.core.util.dataprocessing import Metadata
from pylot.core.util.utils import check4rotated
from pylot.core.util.utils import identifyPhaseID
from pylot.core.util.event import Event as PylotEvent
@ -693,8 +691,10 @@ def correlate_event(eventdir: str, pylot_parameter: PylotParameter, params: dict
# continue if there is not enough traces for stacking
if nstack < params[phase_type]['min_stack']:
logging.warning(f"Not enough traces to stack: {nstack} "
f"(min_stack = {params[phase_type]['min_stack']}). Skip this event")
logging.warning('Not enough traces to stack: {} (min_stack = {}). Skip this event'.format(nstack,
# write unfiltered trace
@ -773,7 +773,7 @@ def correlate_event(eventdir: str, pylot_parameter: PylotParameter, params: dict
return False
def remove_outliers(picks: list, corrected_taupy_picks: list, threshold: float) -> list:
def remove_outliers(picks, corrected_taupy_picks, threshold):
""" delete a pick if difference to corrected taupy pick is larger than threshold"""
n_picks = len(picks)
@ -802,7 +802,7 @@ def remove_outliers(picks: list, corrected_taupy_picks: list, threshold: float)
return picks
def remove_invalid_picks(picks: list) -> list:
def remove_invalid_picks(picks):
""" Remove picks without uncertainty (invalid PyLoT picks)"""
count = 0
deleted_picks_ids = []
@ -817,15 +817,15 @@ def remove_invalid_picks(picks: list) -> list:
return picks
def get_picks_median(picks: list) -> UTCDateTime:
def get_picks_median(picks):
return UTCDateTime(int(np.median([pick.time.timestamp for pick in picks if pick.time])))
def get_picks_mean(picks: list) -> UTCDateTime:
def get_picks_mean(picks):
return UTCDateTime(np.mean([pick.time.timestamp for pick in picks if pick.time]))
def get_corrected_taupy_picks(picks: list, taupypicks: list, all_available: bool = False) -> tuple:
def get_corrected_taupy_picks(picks, taupypicks, all_available=False):
""" get mean/median from picks taupy picks, correct latter for the difference """
def nwst_id_from_wfid(wfid):
@ -847,8 +847,9 @@ def get_corrected_taupy_picks(picks: list, taupypicks: list, all_available: bool
mean_diff = taupy_mean - picks_mean
||||'Calculated {len(taupypicks_new)} TauPy theoretical picks.')
||||'Calculated median difference from TauPy '
f'theoretical picks of {median_diff} seconds. (mean: {mean_diff})')
'Calculated median difference from TauPy theoretical picks of {} seconds. (mean: {})'.format(median_diff,
# return all available taupypicks corrected for median difference to autopicks
if all_available:
@ -861,7 +862,7 @@ def get_corrected_taupy_picks(picks: list, taupypicks: list, all_available: bool
return taupypicks_new, median_diff
def load_stacked_trace(eventdir: str, min_corr_stack: float) -> Optional[tuple]:
def load_stacked_trace(eventdir, min_corr_stack):
# load stacked stream (miniseed)
str_stack_fpaths = glob.glob(os.path.join(eventdir, 'correlation', '*_stacked.mseed'))
if not len(str_stack_fpaths) == 1:
@ -890,8 +891,7 @@ def load_stacked_trace(eventdir: str, min_corr_stack: float) -> Optional[tuple]:
return corr_dict, nwst_id, stacked_trace, nstack
def repick_master_trace(wfdata: Stream, trace_master: Trace, pylot_parameter: PylotParameter, event: Event,
event_id: str, metadata: Metadata, phase_type: str, corr_out_dir: str) -> Optional[Pick]:
def repick_master_trace(wfdata, trace_master, pylot_parameter, event, event_id, metadata, phase_type, corr_out_dir):
rename_lqt = phase_type == 'S'
# create an 'artificial' stream object which can be used as input for PyLoT
stream_master = modify_master_trace4pick(trace_master.copy(), wfdata, rename_lqt=rename_lqt)
@ -928,14 +928,14 @@ def create_correlation_output_dir(eventdir: str, fopts: dict, phase_type: str) -
return export_path
def create_correlation_figure_dir(correlation_out_dir: str) -> str:
def create_correlation_figure_dir(correlation_out_dir):
export_path = os.path.join(correlation_out_dir, 'figures')
if not os.path.isdir(export_path):
return export_path
def add_fpath_extension(fpath: str, fopts: dict, phase: str) -> str:
def add_fpath_extension(fpath, fopts, phase):
if fopts:
freqmin, freqmax = fopts['freqmin'], fopts['freqmax']
if freqmin:
@ -947,12 +947,12 @@ def add_fpath_extension(fpath: str, fopts: dict, phase: str) -> str:
return fpath
def write_correlation_output(export_path: str, correlations_dict: dict, correlations_dict_stacked: dict) -> None:
def write_correlation_output(export_path, correlations_dict, correlations_dict_stacked):
write_json(correlations_dict, os.path.join(export_path, 'correlations_for_stacking.json'))
write_json(correlations_dict_stacked, os.path.join(export_path, 'correlation_results.json'))
def modify_master_trace4pick(trace_master: Trace, wfdata: Stream, rename_lqt: bool = True) -> Stream:
def modify_master_trace4pick(trace_master, wfdata, rename_lqt=True):
Create an artificial Stream with master trace instead of the trace of its corresponding channel.
This is done to find metadata for correct station metadata which were modified when stacking (e.g. loc=ST)
@ -988,8 +988,7 @@ def modify_master_trace4pick(trace_master: Trace, wfdata: Stream, rename_lqt: bo
return stream_master
def export_picks(eventdir: str, correlations_dict: dict, picks: list, taupypicks: list, params: dict,
phase_type: str, pf_extension: str) -> None:
def export_picks(eventdir, correlations_dict, picks, taupypicks, params, phase_type, pf_extension):
threshold = params['export_threshold']
min_picks_export = params['min_picks_export']
# make copy so that modified picks are not exported
@ -1061,8 +1060,7 @@ def export_picks(eventdir: str, correlations_dict: dict, picks: list, taupypicks
||||'Wrote {} correlated picks to file {}'.format(len(event.picks), fpath))
def write_taupy_picks(event: Event, eventdir: str, taupypicks: list, time_shift: float,
extension: str = 'corrected_taup_times') -> None:
def write_taupy_picks(event, eventdir, taupypicks, time_shift, extension='corrected_taup_times'):
# make copies because both objects are being modified
event = copy.deepcopy(event)
taupypicks = copy.deepcopy(taupypicks)
@ -1084,17 +1082,17 @@ def write_taupy_picks(event: Event, eventdir: str, taupypicks: list, time_shift:
write_event(event, eventdir, fname)
def write_event(event: Event, eventdir: str, fname: str) -> str:
def write_event(event, eventdir, fname):
fpath = os.path.join(eventdir, fname)
event.write(fpath, format='QUAKEML')
return fpath
def get_pickfile_name(event_id: str, fname_extension: str) -> str:
def get_pickfile_name(event_id, fname_extension):
return 'PyLoT_{}_{}.xml'.format(event_id, fname_extension)
def get_pickfile_name_correlated(event_id: str, fopts: dict, phase_type: str) -> str:
def get_pickfile_name_correlated(event_id, fopts, phase_type):
fname_extension = add_fpath_extension('correlated', fopts, phase_type)
return get_pickfile_name(event_id, fname_extension)
@ -1115,9 +1113,8 @@ def get_pickfile_name_correlated(event_id: str, fopts: dict, phase_type: str) ->
# return trace_this
def prepare_correlation_input(wfdata: Stream, picks: list, channels: list, trace_master: Trace, pick_master: Pick,
phase_params: dict, plot: bool = None, fig_dir: str = None,
ncorr: int = 1, wfdata_highf: Stream = None, trace_master_highf: Stream = None) -> list:
def prepare_correlation_input(wfdata, picks, channels, trace_master, pick_master, phase_params, plot=None, fig_dir=None,
ncorr=1, wfdata_highf=None, trace_master_highf=None):
# prepare input for multiprocessing worker for all 'other' picks to correlate with current master-trace
input_list = []
@ -1158,7 +1155,7 @@ def prepare_correlation_input(wfdata: Stream, picks: list, channels: list, trace
def stack_mastertrace(wfdata_lowf: Stream, wfdata_highf: Stream, wfdata_raw: Stream, picks: list, params: dict,
channels: list, method: str, fig_dir: str) -> Optional[tuple]:
channels: list, method: str, fig_dir: str):
Correlate all stations with the first available station given in station_list, a list containing central,
permanent, long operating, low noise stations with descending priority.
@ -1193,7 +1190,7 @@ def stack_mastertrace(wfdata_lowf: Stream, wfdata_highf: Stream, wfdata_raw: Str
dt_pre, dt_post = params['dt_stacking']
trace_master, nstack = apply_stacking(trace_master, stations4stack, wfdata_raw, picks, method=method,
do_rms_check=params['check_RMS'], plot=params['plot'], fig_dir=fig_dir,
check_rms=params['check_RMS'], plot=params['plot'], fig_dir=fig_dir,
dt_pre=dt_pre, dt_post=dt_post)
return correlations_dict, nwst_id_master, trace_master, nstack
@ -1267,12 +1264,12 @@ def iterate_correlation(wfdata_lowf: Stream, wfdata_highf: Stream, channels: lis
def apply_stacking(trace_master: Trace, stations4stack: dict, wfdata: Stream, picks: list, method: str,
do_rms_check: bool, dt_pre: float = 250., dt_post: float = 250., plot: bool = False,
check_rms: bool, dt_pre: float = 250., dt_post: float = 250., plot: bool = False,
fig_dir: str = None) -> tuple:
def check_trace_length_and_cut(trace: Trace, pick_time: UTCDateTime = None):
starttime = pick_time - dt_pre
endtime = pick_time + dt_post
def check_trace_length_and_cut(trace: Trace):
starttime = correlated_midtime - dt_pre
endtime = correlated_midtime + dt_post
if trace.stats.starttime > starttime or trace.stats.endtime < endtime:
raise ValueError('Trace too short for safe cutting. Would create inconsistent stacking. Abort!')
@ -1285,7 +1282,7 @@ def apply_stacking(trace_master: Trace, stations4stack: dict, wfdata: Stream, pi
trace_master = trace_master.copy()
trace_master.stats.location = 'ST'
check_trace_length_and_cut(trace_master, pick.time)
# empty trace so that it does not stack twice
|||| = np.zeros(len(
@ -1299,7 +1296,7 @@ def apply_stacking(trace_master: Trace, stations4stack: dict, wfdata: Stream, pi
stream_other =, station=st, channel=channel)
correlated_midtime = pick_other.time - dpick
trace_other = stream_other[0].copy()
check_trace_length_and_cut(trace_other, correlated_midtime)
if not len(trace_other) == len(trace_master):
@ -1308,26 +1305,24 @@ def apply_stacking(trace_master: Trace, stations4stack: dict, wfdata: Stream, pi
if do_rms_check:
traces4stack = check_rms(traces4stack, plot=plot, fig_dir=fig_dir)
if check_rms:
traces4stack = checkRMS(traces4stack, plot=plot, fig_dir=fig_dir)
if plot:
fig = plt.figure(figsize=(16, 9))
ax1 = fig.add_subplot(211)
ax2 = fig.add_subplot(212)
fig = ax1 = ax2 = None
for trace_other in traces4stack:
# stack on mastertrace
trace_normalized = trace_other.copy().normalize()
|||| +=
count += 1
if plot and ax1 and ax2:
if plot:
ax1.plot(, label=count)
ax2.plot(, color='k', alpha=0.1)
if plot and fig:
if plot:
if fig_dir:
fig.savefig(os.path.join(fig_dir, 'traces_stacked.svg'), dpi=300.)
@ -1340,8 +1335,7 @@ def apply_stacking(trace_master: Trace, stations4stack: dict, wfdata: Stream, pi
return trace_master, count
def check_rms(traces4stack: list, plot: bool = False, fig_dir: str = None, max_it: int = 10,
ntimes_std: float = 5.) -> list:
def checkRMS(traces4stack, plot=False, fig_dir=None, max_it=10, ntimes_std=5.):
rms_list = []
trace_names = []
@ -1359,7 +1353,7 @@ def check_rms(traces4stack: list, plot: bool = False, fig_dir: str = None, max_i
while iterate:
count += 1
if count >= max_it:
logging.debug('Maximum iterations ({}) of check_rms function reached.'.format(max_it))
logging.debug('Maximum iterations ({}) of checkRMS function reached.'.format(max_it))
std = np.std(rms_list)
mean = np.mean(rms_list)
@ -1403,14 +1397,14 @@ def plot_rms(rms_list: list, trace_names: list, mean: float, std: float, fig_dir
def calc_rms(array: np.ndarray) -> float:
def calc_rms(X):
Returns root mean square of a given array LON
return np.sqrt(np.sum(np.power(array, 2)) / len(array))
return np.sqrt(np.sum(np.power(X, 2)) / len(X))
def resample_parallel(stream: Stream, freq: float, ncores: int) -> Stream:
def resample_parallel(stream, freq, ncores):
input_list = [{'trace': trace, 'freq': freq} for trace in stream.traces]
parallel = Parallel(n_jobs=ncores)
@ -1423,10 +1417,8 @@ def resample_parallel(stream: Stream, freq: float, ncores: int) -> Stream:
stream.traces = output_list
return stream
# TODO: Check why metadata, channels and ncores are not used
def rotate_stream(stream: Stream, metadata: Metadata, origin: Origin, stations_dict: dict, channels: list,
inclination: float, ncores: int) -> Stream:
def rotate_stream(stream, metadata, origin, stations_dict, channels, inclination, ncores):
""" Rotate stream to LQT. To do this all traces have to be cut to equal lenths"""
input_list = []
@ -1512,8 +1504,7 @@ def unpack_result(result: list) -> dict:
return result_dict
def get_taupy_picks(origin: Origin, stations_dict: dict, pylot_parameter: PylotParameter, phase_type: str,
ncores: int = None) -> list:
def get_taupy_picks(origin: Origin, stations_dict, pylot_parameter, phase_type, ncores=None):
input_list = []
taup_phases = pylot_parameter['taup_phases'].split(',')
@ -1541,7 +1532,7 @@ def get_taupy_picks(origin: Origin, stations_dict: dict, pylot_parameter: PylotP
return taupy_picks
def create_artificial_picks(taupy_result: dict) -> list:
def create_artificial_picks(taupy_result):
artificial_picks = []
for nwst_id, taupy_dict in taupy_result.items():
nw, st = nwst_id.split('.')
@ -1553,14 +1544,14 @@ def create_artificial_picks(taupy_result: dict) -> list:
return artificial_picks
def check_taupy_phases(taupy_result: dict) -> None:
def check_taupy_phases(taupy_result):
test_phase_name = list(taupy_result.values())[0]['phase_name']
phase_names_equal = [item['phase_name'] == test_phase_name for item in taupy_result.values()]
if not all(phase_names_equal):
||||'Different first arriving phases detected in TauPy phases for this event.')
def taupy_worker(input_dict: dict) -> dict:
def taupy_worker(input_dict):
model = input_dict['model']
taupy_input = input_dict['taupy_input']
source_time = input_dict['source_time']
@ -1580,7 +1571,7 @@ def taupy_worker(input_dict: dict) -> dict:
return output_dict
def resample_worker(input_dict: dict) -> Trace:
def resample_worker(input_dict):
trace = input_dict['trace']
freq = input_dict['freq']
if freq == trace.stats.sampling_rate:
@ -1588,7 +1579,7 @@ def resample_worker(input_dict: dict) -> Trace:
return trace.resample(freq, no_filter=freq > trace.stats.sampling_rate)
def rotation_worker(input_dict: dict) -> Optional[Stream]:
def rotation_worker(input_dict):
stream = input_dict['stream']
tstart = max([tr.stats.starttime for tr in stream])
tend = min([tr.stats.endtime for tr in stream])
@ -1605,7 +1596,7 @@ def rotation_worker(input_dict: dict) -> Optional[Stream]:
return stream
def correlation_worker(input_dict: dict) -> dict:
def correlation_worker(input_dict):
"""worker function for multiprocessing"""
# unpack input dictionary
@ -1673,7 +1664,6 @@ def get_stations_min_corr(corr_results: dict, min_corr: float) -> dict:
return corr_results
# TODO check if wfdata is needed
def plot_mastertrace_corr(nwst_id: str, corr_results: dict, wfdata: Stream, stations_dict: dict, picks: list,
trace_master: Trace, method: str, min_corr: float, title: str,
fig_dir: str = None) -> None:
@ -1802,7 +1792,7 @@ def plot_stacked_trace_pick(trace: Trace, pick: Pick, pylot_parameter: PylotPara
def plot_stations(stations_dict: dict, stations2compare: dict, coords_this: dict, corr_result: dict, trace: Trace,
pick: Pick, window_title: str = None) -> Optional[tuple]:
pick: Pick, window_title: str = None):
""" Plot routine to check proximity algorithm. """
title =
@ -1875,7 +1865,8 @@ def get_data(eventdir: str, data_dir: str, headonly: bool = False) -> Stream:
return wfdata
def get_station_distances(stations_dict: dict, coords: dict) -> dict:
def get_closest_stations(coords, stations_dict, n):
""" Calculate distances and return n closest stations in stations_dict for station at coords. """
distances = {}
for station_id, st_coords in stations_dict.items():
dist = gps2dist_azimuth(coords['latitude'], coords['longitude'], st_coords['latitude'], st_coords['longitude'],
@ -1885,28 +1876,28 @@ def get_station_distances(stations_dict: dict, coords: dict) -> dict:
distances[dist] = station_id
return distances
def get_closest_stations(coords: dict, stations_dict: dict, n: int) -> dict:
""" Calculate distances and return the n closest stations in stations_dict for station at coords. """
distances = get_station_distances(stations_dict, coords)
closest_distances = sorted(list(distances.keys()))[:n + 1]
closest_stations = {station: dist for dist, station in distances.items() if dist in closest_distances}
return closest_stations
def get_random_stations(coords: dict, stations_dict: dict, n: int) -> dict:
def get_random_stations(coords, stations_dict, n):
""" Calculate distances and return n randomly selected stations in stations_dict for station at coords. """
random_keys = random.sample(list(stations_dict.keys()), n)
distances = get_station_distances(stations_dict, coords)
distances = {}
for station_id, st_coords in stations_dict.items():
dist = gps2dist_azimuth(coords['latitude'], coords['longitude'], st_coords['latitude'], st_coords['longitude'],
a=6.371e6, f=0)[0]
# exclude same coordinate (self or other instrument at same location)
if dist == 0:
distances[dist] = station_id
random_stations = {station: dist for dist, station in distances.items() if station in random_keys}
return random_stations
def prepare_corr_params(parse_args: argparse, logger: logging.Logger) -> dict:
def prepare_corr_params(parse_args, logger):
with open(parse_args.params) as infile:
parameters_dict = yaml.safe_load(infile)
@ -1931,7 +1922,7 @@ def prepare_corr_params(parse_args: argparse, logger: logging.Logger) -> dict:
return corr_params
def init_logger() -> logging.Logger:
def init_logger():
logger = logging.getLogger()
handler = logging.StreamHandler(sys.stdout)
fhandler = logging.FileHandler('pick_corr.out')
Reference in New Issue
Block a user