import copy import csv from typing import List, Optional from json_writer import JsonWriter from scipy import integrate from scipy.stats import zscore import numpy as np from obspy import Stream, UTCDateTime from obspy.core.event import Event, Pick from obspy.geodetics import gps2dist_azimuth from tau_p_raytracing import TauPRayTracer from sp_jk import sp_jk from opt_algorithms import OptNelderMead from SpectralParameters import SpectralParams, EventSpectralParams from amplitude_spectra import calculate_amp_spectra from spectral_definitions import (K_MADARIAGA_P, K_MADARIAGA_S, K_BRUNE, G_P, G_S, spectrum2moment, calc_stress_drop, calc_source_size, mm) class SpectralAnalysis: """ Application for spectral analysis for P- or S-waves. It uses JK integrals and spectral fitting with simplex optimization algorithm. JK integrals method returns seismic moment, moment magnitude and corner frequency. Spectral fitting uses these results as input. It returns seismic moment, moment magnitude and corner frequency and additionally Q parameter (i.e. quality=1/dumping). In fitting procedure theoretical spectrum (Brune or Boatwright corrected for Q) is matched to calculated amplitude spectrum of seismic signal. These methods are used for each station separately and finally mean values are calculated. Outliers (estimated with z-sqore method) are not used for mean value calculations. Application enables: - 1D velocity model for travel time calculations. It uses tau_p raytracer from ObsPy however for spectral parameters estimation it uses constant parameters i.e. constant S wave velocity and constant gp, gs and density. - constant velocities for travel time calculations in case of missing 1D velocity model. Features of the algorithm: 1. Amplitude spectra are calculated as geometric mean of all traces (e.g. three channels) of seismic station. 2. Application requires P wave pick for P-waves analysis and P or S- wave pick for S wave analysis (it calculates theoretical S wave pick in case it is not available). 3. Applications calculates signal-to-noise spectral ratio and removes either station (based on mean ratio) or particular frequencies (based on S/N ratio for this frequency). """ def __init__(self, stream: Stream, event: Event, inventory, config, logger): self.stream = stream self.event = event self.inventory = inventory self.config = config self.logger = logger self.raytracer = self._initialize_raytracer() self.stations = self._get_unique_stations() self.solutions_jk_P = [] self.solutions_sp_fit_P = [] self.solutions_jk_S = [] self.solutions_sp_fit_S = [] # filenames self.P_WAVE_SP_FITTING_RESULTS_JSON = config.get("P_WAVE_SP_FITTING_RESULTS_JSON") self.S_WAVE_SP_FITTING_RESULTS_JSON = config.get("S_WAVE_SP_FITTING_RESULTS_JSON") self.P_WAVE_JK_RESULTS = config.get("P_WAVE_JK_RESULTS") self.S_WAVE_JK_RESULTS = config.get("S_WAVE_JK_RESULTS") self.P_WAVE_SP_FITTING_RESULTS = config.get("P_WAVE_SP_FITTING_RESULTS") self.S_WAVE_SP_FITTING_RESULTS = config.get("S_WAVE_SP_FITTING_RESULTS") self.EVENT_RESULTS = config.get("EVENT_RESULTS") def _initialize_raytracer(self) -> Optional[TauPRayTracer]: if self.config.get("raytrace"): return TauPRayTracer(self.config.get("velocity_mat_file")) return None def run(self): for station in self.stations: self.logger.info(80 * "#") self.logger.info(f"Running spectral analysis for station: {station}") # Selecting station data station_stream = self.stream.select(station=station) try: station_inv = self.inventory[0].select(station=station, time=station_stream[0].stats.starttime)[0][0] except IndexError: self.logger.warning(f"{station} not in inventory") continue # Stream preprocessing station_stream = self._preprocess_stream(station_stream) if station_stream: hor_distance, ver_distance = self._calculate_distance(station_inv) p_travel_time, s_travel_time = self._calculate_travel_times(horizontal_distance=hor_distance, vertical_distance=ver_distance, station_inv=station_inv) pick_p = self._find_pick(station, self.config.get("p_phase_hint")) pick_s = self._find_pick(station, self.config.get("s_phase_hint")) if pick_p: delta_ps = s_travel_time - p_travel_time if self.config.get("P_wave_analysis"): self._perform_p_wave_analysis(station_stream=station_stream, pick_p=pick_p, delta_ps=delta_ps, p_travel_time=p_travel_time, distance=hor_distance) self.logger.info(80 * "-") if self.config.get("S_wave_analysis"): self._perform_s_wave_analysis(station_stream=station_stream, pick_s=pick_s, pick_p=pick_p, delta_ps=delta_ps, s_travel_time=s_travel_time, distance=hor_distance) self.logger.info(80 * "-") self._save_results_to_json() self._save_results_to_csv(filename=self.EVENT_RESULTS) self._save_results_to_station_csvs() def _save_results_to_json(self): # saving solutions to JSON file solutions_data = [ (self.solutions_sp_fit_P, self.P_WAVE_SP_FITTING_RESULTS_JSON), (self.solutions_sp_fit_S, self.S_WAVE_SP_FITTING_RESULTS_JSON) ] for solutions, filename in solutions_data: JsonWriter(solutions=solutions, filename=filename, logger=self.logger).save() def _save_results_to_station_csvs(self): # Zdefiniowanie danych dla poszczególnych plików solutions_data = [ (self.solutions_jk_P, self.P_WAVE_JK_RESULTS), (self.solutions_jk_S, self.S_WAVE_JK_RESULTS), (self.solutions_sp_fit_P, self.P_WAVE_SP_FITTING_RESULTS), (self.solutions_sp_fit_S, self.S_WAVE_SP_FITTING_RESULTS) ] # Dla każdego zestawu wyników zapisujemy osobny plik CSV for solutions, filename in solutions_data: # Otwieranie pliku CSV do zapisu with open(filename, mode='w', newline='') as csv_file: writer = csv.writer(csv_file) # Zapis nagłówków headers = ["station_name", "mo", "fo", "q", "mw", "source_size", "stress_drop"] writer.writerow(headers) # Zapis danych dla każdej stacji if len(solutions)>0: for solution in solutions: station_name = solution[0] # Zakładam, że nazwa stacji jest w solution[0] parameters = solution[1] # Zakładam, że parametry są w solution[1] # Tworzymy wiersz z wartościami parametrów row = [ station_name, parameters.mo, parameters.fo, parameters.q, parameters.mw, parameters.source_size, parameters.stress_drop ] # Zapisujemy wiersz do pliku CSV writer.writerow(row) def _save_results_to_csv(self, filename): # Nagłówki kolumn headers = ["Method", "M0", "E M0", "F0", "E F0", "Q", "E Q", "Mw", "r", "E r", "Stress drop", "E stress drop"] # Zbieranie danych z czterech źródeł solutions_jk_P = self._return_event_spec_params(solutions=self.solutions_jk_P) solutions_jk_S = self._return_event_spec_params(solutions=self.solutions_jk_S) solutions_sp_fit_P = self._return_event_spec_params(solutions=self.solutions_sp_fit_P) solutions_sp_fit_S = self._return_event_spec_params(solutions=self.solutions_sp_fit_S) # Checking for None values if solutions_jk_P: data_jk_P = [ "P-waves, JK integrals", solutions_jk_P.mo, solutions_jk_P.mo_e, solutions_jk_P.fo, solutions_jk_P.fo_e, solutions_jk_P.q, solutions_jk_P.q_e, solutions_jk_P.mw, solutions_jk_P.source_size, solutions_jk_P.source_size_e, solutions_jk_P.stress_drop, solutions_jk_P.stress_drop_e ] else: data_jk_P = [ "P-waves, JK integrals", None, None, None, None, None, None, None, None, None, None, None ] if solutions_jk_S: data_jk_S = [ "S-waves, JK integrals", solutions_jk_S.mo, solutions_jk_S.mo_e, solutions_jk_S.fo, solutions_jk_S.fo_e, solutions_jk_S.q, solutions_jk_S.q_e, solutions_jk_S.mw, solutions_jk_S.source_size, solutions_jk_S.source_size_e, solutions_jk_S.stress_drop, solutions_jk_S.stress_drop_e ] else: data_jk_S = [ "S-waves, JK integrals", None, None, None, None, None, None, None, None, None, None, None ] if solutions_sp_fit_P: data_sp_fit_P = [ "P-waves, spectrum fitting", solutions_sp_fit_P.mo, solutions_sp_fit_P.mo_e, solutions_sp_fit_P.fo, solutions_sp_fit_P.fo_e, solutions_sp_fit_P.q, solutions_sp_fit_P.q_e, solutions_sp_fit_P.mw, solutions_sp_fit_P.source_size, solutions_sp_fit_P.source_size_e, solutions_sp_fit_P.stress_drop, solutions_sp_fit_P.stress_drop_e ] else: data_sp_fit_P = [ "P-waves, spectrum fitting", None, None, None, None, None, None, None, None, None, None, None ] if solutions_sp_fit_S: data_sp_fit_S = [ "S-waves, spectrum fitting", solutions_sp_fit_S.mo, solutions_sp_fit_S.mo_e, solutions_sp_fit_S.fo, solutions_sp_fit_S.fo_e, solutions_sp_fit_S.q, solutions_sp_fit_S.q_e, solutions_sp_fit_S.mw, solutions_sp_fit_S.source_size, solutions_sp_fit_S.source_size_e, solutions_sp_fit_S.stress_drop, solutions_sp_fit_S.stress_drop_e ] else: data_sp_fit_S = [ "S-waves, spectrum fitting", None, None, None, None, None, None, None, None, None, None, None ] data = [data_jk_P, data_jk_S,data_sp_fit_P, data_sp_fit_S] # Zapis do pliku CSV with open(filename, mode='w', newline='') as file: writer = csv.writer(file) # Zapis nagłówków writer.writerow(headers) # Zapis danych writer.writerows(data) def _write_event_parameters_to_file(self): # Define the content to be written to the file content = ( f"{20 * '#'} RESULTS {20 * '#'}\n" f"{80 * '-'}\n" "Event parameters P_jk:\n" f"{self._return_event_spec_params(solutions=self.solutions_jk_P)}\n" "Event parameters P_sp_fit:\n" f"{self._return_event_spec_params(solutions=self.solutions_sp_fit_P)}\n" "Event parameters S_jk:\n" f"{self._return_event_spec_params(solutions=self.solutions_jk_S)}\n" "Event parameters S_sp_fit:\n" f"{self._return_event_spec_params(solutions=self.solutions_sp_fit_S)}\n" f"{80 * '-'}\n" ) # Write the content to the file with open(self.EVENT_RESULTS, 'w') as file: file.write(content) def _get_unique_stations(self) -> List[str]: return sorted(set([tr.stats.station for tr in self.stream])) def _preprocess_stream(self, stream: Stream) -> Stream: stream.detrend("linear") try: stream.remove_response(inventory=self.inventory, output="DISP") except Exception as e: self.logger.warning(f"{e} No instrument correction - applied integration to obtain displacement signal") stream.integrate(method="cumtrapz") return stream.filter( type="bandpass", freqmin=self.config.get("freq_min"), freqmax=self.config.get("freq_max"), corners=2, zerophase=True ) def _calculate_distance(self, station_inv) -> list[float, float]: try: horizontal_distance = gps2dist_azimuth( self.config.get("latitude"), self.config.get("longitude"), station_inv.latitude, station_inv.longitude )[0] if self.config.get("allow_station_elev"): receiver_depth_in_m = station_inv.elevation else: receiver_depth_in_m = 0 vertical_distance = receiver_depth_in_m + self.config.get("depth") return horizontal_distance, vertical_distance except AttributeError as err: (self.logger.error ("No preferred origin in quakeML file. Cannot calculate distance between station and origin")) raise AttributeError def _calculate_travel_times(self, horizontal_distance: float, vertical_distance, station_inv) -> tuple: if not self.raytracer: vp = self.config.get("vp") vs = self.config.get("vs") distance = (horizontal_distance**2+vertical_distance**2)**0.5 p_travel_time = distance / vp s_travel_time = distance / vs else: if self.config.get("allow_station_elev"): receiver_depth_in_km = station_inv.elevation/1000 else: receiver_depth_in_km = 0 try: p_travel_time = self.raytracer.calculate_arrival(horizontal_distance / 1000, self.config.get("depth") / 1000, receiver_depth_in_km, phase="p").time s_travel_time = self.raytracer.calculate_arrival(horizontal_distance / 1000, self.config.get("depth") / 1000, receiver_depth_in_km, phase="s").time except IndexError: raise Exception("Problem with velocity file. ") return p_travel_time, s_travel_time def _find_pick(self, station: str, phase_hint:list) -> Optional[Pick]: for p in self.event.picks: if p.waveform_id['station_code'] == station and p.phase_hint in phase_hint: return p return None def _perform_p_wave_analysis(self, station_stream: Stream, pick_p: Pick, delta_ps: float, distance: float, p_travel_time: float): self.logger.info("P wave spectral analysis") if delta_ps < self.config.get("window_len"): self.logger.warning("P wave window may overlap S waves") return signal_window = self._trim_seismogram(station_stream, pick_p.time, pre_padding=self.config.get("taper_len"), post_padding=self.config.get("window_len") - self.config.get("taper_len")) signal_window = self._window_preprocessing(signal_window) noise_window = self._trim_seismogram(station_stream, pick_p.time, pre_padding=2 * self.config.get("taper_len") + self.config.get("window_len"), post_padding=-2 * self.config.get("taper_len")) noise_window = self._window_preprocessing(noise_window) if len(signal_window) > 0 and len(noise_window) > 0 and pick_p: self._spectral_fitting(signal_window, noise_window, pick_p, distance, p_travel_time) else: self.logger.warning("Not enough data for P wave analysis") def _perform_s_wave_analysis(self, station_stream: Stream, pick_p: Pick, pick_s: Pick, delta_ps, distance: float, s_travel_time: float): self.logger.info("S wave spectral analysis") if pick_p: noise_window = self._trim_seismogram(station_stream, pick_p.time, pre_padding=2 * self.config.get("taper_len") + self.config.get("window_len"), post_padding=-2 * self.config.get("taper_len")) else: noise_window = self._trim_seismogram(station_stream, station_stream[0].stats.starttime, pre_padding=-2 * self.config.get("taper_len"), post_padding=self.config.get("window_len") + 2 * self.config.get("taper_len")) if not pick_s and pick_p: pick_s = copy.copy(pick_p) pick_s.time = pick_s.time + delta_ps pick_s.phase_hint = "S" if pick_s: signal_window = self._trim_seismogram(station_stream, pick_s.time, pre_padding=self.config.get("taper_len"), post_padding=self.config.get("window_len") - self.config.get("taper_len")) signal_window = self._window_preprocessing(signal_window) if len(signal_window) > 0 and len(noise_window) > 0: self._spectral_fitting(signal_window, noise_window, pick_s, distance, s_travel_time) @staticmethod def _trim_seismogram(station_stream: Stream, pick_time: UTCDateTime, pre_padding=0.5, post_padding=2.6): start_time = pick_time - pre_padding end_time = pick_time + post_padding return station_stream.slice(start_time, end_time) def _window_preprocessing(self, signal_window: Stream) -> Stream: return signal_window.taper(type=self.config.get("taper_type"), max_percentage=0.5, max_length=self.config.get("taper_len")) @staticmethod def signal2noise(signal_amp_spectra, noise_amp_spectra): # Calculate signal-to-noise ratio noise_integral = integrate.trapezoid(noise_amp_spectra[1], x=noise_amp_spectra[0]) signal_integral = integrate.trapezoid(signal_amp_spectra[1], x=signal_amp_spectra[0]) signal2noise_ratio = signal_integral / noise_integral return signal2noise_ratio def _spectral_fitting(self, signal_window: Stream, noise_window: Stream, pick: Pick, distance: float, travel_time: float): # choosing appropriate K value which is related to the source model and type of seismic waves source_model = self.config.get("sp_source_model") if source_model == "Brune": k = K_BRUNE elif source_model == "Madariaga": if pick.phase_hint in self.config.get("p_phase_hint"): k = K_MADARIAGA_P elif pick.phase_hint in self.config.get("s_phase_hint"): k = K_MADARIAGA_S else: self.logger.warning(f"{pick.phase_hint} is a wrong value - pick.phase_hint should be P or S") raise Exception(f"{pick.phase_hint} is a wrong value - pick.phase_hint should be P or S") else: self.logger.warning(f"{source_model} is a wrong value - sp_source_model should be Brune or Madariaga") raise Exception(f"{source_model} is a wrong value - sp_source_model should be Brune or Madariaga") # S wave velocity value vs = self.config.get("vs") # Calculation of amplitude spectra for signal and noise windows xf, yf = calculate_amp_spectra(station_stream=signal_window) xfn, yfn = calculate_amp_spectra(station_stream=noise_window) # Calculation of signal-to-noise ratio signal2noise_ratio = self.signal2noise([xf, yf], [xfn, yfn]) self.logger.info(f"S/N: {signal2noise_ratio}") # Stopping the analysis when the ratio is too low if signal2noise_ratio < self.config["min_energy_ratio"]: self.logger.warning("Signal to noise ratio is too low") return # Setting the weights. weights = np.ones(len(xf)) # If the noise on particular frequencies exceed the threshold value, the weights are equal to 0. sn = yf / yfn weights[sn < self.config.get("min_sn_ratio")] = 0 # Setup initial model from J/K integrals. Initial mo is in displacement units. spectral_level, fo = sp_jk(xf, yf) spectral_params_sp_jk = SpectralParams(mo=spectral_level, fo=fo, q=400, stress_drop=0, source_size=0) # Spectral fitting with Simplex. Initial mo is in displacement units. opt_sim = OptNelderMead(spectral_params_sp_jk, freq_bins=xf, amplitude_spectrum=yf, weights=weights, travel_time=travel_time, config=self.config, logger=self.logger) error_sp_fit, solution_sp_fit = opt_sim.run() # Recalculation of amplitude spectrum level to seismic moment # for spectral fitting method solution_sp_fit.mo = self._spectrum_to_moment(pick_phase=pick.phase_hint, config=self.config, spectral_level=solution_sp_fit.mo, distance=distance) solution_sp_fit.mw = mm(solution_sp_fit.mo) #Calculation of source size and stress drop for spectral fitting method solution_sp_fit.source_size = calc_source_size(vs, solution_sp_fit.fo, k) solution_sp_fit.stress_drop = calc_stress_drop(seismic_moment=solution_sp_fit.mo, source_radius=solution_sp_fit.source_size) # for J-K integrals method mo = self._spectrum_to_moment(pick_phase=pick.phase_hint, config=self.config, spectral_level=spectral_level, distance=distance) source_radius = calc_source_size(vs, fo, k) stress_drop_value = calc_stress_drop(seismic_moment=mo, source_radius=source_radius) spectral_params_sp_jk = SpectralParams(mo=mo, fo=fo, q=400, stress_drop=stress_drop_value, source_size=source_radius) self.logger.info(spectral_params_sp_jk) self.logger.info(solution_sp_fit) # Appending solutions to the lists if pick.phase_hint in self.config["p_phase_hint"]: self.solutions_jk_P.append([signal_window[0].stats.station, spectral_params_sp_jk, None, None, None]) self.solutions_sp_fit_P.append([signal_window[0].stats.station, solution_sp_fit, opt_sim.f_norm.freq, opt_sim.f_norm.th_amp_sp_damp, opt_sim.f_norm.amp_spectrum]) elif pick.phase_hint in self.config["s_phase_hint"]: self.solutions_jk_S.append([signal_window[0].stats.station, spectral_params_sp_jk, None, None, None]) self.solutions_sp_fit_S.append([signal_window[0].stats.station, solution_sp_fit, opt_sim.f_norm.freq, opt_sim.f_norm.th_amp_sp_damp, opt_sim.f_norm.amp_spectrum]) else: self.logger.warning(f"Solutions not saved. Phase hint {self.config['p_phase_hint']} nor " f"{self.config['s_phase_hint']} does not coincide " f"with phase hint {pick.phase_hint} from the catalog") @staticmethod def _spectrum_to_moment(pick_phase, config, spectral_level, distance): # Calculation of spectral amplitude in [Nm * s] if pick_phase == "P": v = config.get("vp") g = G_P else: v = config.get("vs") g = G_S return spectrum2moment(spectral_level=spectral_level, density=config.get("density"), velocity=v, distance=distance, g=g) def _return_event_spec_params(self, solutions): """ Function calculates mean parameters for seismic event based on stations' results and removes outliers :param solutions: :return: """ if not solutions: return None def outliers_detection(M0, F0, Q, STRESS_DROP, SOURCE_SIZE, z_threshold): """Function calculates z_scores and removes outliers which has absolute z_score above z_threshold""" # Calculate z-scores z_scores_M0 = zscore(np.log(M0)) z_scores_F0 = zscore(F0) z_scores_Q = zscore(Q) # Identify outliers outliers = (np.abs(z_scores_M0) > z_threshold) | (np.abs(z_scores_F0) > z_threshold) | ( np.abs(z_scores_Q) > z_threshold) # Filter out outliers M0_filtered = M0[~outliers] F0_filtered = F0[~outliers] Q_filtered = Q[~outliers] STRESS_DROP_filtered = STRESS_DROP[~outliers] SOURCE_SIZE_filtered = SOURCE_SIZE[~outliers] return M0_filtered, F0_filtered, Q_filtered, STRESS_DROP_filtered, SOURCE_SIZE_filtered M0 = np.array([solution[1].mo for solution in solutions]) F0 = np.array([solution[1].fo for solution in solutions]) Q = np.array([solution[1].q for solution in solutions]) STRESS_DROP = np.array([solution[1].stress_drop for solution in solutions]) SOURCE_SIZE = np.array([solution[1].source_size for solution in solutions]) M0, F0, Q, STRESS_DROP, SOURCE_SIZE = outliers_detection(M0=M0, F0=F0, Q=Q, SOURCE_SIZE=SOURCE_SIZE, STRESS_DROP=STRESS_DROP, z_threshold=self.config.get("z_threshold")) # Calculate mean values of parameters mo, mo_e = self.mef(M0) fo, fo_e = self.mef(F0) q, q_e = self.mef(Q) stress_drop, stress_drop_e = self.mef(STRESS_DROP) source_size, source_size_e = self.mef(SOURCE_SIZE) return EventSpectralParams(mo=mo, fo=fo, q=q, mo_e=mo_e, fo_e=fo_e, q_e=q_e, source_size=source_size, source_size_e=source_size_e, stress_drop=stress_drop, stress_drop_e=stress_drop_e) @staticmethod def mef(x): """ Function for calculating average values and standard deviations of source parameters which are log-normally distributed. Formulas based on article: https://doi.org/10.1016/j.pepi.2003.08.006 """ x_mean = 10 ** np.mean(np.log10(x)) if len(x) == 1: return x_mean, -999 x_std = ((1 / (len(x) - 1)) * np.sum((np.log10(x) - np.log10(x_mean)) ** 2)) ** 0.5 x_e = 10 ** x_std return x_mean, x_e