ISEPOS-2360 Move spectral parameter source code from apps_to_migrate from official-apps-artifact-builder

This commit is contained in:
Mieszko Makuch 2025-03-25 14:38:48 +01:00
parent 8d7613d9e8
commit 537e376d6c
19 changed files with 1304 additions and 1081 deletions

View File

@ -1,552 +0,0 @@
import sys
import numpy as np
import pandas as pd
# import os
import scipy.io
import logging
from geopy.distance import geodesic
from geopy.point import Point
from util.base_logger import getDefaultLogger
def main(Input_catalog, Input_injection_rate, time_win_in_hours, time_step_in_hour, time_win_type,
End_time, ev_limit, Model_index, Mc, Mu, G, ssd, C, b_value_type, cl, mag_name, time_inj=None, time_shut_in=None,
time_big_ev=None, Inpar = ["SER", "dv", "d_num"]):
"""
Main function for application: MAXIMUM_MAGNITUDE_DETERMINISTIC_MODELS
Arguments:
Input catalog: path to input file of type 'catalog'
Input injection rate: path to input file of type 'injection_rate'
**kwargs: Model paramters.
Returns:
'PLOT_Mmax_param': Plot of all results (also check 'application.log' for more info)
'DATA_Mmax_param': Results with 'csv' format
'application.log': Logging file
"""
f_indx = Model_index
b_method = b_value_type
Plot_flag = 0
# Setting up the logfile--------------------------------
logger = getDefaultLogger(__name__)
# Importing utilities ----------------------
logger.info("Import utilities")
from util.Find_idx4Time import Find_idx4Time
from util.CandidateEventsTS import CandidateEventsTS
from util.M_max_models import M_max_models
def latlon_to_enu(lat, lon, alt):
lat0 = np.mean(lat)
lon0 = np.mean(lon)
alt0 = 0
# Reference point (origin)
origin = Point(lat0, lon0, alt0)
east = np.zeros_like(lon)
north = np.zeros_like(lat)
up = np.zeros_like(alt)
for i in range(len(lon)):
# Target point
target = Point(lat[i], lon[i], alt[i])
# Calculate East-North-Up
east[i] = geodesic((lat0, lon0), (lat0, lon[i])).meters
if lon[i] < lon0:
east[i] = -east[i]
north[i] = geodesic((lat0, lon0), (lat[i], lon0)).meters
if lat[i] < lat0:
north[i] = -north[i]
up = alt - alt0
return east, north, up
# Importing data
logger.info("Import data")
mat = scipy.io.loadmat(Input_catalog)
Cat_structure_name = scipy.io.whosmat(Input_catalog)[0][0]
Cat_structure = mat[Cat_structure_name]
Cat_id, Cat_t, Cat_m = [], [], []
Cat_x, Cat_y, Cat_z = [], [], []
Cat_lat, Cat_lon, Cat_elv, Cat_depth = [], [], [], []
for i in range(1,Cat_structure.shape[1]):
if Cat_structure.T[i][0][0][0] == 'X':
Cat_x = Cat_structure.T[i][0][2]
if not all(np.isfinite(Cat_x)):
raise ValueError("Catalog-X contains infinite value")
if Cat_structure.T[i][0][3][0] == 'km':
Cat_x *= 1000
if Cat_structure.T[i][0][0][0] == 'Lat':
Cat_lat = Cat_structure.T[i][0][2]
if not all(np.isfinite(Cat_lat)):
raise ValueError("Catalog-Lat contains infinite value")
if Cat_structure.T[i][0][0][0] == 'Y':
Cat_y = Cat_structure.T[i][0][2]
if not all(np.isfinite(Cat_y)):
raise ValueError("Catalog-Y contains infinite value")
if Cat_structure.T[i][0][3][0] == 'km':
Cat_y *= 1000
if Cat_structure.T[i][0][0][0] == 'Long':
Cat_lon = Cat_structure.T[i][0][2]
if not all(np.isfinite(Cat_lon)):
raise ValueError("Catalog-Long contains infinite value")
if Cat_structure.T[i][0][0][0] == 'Z':
Cat_z = Cat_structure.T[i][0][2]
if not all(np.isfinite(Cat_z)):
raise ValueError("Catalog-Z contains infinite value")
if Cat_structure.T[i][0][3][0] == 'km':
Cat_z *= 1000
if Cat_structure.T[i][0][0][0] == 'Depth':
Cat_depth = Cat_structure.T[i][0][2]
if not all(np.isfinite(Cat_depth)):
raise ValueError("Catalog-Depth contains infinite value")
if Cat_structure.T[i][0][3][0] == 'km':
Cat_depth *= 1000
if Cat_structure.T[i][0][0][0] == 'Elevation':
Cat_elv = Cat_structure.T[i][0][2]
if not all(np.isfinite(Cat_elv)):
raise ValueError("Catalog-Elevation contains infinite value")
if Cat_structure.T[i][0][3][0] == 'km':
Cat_elv *= 1000
if Cat_structure.T[i][0][0][0] == 'Time':
Cat_t = Cat_structure.T[i][0][2]
if not all(np.isfinite(Cat_t)):
raise ValueError("Catalog-Time contains infinite value")
if Cat_structure.T[i][0][0][0] == mag_name:
Cat_m = Cat_structure.T[i][0][2]
# if not all(np.isfinite(Cat_m)):
if np.argwhere(all(np.isfinite(Cat_m))):
raise ValueError("Catalog-Magnitude contains infinite value")
if any(Cat_x):
Cat_id = np.linspace(0,Cat_x.shape[0],Cat_x.shape[0]).reshape((Cat_x.shape[0],1))
arg = (Cat_id, Cat_x, Cat_y, Cat_z, Cat_t, Cat_m)
Cat = np.concatenate(arg, axis=1)
elif any(Cat_lat):
if any(Cat_elv):
Cat_x, Cat_y, Cat_z = latlon_to_enu(Cat_lat, Cat_lon, Cat_elv)
elif any(Cat_depth):
Cat_x, Cat_y, Cat_z = latlon_to_enu(Cat_lat, Cat_lon, Cat_depth)
else:
raise ValueError("Catalog Depth or Elevation is not available")
Cat_id = np.linspace(0,Cat_x.shape[0],Cat_x.shape[0]).reshape((Cat_x.shape[0],1))
arg = (Cat_id, Cat_x, Cat_y, Cat_z, Cat_t, Cat_m)
Cat = np.concatenate(arg, axis=1)
else:
raise ValueError("Catalog data are not available")
mat = scipy.io.loadmat(Input_injection_rate)
Inj_structure = mat['d']
if 'Date' in Inj_structure.dtype.names:
inj_date = Inj_structure['Date'][0,0]
inj_rate = Inj_structure['Injection_rate'][0,0]
Hyd = np.concatenate((inj_date,inj_rate), axis=1)
else:
raise ValueError("Injection data are not available")
if Cat[0,4]>np.max(Hyd[:,0]) or Hyd[0,0]>np.max(Cat[:,4]):
raise ValueError('Catalog and injection data do not have time coverage!')
if Hyd[0,0] < Cat[0,4]:
same_time_idx = Find_idx4Time(Hyd[:,0], Cat[0,4])
Hyd = Hyd[same_time_idx:,:]
Hyd[:,0] = (Hyd[:,0] - Cat[0,4])*24*3600
Cat[:,4] = (Cat[:,4] - Cat[0,4])*24*3600
logger.info('Start of the computations is based on the time of catalog data.')
# Model dictionary
Feat_dic = {
'indx' :[0 ,1 ,2 ,3 ,4 ,5 ],
'Full_names':['All_M_max' ,'McGarr' ,'Hallo' ,'Li' ,'van-der-Elst','Shapiro' ],
'Short_name':['max_all' , 'max_mcg', 'max_hlo', 'max_li', 'max_vde' , 'max_shp'],
'f_num' :[5 ,4 ,4 ,1 ,6 ,4 ],
'Param' :[{'Mc': 0.8, 'b_method': ['b'], 'cl': [0.37], 'Inpar': ['dv','d_num'], 'Mu':0.6, 'G': 35*10**(9), 'ssd': 3*10**6, 'C': 0.95, 'ev_limit': 20, 'num_bootstraps': 100}]
}
Feat_dic['Param'][0]['Inpar'] = Inpar
Feat_dic['Param'][0]['ev_limit'] = ev_limit
num_bootstraps = 100 # Number of bootstraping for standard error computation
Feat_dic['Param'][0]['num_bootstraps'] = num_bootstraps
if f_indx in [0,1,2,4]:
if Mc < np.min(Cat[:,-1]) or Mc > np.max(Cat[:,-1]):
raise ValueError("Completeness magnitude (Mc) is out of magnitude range")
Feat_dic['Param'][0]['Mc'] = Mc
if f_indx in [0,1,2]:
if Mu < 0.2 or Mu > 0.8:
raise ValueError("Friction coefficient (Mu) must be between [0.2, 0.8]")
Feat_dic['Param'][0]['Mu'] = Mu
if f_indx in [0,1,2,3]:
if G < 1*10**(9) or G > 100*10**(9):
raise ValueError("Shear modulus of reservoir rock (G) must be between [1, 100] GPa")
Feat_dic['Param'][0]['G'] = G
if f_indx in [0,5]:
if ssd < 0.1*10**(6) or ssd > 100*10**(6):
raise ValueError("Static stress drop (ssd) must be between [0.1, 100] MPa")
Feat_dic['Param'][0]['ssd'] = ssd
if f_indx in [0,5]:
if C < 0.5 or C > 5:
raise ValueError("Geometrical constant (C) of Shaprio's model must be between [0.5, 5]")
Feat_dic['Param'][0]['C'] = C
if f_indx in [0,1,2,4]:
if not b_method:
raise ValueError("Please chose an option for b-value")
if f_indx in [0,4]:
for cl_i in cl:
if cl_i < 0 or cl_i > 1:
raise ValueError("Confidence level (cl) of van der Elst model must be between [0, 1]")
Feat_dic['Param'][0]['cl'] = cl
# Setting up based on the config and model dic --------------
ModelClass = M_max_models()
Model_name = Feat_dic['Full_names'][f_indx]
ModelClass.f_name = Feat_dic['Short_name'][f_indx]
f_num = Feat_dic['f_num'][f_indx]
if Feat_dic['Param'][0]['Mc']:
ModelClass.Mc = Mc
else:
Mc = np.min(Cat[:,-1])
ModelClass.Mc = Mc
time_win = time_win_in_hours*3600 # in sec
ModelClass.time_win = time_win
if f_indx == 0:
# Only first b_methods is used
Feat_dic['Param'][0]['b_method'] = b_method[0]
ModelClass.b_method = Feat_dic['Param'][0]['b_method']
logger.info(f"All models are based on b_method: { b_method[0]}")
Feat_dic['Param'][0]['cl'] = [cl[0]]
ModelClass.cl = Feat_dic['Param'][0]['cl']
logger.info(f"All models are based on cl: { cl[0]}")
ModelClass.Mu = Feat_dic['Param'][0]['Mu']
ModelClass.G = Feat_dic['Param'][0]['G']
ModelClass.ssd = Feat_dic['Param'][0]['ssd']
ModelClass.C = Feat_dic['Param'][0]['C']
ModelClass.num_bootstraps = Feat_dic['Param'][0]['num_bootstraps']
if f_indx == 1 or f_indx == 2:
ModelClass.b_method = Feat_dic['Param'][0]['b_method']
ModelClass.Mu = Feat_dic['Param'][0]['Mu']
ModelClass.G = Feat_dic['Param'][0]['G']
ModelClass.num_bootstraps = Feat_dic['Param'][0]['num_bootstraps']
Feat_dic['Param'][0]['cl'] = [None]
if f_indx == 3:
Feat_dic['Param'][0]['b_method'] = [None]
ModelClass.G = Feat_dic['Param'][0]['G']
Feat_dic['Param'][0]['cl'] = [None]
if f_indx == 4:
ModelClass.b_method = Feat_dic['Param'][0]['b_method']
ModelClass.num_bootstraps = Feat_dic['Param'][0]['num_bootstraps']
ModelClass.G = Feat_dic['Param'][0]['G']
ModelClass.cl = Feat_dic['Param'][0]['cl']
if f_indx == 5:
ModelClass.ssd = Feat_dic['Param'][0]['ssd']
ModelClass.C = Feat_dic['Param'][0]['C']
Feat_dic['Param'][0]['b_method'] = [None]
Feat_dic['Param'][0]['cl'] = [None]
# Output dictionary --------------------------------
Output_dict = {
'Type' :['idx', 'Time[day]'],
'label' :[None, None],
'b_method' :[None, None],
'cl' :[None, None],
}
c_out = 2
if any(Feat_dic['Param'][0]['b_method']) and any(Feat_dic['Param'][0]['cl']):
for i in range(len(Feat_dic['Param'][0]['b_method'])):
for j in range(len(Feat_dic['Param'][0]['cl'])):
if f_indx == 0: # f_index == 0
for i in Feat_dic['Full_names'][1:]:
Output_dict['Type'].append('Maximum magnitude')
Output_dict['label'].append(i)
Output_dict['b_method'].append(None)
Output_dict['cl'].append(None)
c_out += 1
elif j == 0: # f_index == 4
Output_dict['Type'].append('b_value')
Output_dict['label'].append('b_value')
Output_dict['b_method'].append(Feat_dic['Param'][0]['b_method'][i])
Output_dict['cl'].append(None)
Output_dict['Type'].append('Standard Error')
Output_dict['label'].append('b_std_err')
Output_dict['b_method'].append(Feat_dic['Param'][0]['b_method'][i])
Output_dict['cl'].append(None)
Output_dict['Type'].append('Seismogenic Index')
Output_dict['label'].append('SI')
Output_dict['b_method'].append(Feat_dic['Param'][0]['b_method'][i])
Output_dict['cl'].append(None)
Output_dict['Type'].append('Standard Error')
Output_dict['label'].append('si_std_err')
Output_dict['b_method'].append(Feat_dic['Param'][0]['b_method'][i])
Output_dict['cl'].append(None)
Output_dict['Type'].append('Maximum magnitude')
Output_dict['label'].append(Model_name)
Output_dict['b_method'].append(Feat_dic['Param'][0]['b_method'][i])
Output_dict['cl'].append(Feat_dic['Param'][0]['cl'][j])
Output_dict['Type'].append('Standard Error')
Output_dict['label'].append('M_std_err')
Output_dict['b_method'].append(Feat_dic['Param'][0]['b_method'][i])
Output_dict['cl'].append(None)
c_out += 6
else: # f_index == 4
Output_dict['Type'].append('Maximum magnitude')
Output_dict['label'].append(Model_name)
Output_dict['b_method'].append(Feat_dic['Param'][0]['b_method'][i])
Output_dict['cl'].append(Feat_dic['Param'][0]['cl'][j])
Output_dict['Type'].append('Standard Error')
Output_dict['label'].append('M_std_err')
Output_dict['b_method'].append(Feat_dic['Param'][0]['b_method'][i])
Output_dict['cl'].append(None)
c_out += 2
elif any(Feat_dic['Param'][0]['b_method']): # f_index == 1, 2
for i in range(len(Feat_dic['Param'][0]['b_method'])):
Output_dict['Type'].append('b_value')
Output_dict['label'].append('b_value')
Output_dict['b_method'].append(Feat_dic['Param'][0]['b_method'][i])
Output_dict['cl'].append(None)
Output_dict['Type'].append('Standard Error')
Output_dict['label'].append('b_std_err')
Output_dict['b_method'].append(Feat_dic['Param'][0]['b_method'][i])
Output_dict['cl'].append(None)
Output_dict['Type'].append('Maximum magnitude')
Output_dict['label'].append(Model_name)
Output_dict['b_method'].append(Feat_dic['Param'][0]['b_method'][i])
Output_dict['cl'].append(None)
Output_dict['Type'].append('Standard Error')
Output_dict['label'].append('M_std_err')
Output_dict['b_method'].append(Feat_dic['Param'][0]['b_method'][i])
Output_dict['cl'].append(None)
c_out += 4
elif f_indx == 5: # f_index == 5
# for i in ['L(max)','L(int)','L(min)', 'L(avg)']:
# Output_dict['Type'].append('Length[m]')
# Output_dict['label'].append(i)
# Output_dict['b_method'].append(None)
# Output_dict['cl'].append(None)
# Output_dict['Type'].append('Maximum magnitude')
# Output_dict['label'].append(Model_name)
# Output_dict['b_method'].append(None)
# Output_dict['cl'].append(None)
# c_out += 5
for i in ['Shapiro (Lmax)','Shapiro (Lint)','Shapiro (Lmin)', 'Shapiro (Lavg)']:
Output_dict['Type'].append('Maximum magnitude')
Output_dict['label'].append(i)
Output_dict['b_method'].append(None)
Output_dict['cl'].append(None)
c_out += 4
else: # f_index == 3
Output_dict['Type'].append('Maximum magnitude')
Output_dict['label'].append(Model_name)
Output_dict['b_method'].append(None)
Output_dict['cl'].append(None)
c_out += 1
# Add True maximum magnitude to the outputs
Output_dict['Type'].append('Maximum magnitude')
Output_dict['label'].append('True Max-Mag')
Output_dict['b_method'].append(None)
Output_dict['cl'].append(None)
c_out += 1
# if any(Feat_dic['Param'][0]['Inpar']): # Input parameters
if Inpar:
for i in Feat_dic['Param'][0]['Inpar']:
if i == 'd_num':
Output_dict['Type'].append('Number of events')
Output_dict['label'].append('Ev_Num')
elif i == 'dv':
Output_dict['Type'].append('Volume[m3]')
Output_dict['label'].append('Vol')
elif i == 'Mo':
Output_dict['Type'].append('Seismic moment')
Output_dict['label'].append('Mo')
elif i == 'SER':
Output_dict['Type'].append('Seismic efficiency ratio')
Output_dict['label'].append('SER')
Output_dict['b_method'].append(None)
Output_dict['cl'].append(None)
c_out += 1
# Functions ------------------------------
# Computing in extending time or time window
def Feature_in_Time(In_arr):
SER_l = 0
SER_c = 0
i_row = 0
for i in np.arange(1,Times):
# Defining time window and data
if time_win_type == 0:
ModelClass.time_win = i*time_step
candidate_events = CandidateEventsTS(Cat, i*time_step, None, i*time_step)
else:
candidate_events = CandidateEventsTS(Cat, i*time_step, None, time_win)
data = candidate_events.filter_data()
# USER: Check if cumulative dv is correctly computed !!!
end_time_indx = Find_idx4Time(Hyd[:,0], i*time_step)
# ModelClass.dv = np.sum(Hyd[:end_time_indx,1])
ModelClass.dv = np.trapz(Hyd[:end_time_indx,1], Hyd[:end_time_indx,0])/60
if len(data) > Feat_dic['Param'][0]['ev_limit'] and ModelClass.dv > 0:
ModelClass.Mo = np.sum(10**(1.5*data[:end_time_indx,5]+9.1))
if f_indx != 5: # Parameters have not been assinged for f_index == 5
SER_c = ModelClass.Mo/(2.0*ModelClass.G*ModelClass.dv)
SER_l = np.max((SER_c, SER_l))
ModelClass.SER = SER_l
ModelClass.data = data
In_arr[i_row,0] = i # index
In_arr[i_row,1] = i*time_step # Currecnt time
c = 2
if any(Feat_dic['Param'][0]['b_method']) and any(Feat_dic['Param'][0]['cl']):
for ii in range(len(Feat_dic['Param'][0]['b_method'])):
ModelClass.b_method = Feat_dic['Param'][0]['b_method'][ii]
for jj in range(len(Feat_dic['Param'][0]['cl'])): # f_index == 4
ModelClass.cl = Feat_dic['Param'][0]['cl'][jj]
Out = ModelClass.ComputeModel()
if jj == 0:
In_arr[i_row,c:c+f_num] = Out
c += f_num
else:
In_arr[i_row,c:c+2] = Out[-2:]
c += 2
elif any(Feat_dic['Param'][0]['b_method']): # f_index == 1, 2
for ii in range(len(Feat_dic['Param'][0]['b_method'])):
ModelClass.b_method = Feat_dic['Param'][0]['b_method'][ii]
In_arr[i_row,c:c+f_num] = ModelClass.ComputeModel()
c += f_num
else: # f_index = 0, 3, 5
In_arr[i_row,c:c+f_num] = ModelClass.ComputeModel()
c += f_num
# Compute true maximum magnitude in the data
In_arr[i_row,c] = np.max(data[:end_time_indx,5])
c += 1
if Inpar:
for ii in range(len(Feat_dic['Param'][0]['Inpar'])): # Add input parameters
if Feat_dic['Param'][0]['Inpar'][ii] == 'd_num':
In_arr[i_row,c+ii] = data.shape[0]
elif Feat_dic['Param'][0]['Inpar'][ii] == 'dv':
In_arr[i_row,c+ii] = ModelClass.dv
elif Feat_dic['Param'][0]['Inpar'][ii] == 'Mo':
In_arr[i_row,c+ii] = ModelClass.Mo
elif Feat_dic['Param'][0]['Inpar'][ii] == 'SER':
if ModelClass.SER:
In_arr[i_row,c+ii] = ModelClass.SER
i_row += 1
return In_arr[:i_row,:]
# Run functions based on the configurations -------------------
# Computing model
if time_step_in_hour > time_win_in_hours:
raise ValueError('Time steps should be <= time window.')
if (time_win_in_hours+time_step_in_hour)*3600 > np.max(Cat[:,4]):
raise ValueError('Time window and time steps are like that no more than three computations is possible. Use smaller value for either time window or time step.')
time_step = time_step_in_hour*3600
if End_time:
if End_time*24*3600 > np.max(Cat[:,4])+24*3600:
raise ValueError(f'End_time is longer than the maximum time of catalog, which is {np.ceil(np.max(Cat[:,4])/24/3600)} day!')
elif time_step > 0:
Times = int(np.floor((End_time*24*3600 - Cat[0,4]) / time_step))
else:
Times = 2
time_step = time_win
elif time_step > 0:
Times = int(np.floor((np.max(Cat[:,4]) - Cat[0,4]) / time_step))
else:
Times = 2
time_step = time_win
Model_Param_array = np.zeros((Times,c_out))
logger.info("Computing input parameter(s) and the model(s)")
Model_Param_array = Feature_in_Time(Model_Param_array)
# Plotting
if Plot_flag > 0:
if Model_Param_array.any():
logger.info("Plotting results")
import Mmax_plot # Import locally to ensure Mmax_plot is required only when Plot_flag > 0
Mmax_plot.Plot_feature(Model_Param_array, Output_dict)
else:
logger.info("Model_Param_array is empty or not enough values to plot. Check 'csv' file.")
# Saving
logger.info("Saving results")
# Save models and parameters in
def OneLineHeader(loc):
l = Output_dict['Type'][loc]
if Output_dict['b_method'][loc]:
if Output_dict['label'][loc] != 'b_value' and Output_dict['label'][loc] != 'b_std_err' and Output_dict['label'][loc] != 'M_std_err':
l += '_'+Output_dict['label'][loc]
else:
l = Output_dict['label'][loc]
if Output_dict['cl'][loc]:
l += '(b_method: '+Output_dict['b_method'][loc]+', q='+str(Output_dict['cl'][loc])+')'
else:
l += '(b_method: '+Output_dict['b_method'][loc]+')'
elif Output_dict['label'][loc]:
l += '('+ Output_dict['label'][loc] +')'
return l
db_df = pd.DataFrame(data = Model_Param_array,
columns = [OneLineHeader(i) for i in range(len(Output_dict['Type']))])
db_df = db_df.round(4)
# db_df.to_excel(cwd+'/Results/'+Full_feat_name+'.xlsx')
db_df.to_csv('DATA_Mmax_param.csv', sep=';', index=False)
# np.savez_compressed(cwd+'/Results/'+Full_feat_name+'.npz', Model_Param_array = Model_Param_array) # change the name!
if __name__=='__main__':
# Input_catalog = 'Data/Cooper_Basin_Catalog_HAB_1_2003_Reprocessed.mat'
# Input_injection_rate = 'Data/Cooper_Basin_HAB_1_2003_Injection_Rate.mat'
# Model_index = 4
# b_value_type = 'TGR'
main(Input_catalog, Input_injection_rate, time_win_in_hours, time_step_in_hour, time_win_type,
End_time, ev_limit, Inpar, time_inj, time_shut_in, time_big_ev, Model_index,
Mc, Mu, G, ssd, C, b_value_type, cl, mag_name)

View File

@ -1,206 +0,0 @@
import matplotlib.pyplot as plt
import numpy as np
def Plot_feature(Model_Param_array,
Output_dict,
End_time=None,
time_inj=None,
time_shut_in=None,
time_big_ev=None,
Model_name="",
logger=None):
"""
Plotting function extracted from Mmax.py for plotting models and parameters.
Parameters
----------
Model_Param_array : np.ndarray
Computed matrix of model parameters as rows in time.
Output_dict : dict
Dictionary describing each column in Model_Param_array.
End_time : float, optional
The last time to show in the X-axis (days), if desired.
time_inj : float, optional
Time of injection start (days), if you want a vertical line.
time_shut_in : float, optional
Time of shut-in (days), if you want a vertical line.
time_big_ev : float, optional
Time of large event (days), if you want a vertical line.
Model_name : str, optional
Model name used for the plot title.
logger : logging.Logger, optional
Logger for printing info messages. If None, no logging happens.
"""
myVars = locals()
# Function for findin min-max of all similar parameters
def Extermom4All(Model_Param_array, itr_loc):
Mat1D = np.reshape(Model_Param_array[:,itr_loc], -1)
NotNone = np.isfinite(Mat1D)
if np.min(Mat1D[NotNone])>0:
return [np.min(Mat1D[NotNone])*0.95, np.max(Mat1D[NotNone])*1.05]
elif np.min(Mat1D[NotNone])<0 and np.max(Mat1D[NotNone])>0:
return [np.min(Mat1D[NotNone])*1.05, np.max(Mat1D[NotNone])*1.05]
elif np.max(Mat1D[NotNone])<0:
return [np.min(Mat1D[NotNone])*1.05, np.max(Mat1D[NotNone])*0.95]
# Function for setting relevant lagends in the plot
def Legend_label(loc):
l = Output_dict_c['label'][loc]
if Output_dict_c['b_method'][loc]:
if Output_dict_c['cl'][loc]:
l+='('+Output_dict_c['b_method'][loc]+', cl='+str(Output_dict_c['cl'][loc])+')'
else:
l+='('+Output_dict_c['b_method'][loc]+')'
return l
c_NotNone = [] # Removing all parameters with None or constant value
for i in range(Model_Param_array.shape[1]):
NotNone = np.isfinite(Model_Param_array[:,i])
Eq_value = np.mean(Model_Param_array[:,i])
if any(NotNone) and Eq_value != Model_Param_array[0,i]:
c_NotNone.append(i)
else:
logger.info(f"No-PLOT: All values of {Output_dict['Type'][i]} are {Model_Param_array[0,i]}!")
if len(c_NotNone) > 1:
Model_Param_array = Model_Param_array[:,c_NotNone]
# New output dictionary based on valid parameters for plotting
Output_dict_c = {'Type':[], 'label':[], 'b_method':[], 'cl':[]}
for i in range(len(c_NotNone)):
Output_dict_c['Type'].append(Output_dict['Type'][c_NotNone[i]])
Output_dict_c['label'].append(Output_dict['label'][c_NotNone[i]])
Output_dict_c['b_method'].append(Output_dict['b_method'][c_NotNone[i]])
Output_dict_c['cl'].append(Output_dict['cl'][c_NotNone[i]])
coloring=['blue','g','r','c','m','y',
'brown', 'darkolivegreen', 'teal', 'steelblue', 'slateblue',
'purple', 'darksalmon', '#c5b0d5', '#c49c94',
'#e377c2', '#f7b6d2', '#7f7f7f', '#c7c7c7', '#bcbd22', '#dbdb8d',
'#17becf', '#9edae5',
'brown', 'darkolivegreen', 'teal', 'steelblue', 'slateblue',]
# All parameters to be plotted
All_vars = Output_dict_c['Type'][2:]
Uniqe_var = list(dict.fromkeys([s for s in All_vars if 'Standard Error' not in s])) #list(set(All_vars))
# defining handels and labels to make final legend
All_handels = ['p0']
for i in range(1,len(All_vars)):
All_handels.append('p'+str(i))
handels = []
labels = []
# itr_loc: location of paramteres with similar type
itr_loc = np.where(np.array(All_vars) == Uniqe_var[0])[0]+2
fig, myVars[Output_dict_c['Type'][0]] = plt.subplots(1,1,figsize=(8+int(len(All_vars)/3),6))
fig.subplots_adjust(right=1-len(Uniqe_var)*0.09)
if Output_dict_c['label'][itr_loc[0]] == 'True Max-Mag': # plot with dash-line
myVars[All_handels[itr_loc[0]-2]], = myVars[Output_dict_c['Type'][0]].plot(Model_Param_array[:,1]/24/3600, Model_Param_array[:,itr_loc[0]], c= 'k', ls='--', lw = 2)
else:
myVars[All_handels[itr_loc[0]-2]], = myVars[Output_dict_c['Type'][0]].plot(Model_Param_array[:,1]/24/3600, Model_Param_array[:,itr_loc[0]], c= coloring[itr_loc[0]])
handels.append(All_handels[itr_loc[0]-2])
labels.append(Legend_label(itr_loc[0]))
myVars[Output_dict_c['Type'][0]].set_ylabel(Output_dict_c['Type'][itr_loc[0]])
myVars[Output_dict_c['Type'][0]].set_ylim(Extermom4All(Model_Param_array, itr_loc)[0], Extermom4All(Model_Param_array, itr_loc)[1])
myVars[Output_dict_c['Type'][0]].set_xlabel('Day (From start of the recording)')
if End_time:
myVars[Output_dict_c['Type'][0]].set_xlim(0,End_time)
# Plotting statndard error (if exists)
if itr_loc[0]+1 < len(Output_dict_c['Type']) and Output_dict_c['Type'][itr_loc[0]+1] == 'Standard Error':
myVars[Output_dict_c['Type'][0]].fill_between(Model_Param_array[:,1]/24/3600,
Model_Param_array[:,itr_loc[0]] - Model_Param_array[:,itr_loc[0]+1],
Model_Param_array[:,itr_loc[0]] + Model_Param_array[:,itr_loc[0]+1], color= coloring[itr_loc[0]], alpha=0.1)
# Plotting similar parameters on one axis
for j in range(1,len(itr_loc)):
if Output_dict_c['label'][itr_loc[j]] == 'True Max-Mag': # plot with dash-line
myVars[All_handels[itr_loc[j]-2]], = myVars[Output_dict_c['Type'][0]].plot(Model_Param_array[:,1]/24/3600, Model_Param_array[:,itr_loc[j]], c= 'k', ls='--', lw = 2)
else:
myVars[All_handels[itr_loc[j]-2]], = myVars[Output_dict_c['Type'][0]].plot(Model_Param_array[:,1]/24/3600, Model_Param_array[:,itr_loc[j]], c= coloring[itr_loc[j]])
handels.append(All_handels[itr_loc[j]-2])
labels.append(Legend_label(itr_loc[j]))
# Plotting statndard error (if exists)
if itr_loc[0]+1 < len(Output_dict_c['Type']) and Output_dict_c['Type'][itr_loc[j]+1] == 'Standard Error':
myVars[Output_dict_c['Type'][0]].fill_between(Model_Param_array[:,1]/24/3600,
Model_Param_array[:,itr_loc[j]] - Model_Param_array[:,itr_loc[j]+1],
Model_Param_array[:,itr_loc[j]] + Model_Param_array[:,itr_loc[j]+1], color= coloring[itr_loc[j]], alpha=0.1)
first_itr = 0
# Check if there is any more parameter to be plotted in second axes
# The procedure is similar to last plots.
if len(Uniqe_var) > 1:
for i in range(1,len(Uniqe_var)):
itr_loc = np.where(np.array(All_vars) == Uniqe_var[i])[0]+2
myVars[Uniqe_var[i]] = myVars[Output_dict_c['Type'][0]].twinx()
# if it is third or more axis, make a distance between them
if first_itr == 0:
first_itr += 1
set_right = 1
else:
set_right = 1 + first_itr*0.2
first_itr += 1
myVars[Uniqe_var[i]].spines.right.set_position(("axes", set_right))
if Output_dict_c['label'][itr_loc[0]] == 'True Max-Mag': # plot with dash-line
myVars[All_handels[itr_loc[0]-2]], = myVars[Uniqe_var[i]].plot(Model_Param_array[:,1]/24/3600, Model_Param_array[:,itr_loc[0]], c= 'k', ls='--', lw = 2)
else:
myVars[All_handels[itr_loc[0]-2]], = myVars[Uniqe_var[i]].plot(Model_Param_array[:,1]/24/3600, Model_Param_array[:,itr_loc[0]], c= coloring[itr_loc[0]])
handels.append(All_handels[itr_loc[0]-2])
labels.append(Legend_label(itr_loc[0]))
myVars[Uniqe_var[i]].set_ylabel(Output_dict_c['Type'][itr_loc[0]])
myVars[Uniqe_var[i]].yaxis.label.set_color(coloring[itr_loc[0]])
myVars[Uniqe_var[i]].spines["right"].set_edgecolor(coloring[itr_loc[0]])
myVars[Uniqe_var[i]].tick_params(axis='y', colors= coloring[itr_loc[0]])
myVars[Uniqe_var[i]].set_ylim(Extermom4All(Model_Param_array, itr_loc)[0], Extermom4All(Model_Param_array, itr_loc)[1])
if itr_loc[0]+1 < len(Output_dict_c['Type']) and Output_dict_c['Type'][itr_loc[0]+1] == 'Standard Error':
myVars[Uniqe_var[i]].fill_between(Model_Param_array[:,1]/24/3600,
Model_Param_array[:,itr_loc[0]] - Model_Param_array[:,itr_loc[0]+1],
Model_Param_array[:,itr_loc[0]] + Model_Param_array[:,itr_loc[0]+1], color= coloring[itr_loc[0]], alpha=0.1)
for j in range(1,len(itr_loc)):
if Output_dict_c['label'][itr_loc[j]] == 'True Max-Mag': # plot with dash-line
myVars[All_handels[itr_loc[j]-2]], = myVars[Uniqe_var[i]].plot(Model_Param_array[:,1]/24/3600, Model_Param_array[:,itr_loc[j]], c= 'k', ls = '--', lw = 2)
else:
myVars[All_handels[itr_loc[j]-2]], = myVars[Uniqe_var[i]].plot(Model_Param_array[:,1]/24/3600, Model_Param_array[:,itr_loc[j]], c= coloring[itr_loc[j]])
handels.append(All_handels[itr_loc[j]-2])
labels.append(Legend_label(itr_loc[j]))
if itr_loc[j]+1 < len(Output_dict_c['Type']) and Output_dict_c['Type'][itr_loc[j]+1] == 'Standard Error':
myVars[Uniqe_var[i]].fill_between(Model_Param_array[:,1]/24/3600,
Model_Param_array[:,itr_loc[j]] - Model_Param_array[:,itr_loc[j]+1],
Model_Param_array[:,itr_loc[j]] + Model_Param_array[:,itr_loc[j]+1], color= coloring[itr_loc[j]], alpha=0.1)
# If there are timing, plot them as vertical lines
if time_inj:
myVars['l1'], = plt.plot([time_inj,time_inj], [Extermom4All(Model_Param_array, itr_loc)[0],Extermom4All(Model_Param_array, itr_loc)[1]], ls='--', c='k')
handels.append('l1')
labels.append('Start-inj')
if time_shut_in:
myVars['l2'], = plt.plot([time_shut_in,time_shut_in], [Extermom4All(Model_Param_array, itr_loc)[0],Extermom4All(Model_Param_array, itr_loc)[1]], ls='-.', c='k')
handels.append('l2')
labels.append('Shut-in')
if time_big_ev:
myVars['l3'], = plt.plot([time_big_ev,time_big_ev], [Extermom4All(Model_Param_array, itr_loc)[0],Extermom4All(Model_Param_array, itr_loc)[1]], ls='dotted', c='k')
handels.append('l3')
labels.append('Large-Ev')
box = myVars[Output_dict_c['Type'][0]].get_position()
if len(handels) < 6:
myVars[Output_dict_c['Type'][0]].set_position([box.x0, box.y0 + box.height * 0.1,
box.width, box.height * 0.9])
plt.legend([myVars[ii] for ii in handels], labels, loc='upper center',
bbox_to_anchor=(0.5+0.06*first_itr, -0.15), fancybox=True, shadow=True, ncol=len(handels))
elif len(handels) < 13:
myVars[Output_dict_c['Type'][0]].set_position([box.x0, box.y0 + box.height * 0.04*int(len(handels)/2),
box.width, box.height * (1 - 0.04*int(len(handels)/2))])
plt.legend([myVars[ii] for ii in handels], labels, loc='upper center',
bbox_to_anchor=(0.5+0.1*first_itr, -0.04*int(len(handels)/2)), fancybox=True, shadow=True, ncol=int(len(handels)/2)+1, handleheight=2)
else:
myVars[Output_dict_c['Type'][0]].set_position([box.x0, box.y0 + box.height * 0.04*int(len(handels)/2),
box.width, box.height * (1 - 0.04*int(len(handels)/2))])
plt.legend([myVars[ii] for ii in handels], labels, loc='upper center',
bbox_to_anchor=(0.6+0.1*first_itr, -0.04*int(len(handels)/2)), fancybox=True, shadow=True, ncol=int(len(handels)/2)+1, handleheight=2)
plt.title(Model_name)
# plt.savefig(cwd+'/Results/'+Model_name+'.png', dpi=300)
plt.savefig('PLOT_Mmax_param.png', dpi=300)
# plt.show()

582
src/SpectralAnalysis.py Normal file
View File

@ -0,0 +1,582 @@
import copy
import csv
from typing import List, Optional
from json_writer import JsonWriter
from scipy import integrate
from scipy.stats import zscore
import numpy as np
from obspy import Stream, UTCDateTime
from obspy.core.event import Event, Pick
from obspy.geodetics import gps2dist_azimuth
from tau_p_raytracing import TauPRayTracer
from sp_jk import sp_jk
from opt_algorithms import OptNelderMead
from SpectralParameters import SpectralParams, EventSpectralParams
from amplitude_spectra import calculate_amp_spectra
from spectral_definitions import (K_MADARIAGA_P, K_MADARIAGA_S, K_BRUNE, G_P, G_S, spectrum2moment, calc_stress_drop,
calc_source_size, mm)
class SpectralAnalysis:
"""
Application for spectral analysis for P- or S-waves. It uses JK integrals and spectral fitting with simplex
optimization algorithm.
JK integrals method returns seismic moment, moment magnitude and corner frequency.
Spectral fitting uses these results as input. It returns seismic moment, moment magnitude and corner frequency
and additionally Q parameter (i.e. quality=1/dumping). In fitting procedure theoretical spectrum
(Brune or Boatwright corrected for Q) is matched to calculated amplitude spectrum of seismic signal.
These methods are used for each station separately and finally mean values are calculated. Outliers (estimated with
z-sqore method) are not used for mean value calculations.
Application enables:
- 1D velocity model for travel time calculations. It uses tau_p raytracer from ObsPy
however for spectral parameters estimation it uses constant parameters i.e. constant S wave velocity and
constant gp, gs and density.
- constant velocities for travel time calculations in case of missing 1D velocity model.
Features of the algorithm:
1. Amplitude spectra are calculated as geometric mean of all traces (e.g. three channels) of seismic station.
2. Application requires P wave pick for P-waves analysis and P or S- wave pick for S wave analysis
(it calculates theoretical S wave pick in case it is not available).
3. Applications calculates signal-to-noise spectral ratio and removes either station (based on mean ratio)
or particular frequencies (based on S/N ratio for this frequency).
"""
def __init__(self, stream: Stream, event: Event, inventory, config, logger):
self.stream = stream
self.event = event
self.inventory = inventory
self.config = config
self.logger = logger
self.raytracer = self._initialize_raytracer()
self.stations = self._get_unique_stations()
self.solutions_jk_P = []
self.solutions_sp_fit_P = []
self.solutions_jk_S = []
self.solutions_sp_fit_S = []
# filenames
self.P_WAVE_SP_FITTING_RESULTS_JSON = config.get("P_WAVE_SP_FITTING_RESULTS_JSON")
self.S_WAVE_SP_FITTING_RESULTS_JSON = config.get("S_WAVE_SP_FITTING_RESULTS_JSON")
self.P_WAVE_JK_RESULTS = config.get("P_WAVE_JK_RESULTS")
self.S_WAVE_JK_RESULTS = config.get("S_WAVE_JK_RESULTS")
self.P_WAVE_SP_FITTING_RESULTS = config.get("P_WAVE_SP_FITTING_RESULTS")
self.S_WAVE_SP_FITTING_RESULTS = config.get("S_WAVE_SP_FITTING_RESULTS")
self.EVENT_RESULTS = config.get("EVENT_RESULTS")
def _initialize_raytracer(self) -> Optional[TauPRayTracer]:
if self.config.get("raytrace"):
return TauPRayTracer(self.config.get("velocity_mat_file"))
return None
def run(self):
for station in self.stations:
self.logger.info(80 * "#")
self.logger.info(f"Running spectral analysis for station: {station}")
# Selecting station data
station_stream = self.stream.select(station=station)
if len(station_stream) > 3:
self.logger.warning(f"{station} has more than 3 waveform channels loaded. Picks at each station must be on data from the same sensor. Skipping station {station}...")
continue
try:
station_inv = self.inventory.select(station=station, time=station_stream[0].stats.starttime)[0][0]
except IndexError:
self.logger.warning(f"{station} not in inventory")
continue
# Stream preprocessing
station_stream = self._preprocess_stream(station_stream)
if station_stream:
hor_distance, ver_distance = self._calculate_distance(station_inv)
p_travel_time, s_travel_time = self._calculate_travel_times(horizontal_distance=hor_distance,
vertical_distance=ver_distance,
station_inv=station_inv)
pick_p = self._find_pick(station, self.config.get("p_phase_hint"))
pick_s = self._find_pick(station, self.config.get("s_phase_hint"))
if pick_p:
delta_ps = s_travel_time - p_travel_time
if self.config.get("P_wave_analysis"):
self._perform_p_wave_analysis(station_stream=station_stream, pick_p=pick_p, delta_ps=delta_ps,
p_travel_time=p_travel_time, distance=hor_distance)
self.logger.info(80 * "-")
if self.config.get("S_wave_analysis"):
self._perform_s_wave_analysis(station_stream=station_stream, pick_s=pick_s, pick_p=pick_p,
delta_ps=delta_ps, s_travel_time=s_travel_time,
distance=hor_distance)
self.logger.info(80 * "-")
self._save_results_to_json()
self._save_results_to_csv(filename=self.EVENT_RESULTS)
self._save_results_to_station_csvs()
def _save_results_to_json(self):
# saving solutions to JSON file
solutions_data = [
(self.solutions_sp_fit_P, self.P_WAVE_SP_FITTING_RESULTS_JSON),
(self.solutions_sp_fit_S, self.S_WAVE_SP_FITTING_RESULTS_JSON)
]
for solutions, filename in solutions_data:
if len(solutions)>0: #create a file only if there are results to write
JsonWriter(solutions=solutions, filename=filename, logger=self.logger).save()
else:
self.logger.info(f"No output; nothing to write to {filename}")
def _save_results_to_station_csvs(self):
# Zdefiniowanie danych dla poszczególnych plików
solutions_data = [
(self.solutions_jk_P, self.P_WAVE_JK_RESULTS),
(self.solutions_jk_S, self.S_WAVE_JK_RESULTS),
(self.solutions_sp_fit_P, self.P_WAVE_SP_FITTING_RESULTS),
(self.solutions_sp_fit_S, self.S_WAVE_SP_FITTING_RESULTS)
]
# Dla każdego zestawu wyników zapisujemy osobny plik CSV
for solutions, filename in solutions_data:
# Otwieranie pliku CSV do zapisu
if len(solutions)>0: #create a file only if there are results to write
with open(filename, mode='w', newline='') as csv_file:
writer = csv.writer(csv_file)
# Zapis nagłówków
headers = ["station_name", "mo", "fo", "q", "mw", "source_size", "stress_drop"]
writer.writerow(headers)
# Zapis danych dla każdej stacji
for solution in solutions:
station_name = solution[0] # Zakładam, że nazwa stacji jest w solution[0]
parameters = solution[1] # Zakładam, że parametry są w solution[1]
# Tworzymy wiersz z wartościami parametrów
row = [
station_name,
parameters.mo,
parameters.fo,
parameters.q,
parameters.mw,
parameters.source_size,
parameters.stress_drop
]
# Zapisujemy wiersz do pliku CSV
writer.writerow(row)
else:
self.logger.info(f"No output; nothing to write to {filename}")
def _save_results_to_csv(self, filename):
# Nagłówki kolumn
headers = ["Method", "M0", "E M0", "F0", "E F0", "Q", "E Q", "Mw", "r", "E r", "Stress drop", "E stress drop"]
# Zbieranie danych z czterech źródeł
solutions_jk_P = self._return_event_spec_params(solutions=self.solutions_jk_P)
solutions_jk_S = self._return_event_spec_params(solutions=self.solutions_jk_S)
solutions_sp_fit_P = self._return_event_spec_params(solutions=self.solutions_sp_fit_P)
solutions_sp_fit_S = self._return_event_spec_params(solutions=self.solutions_sp_fit_S)
# Checking for None values
if solutions_jk_P:
data_jk_P = [
"P-waves, JK integrals", solutions_jk_P.mo, solutions_jk_P.mo_e, solutions_jk_P.fo, solutions_jk_P.fo_e,
None, None, solutions_jk_P.mw, solutions_jk_P.source_size,
solutions_jk_P.source_size_e, solutions_jk_P.stress_drop, solutions_jk_P.stress_drop_e
]
else:
data_jk_P = [
"P-waves, JK integrals", None, None, None, None,
None, None, None, None,
None, None, None
]
if solutions_jk_S:
data_jk_S = [
"S-waves, JK integrals", solutions_jk_S.mo, solutions_jk_S.mo_e, solutions_jk_S.fo, solutions_jk_S.fo_e,
None, None, solutions_jk_S.mw, solutions_jk_S.source_size,
solutions_jk_S.source_size_e, solutions_jk_S.stress_drop, solutions_jk_S.stress_drop_e
]
else:
data_jk_S = [
"S-waves, JK integrals", None, None, None, None,
None, None, None, None,
None, None, None
]
if solutions_sp_fit_P:
data_sp_fit_P = [
"P-waves, spectrum fitting", solutions_sp_fit_P.mo, solutions_sp_fit_P.mo_e, solutions_sp_fit_P.fo, solutions_sp_fit_P.fo_e,
solutions_sp_fit_P.q, solutions_sp_fit_P.q_e, solutions_sp_fit_P.mw, solutions_sp_fit_P.source_size,
solutions_sp_fit_P.source_size_e, solutions_sp_fit_P.stress_drop, solutions_sp_fit_P.stress_drop_e
]
else:
data_sp_fit_P = [
"P-waves, spectrum fitting", None, None, None, None,
None, None, None, None,
None, None, None
]
if solutions_sp_fit_S:
data_sp_fit_S = [
"S-waves, spectrum fitting", solutions_sp_fit_S.mo, solutions_sp_fit_S.mo_e, solutions_sp_fit_S.fo, solutions_sp_fit_S.fo_e,
solutions_sp_fit_S.q, solutions_sp_fit_S.q_e, solutions_sp_fit_S.mw, solutions_sp_fit_S.source_size,
solutions_sp_fit_S.source_size_e, solutions_sp_fit_S.stress_drop, solutions_sp_fit_S.stress_drop_e
]
else:
data_sp_fit_S = [
"S-waves, spectrum fitting", None, None, None, None,
None, None, None, None,
None, None, None
]
data = [data_jk_P, data_jk_S,data_sp_fit_P, data_sp_fit_S]
# Zapis do pliku CSV
with open(filename, mode='w', newline='') as file:
writer = csv.writer(file)
# Zapis nagłówków
writer.writerow(headers)
# Zapis danych
writer.writerows(data)
def _write_event_parameters_to_file(self):
# Define the content to be written to the file
content = (
f"{20 * '#'} RESULTS {20 * '#'}\n"
f"{80 * '-'}\n"
"Event parameters P_jk:\n"
f"{self._return_event_spec_params(solutions=self.solutions_jk_P)}\n"
"Event parameters P_sp_fit:\n"
f"{self._return_event_spec_params(solutions=self.solutions_sp_fit_P)}\n"
"Event parameters S_jk:\n"
f"{self._return_event_spec_params(solutions=self.solutions_jk_S)}\n"
"Event parameters S_sp_fit:\n"
f"{self._return_event_spec_params(solutions=self.solutions_sp_fit_S)}\n"
f"{80 * '-'}\n"
)
# Write the content to the file
with open(self.EVENT_RESULTS, 'w') as file:
file.write(content)
def _get_unique_stations(self) -> List[str]:
return sorted(set([tr.stats.station for tr in self.stream]))
def _preprocess_stream(self, stream: Stream) -> Stream:
stream.detrend("linear")
try:
stream.remove_response(inventory=self.inventory, output="DISP")
except Exception as e:
self.logger.warning(f"{e} No instrument correction - applied integration to obtain displacement signal")
stream.integrate(method="cumtrapz")
return stream.filter(
type="bandpass",
freqmin=self.config.get("freq_min"),
freqmax=self.config.get("freq_max"),
corners=2,
zerophase=True
)
def _calculate_distance(self, station_inv) -> list[float, float]:
try:
horizontal_distance = gps2dist_azimuth(
self.config.get("latitude"), self.config.get("longitude"), station_inv.latitude,
station_inv.longitude
)[0]
if self.config.get("allow_station_elev"):
receiver_depth_in_m = station_inv.elevation
else:
receiver_depth_in_m = 0
vertical_distance = receiver_depth_in_m + self.config.get("depth")
return horizontal_distance, vertical_distance
except AttributeError as err:
(self.logger.error
("No preferred origin in quakeML file. Cannot calculate distance between station and origin"))
raise AttributeError
def _calculate_travel_times(self, horizontal_distance: float, vertical_distance, station_inv) -> tuple:
if not self.raytracer:
vp = self.config.get("vp")
vs = self.config.get("vs")
distance = (horizontal_distance**2+vertical_distance**2)**0.5
p_travel_time = distance / vp
s_travel_time = distance / vs
else:
if self.config.get("allow_station_elev"):
receiver_depth_in_km = station_inv.elevation/1000
else:
receiver_depth_in_km = 0
try:
p_travel_time = self.raytracer.calculate_arrival(horizontal_distance / 1000,
self.config.get("depth") / 1000,
receiver_depth_in_km, phase="p").time
s_travel_time = self.raytracer.calculate_arrival(horizontal_distance / 1000,
self.config.get("depth") / 1000,
receiver_depth_in_km, phase="s").time
except IndexError:
raise Exception("Problem with velocity file. ")
return p_travel_time, s_travel_time
def _find_pick(self, station: str, phase_hint:list) -> Optional[Pick]:
for p in self.event.picks:
if p.waveform_id['station_code'] == station and p.phase_hint in phase_hint:
return p
return None
def _perform_p_wave_analysis(self, station_stream: Stream, pick_p: Pick, delta_ps: float, distance: float,
p_travel_time: float):
self.logger.info("P wave spectral analysis")
self.logger.info(station_stream)
if delta_ps < self.config.get("window_len"):
self.logger.warning("P wave window may overlap S waves")
return
signal_window = self._trim_seismogram(station_stream, pick_p.time,
pre_padding=self.config.get("taper_len"),
post_padding=self.config.get("window_len") -
self.config.get("taper_len"))
signal_window = self._window_preprocessing(signal_window)
noise_window = self._trim_seismogram(station_stream, pick_p.time,
pre_padding=2 * self.config.get("taper_len") +
self.config.get("window_len"),
post_padding=-2 * self.config.get("taper_len"))
noise_window = self._window_preprocessing(noise_window)
if len(signal_window) > 0 and len(noise_window) > 0 and pick_p:
self._spectral_fitting(signal_window, noise_window, pick_p, distance, p_travel_time)
else:
self.logger.warning("Not enough data for P wave analysis")
def _perform_s_wave_analysis(self, station_stream: Stream, pick_p: Pick, pick_s: Pick, delta_ps, distance: float,
s_travel_time: float):
self.logger.info("S wave spectral analysis")
self.logger.info(station_stream)
if pick_p:
noise_window = self._trim_seismogram(station_stream, pick_p.time,
pre_padding=2 * self.config.get("taper_len") +
self.config.get("window_len"),
post_padding=-2 * self.config.get("taper_len"))
else:
noise_window = self._trim_seismogram(station_stream, station_stream[0].stats.starttime,
pre_padding=-2 * self.config.get("taper_len"),
post_padding=self.config.get("window_len") +
2 * self.config.get("taper_len"))
if not pick_s and pick_p:
pick_s = copy.copy(pick_p)
pick_s.time = pick_s.time + delta_ps
pick_s.phase_hint = "S"
if pick_s:
signal_window = self._trim_seismogram(station_stream, pick_s.time,
pre_padding=self.config.get("taper_len"),
post_padding=self.config.get("window_len")
- self.config.get("taper_len"))
signal_window = self._window_preprocessing(signal_window)
if len(signal_window) > 0 and len(noise_window) > 0:
self._spectral_fitting(signal_window, noise_window, pick_s, distance, s_travel_time)
@staticmethod
def _trim_seismogram(station_stream: Stream, pick_time: UTCDateTime, pre_padding=0.5, post_padding=2.6):
start_time = pick_time - pre_padding
end_time = pick_time + post_padding
return station_stream.slice(start_time, end_time)
def _window_preprocessing(self, signal_window: Stream) -> Stream:
return signal_window.taper(type=self.config.get("taper_type"), max_percentage=0.5,
max_length=self.config.get("taper_len"))
@staticmethod
def signal2noise(signal_amp_spectra, noise_amp_spectra):
# Calculate signal-to-noise ratio
noise_integral = integrate.trapezoid(noise_amp_spectra[1], x=noise_amp_spectra[0])
signal_integral = integrate.trapezoid(signal_amp_spectra[1], x=signal_amp_spectra[0])
signal2noise_ratio = signal_integral / noise_integral
return signal2noise_ratio
def _spectral_fitting(self, signal_window: Stream, noise_window: Stream, pick: Pick, distance: float,
travel_time: float):
# choosing appropriate K value which is related to the source model and type of seismic waves
source_model = self.config.get("sp_source_model")
if source_model == "Brune":
k = K_BRUNE
elif source_model == "Madariaga":
if pick.phase_hint in self.config.get("p_phase_hint"):
k = K_MADARIAGA_P
elif pick.phase_hint in self.config.get("s_phase_hint"):
k = K_MADARIAGA_S
else:
self.logger.warning(f"{pick.phase_hint} is a wrong value - pick.phase_hint should be P or S")
raise Exception(f"{pick.phase_hint} is a wrong value - pick.phase_hint should be P or S")
else:
self.logger.warning(f"{source_model} is a wrong value - sp_source_model should be Brune or Madariaga")
raise Exception(f"{source_model} is a wrong value - sp_source_model should be Brune or Madariaga")
# S wave velocity value
vs = self.config.get("vs")
# Calculation of amplitude spectra for signal and noise windows
xf, yf = calculate_amp_spectra(station_stream=signal_window)
xfn, yfn = calculate_amp_spectra(station_stream=noise_window)
# Calculation of signal-to-noise ratio
signal2noise_ratio = self.signal2noise([xf, yf], [xfn, yfn])
self.logger.info(f"S/N: {signal2noise_ratio}")
# Stopping the analysis when the ratio is too low
if signal2noise_ratio < self.config["min_energy_ratio"]:
self.logger.warning("Signal to noise ratio is too low")
return
# Setting the weights.
weights = np.ones(len(xf))
# If the noise on particular frequencies exceed the threshold value, the weights are equal to 0.
sn = yf / yfn
weights[sn < self.config.get("min_sn_ratio")] = 0
# Setup initial model from J/K integrals. Initial mo is in displacement units.
spectral_level, fo = sp_jk(xf, yf)
spectral_params_sp_jk = SpectralParams(mo=spectral_level, fo=fo, q=400, stress_drop=0, source_size=0)
# Spectral fitting with Simplex. Initial mo is in displacement units.
opt_sim = OptNelderMead(spectral_params_sp_jk, freq_bins=xf, amplitude_spectrum=yf, weights=weights,
travel_time=travel_time, config=self.config, logger=self.logger)
error_sp_fit, solution_sp_fit = opt_sim.run()
# Recalculation of amplitude spectrum level to seismic moment
# for spectral fitting method
solution_sp_fit.mo = self._spectrum_to_moment(pick_phase=pick.phase_hint, config=self.config,
spectral_level=solution_sp_fit.mo, distance=distance)
solution_sp_fit.mw = mm(solution_sp_fit.mo)
#Calculation of source size and stress drop for spectral fitting method
solution_sp_fit.source_size = calc_source_size(vs, solution_sp_fit.fo, k)
solution_sp_fit.stress_drop = calc_stress_drop(seismic_moment=solution_sp_fit.mo, source_radius=solution_sp_fit.source_size)
# for J-K integrals method
mo = self._spectrum_to_moment(pick_phase=pick.phase_hint, config=self.config,
spectral_level=spectral_level, distance=distance)
source_radius = calc_source_size(vs, fo, k)
stress_drop_value = calc_stress_drop(seismic_moment=mo, source_radius=source_radius)
spectral_params_sp_jk = SpectralParams(mo=mo, fo=fo, q=400, stress_drop=stress_drop_value, source_size=source_radius)
self.logger.info(spectral_params_sp_jk)
self.logger.info(solution_sp_fit)
# Appending solutions to the lists
if pick.phase_hint in self.config["p_phase_hint"]:
self.solutions_jk_P.append([signal_window[0].stats.station,
spectral_params_sp_jk,
None,
None,
None])
self.solutions_sp_fit_P.append([signal_window[0].stats.station,
solution_sp_fit,
opt_sim.f_norm.freq,
opt_sim.f_norm.th_amp_sp_damp,
opt_sim.f_norm.amp_spectrum])
elif pick.phase_hint in self.config["s_phase_hint"]:
self.solutions_jk_S.append([signal_window[0].stats.station,
spectral_params_sp_jk,
None,
None,
None])
self.solutions_sp_fit_S.append([signal_window[0].stats.station,
solution_sp_fit,
opt_sim.f_norm.freq,
opt_sim.f_norm.th_amp_sp_damp,
opt_sim.f_norm.amp_spectrum])
else:
self.logger.warning(f"Solutions not saved. Phase hint {self.config['p_phase_hint']} nor "
f"{self.config['s_phase_hint']} does not coincide "
f"with phase hint {pick.phase_hint} from the catalog")
@staticmethod
def _spectrum_to_moment(pick_phase, config, spectral_level, distance):
# Calculation of spectral amplitude in [Nm * s]
if pick_phase == "P":
v = config.get("vp")
g = G_P
else:
v = config.get("vs")
g = G_S
return spectrum2moment(spectral_level=spectral_level, density=config.get("density"), velocity=v,
distance=distance, g=g)
def _return_event_spec_params(self, solutions):
"""
Function calculates mean parameters for seismic event based on stations' results and removes outliers
:param solutions:
:return:
"""
if not solutions:
return None
def outliers_detection(M0, F0, Q, STRESS_DROP, SOURCE_SIZE, z_threshold):
"""Function calculates z_scores and removes outliers which has absolute z_score above z_threshold"""
# Calculate z-scores
z_scores_M0 = zscore(np.log(M0))
z_scores_F0 = zscore(F0)
z_scores_Q = zscore(Q)
# Identify outliers
outliers = (np.abs(z_scores_M0) > z_threshold) | (np.abs(z_scores_F0) > z_threshold) | (
np.abs(z_scores_Q) > z_threshold)
# Filter out outliers
M0_filtered = M0[~outliers]
F0_filtered = F0[~outliers]
Q_filtered = Q[~outliers]
STRESS_DROP_filtered = STRESS_DROP[~outliers]
SOURCE_SIZE_filtered = SOURCE_SIZE[~outliers]
return M0_filtered, F0_filtered, Q_filtered, STRESS_DROP_filtered, SOURCE_SIZE_filtered
M0 = np.array([solution[1].mo for solution in solutions])
F0 = np.array([solution[1].fo for solution in solutions])
Q = np.array([solution[1].q for solution in solutions])
STRESS_DROP = np.array([solution[1].stress_drop for solution in solutions])
SOURCE_SIZE = np.array([solution[1].source_size for solution in solutions])
M0, F0, Q, STRESS_DROP, SOURCE_SIZE = outliers_detection(M0=M0, F0=F0, Q=Q, SOURCE_SIZE=SOURCE_SIZE, STRESS_DROP=STRESS_DROP,
z_threshold=self.config.get("z_threshold"))
# Calculate mean values of parameters
mo, mo_e = self.mef(M0)
fo, fo_e = self.mef(F0)
q, q_e = self.mef(Q)
stress_drop, stress_drop_e = self.mef(STRESS_DROP)
source_size, source_size_e = self.mef(SOURCE_SIZE)
return EventSpectralParams(mo=mo, fo=fo, q=q, mo_e=mo_e, fo_e=fo_e, q_e=q_e, source_size=source_size, source_size_e=source_size_e,
stress_drop=stress_drop, stress_drop_e=stress_drop_e)
@staticmethod
def mef(x):
"""
Function for calculating average values and standard deviations of source parameters which are log-normally distributed.
Formulas based on article: https://doi.org/10.1016/j.pepi.2003.08.006
"""
x_mean = 10 ** np.mean(np.log10(x))
if len(x) == 1:
return x_mean, -999
x_std = ((1 / (len(x) - 1)) * np.sum((np.log10(x) - np.log10(x_mean)) ** 2)) ** 0.5
x_e = 10 ** x_std
return x_mean, x_e

162
src/SpectralAnalysisApp.py Normal file
View File

@ -0,0 +1,162 @@
from obspy import read, read_events, read_inventory
from SpectralAnalysis import SpectralAnalysis
from base_logger import getDefaultLogger
def count_matching_stations(stream, inventory):
"""
Count how many stations from the given stream are present in the inventory.
Parameters:
- stream: ObsPy Stream object containing seismic data.
- inventory: ObsPy Inventory object containing network and station metadata.
Returns:
- count: Number of stations from the stream found in the inventory.
"""
# Extract unique station codes from the stream
stream_stations = set([trace.stats.station for trace in stream])
# Extract unique station codes from the inventory
inventory_stations = set([station.code for network in inventory for station in network.stations])
# Count how many stations from the stream are in the inventory
count = len(stream_stations.intersection(inventory_stations))
return count, len(stream_stations)
def filter_stream_by_picks(st, qml):
"""
Select from stream only traces that have an associated pick.
Parameters:
- st: ObsPy Stream object containing seismic data.
- qml: ObsPy Catalog object containing phase picks.
Returns:
- st2: ObsPy Stream object containing seismic data.
"""
st2 = read().clear() #create blank stream object
for pick in qml.picks:
net = pick.waveform_id.network_code
sta = pick.waveform_id.station_code
loc = pick.waveform_id.location_code
cha = pick.waveform_id.channel_code[:2] + "*" #wildcard to obtain all 3 components
NSLC = net + '.' + sta + '.' + loc + '.' + cha
st2 = st2 + st.select(id=NSLC) #add trace to new stream
st2.merge() #remove duplicate traces
return st2
def main(waveforms, network_inventory, picks_qml, event_latitude, event_longitude, event_depth, vp, vs, density, taper_type, taper_len,
window_len, freq_min, freq_max, min_sn_ratio=1, min_energy_ratio=1, p_wave_analysis=True, s_wave_analysis=True,
raytracing=True, allow_station_elevations=False,
source_model="Madariaga", sp_fit_model="FBoatwright", norm="L2", z_threshold=6,
q_min=1, q_max=400, velocity_model=None):
"""
Main function for application: SPECTRALPARAMETERS
Arguments:
waveforms: path to input file of type 'seismogram'
Velocity model: path to input file of type 'velocity_model'
network_inventory: path to input file of type 'inventory'
event_qml: path to input file of type 'quakeml_seismogram_picks'
raytracing: parameter of type 'BOOLEAN'
save plots: parameter of type 'BOOLEAN'
Returns:
File(s) named 'Katalog sejsmiczny' of type 'quakeml_seismogram_picks' and format 'XML' from working directory
:param event_qml:
"""
logger = getDefaultLogger(__name__)
config = {
# filenames
"P_WAVE_SP_FITTING_RESULTS_JSON": "P_wave_analysis_sp_fit_solutions.json",
"S_WAVE_SP_FITTING_RESULTS_JSON": "S_wave_analysis_sp_fit_solutions.json",
"P_WAVE_JK_RESULTS": "P_wave_analysis_JK_solutions.csv",
"S_WAVE_JK_RESULTS": "S_wave_analysis_JK_solutions.csv",
"P_WAVE_SP_FITTING_RESULTS": "P_wave_analysis_sp_fit_solutions.csv",
"S_WAVE_SP_FITTING_RESULTS": "S_wave_analysis_sp_fit_solutions.csv",
"EVENT_RESULTS": "results.csv",
"velocity_mat_file": velocity_model,
# app options
"P_wave_analysis": p_wave_analysis,
"S_wave_analysis": s_wave_analysis,
"raytrace": raytracing,
"allow_station_elev": allow_station_elevations,
#event location
"longitude": event_longitude,
"latitude": event_latitude,
"depth": event_depth, # in meters
# phase hints
"p_phase_hint": ["P", "Pg"],
"s_phase_hint": ["S", "Sg"],
# source parameters
"vp": vp,
"vs": vs,
"density": density,
# window parameters
"taper_len": taper_len,
"window_len": window_len,
"taper_type": taper_type,
# filter parameters
"freq_min": freq_min,
"freq_max": freq_max,
# frequency signal-to-noise ratio
"min_sn_ratio": min_sn_ratio,
# if station signal-to-noise ratio
"min_energy_ratio": min_energy_ratio,
# L1 / L2
"norm": norm,
# Spectrum fitting model: FBrune / FBoatwright
"sp_fit_model": sp_fit_model,
# Source model: Madariaga / Brune
"sp_source_model": source_model,
# outliers detection
"z_threshold": z_threshold,
# q - damping limits
"q_min": q_min,
"q_max": q_max,
}
# reading files
stream = read(waveforms)
inv = read_inventory(network_inventory)
qml = read_events(picks_qml).events[0]
if not stream:
msg = "MSEED file is empty"
logger.error(msg)
raise Exception(msg)
if not inv or len(inv) == 0:
msg = "Inventory file is empty"
logger.error(msg)
raise Exception(msg)
else:
stream = filter_stream_by_picks(stream, qml)
if len(stream) == 0:
msg = "No waveform data was found corresponding to the provided picks"
logger.error(msg)
raise Exception(msg)
else:
count, total = count_matching_stations(stream, inv)
logger.info(f"{count} out of {total} stations in stream are in the inventory")
if count == 0:
msg = "No stations from MSEED in the Network Inventory"
logger.error(msg)
raise Exception(msg)
else:
sa = SpectralAnalysis(stream=stream, event=qml, inventory=inv, config=config, logger=logger)
sa.run()

64
src/SpectralParameters.py Normal file
View File

@ -0,0 +1,64 @@
from spectral_definitions import mm
class SpectralParams:
def __init__(self, mo, fo, q, stress_drop=0, source_size=0):
self.mo = mo
self.fo = fo
self.q = q
self.mw = mm(mo)
self.stress_drop = stress_drop
self.source_size = source_size
def __str__(self):
return (f"SpectralAnalysis parameters: M0={self.mo:.2e}, F0={self.fo:.2f}, "
f"Q={self.q:.2f}, Mw={self.mw:.2f}, "
f"stress drop = {self.stress_drop:.2e}, source size = {self.source_size:.0f}")
class EventSpectralParams(SpectralParams):
def __init__(self, mo, fo, q, stress_drop, source_size, mo_e, fo_e, q_e, source_size_e, stress_drop_e):
super().__init__(mo, fo, q, stress_drop, source_size)
self.mo_e = mo_e
self.fo_e = fo_e
self.q_e = q_e
self.mw = mm(mo)
self.source_size_e = source_size_e
self.stress_drop_e = stress_drop_e
self.mo_1, self.mo_2 = self.calculate_uncertainties(self.mo, self.mo_e)
self.fo_1, self.fo_2 = self.calculate_uncertainties(self.fo, self.fo_e)
self.q_1, self.q_2 = self.calculate_uncertainties(self.q, self.q_e)
self.mw_1, self.mw_2 = self.calculate_uncertainties_mw(self.mo, self.mo_e)
self.source_size_1, self.source_size_2 = self.calculate_uncertainties(self.source_size, self.source_size_e)
self.stress_drop_1, self.stress_drop_2 = self.calculate_uncertainties(self.stress_drop, self.stress_drop_e)
def __str__(self):
return f"SpectralAnalysis parameters: " \
f"Seismic moment: {self.mo:.2e} <{self.mo_1:.2e}, {self.mo_2:.2e}> " \
f"Corner frequency: {self.fo:.2f} <{self.fo_1:.2f}, {self.fo_2:.2f}> " \
f"Q factor: {self.q:.2f} <{self.q_1:.2f}, {self.q_2:.2f}> " \
f"Mw: {self.mw:.2f} <{self.mw_1:.2f}, {self.mw_2:.2f}> " \
f"Stress drop: {self.stress_drop:.0f} <{self.stress_drop_1:.0f}, {self.stress_drop_2:.0f}> " \
f"Source size: {self.source_size:.0f} <{self.source_size_1:.0f}, {self.source_size_2:.0f}> "
@staticmethod
def calculate_uncertainties(x, x_e):
if x_e == -999:
unc1 = 0
unc2 = 0
else:
unc1 = x / x_e
unc2 = x * x_e
return unc1, unc2
@staticmethod
def calculate_uncertainties_mw(mo, mo_e):
if mo_e == -999:
unc1 = 0
unc2 = 0
else:
unc1 = mm(mo / mo_e)
unc2 = mm(mo * mo_e)
return unc1, unc2

31
src/amplitude_spectra.py Normal file
View File

@ -0,0 +1,31 @@
from obspy import Stream
from scipy.fft import rfft, rfftfreq
from scipy.interpolate import interp1d
import numpy as np
def calculate_amp_spectra(station_stream: Stream):
"""
Function returns frequencies and corresponding mean amplitude spectra (for all available traces in stream).
All spectra are interpolated
:param station_stream:
:return:
"""
# Fourier's transformation of signal for each trace (channel) in the stream
freq_bins = None
psd = 0
for tr in station_stream:
fft_signal = rfft(tr.data)
freq_bins = rfftfreq(len(tr.data), d=tr.stats.delta)
amp_spectrum = abs(tr.stats.delta * fft_signal) # Multiplication by sampling step
amp_spectrum = amp_spectrum * np.sqrt(2) # Multiplication by square root to calibrate energy
psd += amp_spectrum ** 2 # for calculation of root-mean-square
# Calculation of root-mean-square for all traces in the stream
mean_amp_spectra = psd ** 0.5
# Interpolation
fi = np.logspace(np.log10(freq_bins[1]), np.log10(freq_bins[-1]), len(freq_bins))
interpolation_function = interp1d(freq_bins, mean_amp_spectra, kind='linear', fill_value='extrapolate')
return fi, interpolation_function(fi)

34
src/f_models.py Normal file
View File

@ -0,0 +1,34 @@
from SpectralParameters import SpectralParams
class FModel:
def __init__(self, freq_bins, spectral_parameters: SpectralParams):
self.freq = freq_bins
self.sp_par = spectral_parameters
def fmodel(self):
"""Function return given model"""
return
class FBrune(FModel):
"""
Generate Brune's model source spectrum.
"""
def __init__(self, freq_bins, spectral_parameters: SpectralParams):
super().__init__(freq_bins, spectral_parameters)
def fmodel(self):
return self.sp_par.mo / (1 + (self.freq / self.sp_par.fo) ** 2)
class FBoatwright(FModel):
"""
Generate Boatwright's model source spectrum.
"""
def __init__(self, freq_bins, spectral_parameters: SpectralParams):
super().__init__(freq_bins, spectral_parameters)
def fmodel(self):
return self.sp_par.mo / (1 + (self.freq / self.sp_par.fo) ** 4) ** 0.5

62
src/f_norms.py Normal file
View File

@ -0,0 +1,62 @@
import numpy as np
from f_models import FModel, FBrune, FBoatwright
from SpectralParameters import SpectralParams
from spectral_definitions import damping
class FNormResults:
def __init__(self, misfit, freq_bins, amplitude_spectrum, amp_theor_spec):
self.misfit = misfit
self.freq = freq_bins
self.amplitude_spectrum = amplitude_spectrum
self.amplitude_theoretical_spectrum = amp_theor_spec
def __str__(self):
return f"Misfit: {self.misfit:.2f}"
class FNorm:
def __init__(self, norm, spectral_params: SpectralParams, freq_bins, amplitude_spectrum, weights, travel_time,
source_model: FModel, logger):
self.norm = norm
self.spectral_par = spectral_params
self.freq = freq_bins
self.amp_spectrum = amplitude_spectrum
self.th_amp_sp_damp = None
self.weights = weights
self.travel_time = travel_time
self.f_model = source_model
self.logger = logger
def calculate(self):
self.f_model.sp_par = self.spectral_par
self.f_model.freq = self.freq
# theoretical spectrum model (Brune or Boatwright)
amp_th = self.f_model.fmodel()
# calculation of quality (damping) values for all frequencies)
damping_val = damping(q_factor=self.spectral_par.q, frequencies=self.freq, travel_time=self.travel_time)
if self.norm == "L1":
misfit = self.f_norm_l1(damping_amp=damping_val, amp_th=amp_th)
elif self.norm == "L2":
misfit = self.f_norm_l2(damping_amp=damping_val, amp_th=amp_th)
else:
self.logger.error(f"{self.norm} is a wrong norm. It should be L1 or L2")
return None
# Correcting theoretical model (Brune or Boatwright) for damping (Q factor)
self.th_amp_sp_damp = amp_th / damping_val
return FNormResults(misfit=misfit, freq_bins=self.freq, amplitude_spectrum=self.amp_spectrum,
amp_theor_spec=self.th_amp_sp_damp)
def f_norm_l1(self, damping_amp, amp_th):
amp_residuals = self.weights * np.abs(np.log10(self.amp_spectrum * damping_amp) - np.log10(amp_th))
return np.sum(amp_residuals) / np.size(amp_residuals)
def f_norm_l2(self, damping_amp, amp_th):
amp_residuals = self.weights * (np.log10(self.amp_spectrum * damping_amp) - np.log10(amp_th)) ** 2
return np.sqrt(np.sum(amp_residuals) / amp_residuals.size)

49
src/json_writer.py Normal file
View File

@ -0,0 +1,49 @@
import json
class JsonWriter:
def __init__(self, solutions, filename, logger):
self.solutions = solutions
self.filename = filename
self.data = self.prepare_file(solutions)
self.logger = logger
def save(self):
with open(self.filename, 'w') as f:
json.dump(self.data, f)
def prepare_file(self, solutions):
# Initialize an empty dictionary to hold the data
stations_dict = {}
# Iterate over the solutions list
for solution in solutions:
station_name = solution[0]
parameters = {"mo": solution[1].mo, "fo": solution[1].fo, "q": solution[1].q, "source_size": solution[1].source_size, "stress_drop": solution[1].stress_drop}
freq = solution[2]
amp_th_sp_q = solution[3]
amp_spectrum = solution[4]
# Round the parameters to two decimal places
parameters = {key: round(value, 2) for key, value in parameters.items()}
# Check if the station already exists in the dictionary
if station_name not in stations_dict:
stations_dict[station_name] = {
"parameters": list(),
"frequency": list(),
"fitted_amplitude_spectrum": list(),
"amplitude_spectrum": list()
}
try:
# Append the data to the respective lists
stations_dict[station_name]["parameters"]=parameters
if freq is not None and amp_spectrum is not None and amp_th_sp_q is not None:
stations_dict[station_name]["frequency"].extend(freq)
stations_dict[station_name]["fitted_amplitude_spectrum"].extend(amp_th_sp_q)
stations_dict[station_name]["amplitude_spectrum"].extend(amp_spectrum)
except AttributeError as ae:
self.logger.error(ae)
return stations_dict

View File

@ -1,49 +0,0 @@
# -*- coding: utf-8 -*-
# -----------------
# Copyright © 2024 ACK Cyfronet AGH, Poland.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This work was partially funded by DT-GEO Project.
# -----------------
import sys
import argparse
from Mmax import main as Mmax
def main(argv):
parser = argparse.ArgumentParser()
parser.add_argument("Input_catalog", help="Input catalog: path to input file of type 'catalog'")
parser.add_argument("Input_injection_rate", help="Input injection rate: path to input file of type 'injection_rate'")
parser.add_argument("--time_win_in_hours", help="Time window length (in hours- backward from the current time).", type=int, default=6)
parser.add_argument("--time_step_in_hour", help="Time interval for computation (in hours).", type=int, default=3)
parser.add_argument("--time_win_type", help="Time window type for computation.", type=int, default=0)
parser.add_argument("--End_time", help="End time of the computations (in day).", type=int, default=None)
parser.add_argument("--ev_limit", help="Minimum events number required for model computation.", type=int, default=20)
parser.add_argument("--Model_index", help="Model index: parameter of type 'INTEGER'", type=int)
parser.add_argument("--Mc", help="Completeness magnitude.", type=float, default=0.8)
parser.add_argument("--Mu", help="Friction coefficient.", type=float, default=0.6, required=False)
parser.add_argument("--G", help="Shear modulus of reservoir (in Pa).", type=float, default=35000000000)
parser.add_argument("--ssd", help="Static stress drop (in Pa).", type=float, default=3000000)
parser.add_argument("--C", help="Geometrical constant.", type=float, default=0.95)
parser.add_argument("--b_value_type", help="b-value type: parameter of type 'TEXT'", action='append')
parser.add_argument("--cl", help="Confidence level in van der Elst model.", type=float, action='append')
parser.add_argument("--mag_name", help="Magnitude column name", type=str)
args = parser.parse_args()
Mmax(**vars(args))
return
if __name__ == '__main__':
main(sys.argv)

75
src/opt_algorithms.py Normal file
View File

@ -0,0 +1,75 @@
import scipy.optimize
import numpy as np
from SpectralParameters import SpectralParams
import f_models
from f_norms import FNorm
class OptAlgorithm:
"""Base class for optimization algorithms.
"""
def __init__(self, initial_model: SpectralParams, freq_bins, amplitude_spectrum, weights, travel_time, config,
logger):
self.initial_model = initial_model
self.config = config
self.travel_time = travel_time
self.f_model = getattr(f_models, config.get("sp_fit_model"))(freq_bins=freq_bins,
spectral_parameters=initial_model)
self.f_norm = FNorm(norm=self.config.get("norm"), spectral_params=initial_model, freq_bins=freq_bins,
amplitude_spectrum=amplitude_spectrum, weights=weights, travel_time=travel_time,
source_model=self.f_model, logger=logger)
self.solution = None
self.error = None
self.name = self.__class__.__name__
def run(self):
"""Run the optimization algorithm and return SpectralParams results
:return:
"""
return self.error, self.solution
def __repr__(self):
if self.solution:
output = f"{self.name} results:\n"
output += f" {self.solution.__str__()} \n"
output += f" Error: {self.error:.4f}"
return output
else:
return f"{self.name}: no solution"
class OptNelderMead(OptAlgorithm):
"""
Minimize a function using the downhill simplex algorithm from scipy.optimize.
"""
def __init__(self, initial_model: SpectralParams, freq_bins, amplitude_spectrum, weights, travel_time, config,
logger):
super().__init__(initial_model, freq_bins, amplitude_spectrum, weights, travel_time, config, logger)
self.initial_q = (self.config.get("q_min")+self.config.get("q_max"))/2
def run(self):
def prepare_fun(x):
self.f_norm.spectral_par = SpectralParams(mo=x[0], fo=x[1], q=x[2], )
return self.f_norm.calculate().misfit
# Initial model parameters
x0 = [self.initial_model.mo, self.initial_model.fo, self.initial_q]
# Optimization bounds
bounds = [(None, None), (1 / self.config.get("window_len"), self.config.get("freq_max")),
(self.config.get("q_min"), self.config.get("q_max"))]
# Perform optimization
xopt = scipy.optimize.minimize(method='Nelder-Mead', fun=prepare_fun, x0=np.array(x0), bounds=bounds)
# Store the results
self.solution = SpectralParams(mo=xopt.x[0], fo=xopt.x[1], q=xopt.x[2])
self.error = xopt.fun
return self.error, self.solution

24
src/sp_jk.py Normal file
View File

@ -0,0 +1,24 @@
import numpy as np
def sp_jk(freq_bins, amp_spectra):
# Calculate source parameters using J and K Snoke's integrals
# with correction for the limited frequency band. The routine
# ignores quality factor in calculations (ideally, the spectrum
# should be corrected for attenuation before).
# For each waveform, calculate the J and K integrals and source parameters.
Av = amp_spectra * 2 * np.pi * freq_bins
jf = (2 * np.trapz(Av ** 2, x=freq_bins) + 2 / 3 * (amp_spectra[0] * 2 * np.pi * freq_bins[0]) ** 2 *
freq_bins[0] + 2 * (amp_spectra[-1] * 2 * np.pi * freq_bins[-1]) ** 2 * freq_bins[-1])
kf = (2 * np.trapz(amp_spectra ** 2, x=freq_bins) + 2 * amp_spectra[0] ** 2 * freq_bins[0] +
2 / 3 * amp_spectra[-1] ** 2 * freq_bins[-1])
# Calculation of spectral level and corner frequency
mo = 2 * (kf ** 3 / jf) ** 0.25 # spectral level from Snoke's integrals
fo = np.sqrt(jf / kf) / (2 * np.pi) # corner frequency
return mo, fo

View File

@ -0,0 +1,42 @@
import numpy as np
# constants depending on source models
K_BRUNE = 0.37
K_MADARIAGA_P = 0.32
K_MADARIAGA_S = 0.21
# averages of radiation coefficients
G_P = 0.52
G_S = 0.63
def spectrum2moment(spectral_level, density, velocity, distance, g):
return spectral_level * 4.0 * np.pi * density * velocity ** 3 * distance / g
def damping(q_factor, frequencies, travel_time):
"""Exponential damping"""
return np.exp(np.pi * frequencies * travel_time / q_factor)
def mm(mo):
"""Calculate moment magnitude from the spectral level (Mo)
:return moment magnitude (float):
"""
return (np.log10(mo) - 9.1) / 1.5
def m0(mw):
"""Calculate the spectral level (Mo) from the moment magnitude (Mw)
:return spectral level (float):
"""
return 10 ** (mw * 1.5 + 9.1)
def calc_source_size(s_vel, corner_freq, k):
"""Calculate source radius"""
return k * s_vel / corner_freq
def calc_stress_drop(seismic_moment, source_radius):
"""Calculate stress drop"""
return 7 / 16 * seismic_moment / source_radius ** 3

View File

@ -0,0 +1,90 @@
# -*- coding: utf-8 -*-
# -----------------
# Copyright © 2024 ACK Cyfronet AGH, Poland.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This work was partially funded by DT-GEO Project.
# -----------------
import sys
import argparse
from SpectralAnalysisApp import main as SpectralAnalysisApp
def main(argv):
def str2bool(v):
if v.lower() in ("True", "TRUE", "yes", "true", "t", "y", "1"):
return True
elif v.lower() in ("False", "FALSE", "no", "false", "f", "n", "0"):
return False
else:
raise argparse.ArgumentTypeError("Boolean value expected.")
parser = argparse.ArgumentParser()
parser.add_argument("waveforms", help="Path to input file of type miniseed")
parser.add_argument("network_inventory", help="Path to input file of type inventory")
parser.add_argument("picks_qml", help="Path to input file of type phase_association_detections")
parser.add_argument("--velocity_model", help="Path to input file of type velocity_model", required=False)
parser.add_argument("--event_latitude", help="", type=float, default=4000, required=True)
parser.add_argument("--event_longitude", help="", type=float, default=4000, required=True)
parser.add_argument("--event_depth", help="", type=float, default=4000, required=True)
parser.add_argument("--vp", help="P wave velocity in the source",
type=float, default=4000, required=True)
parser.add_argument("--vs", help="S wave velocity in the source",
type=float, default=2500, required=True)
parser.add_argument("--density", help="Rock density in source",
type=float, default=2700, required=True)
parser.add_argument("--taper_type", help="Type of taper",
type=str, default='hann', required=True)
parser.add_argument("--taper_len", help="Maximum length of the taper",
type=float, default=0.1, required=True)
parser.add_argument("--window_len", help="Length of the time window used for spectrum calculation",
type=float, default=1, required=True)
parser.add_argument("--freq_min", help="Minimum frequency for bandpass filtering",
type=float, default=0.1, required=True)
parser.add_argument("--freq_max", help="Maximum frequency for bandpass filtering",
type=float, default=40, required=True)
parser.add_argument("--min_sn_ratio", help="Minimum signal-to-noise ratio for each frequency - for lower S/N values, a given frequency is not taken into account for spectral fitting",
type=float, default=1, required=True)
parser.add_argument("--min_energy_ratio", help="Minimum spectral signal-to-noise ratio for the entire time window for lower S/N values analysis is not performed",
type=float, default=1, required=True)
parser.add_argument("--p_wave_analysis", help="P wave analysis",
type=str2bool, default=True, required=True)
parser.add_argument("--s_wave_analysis", help="S wave analysis",
type=str2bool, default=True, required=True)
parser.add_argument("--raytracing", help="1D model. If False constant velocity values from the source are used for travel time calculations",
type=str2bool, default=True, required=True)
parser.add_argument("--allow_station_elevations", help="Use station elevations (otherwise set them to zero)",
type=str2bool, default=False, required=True)
parser.add_argument("--source_model", help="Source model. List: Madariaga or Brune",
type=str, default="Madariaga", required=True)
parser.add_argument("--sp_fit_model", help="Spectral fitting model. List: FBrune or FBoatwright",
type=str, default="FBoatwright", required=True)
parser.add_argument("--norm", help="Norm for spectral fitting calculations. List: L1 or L2",
type=str, default="L2", required=True)
parser.add_argument("--z_threshold", help="Z threshold for outliers removal (number of standard deviations)",
type=int, default=3, required=True)
parser.add_argument("--q_min", help="Lower bound of Q for spectral fitting",
type=int, default=1, required=True)
parser.add_argument("--q_max", help="Upper bound of Q for spectral fitting",
type=int, default=400, required=True)
args = parser.parse_args()
SpectralAnalysisApp(**vars(args))
return
if __name__ == '__main__':
main(sys.argv)

89
src/tau_p_raytracing.py Normal file
View File

@ -0,0 +1,89 @@
import os
import logging
import pandas as pd
import scipy.io
from obspy.geodetics import kilometer2degrees
from obspy.taup import TauPyModel
from obspy.taup.taup_create import build_taup_model
from obspy.taup.tau import TauModel
from base_logger import getDefaultLogger
class CustomTauPyModel(TauPyModel):
"""
Custom TauPyModel for filepath modification.
"""
def __init__(self, filepath, model="iasp91", verbose=False, planet_flattening=0.0):
super().__init__(model, verbose, planet_flattening)
self.verbose = verbose
self.planet_flattening = planet_flattening
self.model = self.read_model(filepath)
@staticmethod
def read_model(path):
return TauModel.deserialize(path, cache=None)
class TauPRayTracer:
""" Class for raytracing seismic waves using TauP """
def __init__(self, velocity_mat_file):
self.logger = getDefaultLogger('converter')
self.path = "."
self.velocity_model = self._setup_velocity_model(velocity_mat_file)
def _setup_velocity_model(self, velocity_mat_file_name):
velocity_npz_file_path = f"{velocity_mat_file_name[:-3]}npz"
velocity_npz_file_name = velocity_npz_file_path.split('/')[-1]
nd_file = self._mat2nd(velocity_mat_file_name)
build_taup_model(nd_file, output_folder=f"{self.path}")
os.remove(nd_file)
try:
return CustomTauPyModel(filepath=f"{self.path}/{velocity_npz_file_name}")
except FileNotFoundError as er:
self.logger.warning(er)
@staticmethod
def _mat2nd(velocity_mat_file):
# Load the .mat file
mat_contents = scipy.io.loadmat(velocity_mat_file)
data = mat_contents["d"][0][0]
# Create DataFrame from the lists
data_dict = {
'depth': data[0].T[0],
'vp': data[1].T[0],
'vs': data[2].T[0],
'ro': data[3].T[0],
'qp': data[4].T[0],
'qs': data[5].T[0]
}
df = pd.DataFrame(data_dict)
# Adding two new rows to model file - required by TauP
# Get the last row of the DataFrame
last_row = df.iloc[-1].copy()
# Append the last row to the DataFrame
df = pd.concat([df, pd.DataFrame(last_row).T], ignore_index=True)
last_row['depth'] = 6178.1
# Append the modified last row to the DataFrame
df = pd.concat([df, pd.DataFrame(last_row).T], ignore_index=True)
# Specify the name of the text file to save
nd_filename = f'{velocity_mat_file[:-3]}nd'
# Save the DataFrame to a text file
df.to_csv(nd_filename, sep='\t', index=False, header=False)
return nd_filename
def calculate_arrival(self, distance_in_km, source_depth_in_km, receiver_depth_in_km, phase: str):
phase_list = [phase, phase.swapcase()]
return self.velocity_model.get_travel_times(source_depth_in_km=source_depth_in_km,
distance_in_degree=kilometer2degrees(distance_in_km),
receiver_depth_in_km=receiver_depth_in_km,
phase_list=phase_list
)[0]

View File

@ -1,43 +0,0 @@
import numpy as np
class CandidateEventsTS:
def __init__(self, data, current_time, Mc, time_win, space_win=None):
assert time_win > 0, f"Time windows is {time_win}, which should be a positive number"
self.data = data
self.current_time = current_time
self.Mc = Mc
self.time_win = time_win
self.space_win = space_win
def filter_by_time(self):
indx = np.where((self.data[:, 4] > (self.current_time - self.time_win)) & (self.data[:, 4] <= self.current_time))[0]
if len(indx) > 0:
self.data = self.data[indx, :]
else:
self.data = []
def filter_by_magnitude(self):
if self.Mc:
indx = np.where(self.data[:, 5] > self.Mc)[0]
if len(indx) > 0:
self.data = self.data[indx, :]
else:
self.data = []
def filter_by_space(self):
dist = np.sqrt(np.sum((self.data[:, 1:4] - self.data[-1, 1:4]) ** 2, axis=1))
indx = np.where(dist < self.space_win)[0]
if len(indx) > 0:
self.data = self.data[indx, :]
else:
self.data = []
def filter_data(self):
self.filter_by_time()
if len(self.data) > 0:
self.filter_by_magnitude()
if len(self.data) > 0 and self.space_win:
self.filter_by_space()
return self.data

View File

@ -1,14 +0,0 @@
import numpy as np
def Find_idx4Time(In_mat, t):
# In_mat: time array
# t = target time
In_mat = np.array(In_mat)
t = np.array(t)
if len(np.shape(t)) == 0:
return np.where(abs(In_mat - t) <= min(abs(In_mat - t)))[0][0]
else:
In_mat = In_mat.reshape((len(In_mat), 1))
t = t.reshape((1,len(t)))
target_time = np.matmul(np.ones((len(In_mat),1)), t)
diff_mat = target_time - In_mat
return np.where(abs(diff_mat) <= np.min(abs(diff_mat), axis = 0))

View File

@ -1,217 +0,0 @@
import numpy as np
class M_max_models:
def __init__(self, data = None, f_name = None, time_win = None, space_win = None,
Mc = None, b_method = None, num_bootstraps = None,
G = None, Mu = None,
dv = None, Mo = None, SER = None,
cl = None,
ssd = None, C = None,
):
self.data = data # Candidate data table: 2darray n x m, for n events and m clonums: x, y, z, t, mag
self.f_name = f_name # Feature's name to be calculated, check: def ComputeFeaure(self)
self.time_win = time_win # Time window whihc a feature is computed in
self.space_win = space_win # Space window ...
self.Mc = Mc # Magnitude of completeness for computing b-positive
self.b_method = b_method # list of b_methods
self.num_bootstraps = num_bootstraps # Num of bootstraps for standard error estimation of b-value
self.G = G # Shear modulus
self.Mu = Mu # Friction coefficient
self.dv = dv # Injected fluid
self.SER = SER
self.Mo = Mo # Cumulative moment magnitude
self.cl = cl # Confidence level
self.ssd = ssd # Static stress drop (Shapiro et al. 2013)
self.C = C # Geometrical constant (Shapiro et al. 2013)
def b_value(self, b_flag):
if b_flag == '1':
return 1, None
# maximum-likelihood estimate (MLE) of b (Deemer & Votaw 1955; Aki 1965; Kagan 2002):
elif b_flag == 'b':
X = self.data[np.where(self.data[:,-1]>self.Mc)[0],:]
if X.shape[0] > 0:
b = 1/((np.mean(X[:,-1] - self.Mc))*np.log(10))
std_error = b/np.sqrt(X.shape[0])
else:
raise ValueError("All events in the current time window have a magnitude less than 'completeness magnitude'. Use another value either for 'time window', 'minimum number of events' or 'completeness magnitude'. Also check 'time window type'.")
return b, std_error
# B-positive (van der Elst 2021)
elif b_flag == 'bp':
# Function to perform bootstrap estimation
def bootstrap_estimate(data, num_bootstraps):
estimates = []
for _ in range(num_bootstraps):
# Generate bootstrap sample
bootstrap_sample = np.random.choice(data, size=len(data), replace=True)
# Perform maximum likelihood estimation on bootstrap sample
diff_mat = np.diff(bootstrap_sample)
diff_mat = diff_mat[np.where(diff_mat>0)[0]]
estimate = 1/((np.mean(diff_mat - np.min(diff_mat)))*np.log(10))
estimates.append(estimate)
return np.array(estimates)
diff_mat = np.diff(self.data[:,-1])
diff_mat = diff_mat[np.where(diff_mat>0)[0]]
bp = 1/((np.mean(diff_mat - np.min(diff_mat)))*np.log(10))
bootstrap_estimates = bootstrap_estimate(diff_mat, self.num_bootstraps)
std_error = np.std(bootstrap_estimates, axis=0)
return bp, std_error
# Tapered Gutenberg_Richter (TGR) distribution (Kagan 2002)
elif b_flag == 'TGR':
from scipy.optimize import minimize
# The logarithm of the likelihood function for the TGR distribution (Kagan 2002)
def log_likelihood(params, data):
beta, Mcm = params
n = len(data)
Mt = np.min(data)
l = n*beta*np.log(Mt)+1/Mcm*(n*Mt-np.sum(data))-beta*np.sum(np.log(data))+np.sum(np.log([(beta/data[i]+1/Mcm) for i in range(len(data))]))
return -l
X = self.data[np.where(self.data[:,-1]>self.Mc)[0],:]
M = 10**(1.5*X[:,-1]+9.1)
initial_guess = [0.5, np.max(M)]
bounds = [(0.0, None), (np.max(M), None)]
# Minimize the negative likelihood function for beta and maximum moment
result = minimize(log_likelihood, initial_guess, args=(M,), bounds=bounds, method='L-BFGS-B',
options={'gtol': 1e-12, 'disp': False})
beta_opt, Mcm_opt = result.x
eta = 1/Mcm_opt
S = M/np.min(M)
dldb2 = -np.sum([1/(beta_opt-eta*S[i])**2 for i in range(len(S))])
dldbde = -np.sum([S[i]/(beta_opt-eta*S[i])**2 for i in range(len(S))])
dlde2 = -np.sum([S[i]**2/(beta_opt-eta*S[i])**2 for i in range(len(S))])
std_error_beta = 1/np.sqrt(dldb2*dlde2-dldbde**2)*np.sqrt(-dlde2)
return beta_opt*1.5, std_error_beta*1.5
def McGarr(self):
b_value, b_stderr = self.b_value(self.b_method)
B = 2/3*b_value
if B < 1:
sigma_m = ((1-B)/B)*(2*self.Mu)*(5*self.G)/3*self.dv
Mmax = (np.log10(sigma_m)-9.1)/1.5
if b_stderr:
Mmax_stderr = b_stderr/np.abs(np.log(10)*(1.5*b_value-b_value**2))
else:
Mmax_stderr = None
else:
Mmax = None
Mmax_stderr = None
return b_value, b_stderr, Mmax, Mmax_stderr
def Hallo(self):
b_value, b_stderr = self.b_value(self.b_method)
B = 2/3*b_value
if b_value < 1.5:
sigma_m = self.SER*((1-B)/B)*(2*self.Mu)*(5*self.G)/3*self.dv
Mmax = (np.log10(sigma_m)-9.1)/1.5
if b_stderr:
Mmax_stderr = self.SER*b_stderr/np.abs(np.log(10)*(1.5*b_value-b_value**2))
else:
Mmax_stderr = None
else:
Mmax = None
Mmax_stderr = None
return b_value, b_stderr, Mmax, Mmax_stderr
def Li(self):
sigma_m = self.SER*2*self.G*self.dv - self.Mo
Mmax = (np.log10(sigma_m)-9.1)/1.5
if Mmax < 0:
return None
else:
return Mmax
def van_der_Elst(self):
b_value, b_stderr = self.b_value(self.b_method)
# Seismogenic_Index
X = self.data
si = np.log10(X.shape[0]) - np.log10(self.dv) + b_value*self.Mc
if b_stderr:
si_stderr = self.Mc*b_stderr
else:
si_stderr = None
Mmax = (si + np.log10(self.dv))/b_value - np.log10(X.shape[0]*(1-self.cl**(1/X.shape[0])))/b_value
if b_stderr:
Mmax_stderr = (np.log10(X.shape[0]) + np.log10(X.shape[0]*(1-self.cl**(1/X.shape[0]))))*b_stderr
else:
Mmax_stderr = None
return b_value, b_stderr, si, si_stderr, Mmax, Mmax_stderr
def L_Shapiro(self):
from scipy.stats import chi2
X = self.data[np.isfinite(self.data[:,1]),1:4]
# Parameters
STD = 2.0 # 2 standard deviations
conf = 2 * chi2.cdf(STD, 2) - 1 # covers around 95% of population
scalee = chi2.ppf(conf, 2) # inverse chi-squared with dof=#dimensions
# Center the data
Mu = np.mean(X, axis=0)
X0 = X - Mu
# Covariance matrix
Cov = np.cov(X0, rowvar=False) * scalee
# Eigen decomposition
D, V = np.linalg.eigh(Cov)
order = np.argsort(D)[::-1]
D = D[order]
V = V[:, order]
# Compute radii
VV = V * np.sqrt(D)
R1 = np.sqrt(VV[0, 0]**2 + VV[1, 0]**2 + VV[2, 0]**2)
R2 = np.sqrt(VV[0, 1]**2 + VV[1, 1]**2 + VV[2, 1]**2)
R3 = np.sqrt(VV[0, 2]**2 + VV[1, 2]**2 + VV[2, 2]**2)
L = (1/3*(1/R1**3+1/R2**3+1/R3**3))**(-1/3)
return R1, R2, R3, L
def Shapiro(self):
R1, R2, R3, L = self.L_Shapiro()
Sh_lmax = np.log10((2*R1)**2)+(np.log10(self.ssd)-np.log10(self.C)-9.1)/1.5
Sh_lint = np.log10((2*R2)**2)+(np.log10(self.ssd)-np.log10(self.C)-9.1)/1.5
Sh_lmin = np.log10((2*R3)**2)+(np.log10(self.ssd)-np.log10(self.C)-9.1)/1.5
Sh_lavg = np.log10((2*L)**2)+(np.log10(self.ssd)-np.log10(self.C)-9.1)/1.5
return Sh_lmax, Sh_lint, Sh_lmin, Sh_lavg
# return R1, R2, R3, L, np.log10(R3**2)+(np.log10(self.ssd)-np.log10(self.C)-9.1)/1.5
def All_models(self):
return self.McGarr()[2], self.Hallo()[2], self.Li(), self.van_der_Elst()[-2], self.Shapiro()[-1]
def ComputeModel(self):
if self.f_name == 'max_mcg':
return self.McGarr()
if self.f_name == 'max_hlo':
return self.Hallo()
if self.f_name == 'max_li':
return self.Li()
if self.f_name == 'max_vde':
return self.van_der_Elst()
if self.f_name == 'max_shp':
return self.Shapiro()
if self.f_name == 'max_all':
return self.All_models()