SourceParametersEstimation/SpectralAnalysis.py

572 lines
28 KiB
Python
Raw Permalink Normal View History

2024-10-09 11:24:44 +02:00
import copy
import csv
from typing import List, Optional
from json_writer import JsonWriter
from scipy import integrate
from scipy.stats import zscore
import numpy as np
from obspy import Stream, UTCDateTime
from obspy.core.event import Event, Pick
from obspy.geodetics import gps2dist_azimuth
from tau_p_raytracing import TauPRayTracer
from sp_jk import sp_jk
from opt_algorithms import OptNelderMead
from SpectralParameters import SpectralParams, EventSpectralParams
from amplitude_spectra import calculate_amp_spectra
from spectral_definitions import (K_MADARIAGA_P, K_MADARIAGA_S, K_BRUNE, G_P, G_S, spectrum2moment, calc_stress_drop,
calc_source_size, mm)
class SpectralAnalysis:
"""
Application for spectral analysis for P- or S-waves. It uses JK integrals and spectral fitting with simplex
optimization algorithm.
JK integrals method returns seismic moment, moment magnitude and corner frequency.
Spectral fitting uses these results as input. It returns seismic moment, moment magnitude and corner frequency
and additionally Q parameter (i.e. quality=1/dumping). In fitting procedure theoretical spectrum
(Brune or Boatwright corrected for Q) is matched to calculated amplitude spectrum of seismic signal.
These methods are used for each station separately and finally mean values are calculated. Outliers (estimated with
z-sqore method) are not used for mean value calculations.
Application enables:
- 1D velocity model for travel time calculations. It uses tau_p raytracer from ObsPy
however for spectral parameters estimation it uses constant parameters i.e. constant S wave velocity and
constant gp, gs and density.
- constant velocities for travel time calculations in case of missing 1D velocity model.
Features of the algorithm:
1. Amplitude spectra are calculated as geometric mean of all traces (e.g. three channels) of seismic station.
2. Application requires P wave pick for P-waves analysis and P or S- wave pick for S wave analysis
(it calculates theoretical S wave pick in case it is not available).
3. Applications calculates signal-to-noise spectral ratio and removes either station (based on mean ratio)
or particular frequencies (based on S/N ratio for this frequency).
"""
def __init__(self, stream: Stream, event: Event, inventory, config, logger):
self.stream = stream
self.event = event
self.inventory = inventory
self.config = config
self.logger = logger
self.raytracer = self._initialize_raytracer()
self.stations = self._get_unique_stations()
self.solutions_jk_P = []
self.solutions_sp_fit_P = []
self.solutions_jk_S = []
self.solutions_sp_fit_S = []
# filenames
self.P_WAVE_SP_FITTING_RESULTS_JSON = config.get("P_WAVE_SP_FITTING_RESULTS_JSON")
self.S_WAVE_SP_FITTING_RESULTS_JSON = config.get("S_WAVE_SP_FITTING_RESULTS_JSON")
self.P_WAVE_JK_RESULTS = config.get("P_WAVE_JK_RESULTS")
self.S_WAVE_JK_RESULTS = config.get("S_WAVE_JK_RESULTS")
self.P_WAVE_SP_FITTING_RESULTS = config.get("P_WAVE_SP_FITTING_RESULTS")
self.S_WAVE_SP_FITTING_RESULTS = config.get("S_WAVE_SP_FITTING_RESULTS")
self.EVENT_RESULTS = config.get("EVENT_RESULTS")
def _initialize_raytracer(self) -> Optional[TauPRayTracer]:
if self.config.get("raytrace"):
return TauPRayTracer(self.config.get("velocity_mat_file"))
return None
def run(self):
for station in self.stations:
self.logger.info(80 * "#")
self.logger.info(f"Running spectral analysis for station: {station}")
# Selecting station data
station_stream = self.stream.select(station=station)
try:
station_inv = self.inventory[0].select(station=station, time=station_stream[0].stats.starttime)[0][0]
except IndexError:
self.logger.warning(f"{station} not in inventory")
continue
# Stream preprocessing
station_stream = self._preprocess_stream(station_stream)
if station_stream:
hor_distance, ver_distance = self._calculate_distance(station_inv)
p_travel_time, s_travel_time = self._calculate_travel_times(horizontal_distance=hor_distance,
vertical_distance=ver_distance,
station_inv=station_inv)
pick_p = self._find_pick(station, self.config.get("p_phase_hint"))
pick_s = self._find_pick(station, self.config.get("s_phase_hint"))
if pick_p:
delta_ps = s_travel_time - p_travel_time
if self.config.get("P_wave_analysis"):
self._perform_p_wave_analysis(station_stream=station_stream, pick_p=pick_p, delta_ps=delta_ps,
p_travel_time=p_travel_time, distance=hor_distance)
self.logger.info(80 * "-")
if self.config.get("S_wave_analysis"):
self._perform_s_wave_analysis(station_stream=station_stream, pick_s=pick_s, pick_p=pick_p,
delta_ps=delta_ps, s_travel_time=s_travel_time,
distance=hor_distance)
self.logger.info(80 * "-")
self._save_results_to_json()
self._save_results_to_csv(filename=self.EVENT_RESULTS)
self._save_results_to_station_csvs()
def _save_results_to_json(self):
# saving solutions to JSON file
solutions_data = [
(self.solutions_sp_fit_P, self.P_WAVE_SP_FITTING_RESULTS_JSON),
(self.solutions_sp_fit_S, self.S_WAVE_SP_FITTING_RESULTS_JSON)
]
for solutions, filename in solutions_data:
JsonWriter(solutions=solutions, filename=filename, logger=self.logger).save()
def _save_results_to_station_csvs(self):
# Zdefiniowanie danych dla poszczególnych plików
solutions_data = [
(self.solutions_jk_P, self.P_WAVE_JK_RESULTS),
(self.solutions_jk_S, self.S_WAVE_JK_RESULTS),
(self.solutions_sp_fit_P, self.P_WAVE_SP_FITTING_RESULTS),
(self.solutions_sp_fit_S, self.S_WAVE_SP_FITTING_RESULTS)
]
# Dla każdego zestawu wyników zapisujemy osobny plik CSV
for solutions, filename in solutions_data:
# Otwieranie pliku CSV do zapisu
with open(filename, mode='w', newline='') as csv_file:
writer = csv.writer(csv_file)
# Zapis nagłówków
headers = ["station_name", "mo", "fo", "q", "mw", "source_size", "stress_drop"]
writer.writerow(headers)
# Zapis danych dla każdej stacji
if len(solutions)>0:
for solution in solutions:
station_name = solution[0] # Zakładam, że nazwa stacji jest w solution[0]
parameters = solution[1] # Zakładam, że parametry są w solution[1]
# Tworzymy wiersz z wartościami parametrów
row = [
station_name,
parameters.mo,
parameters.fo,
parameters.q,
parameters.mw,
parameters.source_size,
parameters.stress_drop
]
# Zapisujemy wiersz do pliku CSV
writer.writerow(row)
def _save_results_to_csv(self, filename):
# Nagłówki kolumn
headers = ["Method", "M0", "E M0", "F0", "E F0", "Q", "E Q", "Mw", "r", "E r", "Stress drop", "E stress drop"]
# Zbieranie danych z czterech źródeł
solutions_jk_P = self._return_event_spec_params(solutions=self.solutions_jk_P)
solutions_jk_S = self._return_event_spec_params(solutions=self.solutions_jk_S)
solutions_sp_fit_P = self._return_event_spec_params(solutions=self.solutions_sp_fit_P)
solutions_sp_fit_S = self._return_event_spec_params(solutions=self.solutions_sp_fit_S)
# Checking for None values
if solutions_jk_P:
data_jk_P = [
"P-waves, JK integrals", solutions_jk_P.mo, solutions_jk_P.mo_e, solutions_jk_P.fo, solutions_jk_P.fo_e,
solutions_jk_P.q, solutions_jk_P.q_e, solutions_jk_P.mw, solutions_jk_P.source_size,
solutions_jk_P.source_size_e, solutions_jk_P.stress_drop, solutions_jk_P.stress_drop_e
]
else:
data_jk_P = [
"P-waves, JK integrals", None, None, None, None,
None, None, None, None,
None, None, None
]
if solutions_jk_S:
data_jk_S = [
"S-waves, JK integrals", solutions_jk_S.mo, solutions_jk_S.mo_e, solutions_jk_S.fo, solutions_jk_S.fo_e,
solutions_jk_S.q, solutions_jk_S.q_e, solutions_jk_S.mw, solutions_jk_S.source_size,
solutions_jk_S.source_size_e, solutions_jk_S.stress_drop, solutions_jk_S.stress_drop_e
]
else:
data_jk_S = [
"S-waves, JK integrals", None, None, None, None,
None, None, None, None,
None, None, None
]
if solutions_sp_fit_P:
data_sp_fit_P = [
"P-waves, spectrum fitting", solutions_sp_fit_P.mo, solutions_sp_fit_P.mo_e, solutions_sp_fit_P.fo, solutions_sp_fit_P.fo_e,
solutions_sp_fit_P.q, solutions_sp_fit_P.q_e, solutions_sp_fit_P.mw, solutions_sp_fit_P.source_size,
solutions_sp_fit_P.source_size_e, solutions_sp_fit_P.stress_drop, solutions_sp_fit_P.stress_drop_e
]
else:
data_sp_fit_P = [
"P-waves, spectrum fitting", None, None, None, None,
None, None, None, None,
None, None, None
]
if solutions_sp_fit_S:
data_sp_fit_S = [
"S-waves, spectrum fitting", solutions_sp_fit_S.mo, solutions_sp_fit_S.mo_e, solutions_sp_fit_S.fo, solutions_sp_fit_S.fo_e,
solutions_sp_fit_S.q, solutions_sp_fit_S.q_e, solutions_sp_fit_S.mw, solutions_sp_fit_S.source_size,
solutions_sp_fit_S.source_size_e, solutions_sp_fit_S.stress_drop, solutions_sp_fit_S.stress_drop_e
]
else:
data_sp_fit_S = [
"S-waves, spectrum fitting", None, None, None, None,
None, None, None, None,
None, None, None
]
data = [data_jk_P, data_jk_S,data_sp_fit_P, data_sp_fit_S]
# Zapis do pliku CSV
with open(filename, mode='w', newline='') as file:
writer = csv.writer(file)
# Zapis nagłówków
writer.writerow(headers)
# Zapis danych
writer.writerows(data)
def _write_event_parameters_to_file(self):
# Define the content to be written to the file
content = (
f"{20 * '#'} RESULTS {20 * '#'}\n"
f"{80 * '-'}\n"
"Event parameters P_jk:\n"
f"{self._return_event_spec_params(solutions=self.solutions_jk_P)}\n"
"Event parameters P_sp_fit:\n"
f"{self._return_event_spec_params(solutions=self.solutions_sp_fit_P)}\n"
"Event parameters S_jk:\n"
f"{self._return_event_spec_params(solutions=self.solutions_jk_S)}\n"
"Event parameters S_sp_fit:\n"
f"{self._return_event_spec_params(solutions=self.solutions_sp_fit_S)}\n"
f"{80 * '-'}\n"
)
# Write the content to the file
with open(self.EVENT_RESULTS, 'w') as file:
file.write(content)
def _get_unique_stations(self) -> List[str]:
return sorted(set([tr.stats.station for tr in self.stream]))
def _preprocess_stream(self, stream: Stream) -> Stream:
stream.detrend("linear")
try:
stream.remove_response(inventory=self.inventory, output="DISP")
except Exception as e:
self.logger.warning(f"{e} No instrument correction - applied integration to obtain displacement signal")
stream.integrate(method="cumtrapz")
return stream.filter(
type="bandpass",
freqmin=self.config.get("freq_min"),
freqmax=self.config.get("freq_max"),
corners=2,
zerophase=True
)
def _calculate_distance(self, station_inv) -> list[float, float]:
try:
horizontal_distance = gps2dist_azimuth(
self.config.get("latitude"), self.config.get("longitude"), station_inv.latitude,
station_inv.longitude
)[0]
if self.config.get("allow_station_elev"):
receiver_depth_in_m = station_inv.elevation
else:
receiver_depth_in_m = 0
vertical_distance = receiver_depth_in_m + self.config.get("depth")
return horizontal_distance, vertical_distance
except AttributeError as err:
(self.logger.error
("No preferred origin in quakeML file. Cannot calculate distance between station and origin"))
raise AttributeError
def _calculate_travel_times(self, horizontal_distance: float, vertical_distance, station_inv) -> tuple:
if not self.raytracer:
vp = self.config.get("vp")
vs = self.config.get("vs")
distance = (horizontal_distance**2+vertical_distance**2)**0.5
p_travel_time = distance / vp
s_travel_time = distance / vs
else:
if self.config.get("allow_station_elev"):
receiver_depth_in_km = station_inv.elevation/1000
else:
receiver_depth_in_km = 0
try:
p_travel_time = self.raytracer.calculate_arrival(horizontal_distance / 1000,
self.config.get("depth") / 1000,
receiver_depth_in_km, phase="p").time
s_travel_time = self.raytracer.calculate_arrival(horizontal_distance / 1000,
self.config.get("depth") / 1000,
receiver_depth_in_km, phase="s").time
except IndexError:
raise Exception("Problem with velocity file. ")
return p_travel_time, s_travel_time
def _find_pick(self, station: str, phase_hint:list) -> Optional[Pick]:
for p in self.event.picks:
if p.waveform_id['station_code'] == station and p.phase_hint in phase_hint:
return p
return None
def _perform_p_wave_analysis(self, station_stream: Stream, pick_p: Pick, delta_ps: float, distance: float,
p_travel_time: float):
self.logger.info("P wave spectral analysis")
if delta_ps < self.config.get("window_len"):
self.logger.warning("P wave window may overlap S waves")
return
signal_window = self._trim_seismogram(station_stream, pick_p.time,
pre_padding=self.config.get("taper_len"),
post_padding=self.config.get("window_len") -
self.config.get("taper_len"))
signal_window = self._window_preprocessing(signal_window)
noise_window = self._trim_seismogram(station_stream, pick_p.time,
pre_padding=2 * self.config.get("taper_len") +
self.config.get("window_len"),
post_padding=-2 * self.config.get("taper_len"))
noise_window = self._window_preprocessing(noise_window)
if len(signal_window) > 0 and len(noise_window) > 0 and pick_p:
self._spectral_fitting(signal_window, noise_window, pick_p, distance, p_travel_time)
else:
self.logger.warning("Not enough data for P wave analysis")
def _perform_s_wave_analysis(self, station_stream: Stream, pick_p: Pick, pick_s: Pick, delta_ps, distance: float,
s_travel_time: float):
self.logger.info("S wave spectral analysis")
if pick_p:
noise_window = self._trim_seismogram(station_stream, pick_p.time,
pre_padding=2 * self.config.get("taper_len") +
self.config.get("window_len"),
post_padding=-2 * self.config.get("taper_len"))
else:
noise_window = self._trim_seismogram(station_stream, station_stream[0].stats.starttime,
pre_padding=-2 * self.config.get("taper_len"),
post_padding=self.config.get("window_len") +
2 * self.config.get("taper_len"))
if not pick_s and pick_p:
pick_s = copy.copy(pick_p)
pick_s.time = pick_s.time + delta_ps
pick_s.phase_hint = "S"
if pick_s:
signal_window = self._trim_seismogram(station_stream, pick_s.time,
pre_padding=self.config.get("taper_len"),
post_padding=self.config.get("window_len")
- self.config.get("taper_len"))
signal_window = self._window_preprocessing(signal_window)
if len(signal_window) > 0 and len(noise_window) > 0:
self._spectral_fitting(signal_window, noise_window, pick_s, distance, s_travel_time)
@staticmethod
def _trim_seismogram(station_stream: Stream, pick_time: UTCDateTime, pre_padding=0.5, post_padding=2.6):
start_time = pick_time - pre_padding
end_time = pick_time + post_padding
return station_stream.slice(start_time, end_time)
def _window_preprocessing(self, signal_window: Stream) -> Stream:
return signal_window.taper(type=self.config.get("taper_type"), max_percentage=0.5,
max_length=self.config.get("taper_len"))
@staticmethod
def signal2noise(signal_amp_spectra, noise_amp_spectra):
# Calculate signal-to-noise ratio
noise_integral = integrate.trapezoid(noise_amp_spectra[1], x=noise_amp_spectra[0])
signal_integral = integrate.trapezoid(signal_amp_spectra[1], x=signal_amp_spectra[0])
signal2noise_ratio = signal_integral / noise_integral
return signal2noise_ratio
def _spectral_fitting(self, signal_window: Stream, noise_window: Stream, pick: Pick, distance: float,
travel_time: float):
# choosing appropriate K value which is related to the source model and type of seismic waves
source_model = self.config.get("sp_source_model")
if source_model == "Brune":
k = K_BRUNE
elif source_model == "Madariaga":
if pick.phase_hint in self.config.get("p_phase_hint"):
k = K_MADARIAGA_P
elif pick.phase_hint in self.config.get("s_phase_hint"):
k = K_MADARIAGA_S
else:
self.logger.warning(f"{pick.phase_hint} is a wrong value - pick.phase_hint should be P or S")
raise Exception(f"{pick.phase_hint} is a wrong value - pick.phase_hint should be P or S")
else:
self.logger.warning(f"{source_model} is a wrong value - sp_source_model should be Brune or Madariaga")
raise Exception(f"{source_model} is a wrong value - sp_source_model should be Brune or Madariaga")
# S wave velocity value
vs = self.config.get("vs")
# Calculation of amplitude spectra for signal and noise windows
xf, yf = calculate_amp_spectra(station_stream=signal_window)
xfn, yfn = calculate_amp_spectra(station_stream=noise_window)
# Calculation of signal-to-noise ratio
signal2noise_ratio = self.signal2noise([xf, yf], [xfn, yfn])
self.logger.info(f"S/N: {signal2noise_ratio}")
# Stopping the analysis when the ratio is too low
if signal2noise_ratio < self.config["min_energy_ratio"]:
self.logger.warning("Signal to noise ratio is too low")
return
# Setting the weights.
weights = np.ones(len(xf))
# If the noise on particular frequencies exceed the threshold value, the weights are equal to 0.
sn = yf / yfn
weights[sn < self.config.get("min_sn_ratio")] = 0
# Setup initial model from J/K integrals. Initial mo is in displacement units.
spectral_level, fo = sp_jk(xf, yf)
spectral_params_sp_jk = SpectralParams(mo=spectral_level, fo=fo, q=400, stress_drop=0, source_size=0)
# Spectral fitting with Simplex. Initial mo is in displacement units.
opt_sim = OptNelderMead(spectral_params_sp_jk, freq_bins=xf, amplitude_spectrum=yf, weights=weights,
travel_time=travel_time, config=self.config, logger=self.logger)
error_sp_fit, solution_sp_fit = opt_sim.run()
# Recalculation of amplitude spectrum level to seismic moment
# for spectral fitting method
solution_sp_fit.mo = self._spectrum_to_moment(pick_phase=pick.phase_hint, config=self.config,
spectral_level=solution_sp_fit.mo, distance=distance)
solution_sp_fit.mw = mm(solution_sp_fit.mo)
#Calculation of source size and stress drop for spectral fitting method
solution_sp_fit.source_size = calc_source_size(vs, solution_sp_fit.fo, k)
solution_sp_fit.stress_drop = calc_stress_drop(seismic_moment=solution_sp_fit.mo, source_radius=solution_sp_fit.source_size)
# for J-K integrals method
mo = self._spectrum_to_moment(pick_phase=pick.phase_hint, config=self.config,
spectral_level=spectral_level, distance=distance)
source_radius = calc_source_size(vs, fo, k)
stress_drop_value = calc_stress_drop(seismic_moment=mo, source_radius=source_radius)
spectral_params_sp_jk = SpectralParams(mo=mo, fo=fo, q=400, stress_drop=stress_drop_value, source_size=source_radius)
self.logger.info(spectral_params_sp_jk)
self.logger.info(solution_sp_fit)
# Appending solutions to the lists
if pick.phase_hint in self.config["p_phase_hint"]:
self.solutions_jk_P.append([signal_window[0].stats.station,
spectral_params_sp_jk,
None,
None,
None])
self.solutions_sp_fit_P.append([signal_window[0].stats.station,
solution_sp_fit,
opt_sim.f_norm.freq,
opt_sim.f_norm.th_amp_sp_damp,
opt_sim.f_norm.amp_spectrum])
elif pick.phase_hint in self.config["s_phase_hint"]:
self.solutions_jk_S.append([signal_window[0].stats.station,
spectral_params_sp_jk,
None,
None,
None])
self.solutions_sp_fit_S.append([signal_window[0].stats.station,
solution_sp_fit,
opt_sim.f_norm.freq,
opt_sim.f_norm.th_amp_sp_damp,
opt_sim.f_norm.amp_spectrum])
else:
self.logger.warning(f"Solutions not saved. Phase hint {self.config['p_phase_hint']} nor "
f"{self.config['s_phase_hint']} does not coincide "
f"with phase hint {pick.phase_hint} from the catalog")
@staticmethod
def _spectrum_to_moment(pick_phase, config, spectral_level, distance):
# Calculation of spectral amplitude in [Nm * s]
if pick_phase == "P":
v = config.get("vp")
g = G_P
else:
v = config.get("vs")
g = G_S
return spectrum2moment(spectral_level=spectral_level, density=config.get("density"), velocity=v,
distance=distance, g=g)
def _return_event_spec_params(self, solutions):
"""
Function calculates mean parameters for seismic event based on stations' results and removes outliers
:param solutions:
:return:
"""
if not solutions:
return None
def outliers_detection(M0, F0, Q, STRESS_DROP, SOURCE_SIZE, z_threshold):
"""Function calculates z_scores and removes outliers which has absolute z_score above z_threshold"""
# Calculate z-scores
z_scores_M0 = zscore(np.log(M0))
z_scores_F0 = zscore(F0)
z_scores_Q = zscore(Q)
# Identify outliers
outliers = (np.abs(z_scores_M0) > z_threshold) | (np.abs(z_scores_F0) > z_threshold) | (
np.abs(z_scores_Q) > z_threshold)
# Filter out outliers
M0_filtered = M0[~outliers]
F0_filtered = F0[~outliers]
Q_filtered = Q[~outliers]
STRESS_DROP_filtered = STRESS_DROP[~outliers]
SOURCE_SIZE_filtered = SOURCE_SIZE[~outliers]
return M0_filtered, F0_filtered, Q_filtered, STRESS_DROP_filtered, SOURCE_SIZE_filtered
M0 = np.array([solution[1].mo for solution in solutions])
F0 = np.array([solution[1].fo for solution in solutions])
Q = np.array([solution[1].q for solution in solutions])
STRESS_DROP = np.array([solution[1].stress_drop for solution in solutions])
SOURCE_SIZE = np.array([solution[1].source_size for solution in solutions])
M0, F0, Q, STRESS_DROP, SOURCE_SIZE = outliers_detection(M0=M0, F0=F0, Q=Q, SOURCE_SIZE=SOURCE_SIZE, STRESS_DROP=STRESS_DROP,
z_threshold=self.config.get("z_threshold"))
# Calculate mean values of parameters
mo, mo_e = self.mef(M0)
fo, fo_e = self.mef(F0)
q, q_e = self.mef(Q)
stress_drop, stress_drop_e = self.mef(STRESS_DROP)
source_size, source_size_e = self.mef(SOURCE_SIZE)
return EventSpectralParams(mo=mo, fo=fo, q=q, mo_e=mo_e, fo_e=fo_e, q_e=q_e, source_size=source_size, source_size_e=source_size_e,
stress_drop=stress_drop, stress_drop_e=stress_drop_e)
@staticmethod
def mef(x):
"""
Function for calculating average values and standard deviations of source parameters which are log-normally distributed.
Formulas based on article: https://doi.org/10.1016/j.pepi.2003.08.006
"""
x_mean = 10 ** np.mean(np.log10(x))
if len(x) == 1:
return x_mean, -999
x_std = ((1 / (len(x) - 1)) * np.sum((np.log10(x) - np.log10(x_mean)) ** 2)) ** 0.5
x_e = 10 ** x_std
return x_mean, x_e