platform-demo-scripts/scripts/mseeds_to_seisbench.py

256 lines
8.8 KiB
Python

"""
-----------------
Copyright © 2023 ACK Cyfronet AGH, Poland.
This work was partially funded by EPOS Project funded in frame of PL-POIR4.2
-----------------
"""
import os
import pandas as pd
import obspy
from obspy.core.event import read_events
import seisbench.data as sbd
import seisbench.util as sbu
import logging
import argparse
logging.basicConfig(filename="output.out",
filemode='a',
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%H:%M:%S',
level=logging.DEBUG)
logging.root.setLevel(logging.INFO)
logger = logging.getLogger('converter')
def split_events(events, input_path):
logger.info("Splitting available events into train, dev and test sets ...")
events_stats = pd.DataFrame()
events_stats.index.name = "event"
for i, event in enumerate(events):
# check if mseed exists
actual_picks = 0
for pick in event.picks:
trace_params = get_trace_params(pick)
trace_path = get_trace_path(input_path, trace_params)
if os.path.isfile(trace_path):
actual_picks += 1
events_stats.loc[i, "pick_count"] = actual_picks
events_stats['pick_count_cumsum'] = events_stats.pick_count.cumsum()
train_th = 0.7 * events_stats.pick_count_cumsum.values[-1]
dev_th = 0.85 * events_stats.pick_count_cumsum.values[-1]
events_stats['split'] = 'test'
for i, event in events_stats.iterrows():
if event['pick_count_cumsum'] < train_th:
events_stats.loc[i, 'split'] = 'train'
elif event['pick_count_cumsum'] < dev_th:
events_stats.loc[i, 'split'] = 'dev'
else:
break
logger.info(f"Split: {events_stats['split'].value_counts()}")
return events_stats
def get_event_params(event):
origin = event.preferred_origin()
if origin is None:
return {}
mag = event.preferred_magnitude()
source_id = str(event.resource_id)
event_params = {
"source_id": source_id,
"source_origin_uncertainty_sec": origin.time_errors["uncertainty"],
"source_latitude_deg": origin.latitude,
"source_latitude_uncertainty_km": origin.latitude_errors["uncertainty"],
"source_longitude_deg": origin.longitude,
"source_longitude_uncertainty_km": origin.longitude_errors["uncertainty"],
"source_depth_km": origin.depth / 1e3,
"source_depth_uncertainty_km": origin.depth_errors["uncertainty"] / 1e3 if origin.depth_errors[
"uncertainty"] is not None else None,
}
if mag is not None:
event_params["source_magnitude"] = mag.mag
event_params["source_magnitude_uncertainty"] = mag.mag_errors["uncertainty"]
event_params["source_magnitude_type"] = mag.magnitude_type
event_params["source_magnitude_author"] = mag.creation_info.agency_id if mag.creation_info is not None else None
return event_params
def get_trace_params(pick):
trace_params = {
"station_network_code": pick.waveform_id.network_code,
"station_code": pick.waveform_id.station_code,
"trace_channel": pick.waveform_id.channel_code,
"station_location_code": pick.waveform_id.location_code,
"time": pick.time
}
return trace_params
def find_trace(pick_time, traces):
for tr in traces:
if pick_time > tr.stats.endtime:
continue
if pick_time >= tr.stats.starttime:
return tr
logger.warning(f"no matching trace for peak: {pick_time}")
return None
def get_trace_path(input_path, trace_params):
year = trace_params["time"].year
day_of_year = pd.Timestamp(str(trace_params["time"])).day_of_year
net = trace_params["station_network_code"]
station = trace_params["station_code"]
tr_channel = trace_params["trace_channel"]
path = f"{input_path}/{year}/{net}/{station}/{tr_channel}.D/{net}.{station}..{tr_channel}.D.{year}.{day_of_year}"
return path
def get_three_channels_trace_paths(input_path, trace_params):
year = trace_params["time"].year
day_of_year = pd.Timestamp(str(trace_params["time"])).day_of_year
net = trace_params["station_network_code"]
station = trace_params["station_code"]
channel_base = trace_params["trace_channel"]
paths = []
for ch in ["E", "N", "Z"]:
channel = channel_base[:-1] + ch
paths.append(
f"{input_path}/{year}/{net}/{station}/{channel}.D/{net}.{station}..{channel}.D.{year}.{day_of_year:03}")
return paths
def load_trace(input_path, trace_params):
trace_path = get_trace_path(input_path, trace_params)
trace = None
if not os.path.isfile(trace_path):
logger.warning(trace_path + " not found")
else:
stream = obspy.read(trace_path)
if len(stream.traces) > 1:
trace = find_trace(trace_params["time"], stream.traces)
elif len(stream.traces) == 0:
logger.warning(f"no data in: {trace_path}")
else:
trace = stream.traces[0]
return trace
def load_stream(input_path, trace_params, time_before=60, time_after=60):
sampling_rate, stream = None, None
pick_time = trace_params["time"]
trace_paths = get_three_channels_trace_paths(input_path, trace_params)
for trace_path in trace_paths:
if not os.path.isfile(trace_path):
logger.warning(trace_path + " not found")
else:
if stream is None:
stream = obspy.read(trace_path)
else:
stream += obspy.read(trace_path)
if stream is not None:
stream = stream.slice(pick_time - time_before, pick_time + time_after)
if len(stream.traces) == 0:
print(f"no data in: {trace_path}")
else:
sampling_rate = stream.traces[0].stats.sampling_rate
stream.merge()
return sampling_rate, stream
def convert_mseed_to_seisbench_format(input_path, catalog_path, output_path):
"""
Convert mseed files to seisbench dataset format
:param input_path: folder with mseed files
:param catalog_path: path to events catalog in quakeml format
:param output_path: folder to save seisbench dataset
:return:
"""
logger.info("Loading events catalog ...")
events = read_events(catalog_path)
events_stats = split_events(events, input_path)
metadata_path = output_path + "/metadata.csv"
waveforms_path = output_path + "/waveforms.hdf5"
events_to_convert = events_stats[events_stats['pick_count'] > 0]
logger.debug("Catalog loaded, starting converting {events_to_convert} events ...")
with sbd.WaveformDataWriter(metadata_path, waveforms_path) as writer:
writer.data_format = {
"dimension_order": "CW",
"component_order": "ZNE",
}
for i, event in enumerate(events):
logger.debug(f"Converting {i} event")
event_params = get_event_params(event)
event_params["split"] = events_stats.loc[i, "split"]
picks_per_station = {}
for pick in event.picks:
station = pick.waveform_id.station_code
if station in picks_per_station:
picks_per_station[station].append(pick)
else:
picks_per_station[station] = [pick]
for picks in picks_per_station.values():
trace_params = get_trace_params(picks[0])
sampling_rate, stream = load_stream(input_path, trace_params)
if stream is None or len(stream.traces) == 0:
continue
actual_t_start, data, _ = sbu.stream_to_array(
stream,
component_order=writer.data_format["component_order"],
)
trace_params["trace_sampling_rate_hz"] = sampling_rate
trace_params["trace_start_time"] = str(actual_t_start)
for pick in picks:
pick_time = obspy.core.utcdatetime.UTCDateTime(pick.time)
pick_idx = (pick_time - actual_t_start) * sampling_rate
trace_params[f"trace_{pick.phase_hint}_arrival_sample"] = int(pick_idx)
writer.add_trace({**event_params, **trace_params}, data)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Convert mseed files to seisbench format')
parser.add_argument('--input_path', type=str, help='Path to mseed files')
parser.add_argument('--catalog_path', type=str, help='Path to events catalog in quakeml format')
parser.add_argument('--output_path', type=str, help='Path to output files')
args = parser.parse_args()
convert_mseed_to_seisbench_format(args.input_path, args.catalog_path, args.output_path)