Merge branch 'develop' into feature/refactor

This commit is contained in:
Darius Arnold 2018-07-13 09:28:50 +02:00
commit 2c92f6f2fd
25 changed files with 306 additions and 88 deletions

1
.gitignore vendored
View File

@ -2,3 +2,4 @@
*~
pylot/RELEASE-VERSION
*.idea
autopylot.sh*

View File

@ -110,21 +110,8 @@ class MainWindow(QMainWindow):
def __init__(self, parent=None, infile=None):
super(MainWindow, self).__init__(parent)
# check for default pylot.in-file
if not infile:
infile = os.path.join(os.path.expanduser('~'), '.pylot', 'pylot.in')
print('Using default input file {}'.format(infile))
if os.path.isfile(infile) == False:
infile = QFileDialog().getOpenFileName(caption='Choose PyLoT-input file')
self.init_config_files(infile)
if not os.path.exists(infile[0]):
QMessageBox.warning(self, "PyLoT Warning",
"No PyLoT-input file declared!")
sys.exit(0)
self.infile = infile[0]
else:
self.infile = infile
self._inputs = PylotParameter(infile)
self._props = None
self.dirty = False
@ -208,6 +195,22 @@ class MainWindow(QMainWindow):
self.loc = False
def init_config_files(self, infile):
pylot_config_dir = os.path.join(os.path.expanduser('~'), '.pylot')
if not os.path.exists(pylot_config_dir):
os.mkdir(pylot_config_dir)
self._inputs = PylotParameter(infile)
if not infile:
self._inputs.reset_defaults()
# check for default pylot.in-file
infile = os.path.join(pylot_config_dir, '.pylot.in')
print('Using default input file {}'.format(infile))
self._inputs.export2File(infile)
self.infile = infile
def setupUi(self):
try:
self.startTime = min(
@ -1058,9 +1061,12 @@ class MainWindow(QMainWindow):
eventlist = ed.selectedFiles()
basepath = eventlist[0].split(os.path.basename(eventlist[0]))[0]
if check_obspydmt_structure(basepath):
print('Recognized obspyDMT structure in selected files.')
print('Recognized obspyDMT structure in selected files. Settings Datastructure to ObspyDMT')
self.dataStructure = DATASTRUCTURE['obspyDMT']()
eventlist = check_all_obspy(eventlist)
else:
print('Settings Datastructure to PILOT')
self.dataStructure = DATASTRUCTURE['PILOT']()
eventlist = check_all_pylot(eventlist)
if not eventlist:
print('No events found! Expected structure for event folders: [eEVID.DOY.YR],\n'
@ -1407,7 +1413,7 @@ class MainWindow(QMainWindow):
self.get_data().resetPicks()
return self.saveData(event, directory, outformats)
fcheck = ['manual', 'origins', 'magnitude']
fcheck = ['auto', 'manual', 'origins', 'magnitude']
saved_as = str()
for outformat in outformats:
@ -2631,17 +2637,20 @@ class MainWindow(QMainWindow):
elif type == 'auto':
event.addAutopicks(picksdict['auto'])
def drawPicks(self, station=None, picktype=None):
def drawPicks(self, station=None, picktype=None, stime=None):
# if picktype not specified, draw both
if not stime:
stime = self.getStime()
if not picktype:
self.drawPicks(station, 'manual')
self.drawPicks(station, 'auto')
self.drawPicks(station, 'manual', stime)
self.drawPicks(station, 'auto', stime)
return
# if picks to draw not specified, draw all picks available
if not station:
for station in self.getPicks(type=picktype):
self.drawPicks(station, picktype=picktype)
self.drawPicks(station, picktype=picktype, stime=stime)
return
# check for station key in dictionary, else return
@ -2659,7 +2668,6 @@ class MainWindow(QMainWindow):
ylims = np.array([-.5, +.5]) + plotID
stat_picks = self.getPicks(type=picktype)[station]
stime = self.getStime()
for phase in stat_picks:
if phase == 'SPt': continue # wadati SP time

View File

@ -314,7 +314,7 @@ def autoPyLoT(input_dict=None, parameter=None, inputfile=None, fnames=None, even
ttpat)
# locate the event
nll.locate(ctrfile, inputfile)
nll.locate(ctrfile, parameter)
# !iterative picking if traces remained unpicked or occupied with bad picks!
# get theoretical onset times for picks with weights >= 4
@ -402,7 +402,7 @@ def autoPyLoT(input_dict=None, parameter=None, inputfile=None, fnames=None, even
# remove actual NLLoc-location file to keep only the last
os.remove(nllocfile)
# locate the event
nll.locate(ctrfile, inputfile)
nll.locate(ctrfile, parameter)
print("autoPyLoT: Iteration No. %d finished." % nlloccounter)
# get updated NLLoc-location file
nllocfile = max(glob.glob(locsearch), key=os.path.getctime)

1
autopylot.sh Normal file
View File

@ -0,0 +1 @@
python ./autoPyLoT.py -i /home/marcel/.pylot/pylot_global.in -dmt processed -c 80

View File

@ -733,6 +733,22 @@ class PilotDataStructure(GenericDataStructure):
self.setExpandFields(['root', 'database'])
class ObspyDMTdataStructure(GenericDataStructure):
"""
Object containing the data access information for the old PILOT data
structure.
"""
def __init__(self, **fields):
if not fields:
fields = {'database': '',
'root': ''}
GenericDataStructure.__init__(self, **fields)
self.setExpandFields(['root', 'database'])
class SeiscompDataStructure(GenericDataStructure):
"""
Dictionary containing the data access information for an SDS data archive:

View File

@ -63,7 +63,7 @@ defaults = {'rootpath': {'type': str,
'ctrfile': {'type': str,
'tooltip': 'name of autoPyLoT-output control file for NLLoc',
'value': 'Insheim_min1d2015_auto.in',
'value': '',
'namestring': 'Control filename'},
'ttpatter': {'type': str,

View File

@ -48,6 +48,7 @@ class PylotParameter(object):
self.__init_default_paras()
self.__init_subsettings()
self.__filename = fnin
self.__parameter = {}
self._verbosity = verbosity
self._parFileCont = {}
# io from parsed arguments alternatively
@ -273,8 +274,8 @@ class PylotParameter(object):
:rtype: None
"""
defaults = self.get_defaults()
for param in defaults:
self.setParamKV(param, defaults[param]['value'])
for param_name, param in defaults.items():
self.setParamKV(param_name, param['value'])
def from_file(self, fnin=None):
"""

View File

@ -73,17 +73,14 @@ def modify_inputs(ctrfn, root, nllocoutn, phasefn, tttn):
nllfile.close()
def locate(fnin, infile=None):
def locate(fnin, parameter=None):
"""
takes an external program name and tries to run it
:param fnin: external program name
:return: None
"""
if infile is None:
exe_path = which('NLLoc')
else:
exe_path = which('NLLoc', infile)
exe_path = which('NLLoc', parameter)
if exe_path is None:
raise NLLocError('NonLinLoc executable not found; check your '
'environment variables')

View File

@ -89,14 +89,22 @@ def autopickevent(data, param, iplot=0, fig_dict=None, fig_dict_wadatijack=None,
'stations on {} cores.'.format(len(input_tuples), ncores_str))
pool = gen_Pool(ncores)
result = pool.map(call_autopickstation, input_tuples)
results = pool.map(call_autopickstation, input_tuples)
pool.close()
for pick in result:
if pick:
station = pick['station']
pick.pop('station')
all_onsets[station] = pick
for result, wfstream in results:
if type(result) == dict:
station = result['station']
result.pop('station')
all_onsets[station] = result
else:
if result == None:
result = 'Picker exited unexpectedly.'
if len(wfstream) > 0:
station = wfstream[0].stats.station
else:
station = None
print('Could not pick a station: {}\nReason: {}'.format(station, result))
# quality control
# median check and jackknife on P-onset times
@ -116,7 +124,10 @@ def call_autopickstation(input_tuple):
"""
wfstream, pickparam, verbose, metadata, origin = input_tuple
# multiprocessing not possible with interactive plotting
return autopickstation(wfstream, pickparam, verbose, iplot=0, metadata=metadata, origin=origin)
try:
return autopickstation(wfstream, pickparam, verbose, iplot=0, metadata=metadata, origin=origin), wfstream
except Exception as e:
return e, wfstream
def get_source_coords(parser, station_id):

View File

@ -23,7 +23,7 @@ import warnings
import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import argrelmax
from scipy.signal import argrelmax, argrelmin
from pylot.core.pick.charfuns import CharacteristicFunction
from pylot.core.pick.utils import getnoisewin, getsignalwin
@ -197,10 +197,15 @@ class AICPicker(AutoPicker):
# find minimum in AIC-CF front of maximum of HOS/AR-CF
lpickwindow = int(round(self.PickWindow / self.dt))
for i in range(icfmax - 1, max([icfmax - lpickwindow, 2]), -1):
if aicsmooth[i - 1] >= aicsmooth[i]:
self.Pick = self.Tcf[i]
break
tsafety = self.TSNR[1] # safety gap, AIC is usually a little bit too late
left_corner_ind = max([icfmax - lpickwindow, 2])
right_corner_ind = icfmax + int(tsafety / self.dt)
aic_snip = aicsmooth[left_corner_ind : right_corner_ind]
minima = argrelmin(aic_snip)[0] # 0th entry of tuples for axes
if len(minima) > 0:
pickindex = minima[-1] + left_corner_ind
self.Pick = self.Tcf[pickindex]
# if no minimum could be found:
# search in 1st derivative of AIC-CF
if self.Pick is None:
@ -215,17 +220,12 @@ class AICPicker(AutoPicker):
for i in range(icfmax - 1, max([icfmax - lpickwindow, 2]), -1):
if diffcf[i - 1] >= diffcf[i]:
self.Pick = self.Tcf[i]
pickindex = i
break
if self.Pick is not None:
# get noise window
inoise = getnoisewin(self.Tcf, self.Pick, self.TSNR[0], self.TSNR[1])
# check, if these are counts or m/s, important for slope estimation!
# this is quick and dirty, better solution? #Todo wtf
if max(self.Data[0].data < 1e-3) and max(self.Data[0].data >= 1e-6):
self.Data[0].data = self.Data[0].data * 1000000.
elif max(self.Data[0].data < 1e-6):
self.Data[0].data = self.Data[0].data * 1e13
# get signal window
isignal = getsignalwin(self.Tcf, self.Pick, self.TSNR[2])
if len(isignal) == 0:
@ -244,7 +244,6 @@ class AICPicker(AutoPicker):
# calculate slope from CF after initial pick
# get slope window
tslope = self.TSNR[3] # slope determination window
tsafety = self.TSNR[1] # safety gap, AIC is usually a little bit too late
if tsafety >= 0:
islope = np.where((self.Tcf <= min([self.Pick + tslope + tsafety, self.Tcf[-1]])) \
& (self.Tcf >= self.Pick)) # TODO: put this in a seperate function like getsignalwin
@ -263,7 +262,6 @@ class AICPicker(AutoPicker):
return
try:
imaxs, = argrelmax(dataslope)
imaxs.size
imax = imaxs[0]
except ValueError as e:
print(e, 'picker: argrelmax not working!')
@ -280,7 +278,7 @@ class AICPicker(AutoPicker):
if self.iplot > 1:
if self.fig == None or self.fig == 'None':
fig = plt.figure()
plt_flag = 1
plt_flag = iplot
else:
fig = self.fig
ax = fig.add_subplot(111)
@ -291,7 +289,7 @@ class AICPicker(AutoPicker):
ax.set_xlabel('Time [s] since %s' % self.Data[0].stats.starttime)
ax.set_yticks([])
ax.set_title(self.Data[0].stats.station)
if plt_flag == 1:
if plt_flag in [1, 2]:
fig.show()
try: input()
except SyntaxError: pass
@ -307,6 +305,8 @@ class AICPicker(AutoPicker):
print('AICPicker: Negative slope, bad onset skipped!')
else:
self.slope = 1 / (len(dataslope) * self.Data[0].stats.delta) * (datafit[-1] - datafit[0])
# normalize slope to maximum of cf to make it unit independent
self.slope /= self.Data[0].data[icfmax]
else:
self.SNR = None
@ -315,7 +315,7 @@ class AICPicker(AutoPicker):
if iplot > 1:
if self.fig == None or self.fig == 'None':
fig = plt.figure() # self.iplot)
plt_flag = 1
plt_flag = iplot
else:
fig = self.fig
fig._tight = True
@ -359,11 +359,15 @@ class AICPicker(AutoPicker):
else:
ax1.set_title(self.Data[0].stats.station)
if plt_flag == 1:
if plt_flag in [1, 2]:
fig.show()
try: input()
except SyntaxError: pass
plt.close(fig)
if plt_flag == 3:
stats = self.Data[0].stats
netstlc = '{}.{}.{}'.format(stats.network, stats.station, stats.location)
fig.savefig('aicfig_{}_{}.png'.format(netstlc, stats.channel))
if self.Pick == None:
print('AICPicker: Could not find minimum, picking window too short?')

View File

@ -705,7 +705,7 @@ def wadaticheck(pickdic, dttolerance, iplot=0, fig_dict=None):
wfitflag = 1
# plot results
if iplot > 0:
if iplot > 0 or fig_dict:
if fig_dict:
fig = fig_dict['wadati']
linecolor = fig_dict['plot_style']['linecolor']['rgba_mpl']
@ -947,7 +947,7 @@ def checkPonsets(pickdic, dttolerance, jackfactor=5, iplot=0, fig_dict=None):
checkedonsets = pickdic
if iplot > 0:
if iplot > 0 or fig_dict:
if fig_dict:
fig = fig_dict['jackknife']
plt_flag = 0

View File

@ -15,33 +15,81 @@ from pylot.core.util.utils import key_for_set_value, find_in_list, \
class Metadata(object):
def __init__(self, inventory=None):
self.inventories = []
if os.path.isdir(inventory):
self.add_inventory(inventory)
if os.path.isfile(inventory):
self.add_inventory_file(inventory)
self.seed_ids = {}
# saves read metadata objects (Parser/inventory) for a filename
self.inventory_files = {}
# saves filenames holding metadata for a seed_id
self.seed_ids = {}
if inventory:
if os.path.isdir(inventory):
self.add_inventory(inventory)
if os.path.isfile(inventory):
self.add_inventory_file(inventory)
def __str__(self):
repr = 'PyLoT Metadata object including the following inventories:\n\n'
ntotal = len(self.inventories)
for index, inventory in enumerate(self.inventories):
if index < 2 or (ntotal - index) < 3:
repr += '{}\n'.format(inventory)
if ntotal > 4 and int(ntotal/2) == index:
repr += '...\n'
if ntotal > 4:
repr += '\nTotal of {} inventories. Use Metadata.inventories to see all.'.format(ntotal)
return repr
def __repr__(self):
return self.__str__()
def add_inventory(self, path_to_inventory):
# add paths to list of inventories
'''
add paths to list of inventories
:param path_to_inventory:
:return:
'''
assert (os.path.isdir(path_to_inventory)), '{} is no directory'.format(path_to_inventory)
if not path_to_inventory in self.inventories:
self.inventories.append(path_to_inventory)
def add_inventory_file(self, path_to_inventory_file):
assert (os.path.isfile(path_to_inventory_file)), '{} is no directory'.format(path_to_inventory_file)
'''
add a single file to inventory files
:param path_to_inventory_file:
:return:
'''
assert (os.path.isfile(path_to_inventory_file)), '{} is no file'.format(path_to_inventory_file)
self.add_inventory(os.path.split(path_to_inventory_file)[0])
if not path_to_inventory_file in self.inventory_files.keys():
self.read_single_file(path_to_inventory_file)
def remove_all_inventories(self):
self.__init__()
def remove_inventory(self, path_to_inventory):
'''
remove a path from inventories list
:param path_to_inventory:
:return:
'''
if not path_to_inventory in self.inventories:
print('Path {} not in inventories list.'.format(path_to_inventory))
return
self.inventories.remove(path_to_inventory)
for filename in self.inventory_files.keys():
if filename.startswith(path_to_inventory):
del(self.inventory_files[filename])
for seed_id in self.seed_ids.keys():
if self.seed_ids[seed_id].startswith(path_to_inventory):
del(self.seed_ids[seed_id])
def get_metadata(self, seed_id):
@ -54,9 +102,12 @@ class Metadata(object):
self.read_all()
for inv_fname, metadata in self.inventory_files.items():
# use get_coordinates to check for seed_id
if metadata['data'].get_coordinates(seed_id):
try:
metadata['data'].get_coordinates(seed_id)
self.seed_ids[seed_id] = inv_fname
return metadata
except Exception as e:
continue
print('Could not find metadata for station {}'.format(seed_id))
return None
fname = self.seed_ids[seed_id]
@ -64,6 +115,10 @@ class Metadata(object):
def read_all(self):
'''
read all metadata files found in all inventories
:return:
'''
for inventory in self.inventories:
for inv_fname in os.listdir(inventory):
inv_fname = os.path.join(inventory, inv_fname)
@ -93,17 +148,18 @@ class Metadata(object):
def get_coordinates(self, seed_id):
metadata = self.get_metadata(seed_id)
if not metadata:
return
return metadata['data'].get_coordinates(seed_id)
def get_paz(self, seed_id, time=None):
def get_paz(self, seed_id, time):
metadata = self.get_metadata(seed_id)
if not metadata:
return
if metadata['invtype'] in ['dless', 'dseed']:
return metadata['data'].get_paz(seed_id)
return metadata['data'].get_paz(seed_id, time)
elif metadata['invtype'] in ['resp', 'xml']:
if not time:
print('Time needed to extract metadata from station inventory.')
return None
resp = metadata['data'].get_response(seed_id, time)
return resp.get_paz(seed_id)
@ -404,11 +460,16 @@ def read_metadata(path_to_inventory):
def restitute_trace(input_tuple):
tr, invtype, inobj, unit, force = input_tuple
tr, metadata, unit, force = input_tuple
remove_trace = False
seed_id = tr.get_id()
mdata = metadata.get_metadata(seed_id)
invtype = mdata['invtype']
inobj = mdata['data']
# check, whether this trace has already been corrected
if 'processing' in tr.stats.keys() \
and np.any(['remove' in p for p in tr.stats.processing]) \
@ -476,7 +537,7 @@ def restitute_trace(input_tuple):
return tr, remove_trace
def restitute_data(data, invtype, inobj, unit='VEL', force=False, ncores=0):
def restitute_data(data, metadata, unit='VEL', force=False, ncores=0):
"""
takes a data stream and a path_to_inventory and returns the corrected
waveform data stream
@ -497,7 +558,7 @@ def restitute_data(data, invtype, inobj, unit='VEL', force=False, ncores=0):
# loop over traces
input_tuples = []
for tr in data:
input_tuples.append((tr, invtype, inobj, unit, force))
input_tuples.append((tr, metadata, unit, force))
data.remove(tr)
pool = gen_Pool(ncores)

View File

@ -6,7 +6,7 @@ Created on Wed Jan 26 17:47:25 2015
@author: sebastianw
"""
from pylot.core.io.data import SeiscompDataStructure, PilotDataStructure
from pylot.core.io.data import SeiscompDataStructure, PilotDataStructure, ObspyDMTdataStructure
DATASTRUCTURE = {'PILOT': PilotDataStructure, 'SeisComP': SeiscompDataStructure,
'obspyDMT': None, None: None}
'obspyDMT': ObspyDMTdataStructure, None: PilotDataStructure}

View File

@ -1024,7 +1024,7 @@ def check4rotated(data, metadata=None, verbosity=1):
for trace_id in trace_ids:
dip, az = get_dip_azimut(parser, trace_id)
trace = wfstream.select(id=trace_id)[0]
if az > 315 and az <= 45 or az > 135 and az <= 225:
if az > 315 or az <= 45 or az > 135 and az <= 225:
trace.data = n
trace.stats.channel = trace.stats.channel[0:-1] + 'N'
elif az > 45 and az <= 135 or az > 225 and az <= 315:
@ -1094,7 +1094,7 @@ def runProgram(cmd, parameter=None):
subprocess.check_output('{} | tee /dev/stderr'.format(cmd), shell=True)
def which(program, infile=None):
def which(program, parameter):
"""
takes a program name and returns the full path to the executable or None
modified after: http://stackoverflow.com/questions/377017/test-if-executable-exists-in-python
@ -1109,16 +1109,9 @@ def which(program, infile=None):
for key in settings.allKeys():
if 'binPath' in key:
os.environ['PATH'] += ':{0}'.format(settings.value(key))
if infile is None:
# use default parameter-file name
bpath = os.path.join(os.path.expanduser('~'), '.pylot', 'pylot.in')
else:
bpath = os.path.join(os.path.expanduser('~'), '.pylot', infile)
if os.path.exists(bpath):
nllocpath = ":" + PylotParameter(bpath).get('nllocbin')
os.environ['PATH'] += nllocpath
except ImportError as e:
nllocpath = ":" + parameter.get('nllocbin')
os.environ['PATH'] += nllocpath
except Exception as e:
print(e.message)
def is_exe(fpath):

View File

@ -2832,10 +2832,20 @@ class MultiEventWidget(QWidget):
self.rb_layout.insertWidget(index, rb)
self.rb_layout.setStretch(index, 0)
self.pb = QtGui.QProgressBar()
self.pb.setRange(0, 0)
self.pb.setVisible(False)
#space holder for progressbar
self._pb_space = QtGui.QWidget()
self.rb_layout.addWidget(self.start_button)
self.rb_layout.addWidget(QtGui.QWidget())
self.rb_layout.addWidget(self.pb)
self.rb_layout.addWidget(self._pb_space)
self.rb_layout.setStretch(len(self.options) + 1, 1)
self.rb_layout.setStretch(len(self.options) + 2, 1)
self.main_layout.insertLayout(0, self.rb_layout)
@ -2866,6 +2876,8 @@ class MultiEventWidget(QWidget):
for rb in self.rb_dict.values():
rb.setEnabled(bool)
self.start_button.setEnabled(bool)
self.pb.setVisible(not(bool))
self._pb_space.setVisible(bool)
self.eventbox.setEnabled(bool)
self.button_clear.setEnabled(bool)

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,104 @@
import unittest
import os
from pylot.core.util.dataprocessing import Metadata
class TestMetadata(unittest.TestCase):
def setUp(self):
self.station_id = 'BW.WETR..HH'
metadata_folder = 'metadata1'
self.m = Metadata(metadata_folder)
def test_get_coordinates_sucess(self):
expected = {'Z': {u'elevation': 607.0, u'longitude': 12.87571, u'local_depth': 0.0, u'azimuth': 0.0, u'latitude': 49.14502, u'dip': -90.0},
'E': {u'azimuth': 90.0, u'dip': 0.0, u'elevation': 607.0, u'latitude': 49.14502, u'local_depth': 0.0, u'longitude': 12.87571},
'N': {u'azimuth': 0.0, u'dip': 0.0, u'elevation': 607.0, u'latitude': 49.14502, u'local_depth': 0.0, u'longitude': 12.87571}
}
result = {}
for channel in ('Z', 'N', 'E'):
coords = self.m.get_coordinates(self.station_id+channel)
result[channel] = coords
self.assertDictEqual(result[channel], expected[channel])
class TestMetadataAdding(unittest.TestCase):
"""Tests if adding files and directories to a metadata object works."""
def setUp(self):
self.station_id = 'BW.WETR..HH'
self.metadata_folders = ('metadata1', 'metadata2')
self.m = Metadata()
def test_add_inventory_folder(self):
"""Test if add_inventory adds the folder to the list of inventories"""
self.m.add_inventory(self.metadata_folders[0])
# adding an inventory folder should append it to the list of inventories
self.assertDictEqual({}, self.m.inventory_files)
self.assertDictEqual({}, self.m.seed_ids)
self.assertEqual([self.metadata_folders[0]], self.m.inventories)
def test_add_inventory_file(self):
"""Test if add_inventory_file adds the folder containing the file to the list of inventories and
if the files is added to inventory_files"""
fpath = os.path.join(self.metadata_folders[0], 'DATALESS.BW.WETR..HHZ')
self.m.add_inventory_file(fpath)
# adding an inventory file should append its folder to the list of inventories and the file to the
self.assertEqual(['metadata1/DATALESS.BW.WETR..HHZ'], self.m.inventory_files.keys()) # does the filename exist in inventory files?
self.assertEqual(['data', 'invtype'], self.m.inventory_files['metadata1/DATALESS.BW.WETR..HHZ'].keys()) # is the required information attacht to the filename?
self.assertDictEqual({}, self.m.seed_ids)
self.assertEqual([self.metadata_folders[0]], self.m.inventories)
def test_add_inventory_invalid_path(self):
"""Test if adding an inventory that is not an existing directory fails with an exception"""
with self.assertRaises(Exception):
self.m.add_inventory('InvalidDirName')
self.assertEqual([], self.m.inventories) # inventory list should still be empty
def test_add_inventory_file_invalid_path(self):
"""Test if adding a inventory file with an invalid path fails with an exception"""
with self.assertRaises(Exception):
self.m.add_inventory_file('/invalid/file/name')
self.assertEqual([], self.m.inventories) # inventory list should still be empty
class TestMetadataRemoval(unittest.TestCase):
"""Tests if removing files and directories to a metadata object works."""
def setUp(self):
self.station_id = 'BW.WETR..HH'
self.metadata_folders = ('metadata1', 'metadata2')
self.m = Metadata()
def test_remove_all_inventories(self):
"""Test if function remove_inventory cleans the Metadata object """
# add multiple inventories
for folder in self.metadata_folders:
self.m.add_inventory(folder)
self.m.remove_all_inventories()
self.isEmpty(self.m)
def test_remove_inventory(self):
"""Test if remove_inventory removes single inventories"""
# add multiple inventories
for folder in self.metadata_folders:
self.m.add_inventory(folder)
self.m.remove_inventory(self.metadata_folders[0])
self.assertNotIn(self.metadata_folders[0], self.m.inventories)
self.m.remove_inventory(self.metadata_folders[1])
self.assertNotIn(self.metadata_folders[1], self.m.inventories)
self.isEmpty(self.m)
def test_remove_inventory_not_in_inventory_list(self):
"""Test if remove_inventory does not modify the metadata instance if the given inventory to remove does not
exist in the instance."""
# add multiple inventories
self.m.add_inventory(self.metadata_folders[0])
self.m.remove_inventory('metadata_not_existing')
self.assertIn(self.metadata_folders[0], self.m.inventories)
def isEmpty(self, metadata):
"""Asserts if the given metadata object is empty"""
self.assertDictEqual({}, metadata.inventory_files)
self.assertDictEqual({}, metadata.seed_ids)
self.assertEqual([], metadata.inventories)