Source Parameters Estimation code

This commit is contained in:
Joanna Kocot Test 2024-10-09 11:24:44 +02:00
commit a74c75eae0
12 changed files with 1272 additions and 0 deletions

571
SpectralAnalysis.py Normal file
View File

@ -0,0 +1,571 @@
import copy
import csv
from typing import List, Optional
from json_writer import JsonWriter
from scipy import integrate
from scipy.stats import zscore
import numpy as np
from obspy import Stream, UTCDateTime
from obspy.core.event import Event, Pick
from obspy.geodetics import gps2dist_azimuth
from tau_p_raytracing import TauPRayTracer
from sp_jk import sp_jk
from opt_algorithms import OptNelderMead
from SpectralParameters import SpectralParams, EventSpectralParams
from amplitude_spectra import calculate_amp_spectra
from spectral_definitions import (K_MADARIAGA_P, K_MADARIAGA_S, K_BRUNE, G_P, G_S, spectrum2moment, calc_stress_drop,
calc_source_size, mm)
class SpectralAnalysis:
"""
Application for spectral analysis for P- or S-waves. It uses JK integrals and spectral fitting with simplex
optimization algorithm.
JK integrals method returns seismic moment, moment magnitude and corner frequency.
Spectral fitting uses these results as input. It returns seismic moment, moment magnitude and corner frequency
and additionally Q parameter (i.e. quality=1/dumping). In fitting procedure theoretical spectrum
(Brune or Boatwright corrected for Q) is matched to calculated amplitude spectrum of seismic signal.
These methods are used for each station separately and finally mean values are calculated. Outliers (estimated with
z-sqore method) are not used for mean value calculations.
Application enables:
- 1D velocity model for travel time calculations. It uses tau_p raytracer from ObsPy
however for spectral parameters estimation it uses constant parameters i.e. constant S wave velocity and
constant gp, gs and density.
- constant velocities for travel time calculations in case of missing 1D velocity model.
Features of the algorithm:
1. Amplitude spectra are calculated as geometric mean of all traces (e.g. three channels) of seismic station.
2. Application requires P wave pick for P-waves analysis and P or S- wave pick for S wave analysis
(it calculates theoretical S wave pick in case it is not available).
3. Applications calculates signal-to-noise spectral ratio and removes either station (based on mean ratio)
or particular frequencies (based on S/N ratio for this frequency).
"""
def __init__(self, stream: Stream, event: Event, inventory, config, logger):
self.stream = stream
self.event = event
self.inventory = inventory
self.config = config
self.logger = logger
self.raytracer = self._initialize_raytracer()
self.stations = self._get_unique_stations()
self.solutions_jk_P = []
self.solutions_sp_fit_P = []
self.solutions_jk_S = []
self.solutions_sp_fit_S = []
# filenames
self.P_WAVE_SP_FITTING_RESULTS_JSON = config.get("P_WAVE_SP_FITTING_RESULTS_JSON")
self.S_WAVE_SP_FITTING_RESULTS_JSON = config.get("S_WAVE_SP_FITTING_RESULTS_JSON")
self.P_WAVE_JK_RESULTS = config.get("P_WAVE_JK_RESULTS")
self.S_WAVE_JK_RESULTS = config.get("S_WAVE_JK_RESULTS")
self.P_WAVE_SP_FITTING_RESULTS = config.get("P_WAVE_SP_FITTING_RESULTS")
self.S_WAVE_SP_FITTING_RESULTS = config.get("S_WAVE_SP_FITTING_RESULTS")
self.EVENT_RESULTS = config.get("EVENT_RESULTS")
def _initialize_raytracer(self) -> Optional[TauPRayTracer]:
if self.config.get("raytrace"):
return TauPRayTracer(self.config.get("velocity_mat_file"))
return None
def run(self):
for station in self.stations:
self.logger.info(80 * "#")
self.logger.info(f"Running spectral analysis for station: {station}")
# Selecting station data
station_stream = self.stream.select(station=station)
try:
station_inv = self.inventory[0].select(station=station, time=station_stream[0].stats.starttime)[0][0]
except IndexError:
self.logger.warning(f"{station} not in inventory")
continue
# Stream preprocessing
station_stream = self._preprocess_stream(station_stream)
if station_stream:
hor_distance, ver_distance = self._calculate_distance(station_inv)
p_travel_time, s_travel_time = self._calculate_travel_times(horizontal_distance=hor_distance,
vertical_distance=ver_distance,
station_inv=station_inv)
pick_p = self._find_pick(station, self.config.get("p_phase_hint"))
pick_s = self._find_pick(station, self.config.get("s_phase_hint"))
if pick_p:
delta_ps = s_travel_time - p_travel_time
if self.config.get("P_wave_analysis"):
self._perform_p_wave_analysis(station_stream=station_stream, pick_p=pick_p, delta_ps=delta_ps,
p_travel_time=p_travel_time, distance=hor_distance)
self.logger.info(80 * "-")
if self.config.get("S_wave_analysis"):
self._perform_s_wave_analysis(station_stream=station_stream, pick_s=pick_s, pick_p=pick_p,
delta_ps=delta_ps, s_travel_time=s_travel_time,
distance=hor_distance)
self.logger.info(80 * "-")
self._save_results_to_json()
self._save_results_to_csv(filename=self.EVENT_RESULTS)
self._save_results_to_station_csvs()
def _save_results_to_json(self):
# saving solutions to JSON file
solutions_data = [
(self.solutions_sp_fit_P, self.P_WAVE_SP_FITTING_RESULTS_JSON),
(self.solutions_sp_fit_S, self.S_WAVE_SP_FITTING_RESULTS_JSON)
]
for solutions, filename in solutions_data:
JsonWriter(solutions=solutions, filename=filename, logger=self.logger).save()
def _save_results_to_station_csvs(self):
# Zdefiniowanie danych dla poszczególnych plików
solutions_data = [
(self.solutions_jk_P, self.P_WAVE_JK_RESULTS),
(self.solutions_jk_S, self.S_WAVE_JK_RESULTS),
(self.solutions_sp_fit_P, self.P_WAVE_SP_FITTING_RESULTS),
(self.solutions_sp_fit_S, self.S_WAVE_SP_FITTING_RESULTS)
]
# Dla każdego zestawu wyników zapisujemy osobny plik CSV
for solutions, filename in solutions_data:
# Otwieranie pliku CSV do zapisu
with open(filename, mode='w', newline='') as csv_file:
writer = csv.writer(csv_file)
# Zapis nagłówków
headers = ["station_name", "mo", "fo", "q", "mw", "source_size", "stress_drop"]
writer.writerow(headers)
# Zapis danych dla każdej stacji
if len(solutions)>0:
for solution in solutions:
station_name = solution[0] # Zakładam, że nazwa stacji jest w solution[0]
parameters = solution[1] # Zakładam, że parametry są w solution[1]
# Tworzymy wiersz z wartościami parametrów
row = [
station_name,
parameters.mo,
parameters.fo,
parameters.q,
parameters.mw,
parameters.source_size,
parameters.stress_drop
]
# Zapisujemy wiersz do pliku CSV
writer.writerow(row)
def _save_results_to_csv(self, filename):
# Nagłówki kolumn
headers = ["Method", "M0", "E M0", "F0", "E F0", "Q", "E Q", "Mw", "r", "E r", "Stress drop", "E stress drop"]
# Zbieranie danych z czterech źródeł
solutions_jk_P = self._return_event_spec_params(solutions=self.solutions_jk_P)
solutions_jk_S = self._return_event_spec_params(solutions=self.solutions_jk_S)
solutions_sp_fit_P = self._return_event_spec_params(solutions=self.solutions_sp_fit_P)
solutions_sp_fit_S = self._return_event_spec_params(solutions=self.solutions_sp_fit_S)
# Checking for None values
if solutions_jk_P:
data_jk_P = [
"P-waves, JK integrals", solutions_jk_P.mo, solutions_jk_P.mo_e, solutions_jk_P.fo, solutions_jk_P.fo_e,
solutions_jk_P.q, solutions_jk_P.q_e, solutions_jk_P.mw, solutions_jk_P.source_size,
solutions_jk_P.source_size_e, solutions_jk_P.stress_drop, solutions_jk_P.stress_drop_e
]
else:
data_jk_P = [
"P-waves, JK integrals", None, None, None, None,
None, None, None, None,
None, None, None
]
if solutions_jk_S:
data_jk_S = [
"S-waves, JK integrals", solutions_jk_S.mo, solutions_jk_S.mo_e, solutions_jk_S.fo, solutions_jk_S.fo_e,
solutions_jk_S.q, solutions_jk_S.q_e, solutions_jk_S.mw, solutions_jk_S.source_size,
solutions_jk_S.source_size_e, solutions_jk_S.stress_drop, solutions_jk_S.stress_drop_e
]
else:
data_jk_S = [
"S-waves, JK integrals", None, None, None, None,
None, None, None, None,
None, None, None
]
if solutions_sp_fit_P:
data_sp_fit_P = [
"P-waves, spectrum fitting", solutions_sp_fit_P.mo, solutions_sp_fit_P.mo_e, solutions_sp_fit_P.fo, solutions_sp_fit_P.fo_e,
solutions_sp_fit_P.q, solutions_sp_fit_P.q_e, solutions_sp_fit_P.mw, solutions_sp_fit_P.source_size,
solutions_sp_fit_P.source_size_e, solutions_sp_fit_P.stress_drop, solutions_sp_fit_P.stress_drop_e
]
else:
data_sp_fit_P = [
"P-waves, spectrum fitting", None, None, None, None,
None, None, None, None,
None, None, None
]
if solutions_sp_fit_S:
data_sp_fit_S = [
"S-waves, spectrum fitting", solutions_sp_fit_S.mo, solutions_sp_fit_S.mo_e, solutions_sp_fit_S.fo, solutions_sp_fit_S.fo_e,
solutions_sp_fit_S.q, solutions_sp_fit_S.q_e, solutions_sp_fit_S.mw, solutions_sp_fit_S.source_size,
solutions_sp_fit_S.source_size_e, solutions_sp_fit_S.stress_drop, solutions_sp_fit_S.stress_drop_e
]
else:
data_sp_fit_S = [
"S-waves, spectrum fitting", None, None, None, None,
None, None, None, None,
None, None, None
]
data = [data_jk_P, data_jk_S,data_sp_fit_P, data_sp_fit_S]
# Zapis do pliku CSV
with open(filename, mode='w', newline='') as file:
writer = csv.writer(file)
# Zapis nagłówków
writer.writerow(headers)
# Zapis danych
writer.writerows(data)
def _write_event_parameters_to_file(self):
# Define the content to be written to the file
content = (
f"{20 * '#'} RESULTS {20 * '#'}\n"
f"{80 * '-'}\n"
"Event parameters P_jk:\n"
f"{self._return_event_spec_params(solutions=self.solutions_jk_P)}\n"
"Event parameters P_sp_fit:\n"
f"{self._return_event_spec_params(solutions=self.solutions_sp_fit_P)}\n"
"Event parameters S_jk:\n"
f"{self._return_event_spec_params(solutions=self.solutions_jk_S)}\n"
"Event parameters S_sp_fit:\n"
f"{self._return_event_spec_params(solutions=self.solutions_sp_fit_S)}\n"
f"{80 * '-'}\n"
)
# Write the content to the file
with open(self.EVENT_RESULTS, 'w') as file:
file.write(content)
def _get_unique_stations(self) -> List[str]:
return sorted(set([tr.stats.station for tr in self.stream]))
def _preprocess_stream(self, stream: Stream) -> Stream:
stream.detrend("linear")
try:
stream.remove_response(inventory=self.inventory, output="DISP")
except Exception as e:
self.logger.warning(f"{e} No instrument correction - applied integration to obtain displacement signal")
stream.integrate(method="cumtrapz")
return stream.filter(
type="bandpass",
freqmin=self.config.get("freq_min"),
freqmax=self.config.get("freq_max"),
corners=2,
zerophase=True
)
def _calculate_distance(self, station_inv) -> list[float, float]:
try:
horizontal_distance = gps2dist_azimuth(
self.config.get("latitude"), self.config.get("longitude"), station_inv.latitude,
station_inv.longitude
)[0]
if self.config.get("allow_station_elev"):
receiver_depth_in_m = station_inv.elevation
else:
receiver_depth_in_m = 0
vertical_distance = receiver_depth_in_m + self.config.get("depth")
return horizontal_distance, vertical_distance
except AttributeError as err:
(self.logger.error
("No preferred origin in quakeML file. Cannot calculate distance between station and origin"))
raise AttributeError
def _calculate_travel_times(self, horizontal_distance: float, vertical_distance, station_inv) -> tuple:
if not self.raytracer:
vp = self.config.get("vp")
vs = self.config.get("vs")
distance = (horizontal_distance**2+vertical_distance**2)**0.5
p_travel_time = distance / vp
s_travel_time = distance / vs
else:
if self.config.get("allow_station_elev"):
receiver_depth_in_km = station_inv.elevation/1000
else:
receiver_depth_in_km = 0
try:
p_travel_time = self.raytracer.calculate_arrival(horizontal_distance / 1000,
self.config.get("depth") / 1000,
receiver_depth_in_km, phase="p").time
s_travel_time = self.raytracer.calculate_arrival(horizontal_distance / 1000,
self.config.get("depth") / 1000,
receiver_depth_in_km, phase="s").time
except IndexError:
raise Exception("Problem with velocity file. ")
return p_travel_time, s_travel_time
def _find_pick(self, station: str, phase_hint:list) -> Optional[Pick]:
for p in self.event.picks:
if p.waveform_id['station_code'] == station and p.phase_hint in phase_hint:
return p
return None
def _perform_p_wave_analysis(self, station_stream: Stream, pick_p: Pick, delta_ps: float, distance: float,
p_travel_time: float):
self.logger.info("P wave spectral analysis")
if delta_ps < self.config.get("window_len"):
self.logger.warning("P wave window may overlap S waves")
return
signal_window = self._trim_seismogram(station_stream, pick_p.time,
pre_padding=self.config.get("taper_len"),
post_padding=self.config.get("window_len") -
self.config.get("taper_len"))
signal_window = self._window_preprocessing(signal_window)
noise_window = self._trim_seismogram(station_stream, pick_p.time,
pre_padding=2 * self.config.get("taper_len") +
self.config.get("window_len"),
post_padding=-2 * self.config.get("taper_len"))
noise_window = self._window_preprocessing(noise_window)
if len(signal_window) > 0 and len(noise_window) > 0 and pick_p:
self._spectral_fitting(signal_window, noise_window, pick_p, distance, p_travel_time)
else:
self.logger.warning("Not enough data for P wave analysis")
def _perform_s_wave_analysis(self, station_stream: Stream, pick_p: Pick, pick_s: Pick, delta_ps, distance: float,
s_travel_time: float):
self.logger.info("S wave spectral analysis")
if pick_p:
noise_window = self._trim_seismogram(station_stream, pick_p.time,
pre_padding=2 * self.config.get("taper_len") +
self.config.get("window_len"),
post_padding=-2 * self.config.get("taper_len"))
else:
noise_window = self._trim_seismogram(station_stream, station_stream[0].stats.starttime,
pre_padding=-2 * self.config.get("taper_len"),
post_padding=self.config.get("window_len") +
2 * self.config.get("taper_len"))
if not pick_s and pick_p:
pick_s = copy.copy(pick_p)
pick_s.time = pick_s.time + delta_ps
pick_s.phase_hint = "S"
if pick_s:
signal_window = self._trim_seismogram(station_stream, pick_s.time,
pre_padding=self.config.get("taper_len"),
post_padding=self.config.get("window_len")
- self.config.get("taper_len"))
signal_window = self._window_preprocessing(signal_window)
if len(signal_window) > 0 and len(noise_window) > 0:
self._spectral_fitting(signal_window, noise_window, pick_s, distance, s_travel_time)
@staticmethod
def _trim_seismogram(station_stream: Stream, pick_time: UTCDateTime, pre_padding=0.5, post_padding=2.6):
start_time = pick_time - pre_padding
end_time = pick_time + post_padding
return station_stream.slice(start_time, end_time)
def _window_preprocessing(self, signal_window: Stream) -> Stream:
return signal_window.taper(type=self.config.get("taper_type"), max_percentage=0.5,
max_length=self.config.get("taper_len"))
@staticmethod
def signal2noise(signal_amp_spectra, noise_amp_spectra):
# Calculate signal-to-noise ratio
noise_integral = integrate.trapezoid(noise_amp_spectra[1], x=noise_amp_spectra[0])
signal_integral = integrate.trapezoid(signal_amp_spectra[1], x=signal_amp_spectra[0])
signal2noise_ratio = signal_integral / noise_integral
return signal2noise_ratio
def _spectral_fitting(self, signal_window: Stream, noise_window: Stream, pick: Pick, distance: float,
travel_time: float):
# choosing appropriate K value which is related to the source model and type of seismic waves
source_model = self.config.get("sp_source_model")
if source_model == "Brune":
k = K_BRUNE
elif source_model == "Madariaga":
if pick.phase_hint in self.config.get("p_phase_hint"):
k = K_MADARIAGA_P
elif pick.phase_hint in self.config.get("s_phase_hint"):
k = K_MADARIAGA_S
else:
self.logger.warning(f"{pick.phase_hint} is a wrong value - pick.phase_hint should be P or S")
raise Exception(f"{pick.phase_hint} is a wrong value - pick.phase_hint should be P or S")
else:
self.logger.warning(f"{source_model} is a wrong value - sp_source_model should be Brune or Madariaga")
raise Exception(f"{source_model} is a wrong value - sp_source_model should be Brune or Madariaga")
# S wave velocity value
vs = self.config.get("vs")
# Calculation of amplitude spectra for signal and noise windows
xf, yf = calculate_amp_spectra(station_stream=signal_window)
xfn, yfn = calculate_amp_spectra(station_stream=noise_window)
# Calculation of signal-to-noise ratio
signal2noise_ratio = self.signal2noise([xf, yf], [xfn, yfn])
self.logger.info(f"S/N: {signal2noise_ratio}")
# Stopping the analysis when the ratio is too low
if signal2noise_ratio < self.config["min_energy_ratio"]:
self.logger.warning("Signal to noise ratio is too low")
return
# Setting the weights.
weights = np.ones(len(xf))
# If the noise on particular frequencies exceed the threshold value, the weights are equal to 0.
sn = yf / yfn
weights[sn < self.config.get("min_sn_ratio")] = 0
# Setup initial model from J/K integrals. Initial mo is in displacement units.
spectral_level, fo = sp_jk(xf, yf)
spectral_params_sp_jk = SpectralParams(mo=spectral_level, fo=fo, q=400, stress_drop=0, source_size=0)
# Spectral fitting with Simplex. Initial mo is in displacement units.
opt_sim = OptNelderMead(spectral_params_sp_jk, freq_bins=xf, amplitude_spectrum=yf, weights=weights,
travel_time=travel_time, config=self.config, logger=self.logger)
error_sp_fit, solution_sp_fit = opt_sim.run()
# Recalculation of amplitude spectrum level to seismic moment
# for spectral fitting method
solution_sp_fit.mo = self._spectrum_to_moment(pick_phase=pick.phase_hint, config=self.config,
spectral_level=solution_sp_fit.mo, distance=distance)
solution_sp_fit.mw = mm(solution_sp_fit.mo)
#Calculation of source size and stress drop for spectral fitting method
solution_sp_fit.source_size = calc_source_size(vs, solution_sp_fit.fo, k)
solution_sp_fit.stress_drop = calc_stress_drop(seismic_moment=solution_sp_fit.mo, source_radius=solution_sp_fit.source_size)
# for J-K integrals method
mo = self._spectrum_to_moment(pick_phase=pick.phase_hint, config=self.config,
spectral_level=spectral_level, distance=distance)
source_radius = calc_source_size(vs, fo, k)
stress_drop_value = calc_stress_drop(seismic_moment=mo, source_radius=source_radius)
spectral_params_sp_jk = SpectralParams(mo=mo, fo=fo, q=400, stress_drop=stress_drop_value, source_size=source_radius)
self.logger.info(spectral_params_sp_jk)
self.logger.info(solution_sp_fit)
# Appending solutions to the lists
if pick.phase_hint in self.config["p_phase_hint"]:
self.solutions_jk_P.append([signal_window[0].stats.station,
spectral_params_sp_jk,
None,
None,
None])
self.solutions_sp_fit_P.append([signal_window[0].stats.station,
solution_sp_fit,
opt_sim.f_norm.freq,
opt_sim.f_norm.th_amp_sp_damp,
opt_sim.f_norm.amp_spectrum])
elif pick.phase_hint in self.config["s_phase_hint"]:
self.solutions_jk_S.append([signal_window[0].stats.station,
spectral_params_sp_jk,
None,
None,
None])
self.solutions_sp_fit_S.append([signal_window[0].stats.station,
solution_sp_fit,
opt_sim.f_norm.freq,
opt_sim.f_norm.th_amp_sp_damp,
opt_sim.f_norm.amp_spectrum])
else:
self.logger.warning(f"Solutions not saved. Phase hint {self.config['p_phase_hint']} nor "
f"{self.config['s_phase_hint']} does not coincide "
f"with phase hint {pick.phase_hint} from the catalog")
@staticmethod
def _spectrum_to_moment(pick_phase, config, spectral_level, distance):
# Calculation of spectral amplitude in [Nm * s]
if pick_phase == "P":
v = config.get("vp")
g = G_P
else:
v = config.get("vs")
g = G_S
return spectrum2moment(spectral_level=spectral_level, density=config.get("density"), velocity=v,
distance=distance, g=g)
def _return_event_spec_params(self, solutions):
"""
Function calculates mean parameters for seismic event based on stations' results and removes outliers
:param solutions:
:return:
"""
if not solutions:
return None
def outliers_detection(M0, F0, Q, STRESS_DROP, SOURCE_SIZE, z_threshold):
"""Function calculates z_scores and removes outliers which has absolute z_score above z_threshold"""
# Calculate z-scores
z_scores_M0 = zscore(np.log(M0))
z_scores_F0 = zscore(F0)
z_scores_Q = zscore(Q)
# Identify outliers
outliers = (np.abs(z_scores_M0) > z_threshold) | (np.abs(z_scores_F0) > z_threshold) | (
np.abs(z_scores_Q) > z_threshold)
# Filter out outliers
M0_filtered = M0[~outliers]
F0_filtered = F0[~outliers]
Q_filtered = Q[~outliers]
STRESS_DROP_filtered = STRESS_DROP[~outliers]
SOURCE_SIZE_filtered = SOURCE_SIZE[~outliers]
return M0_filtered, F0_filtered, Q_filtered, STRESS_DROP_filtered, SOURCE_SIZE_filtered
M0 = np.array([solution[1].mo for solution in solutions])
F0 = np.array([solution[1].fo for solution in solutions])
Q = np.array([solution[1].q for solution in solutions])
STRESS_DROP = np.array([solution[1].stress_drop for solution in solutions])
SOURCE_SIZE = np.array([solution[1].source_size for solution in solutions])
M0, F0, Q, STRESS_DROP, SOURCE_SIZE = outliers_detection(M0=M0, F0=F0, Q=Q, SOURCE_SIZE=SOURCE_SIZE, STRESS_DROP=STRESS_DROP,
z_threshold=self.config.get("z_threshold"))
# Calculate mean values of parameters
mo, mo_e = self.mef(M0)
fo, fo_e = self.mef(F0)
q, q_e = self.mef(Q)
stress_drop, stress_drop_e = self.mef(STRESS_DROP)
source_size, source_size_e = self.mef(SOURCE_SIZE)
return EventSpectralParams(mo=mo, fo=fo, q=q, mo_e=mo_e, fo_e=fo_e, q_e=q_e, source_size=source_size, source_size_e=source_size_e,
stress_drop=stress_drop, stress_drop_e=stress_drop_e)
@staticmethod
def mef(x):
"""
Function for calculating average values and standard deviations of source parameters which are log-normally distributed.
Formulas based on article: https://doi.org/10.1016/j.pepi.2003.08.006
"""
x_mean = 10 ** np.mean(np.log10(x))
if len(x) == 1:
return x_mean, -999
x_std = ((1 / (len(x) - 1)) * np.sum((np.log10(x) - np.log10(x_mean)) ** 2)) ** 0.5
x_e = 10 ** x_std
return x_mean, x_e

138
SpectralAnalysisApp.py Normal file
View File

@ -0,0 +1,138 @@
import logging
from obspy import read, read_events, read_inventory
from SpectralAnalysis import SpectralAnalysis
def count_matching_stations(stream, inventory):
"""
Count how many stations from the given stream are present in the inventory.
Parameters:
- stream: ObsPy Stream object containing seismic data.
- inventory: ObsPy Inventory object containing network and station metadata.
Returns:
- count: Number of stations from the stream found in the inventory.
"""
# Extract unique station codes from the stream
stream_stations = set([trace.stats.station for trace in stream])
# Extract unique station codes from the inventory
inventory_stations = set([station.code for network in inventory for station in network.stations])
# Count how many stations from the stream are in the inventory
count = len(stream_stations.intersection(inventory_stations))
return count, len(stream_stations)
def main(waveforms, network_inventory, picks_qml, event_latitude, event_longitude, event_depth, vp, vs, density, taper_type, taper_len,
window_len, freq_min, freq_max, min_sn_ratio=1, min_energy_ratio=1, p_wave_analysis=True, s_wave_analysis=True,
raytracing=True, allow_station_elevations=False,
source_model="Madariaga", sp_fit_model="FBoatwright", norm="L2", z_threshold=6,
q_min=1, q_max=400, velocity_model=None):
"""
Main function for application: SPECTRALPARAMETERS
Arguments:
waveforms: path to input file of type 'seismogram'
Velocity model: path to input file of type 'velocity_model'
network_inventory: path to input file of type 'inventory'
event_qml: path to input file of type 'quakeml_seismogram_picks'
raytracing: parameter of type 'BOOLEAN'
save plots: parameter of type 'BOOLEAN'
Returns:
File(s) named 'Katalog sejsmiczny' of type 'quakeml_seismogram_picks' and format 'XML' from working directory
:param event_qml:
"""
logging.basicConfig(filename="application.log",
filemode='a',
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%H:%M:%S',
level=logging.INFO)
logger = logging.getLogger(__name__)
config = {
# filenames
"P_WAVE_SP_FITTING_RESULTS_JSON": "P_wave_analysis_sp_fit_solutions.json",
"S_WAVE_SP_FITTING_RESULTS_JSON": "S_wave_analysis_sp_fit_solutions.json",
"P_WAVE_JK_RESULTS": "P_wave_analysis_JK_solutions.csv",
"S_WAVE_JK_RESULTS": "S_wave_analysis_JK_solutions.csv",
"P_WAVE_SP_FITTING_RESULTS": "P_wave_analysis_sp_fit_solutions.csv",
"S_WAVE_SP_FITTING_RESULTS": "S_wave_analysis_sp_fit_solutions.csv",
"EVENT_RESULTS": "results.csv",
"velocity_mat_file": velocity_model,
# app options
"P_wave_analysis": p_wave_analysis,
"S_wave_analysis": s_wave_analysis,
"raytrace": raytracing,
"allow_station_elev": allow_station_elevations,
#event location
"longitude": event_longitude,
"latitude": event_latitude,
"depth": event_depth, # in meters
# phase hints
"p_phase_hint": ["P", "Pg"],
"s_phase_hint": ["S", "Sg"],
# source parameters
"vp": vp,
"vs": vs,
"density": density,
# window parameters
"taper_len": taper_len,
"window_len": window_len,
"taper_type": taper_type,
# filter parameters
"freq_min": freq_min,
"freq_max": freq_max,
# frequency signal-to-noise ratio
"min_sn_ratio": min_sn_ratio,
# if station signal-to-noise ratio
"min_energy_ratio": min_energy_ratio,
# L1 / L2
"norm": norm,
# Spectrum fitting model: FBrune / FBoatwright
"sp_fit_model": sp_fit_model,
# Source model: Madariaga / Brune
"sp_source_model": source_model,
# outliers detection
"z_threshold": z_threshold,
# q - damping limits
"q_min": q_min,
"q_max": q_max,
}
# reading files
stream = read(waveforms)
inv = read_inventory(network_inventory)
qml = read_events(picks_qml).events[0]
if not stream:
msg = "MSEED file is empty"
logger.error(msg)
raise Exception(msg)
if not inv or len(inv) == 0:
msg = "Inventory file is empty"
logger.error(msg)
raise Exception(msg)
else:
count, total = count_matching_stations(stream, inv)
logger.info(f"{count} out of {total} stations in stream are in the inventory")
if count == 0:
msg = "No stations from MSEED in the Network Inventory"
logger.error(msg)
raise Exception(msg)
else:
sa = SpectralAnalysis(stream=stream, event=qml, inventory=inv, config=config, logger=logger)
sa.run()

64
SpectralParameters.py Normal file
View File

@ -0,0 +1,64 @@
from spectral_definitions import mm
class SpectralParams:
def __init__(self, mo, fo, q, stress_drop=0, source_size=0):
self.mo = mo
self.fo = fo
self.q = q
self.mw = mm(mo)
self.stress_drop = stress_drop
self.source_size = source_size
def __str__(self):
return (f"SpectralAnalysis parameters: M0={self.mo:.2e}, F0={self.fo:.2f}, "
f"Q={self.q:.2f}, Mw={self.mw:.2f}, "
f"stress drop = {self.stress_drop:.2e}, source size = {self.source_size:.0f}")
class EventSpectralParams(SpectralParams):
def __init__(self, mo, fo, q, stress_drop, source_size, mo_e, fo_e, q_e, source_size_e, stress_drop_e):
super().__init__(mo, fo, q, stress_drop, source_size)
self.mo_e = mo_e
self.fo_e = fo_e
self.q_e = q_e
self.mw = mm(mo)
self.source_size_e = source_size_e
self.stress_drop_e = stress_drop_e
self.mo_1, self.mo_2 = self.calculate_uncertainties(self.mo, self.mo_e)
self.fo_1, self.fo_2 = self.calculate_uncertainties(self.fo, self.fo_e)
self.q_1, self.q_2 = self.calculate_uncertainties(self.q, self.q_e)
self.mw_1, self.mw_2 = self.calculate_uncertainties_mw(self.mo, self.mo_e)
self.source_size_1, self.source_size_2 = self.calculate_uncertainties(self.source_size, self.source_size_e)
self.stress_drop_1, self.stress_drop_2 = self.calculate_uncertainties(self.stress_drop, self.stress_drop_e)
def __str__(self):
return f"SpectralAnalysis parameters: " \
f"Seismic moment: {self.mo:.2e} <{self.mo_1:.2e}, {self.mo_2:.2e}> " \
f"Corner frequency: {self.fo:.2f} <{self.fo_1:.2f}, {self.fo_2:.2f}> " \
f"Q factor: {self.q:.2f} <{self.q_1:.2f}, {self.q_2:.2f}> " \
f"Mw: {self.mw:.2f} <{self.mw_1:.2f}, {self.mw_2:.2f}> " \
f"Stress drop: {self.stress_drop:.0f} <{self.stress_drop_1:.0f}, {self.stress_drop_2:.0f}> " \
f"Source size: {self.source_size:.0f} <{self.source_size_1:.0f}, {self.source_size_2:.0f}> "
@staticmethod
def calculate_uncertainties(x, x_e):
if x_e == -999:
unc1 = 0
unc2 = 0
else:
unc1 = x / x_e
unc2 = x * x_e
return unc1, unc2
@staticmethod
def calculate_uncertainties_mw(mo, mo_e):
if mo_e == -999:
unc1 = 0
unc2 = 0
else:
unc1 = mm(mo / mo_e)
unc2 = mm(mo * mo_e)
return unc1, unc2

31
amplitude_spectra.py Normal file
View File

@ -0,0 +1,31 @@
from obspy import Stream
from scipy.fft import rfft, rfftfreq
from scipy.interpolate import interp1d
import numpy as np
def calculate_amp_spectra(station_stream: Stream):
"""
Function returns frequencies and corresponding mean amplitude spectra (for all available traces in stream).
All spectra are interpolated
:param station_stream:
:return:
"""
# Fourier's transformation of signal for each trace (channel) in the stream
freq_bins = None
psd = 0
for tr in station_stream:
fft_signal = rfft(tr.data)
freq_bins = rfftfreq(len(tr.data), d=tr.stats.delta)
amp_spectrum = abs(tr.stats.delta * fft_signal) # Multiplication by sampling step
amp_spectrum = amp_spectrum * np.sqrt(2) # Multiplication by square root to calibrate energy
psd += amp_spectrum ** 2 # for calculation of root-mean-square
# Calculation of root-mean-square for all traces in the stream
mean_amp_spectra = psd ** 0.5
# Interpolation
fi = np.logspace(np.log10(freq_bins[1]), np.log10(freq_bins[-1]), len(freq_bins))
interpolation_function = interp1d(freq_bins, mean_amp_spectra, kind='linear', fill_value='extrapolate')
return fi, interpolation_function(fi)

34
f_models.py Normal file
View File

@ -0,0 +1,34 @@
from SpectralParameters import SpectralParams
class FModel:
def __init__(self, freq_bins, spectral_parameters: SpectralParams):
self.freq = freq_bins
self.sp_par = spectral_parameters
def fmodel(self):
"""Function return given model"""
return
class FBrune(FModel):
"""
Generate Brune's model source spectrum.
"""
def __init__(self, freq_bins, spectral_parameters: SpectralParams):
super().__init__(freq_bins, spectral_parameters)
def fmodel(self):
return self.sp_par.mo / (1 + (self.freq / self.sp_par.fo) ** 2)
class FBoatwright(FModel):
"""
Generate Boatwright's model source spectrum.
"""
def __init__(self, freq_bins, spectral_parameters: SpectralParams):
super().__init__(freq_bins, spectral_parameters)
def fmodel(self):
return self.sp_par.mo / (1 + (self.freq / self.sp_par.fo) ** 4) ** 0.5

62
f_norms.py Normal file
View File

@ -0,0 +1,62 @@
import numpy as np
from f_models import FModel, FBrune, FBoatwright
from SpectralParameters import SpectralParams
from spectral_definitions import damping
class FNormResults:
def __init__(self, misfit, freq_bins, amplitude_spectrum, amp_theor_spec):
self.misfit = misfit
self.freq = freq_bins
self.amplitude_spectrum = amplitude_spectrum
self.amplitude_theoretical_spectrum = amp_theor_spec
def __str__(self):
return f"Misfit: {self.misfit:.2f}"
class FNorm:
def __init__(self, norm, spectral_params: SpectralParams, freq_bins, amplitude_spectrum, weights, travel_time,
source_model: FModel, logger):
self.norm = norm
self.spectral_par = spectral_params
self.freq = freq_bins
self.amp_spectrum = amplitude_spectrum
self.th_amp_sp_damp = None
self.weights = weights
self.travel_time = travel_time
self.f_model = source_model
self.logger = logger
def calculate(self):
self.f_model.sp_par = self.spectral_par
self.f_model.freq = self.freq
# theoretical spectrum model (Brune or Boatwright)
amp_th = self.f_model.fmodel()
# calculation of quality (damping) values for all frequencies)
damping_val = damping(q_factor=self.spectral_par.q, frequencies=self.freq, travel_time=self.travel_time)
if self.norm == "L1":
misfit = self.f_norm_l1(damping_amp=damping_val, amp_th=amp_th)
elif self.norm == "L2":
misfit = self.f_norm_l2(damping_amp=damping_val, amp_th=amp_th)
else:
self.logger.error(f"{self.norm} is a wrong norm. It should be L1 or L2")
return None
# Correcting theoretical model (Brune or Boatwright) for damping (Q factor)
self.th_amp_sp_damp = amp_th / damping_val
return FNormResults(misfit=misfit, freq_bins=self.freq, amplitude_spectrum=self.amp_spectrum,
amp_theor_spec=self.th_amp_sp_damp)
def f_norm_l1(self, damping_amp, amp_th):
amp_residuals = self.weights * np.abs(np.log10(self.amp_spectrum * damping_amp) - np.log10(amp_th))
return np.sum(amp_residuals) / np.size(amp_residuals)
def f_norm_l2(self, damping_amp, amp_th):
amp_residuals = self.weights * (np.log10(self.amp_spectrum * damping_amp) - np.log10(amp_th)) ** 2
return np.sqrt(np.sum(amp_residuals) / amp_residuals.size)

49
json_writer.py Normal file
View File

@ -0,0 +1,49 @@
import json
class JsonWriter:
def __init__(self, solutions, filename, logger):
self.solutions = solutions
self.filename = filename
self.data = self.prepare_file(solutions)
self.logger = logger
def save(self):
with open(self.filename, 'w') as f:
json.dump(self.data, f)
def prepare_file(self, solutions):
# Initialize an empty dictionary to hold the data
stations_dict = {}
# Iterate over the solutions list
for solution in solutions:
station_name = solution[0]
parameters = {"mo": solution[1].mo, "fo": solution[1].fo, "q": solution[1].q, "source_size": solution[1].source_size, "stress_drop": solution[1].stress_drop}
freq = solution[2]
amp_th_sp_q = solution[3]
amp_spectrum = solution[4]
# Round the parameters to two decimal places
parameters = {key: round(value, 2) for key, value in parameters.items()}
# Check if the station already exists in the dictionary
if station_name not in stations_dict:
stations_dict[station_name] = {
"parameters": list(),
"frequency": list(),
"fitted_amplitude_spectrum": list(),
"amplitude_spectrum": list()
}
try:
# Append the data to the respective lists
stations_dict[station_name]["parameters"]=parameters
if freq is not None and amp_spectrum is not None and amp_th_sp_q is not None:
stations_dict[station_name]["frequency"].extend(freq)
stations_dict[station_name]["fitted_amplitude_spectrum"].extend(amp_th_sp_q)
stations_dict[station_name]["amplitude_spectrum"].extend(amp_spectrum)
except AttributeError as ae:
self.logger.error(ae)
return stations_dict

75
opt_algorithms.py Normal file
View File

@ -0,0 +1,75 @@
import scipy.optimize
import numpy as np
from SpectralParameters import SpectralParams
import f_models
from f_norms import FNorm
class OptAlgorithm:
"""Base class for optimization algorithms.
"""
def __init__(self, initial_model: SpectralParams, freq_bins, amplitude_spectrum, weights, travel_time, config,
logger):
self.initial_model = initial_model
self.config = config
self.travel_time = travel_time
self.f_model = getattr(f_models, config.get("sp_fit_model"))(freq_bins=freq_bins,
spectral_parameters=initial_model)
self.f_norm = FNorm(norm=self.config.get("norm"), spectral_params=initial_model, freq_bins=freq_bins,
amplitude_spectrum=amplitude_spectrum, weights=weights, travel_time=travel_time,
source_model=self.f_model, logger=logger)
self.solution = None
self.error = None
self.name = self.__class__.__name__
def run(self):
"""Run the optimization algorithm and return SpectralParams results
:return:
"""
return self.error, self.solution
def __repr__(self):
if self.solution:
output = f"{self.name} results:\n"
output += f" {self.solution.__str__()} \n"
output += f" Error: {self.error:.4f}"
return output
else:
return f"{self.name}: no solution"
class OptNelderMead(OptAlgorithm):
"""
Minimize a function using the downhill simplex algorithm from scipy.optimize.
"""
def __init__(self, initial_model: SpectralParams, freq_bins, amplitude_spectrum, weights, travel_time, config,
logger):
super().__init__(initial_model, freq_bins, amplitude_spectrum, weights, travel_time, config, logger)
self.initial_q = (self.config.get("q_min")+self.config.get("q_max"))/2
def run(self):
def prepare_fun(x):
self.f_norm.spectral_par = SpectralParams(mo=x[0], fo=x[1], q=x[2], )
return self.f_norm.calculate().misfit
# Initial model parameters
x0 = [self.initial_model.mo, self.initial_model.fo, self.initial_q]
# Optimization bounds
bounds = [(None, None), (1 / self.config.get("window_len"), self.config.get("freq_max")),
(self.config.get("q_min"), self.config.get("q_max"))]
# Perform optimization
xopt = scipy.optimize.minimize(method='Nelder-Mead', fun=prepare_fun, x0=np.array(x0), bounds=bounds)
# Store the results
self.solution = SpectralParams(mo=xopt.x[0], fo=xopt.x[1], q=xopt.x[2])
self.error = xopt.fun
return self.error, self.solution

24
sp_jk.py Normal file
View File

@ -0,0 +1,24 @@
import numpy as np
def sp_jk(freq_bins, amp_spectra):
# Calculate source parameters using J and K Snoke's integrals
# with correction for the limited frequency band. The routine
# ignores quality factor in calculations (ideally, the spectrum
# should be corrected for attenuation before).
# For each waveform, calculate the J and K integrals and source parameters.
Av = amp_spectra * 2 * np.pi * freq_bins
jf = (2 * np.trapz(Av ** 2, x=freq_bins) + 2 / 3 * (amp_spectra[0] * 2 * np.pi * freq_bins[0]) ** 2 *
freq_bins[0] + 2 * (amp_spectra[-1] * 2 * np.pi * freq_bins[-1]) ** 2 * freq_bins[-1])
kf = (2 * np.trapz(amp_spectra ** 2, x=freq_bins) + 2 * amp_spectra[0] ** 2 * freq_bins[0] +
2 / 3 * amp_spectra[-1] ** 2 * freq_bins[-1])
# Calculation of spectral level and corner frequency
mo = 2 * (kf ** 3 / jf) ** 0.25 # spectral level from Snoke's integrals
fo = np.sqrt(jf / kf) / (2 * np.pi) # corner frequency
return mo, fo

42
spectral_definitions.py Normal file
View File

@ -0,0 +1,42 @@
import numpy as np
# constants depending on source models
K_BRUNE = 0.37
K_MADARIAGA_P = 0.32
K_MADARIAGA_S = 0.21
# averages of radiation coefficients
G_P = 0.52
G_S = 0.63
def spectrum2moment(spectral_level, density, velocity, distance, g):
return spectral_level * 4.0 * np.pi * density * velocity ** 3 * distance / g
def damping(q_factor, frequencies, travel_time):
"""Exponential damping"""
return np.exp(np.pi * frequencies * travel_time / q_factor)
def mm(mo):
"""Calculate moment magnitude from the spectral level (Mo)
:return moment magnitude (float):
"""
return (np.log10(mo) - 9.1) / 1.5
def m0(mw):
"""Calculate the spectral level (Mo) from the moment magnitude (Mw)
:return spectral level (float):
"""
return 10 ** (mw * 1.5 + 9.1)
def calc_source_size(s_vel, corner_freq, k):
"""Calculate source radius"""
return k * s_vel / corner_freq
def calc_stress_drop(seismic_moment, source_radius):
"""Calculate stress drop"""
return 7 / 16 * seismic_moment / source_radius ** 3

View File

@ -0,0 +1,90 @@
# -*- coding: utf-8 -*-
# -----------------
# Copyright © 2024 ACK Cyfronet AGH, Poland.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This work was partially funded by DT-GEO Project.
# -----------------
import sys
import argparse
from SpectralAnalysisApp import main as SpectralAnalysisApp
def main(argv):
def str2bool(v):
if v.lower() in ("True", "TRUE", "yes", "true", "t", "y", "1"):
return True
elif v.lower() in ("False", "FALSE", "no", "false", "f", "n", "0"):
return False
else:
raise argparse.ArgumentTypeError("Boolean value expected.")
parser = argparse.ArgumentParser()
parser.add_argument("waveforms", help="Path to input file of type miniseed")
parser.add_argument("network_inventory", help="Path to input file of type inventory")
parser.add_argument("picks_qml", help="Path to input file of type phase_association_detections")
parser.add_argument("--velocity_model", help="Path to input file of type velocity_model", required=False)
parser.add_argument("--event_latitude", help="", type=float, default=4000, required=True)
parser.add_argument("--event_longitude", help="", type=float, default=4000, required=True)
parser.add_argument("--event_depth", help="", type=float, default=4000, required=True)
parser.add_argument("--vp", help="P wave velocity in the source",
type=float, default=4000, required=True)
parser.add_argument("--vs", help="S wave velocity in the source",
type=float, default=2500, required=True)
parser.add_argument("--density", help="Rock density in source",
type=float, default=2700, required=True)
parser.add_argument("--taper_type", help="Type of taper",
type=str, default='hann', required=True)
parser.add_argument("--taper_len", help="Maximum length of the taper",
type=float, default=0.1, required=True)
parser.add_argument("--window_len", help="Length of the time window used for spectrum calculation",
type=float, default=1, required=True)
parser.add_argument("--freq_min", help="Minimum frequency for bandpass filtering",
type=float, default=0.1, required=True)
parser.add_argument("--freq_max", help="Maximum frequency for bandpass filtering",
type=float, default=40, required=True)
parser.add_argument("--min_sn_ratio", help="Minimum signal-to-noise ratio for each frequency - for lower S/N values, a given frequency is not taken into account for spectral fitting",
type=float, default=1, required=True)
parser.add_argument("--min_energy_ratio", help="Minimum spectral signal-to-noise ratio for the entire time window for lower S/N values analysis is not performed",
type=float, default=1, required=True)
parser.add_argument("--p_wave_analysis", help="P wave analysis",
type=str2bool, default=True, required=True)
parser.add_argument("--s_wave_analysis", help="S wave analysis",
type=str2bool, default=True, required=True)
parser.add_argument("--raytracing", help="1D model. If False constant velocity values from the source are used for travel time calculations",
type=str2bool, default=True, required=True)
parser.add_argument("--allow_station_elevations", help="Use station elevations (otherwise set them to zero)",
type=str2bool, default=False, required=True)
parser.add_argument("--source_model", help="Source model. List: Madariaga or Brune",
type=str, default="Madariaga", required=True)
parser.add_argument("--sp_fit_model", help="Spectral fitting model. List: FBrune or FBoatwright",
type=str, default="FBoatwright", required=True)
parser.add_argument("--norm", help="Norm for spectral fitting calculations. List: L1 or L2",
type=str, default="L2", required=True)
parser.add_argument("--z_threshold", help="Z threshold for outliers removal (number of standard deviations)",
type=int, default=3, required=True)
parser.add_argument("--q_min", help="Lower bound of Q for spectral fitting",
type=int, default=1, required=True)
parser.add_argument("--q_max", help="Upper bound of Q for spectral fitting",
type=int, default=400, required=True)
args = parser.parse_args()
SpectralAnalysisApp(**vars(args))
return
if __name__ == '__main__':
main(sys.argv)

92
tau_p_raytracing.py Normal file
View File

@ -0,0 +1,92 @@
import os
import logging
import pandas as pd
import scipy.io
from obspy.geodetics import kilometer2degrees
from obspy.taup import TauPyModel
from obspy.taup.taup_create import build_taup_model
from obspy.taup.tau import TauModel
class CustomTauPyModel(TauPyModel):
"""
Custom TauPyModel for filepath modification.
"""
def __init__(self, filepath, model="iasp91", verbose=False, planet_flattening=0.0):
super().__init__(model, verbose, planet_flattening)
self.verbose = verbose
self.planet_flattening = planet_flattening
self.model = self.read_model(filepath)
@staticmethod
def read_model(path):
return TauModel.deserialize(path, cache=None)
class TauPRayTracer:
""" Class for raytracing seismic waves using TauP """
def __init__(self, velocity_mat_file):
logging.basicConfig(filename="application.log",
filemode='a',
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%H:%M:%S',
level=logging.INFO)
self.logger = logging.getLogger('converter')
self.path = "."
self.velocity_model = self._setup_velocity_model(velocity_mat_file)
def _setup_velocity_model(self, velocity_mat_file_name):
velocity_npz_file_path = f"{velocity_mat_file_name[:-3]}npz"
velocity_npz_file_name = velocity_npz_file_path.split('/')[-1]
nd_file = self._mat2nd(velocity_mat_file_name)
build_taup_model(nd_file, output_folder=f"{self.path}")
os.remove(nd_file)
try:
return CustomTauPyModel(filepath=f"{self.path}/{velocity_npz_file_name}")
except FileNotFoundError as er:
self.logger.warning(er)
@staticmethod
def _mat2nd(velocity_mat_file):
# Load the .mat file
mat_contents = scipy.io.loadmat(velocity_mat_file)
data = mat_contents["d"][0][0]
# Create DataFrame from the lists
data_dict = {
'depth': data[0].T[0],
'vp': data[1].T[0],
'vs': data[2].T[0],
'ro': data[3].T[0],
'qp': data[4].T[0],
'qs': data[5].T[0]
}
df = pd.DataFrame(data_dict)
# Adding two new rows to model file - required by TauP
# Get the last row of the DataFrame
last_row = df.iloc[-1].copy()
# Append the last row to the DataFrame
df = pd.concat([df, pd.DataFrame(last_row).T], ignore_index=True)
last_row['depth'] = 6178.1
# Append the modified last row to the DataFrame
df = pd.concat([df, pd.DataFrame(last_row).T], ignore_index=True)
# Specify the name of the text file to save
nd_filename = f'{velocity_mat_file[:-3]}nd'
# Save the DataFrame to a text file
df.to_csv(nd_filename, sep='\t', index=False, header=False)
return nd_filename
def calculate_arrival(self, distance_in_km, source_depth_in_km, receiver_depth_in_km, phase: str):
phase_list = [phase, phase.swapcase()]
return self.velocity_model.get_travel_times(source_depth_in_km=source_depth_in_km,
distance_in_degree=kilometer2degrees(distance_in_km),
receiver_depth_in_km=receiver_depth_in_km,
phase_list=phase_list
)[0]