Populated app repository with code and licence
This commit is contained in:
43
util/CandidateEventsTS.py
Normal file
43
util/CandidateEventsTS.py
Normal file
@@ -0,0 +1,43 @@
|
||||
import numpy as np
|
||||
|
||||
class CandidateEventsTS:
|
||||
def __init__(self, data, current_time, Mc, time_win, space_win=None):
|
||||
assert time_win > 0, f"Time windows is {time_win}, which should be a positive number"
|
||||
|
||||
self.data = data
|
||||
self.current_time = current_time
|
||||
self.Mc = Mc
|
||||
self.time_win = time_win
|
||||
self.space_win = space_win
|
||||
|
||||
def filter_by_time(self):
|
||||
indx = np.where((self.data[:, 4] > (self.current_time - self.time_win)) & (self.data[:, 4] <= self.current_time))[0]
|
||||
if len(indx) > 0:
|
||||
self.data = self.data[indx, :]
|
||||
else:
|
||||
self.data = []
|
||||
|
||||
def filter_by_magnitude(self):
|
||||
if self.Mc:
|
||||
indx = np.where(self.data[:, 5] > self.Mc)[0]
|
||||
if len(indx) > 0:
|
||||
self.data = self.data[indx, :]
|
||||
else:
|
||||
self.data = []
|
||||
|
||||
def filter_by_space(self):
|
||||
dist = np.sqrt(np.sum((self.data[:, 1:4] - self.data[-1, 1:4]) ** 2, axis=1))
|
||||
indx = np.where(dist < self.space_win)[0]
|
||||
if len(indx) > 0:
|
||||
self.data = self.data[indx, :]
|
||||
else:
|
||||
self.data = []
|
||||
|
||||
def filter_data(self):
|
||||
self.filter_by_time()
|
||||
if len(self.data) > 0:
|
||||
self.filter_by_magnitude()
|
||||
if len(self.data) > 0 and self.space_win:
|
||||
self.filter_by_space()
|
||||
|
||||
return self.data
|
14
util/Find_idx4Time.py
Normal file
14
util/Find_idx4Time.py
Normal file
@@ -0,0 +1,14 @@
|
||||
import numpy as np
|
||||
def Find_idx4Time(In_mat, t):
|
||||
# In_mat: time array
|
||||
# t = target time
|
||||
In_mat = np.array(In_mat)
|
||||
t = np.array(t)
|
||||
if len(np.shape(t)) == 0:
|
||||
return np.where(abs(In_mat - t) <= min(abs(In_mat - t)))[0][0]
|
||||
else:
|
||||
In_mat = In_mat.reshape((len(In_mat), 1))
|
||||
t = t.reshape((1,len(t)))
|
||||
target_time = np.matmul(np.ones((len(In_mat),1)), t)
|
||||
diff_mat = target_time - In_mat
|
||||
return np.where(abs(diff_mat) <= np.min(abs(diff_mat), axis = 0))
|
216
util/M_max_models.py
Normal file
216
util/M_max_models.py
Normal file
@@ -0,0 +1,216 @@
|
||||
# -----------------
|
||||
# Copyright © 2024 ACK Cyfronet AGH, Poland.
|
||||
# -----------------
|
||||
|
||||
import numpy as np
|
||||
|
||||
class M_max_models:
|
||||
def __init__(self, data = None, f_name = None, time_win = None, space_win = None,
|
||||
Mc = None, b_method = None, num_bootstraps = None,
|
||||
G = None, Mu = None,
|
||||
dv = None, Mo = None, SER = None,
|
||||
cl = None,
|
||||
ssd = None, C = None,
|
||||
):
|
||||
|
||||
self.data = data # Candidate data table: 2darray n x m, for n events and m clonums: x, y, z, t, mag
|
||||
self.f_name = f_name # Feature's name to be calculated, check: def ComputeFeaure(self)
|
||||
self.time_win = time_win # Time window whihc a feature is computed in
|
||||
self.space_win = space_win # Space window ...
|
||||
|
||||
self.Mc = Mc # Magnitude of completeness for computing b-positive
|
||||
self.b_method = b_method # list of b_methods
|
||||
self.num_bootstraps = num_bootstraps # Num of bootstraps for standard error estimation of b-value
|
||||
|
||||
self.G = G # Shear modulus
|
||||
self.Mu = Mu # Friction coefficient
|
||||
self.dv = dv # Injected fluid
|
||||
self.SER = SER
|
||||
|
||||
self.Mo = Mo # Cumulative moment magnitude
|
||||
self.cl = cl # Confidence level
|
||||
|
||||
self.ssd = ssd # Static stress drop (Shapiro et al. 2013)
|
||||
self.C = C # Geometrical constant (Shapiro et al. 2013)
|
||||
|
||||
|
||||
|
||||
|
||||
def b_value(self, b_flag):
|
||||
if b_flag == '1':
|
||||
return 1, None
|
||||
|
||||
# maximum-likelihood estimate (MLE) of b (Deemer & Votaw 1955; Aki 1965; Kagan 2002):
|
||||
elif b_flag == 'b':
|
||||
X = self.data[np.where(self.data[:,-1]>self.Mc)[0],:]
|
||||
if X.shape[0] > 0:
|
||||
b = 1/((np.mean(X[:,-1] - self.Mc))*np.log(10))
|
||||
std_error = b/np.sqrt(X.shape[0])
|
||||
else:
|
||||
raise ValueError("All events in the current time window have a magnitude less than 'completeness magnitude'. Use another value either for 'time window', 'minimum number of events' or 'completeness magnitude'.")
|
||||
return b, std_error
|
||||
|
||||
# B-positive (van der Elst 2021)
|
||||
elif b_flag == 'bp':
|
||||
|
||||
# Function to perform bootstrap estimation
|
||||
def bootstrap_estimate(data, num_bootstraps):
|
||||
estimates = []
|
||||
for _ in range(num_bootstraps):
|
||||
# Generate bootstrap sample
|
||||
bootstrap_sample = np.random.choice(data, size=len(data), replace=True)
|
||||
# Perform maximum likelihood estimation on bootstrap sample
|
||||
diff_mat = np.diff(bootstrap_sample)
|
||||
diff_mat = diff_mat[np.where(diff_mat>0)[0]]
|
||||
estimate = 1/((np.mean(diff_mat - np.min(diff_mat)))*np.log(10))
|
||||
estimates.append(estimate)
|
||||
return np.array(estimates)
|
||||
|
||||
diff_mat = np.diff(self.data[:,-1])
|
||||
diff_mat = diff_mat[np.where(diff_mat>0)[0]]
|
||||
bp = 1/((np.mean(diff_mat - np.min(diff_mat)))*np.log(10))
|
||||
bootstrap_estimates = bootstrap_estimate(diff_mat, self.num_bootstraps)
|
||||
std_error = np.std(bootstrap_estimates, axis=0)
|
||||
return bp, std_error
|
||||
|
||||
# Tapered Gutenberg_Richter (TGR) distribution (Kagan 2002)
|
||||
elif b_flag == 'TGR':
|
||||
from scipy.optimize import minimize
|
||||
|
||||
# The logarithm of the likelihood function for the TGR distribution (Kagan 2002)
|
||||
def log_likelihood(params, data):
|
||||
beta, Mcm = params
|
||||
n = len(data)
|
||||
Mt = np.min(data)
|
||||
l = n*beta*np.log(Mt)+1/Mcm*(n*Mt-np.sum(data))-beta*np.sum(np.log(data))+np.sum(np.log([(beta/data[i]+1/Mcm) for i in range(len(data))]))
|
||||
return -l
|
||||
X = self.data[np.where(self.data[:,-1]>self.Mc)[0],:]
|
||||
M = 10**(1.5*X[:,-1]+9.1)
|
||||
initial_guess = [0.5, np.max(M)]
|
||||
bounds = [(0.0, None), (np.max(M), None)]
|
||||
|
||||
# Minimize the negative likelihood function for beta and maximum moment
|
||||
result = minimize(log_likelihood, initial_guess, args=(M,), bounds=bounds, method='L-BFGS-B',
|
||||
options={'gtol': 1e-12, 'disp': False})
|
||||
beta_opt, Mcm_opt = result.x
|
||||
eta = 1/Mcm_opt
|
||||
S = M/np.min(M)
|
||||
dldb2 = -np.sum([1/(beta_opt-eta*S[i])**2 for i in range(len(S))])
|
||||
dldbde = -np.sum([S[i]/(beta_opt-eta*S[i])**2 for i in range(len(S))])
|
||||
dlde2 = -np.sum([S[i]**2/(beta_opt-eta*S[i])**2 for i in range(len(S))])
|
||||
std_error_beta = 1/np.sqrt(dldb2*dlde2-dldbde**2)*np.sqrt(-dlde2)
|
||||
return beta_opt*1.5, std_error_beta*1.5
|
||||
|
||||
def McGarr(self):
|
||||
b_value, b_stderr = self.b_value(self.b_method)
|
||||
B = 2/3*b_value
|
||||
if B < 1:
|
||||
sigma_m = ((1-B)/B)*(2*self.Mu)*(5*self.G)/3*self.dv
|
||||
Mmax = (np.log10(sigma_m)-9.1)/1.5
|
||||
if b_stderr:
|
||||
Mmax_stderr = b_stderr/np.abs(np.log(10)*(1.5*b_value-b_value**2))
|
||||
else:
|
||||
Mmax_stderr = None
|
||||
else:
|
||||
Mmax = None
|
||||
Mmax_stderr = None
|
||||
|
||||
return b_value, b_stderr, Mmax, Mmax_stderr
|
||||
|
||||
def Hallo(self):
|
||||
b_value, b_stderr = self.b_value(self.b_method)
|
||||
B = 2/3*b_value
|
||||
if b_value < 1.5:
|
||||
sigma_m = self.SER*((1-B)/B)*(2*self.Mu)*(5*self.G)/3*self.dv
|
||||
Mmax = (np.log10(sigma_m)-9.1)/1.5
|
||||
if b_stderr:
|
||||
Mmax_stderr = self.SER*b_stderr/np.abs(np.log(10)*(1.5*b_value-b_value**2))
|
||||
else:
|
||||
Mmax_stderr = None
|
||||
else:
|
||||
Mmax = None
|
||||
Mmax_stderr = None
|
||||
|
||||
return b_value, b_stderr, Mmax, Mmax_stderr
|
||||
|
||||
def Li(self):
|
||||
sigma_m = self.SER*2*self.G*self.dv - self.Mo
|
||||
Mmax = (np.log10(sigma_m)-9.1)/1.5
|
||||
if Mmax < 0:
|
||||
return None
|
||||
else:
|
||||
return Mmax
|
||||
|
||||
def van_der_Elst(self):
|
||||
b_value, b_stderr = self.b_value(self.b_method)
|
||||
# Seismogenic_Index
|
||||
X = self.data
|
||||
si = np.log10(X.shape[0]) - np.log10(self.dv) + b_value*self.Mc
|
||||
if b_stderr:
|
||||
si_stderr = self.Mc*b_stderr
|
||||
else:
|
||||
si_stderr = None
|
||||
|
||||
Mmax = (si + np.log10(self.dv))/b_value - np.log10(X.shape[0]*(1-self.cl**(1/X.shape[0])))/b_value
|
||||
if b_stderr:
|
||||
Mmax_stderr = (np.log10(X.shape[0]) + np.log10(X.shape[0]*(1-self.cl**(1/X.shape[0]))))*b_stderr
|
||||
else:
|
||||
Mmax_stderr = None
|
||||
|
||||
return b_value, b_stderr, si, si_stderr, Mmax, Mmax_stderr
|
||||
|
||||
def L_Shapiro(self):
|
||||
from scipy.stats import chi2
|
||||
|
||||
X = self.data[np.isfinite(self.data[:,1]),1:4]
|
||||
# Parameters
|
||||
STD = 2.0 # 2 standard deviations
|
||||
conf = 2 * chi2.cdf(STD, 2) - 1 # covers around 95% of population
|
||||
scalee = chi2.ppf(conf, 2) # inverse chi-squared with dof=#dimensions
|
||||
|
||||
# Center the data
|
||||
Mu = np.mean(X, axis=0)
|
||||
X0 = X - Mu
|
||||
|
||||
# Covariance matrix
|
||||
Cov = np.cov(X0, rowvar=False) * scalee
|
||||
|
||||
# Eigen decomposition
|
||||
D, V = np.linalg.eigh(Cov)
|
||||
order = np.argsort(D)[::-1]
|
||||
D = D[order]
|
||||
V = V[:, order]
|
||||
|
||||
# Compute radii
|
||||
VV = V * np.sqrt(D)
|
||||
R1 = np.sqrt(VV[0, 0]**2 + VV[1, 0]**2 + VV[2, 0]**2)
|
||||
R2 = np.sqrt(VV[0, 1]**2 + VV[1, 1]**2 + VV[2, 1]**2)
|
||||
R3 = np.sqrt(VV[0, 2]**2 + VV[1, 2]**2 + VV[2, 2]**2)
|
||||
|
||||
L = (1/3*(1/R1**3+1/R2**3+1/R3**3))**(-1/3)
|
||||
|
||||
return R1, R2, R3, L
|
||||
|
||||
def Shapiro(self):
|
||||
R1, R2, R3, L = self.L_Shapiro()
|
||||
|
||||
return R1, R2, R3, L, np.log10(R3**2)+(np.log10(self.ssd)-np.log10(self.C)-9.1)/1.5
|
||||
|
||||
def All_models(self):
|
||||
|
||||
return self.McGarr()[2], self.Hallo()[2], self.Li(), self.van_der_Elst()[-2], self.Shapiro()[-1]
|
||||
|
||||
|
||||
def ComputeModel(self):
|
||||
if self.f_name == 'max_mcg':
|
||||
return self.McGarr()
|
||||
if self.f_name == 'max_hlo':
|
||||
return self.Hallo()
|
||||
if self.f_name == 'max_li':
|
||||
return self.Li()
|
||||
if self.f_name == 'max_vde':
|
||||
return self.van_der_Elst()
|
||||
if self.f_name == 'max_shp':
|
||||
return self.Shapiro()
|
||||
if self.f_name == 'max_all':
|
||||
return self.All_models()
|
Reference in New Issue
Block a user