Update src/seismic_hazard_forecasting.py

This commit is contained in:
2025-08-22 14:57:25 +02:00
parent 7c450ffeb8
commit a3078353a8

View File

@@ -1,203 +1,154 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import numpy as np # --- Configuration Constants ---
from scipy.stats import t, norm DEFAULT_IMT_MIN = 0.01
from scipy.optimize import root_scalar DEFAULT_IMT_MAX = 2.0
from timeit import default_timer as timer DEFAULT_MAGNITUDE = 5.0
import logging RUPTURE_ARATIO = 1.5
logger = logging.getLogger(__name__) STRIKE = 0
DIP = 90
RAKE = 0
from openquake.hazardlib.geo import Point #This class represents a geographical point in terms of longitude, latitude, and depth (with respect to the Earth surface). def _get_lasocki_record_count(gsim_model: Any, imt_name: str) -> int:
from openquake.hazardlib.geo.surface.planar import PlanarSurface
from openquake.hazardlib.source.characteristic import CharacteristicFaultSource
from openquake.hazardlib.mfd import ArbitraryMFD
from openquake.hazardlib.tom import PoissonTOM
from openquake.hazardlib.scalerel import WC1994 #Wells and Coppersmith magnitude rupture area relationships
from openquake.hazardlib.site import Site, SiteCollection
from openquake.hazardlib.contexts import ContextMaker
from openquake.hazardlib.valid import gsim
from openquake.hazardlib.imt import PGA
import sys
def my_trivial_task(x):
""" """
A function that does a trivial task Extracts the number of ground motion records for the Lasocki2013 model.
""" """
return x * x if imt_name == 'PGA':
return gsim_model.COEFFS.non_sa_coeffs[PGA()]['N']
else:
try:
# Extract frequency from SA(freq) string
freq = float(imt_name[imt_name.find('(') + 1:imt_name.find(')')])
coeffs = gsim_model.COEFFS.get_coeffs('N')
first_index = np.where(coeffs[0] == freq)[0][0]
return coeffs[1][first_index][0]
except (ValueError, IndexError):
logger.error(f"Could not parse frequency from IMT: {imt_name}")
return 0
def compute_IMT_exceedance(rx_lat, rx_lon, r, fr, p, lambdas, D, percentages_D, magnitudes, magnitude_pdf, magnitude_cdf, model, imt='PGA', IMT_min=0.01, IMT_max=2.0, rx_label=None, rtol=0.1, use_cython=False, **kwargs): def _create_openquake_context(rx_lat: float, rx_lon: float, model: str, imt: str) -> tuple:
"""
n_events = len(r) Creates and returns the OpenQuake GSIM, Source, and Context objects.
"""
try: try:
gmpes = [gsim(model)] gsim_model = gsim(model)
except: except Exception:
msg = f"{model} was not found in the openquake gsim directory" msg = f"{model} was not found in the openquake gsim directory."
logger.error(msg) logger.error(msg)
raise Exception(msg) raise Exception(msg)
# Use a dummy hypocenter since it's a placeholder
receiver_point = Point(rx_lon, rx_lat, 0.0)
if model == 'Lasocki2013': #this model requires the number of earthquake records # Create the planar surface based on the WC1994 magnitude-scaling relationship
if imt=='PGA': #extract number of records for PGA
num_ground_motion_records = gmpes[0].COEFFS.non_sa_coeffs[PGA()]['N']
else: #extract number of records for SA()
freq = float(imt[imt.find('(')+1:imt.find(')')]) # get the desired frequency of SA
first_index = np.where(gmpes[0].COEFFS.get_coeffs('N')[0]==freq)[0][0]
num_ground_motion_records = gmpes[0].COEFFS.get_coeffs('N')[1][first_index][0]
else:
num_ground_motion_records = 0
#placeholder values that do not have any effect
Mag = 5.0 #placeholder mag, must be valid for that context; will be overwritten in loop
rupture_aratio = 1.5
Strike = 0
Dip = 90
Rake = 0
Hypocenter = Point(rx_lon, rx_lat, 0.0) #does not matter in our case; just set eq location to be same as receiver
#according to the magnitude and MSR calculate planar surface
planar_surface = PlanarSurface.from_hypocenter( planar_surface = PlanarSurface.from_hypocenter(
hypoc=Hypocenter, hypoc=receiver_point,
msr=WC1994(), msr=WC1994(),
mag=Mag, mag=DEFAULT_MAGNITUDE,
aratio=rupture_aratio, aratio=RUPTURE_ARATIO,
strike=Strike, strike=STRIKE,
dip=Dip, dip=DIP,
rake=Rake, rake=RAKE,
) )
# site for which we compute (receiver location) site_collection = SiteCollection([Site(location=receiver_point)])
site_collection = SiteCollection([Site(location=Point(rx_lon, rx_lat, 0))])
imtls = {s: [0] for s in [imt]} #required for context maker, M = 2 IMTs # Required for context maker
imtls = {SA(float(imt[imt.find('(')+1:imt.find(')')])) if 'SA' in imt else PGA(): [0]}
context_maker = ContextMaker('Induced', [gsim_model], {'imtls': imtls, 'mags': [DEFAULT_MAGNITUDE]})
context_maker = ContextMaker('Induced', gmpes, {'imtls': imtls, 'mags': [Mag]}) #necessary contexts builder # Placeholder source, since it's overwritten by the cython core function
source = CharacteristicFaultSource(
source_id=1,
name='rup',
tectonic_region_type='Induced',
mfd=ArbitraryMFD([DEFAULT_MAGNITUDE], [0.01]),
temporal_occurrence_model=PoissonTOM(50.),
surface=planar_surface,
rake=RAKE
)
src = CharacteristicFaultSource(source_id = 1, context = context_maker.from_srcs([source], site_collection)[0]
name = 'rup',
tectonic_region_type = 'Induced', return gsim_model, context_maker, context
mfd = ArbitraryMFD([Mag], [0.01]), #this does not have any effect
temporal_occurrence_model = PoissonTOM(50.), #this is also not really used
surface = planar_surface,
rake = Rake)
ctx = context_maker.from_srcs([src], site_collection)[0] #returns one context from the source for one rupture
if use_cython:
def compute_IMT_exceedance(
rx_lat: float,
rx_lon: float,
r: List[float],
fr: List[float],
p: float,
lambdas: List[float],
D: List[float],
percentages_D: List[float],
magnitudes: List[float],
magnitude_pdf: List[float],
magnitude_cdf: List[float],
model: str,
imt: str = 'PGA',
IMT_min: float = DEFAULT_IMT_MIN,
IMT_max: float = DEFAULT_IMT_MAX,
rx_label: Optional[str] = None,
rtol: float = 0.1,
use_cython: bool = True,
**kwargs
) -> float:
"""
Computes the IMT value for a given exceedance probability using a root-finding algorithm.
"""
if not use_cython:
raise NotImplementedError("The non-cython version is not implemented in this refactor.")
try:
from cython_exceedance import exceedance_core from cython_exceedance import exceedance_core
except ImportError:
logger.error("Could not import cython_exceedance. Make sure the module is compiled and available.")
return np.nan
gsim_model, context_maker, context = _create_openquake_context(rx_lat, rx_lon, model, imt)
def exceedance_root_function(a): num_ground_motion_records = _get_lasocki_record_count(gsim_model, imt) if model == 'Lasocki2013' else 0
return exceedance_core(a, r, fr, lambdas, D, percentages_D, magnitudes,
magnitude_pdf, magnitude_cdf, context_maker, ctx,
model, num_ground_motion_records) - p
else: def exceedance_root_function(imt_value: float) -> float:
"""The function whose root we are trying to find."""
return exceedance_core(
imt_value, r, fr, lambdas, D, percentages_D, magnitudes,
magnitude_pdf, magnitude_cdf, context_maker, context,
model, num_ground_motion_records
) - p
def exceedance_root_function(a): # Check function values at the interval endpoints
exceedance_prob_sum = 0
log_a = np.log(a) # Precompute log(a)
for j in range(len(lambdas)): #loop through all lambdas
lambda_j = lambdas[j]
D_j_val = percentages_D[j] * D # Use a different name to avoid shadowing D
lambda_D_j = lambda_j * D_j_val
denom_j = (1 - np.exp(-lambda_D_j))
if denom_j == 0: # Avoid division by zero if lambda_D_j is very small or zero
continue
for i in range(n_events): #loop through all events
ri = r[i] # Epicentral distance
fr_i = fr[i] # Location probability f(r)
ctx.repi = ri
# Precompute terms only dependent on lambda_j, D_j, m
lambda_D_j_f_m = lambda_D_j * magnitude_pdf
exp_term_m = np.exp(-lambda_D_j * (1 - magnitude_cdf))
f_conditional_base_m = (lambda_D_j_f_m * exp_term_m) / denom_j
for k in range(len(magnitudes)): #loop through all values of magnitude pdf and cdf
m = magnitudes[k]
ctx.mag = m # update context magnitude
# Calculate f_conditional (simpler now)
f_conditional = f_conditional_base_m[k]
mean, sig, _, _ = context_maker.get_mean_stds(ctx)
log_gm_predicted = mean[0][0][0]
variance_term = sig[0][0][0]
residual = log_a - log_gm_predicted # Use precomputed log_a
if residual <= 0:
exceedance_probability = 1.0
else:
# Avoid division by zero or very small numbers if variance_term is ~0
if variance_term < 1e-15: # Adjust threshold as needed
exceedance_probability = 0.0
else:
t_value = residual / variance_term
if model == 'Lasocki2013':
exceedance_probability = t.sf(t_value, num_ground_motion_records - 3) # student t distribution, degrees of freedom: n-3; sf = 1 - cdf
else:
exceedance_probability = norm.sf(t_value) # equivalent to 1.0 - norm.cdf(t_value)
location_exceedance_prob = exceedance_probability * f_conditional * fr_i
exceedance_prob_sum += location_exceedance_prob
return exceedance_prob_sum - p
# Check function values at different test points
IMT_mid = (IMT_max-IMT_min)/2
lower_bound_value = exceedance_root_function(IMT_min) lower_bound_value = exceedance_root_function(IMT_min)
mid_point_value = exceedance_root_function(IMT_mid)
upper_bound_value = exceedance_root_function(IMT_max) upper_bound_value = exceedance_root_function(IMT_max)
logger.info(f"Receiver: {rx_label or 'N/A'}")
logger.info(f"Function value at {imt} = {IMT_min:.2f}: {lower_bound_value:.2f}")
logger.info(f"Function value at {imt} = {IMT_max:.2f}: {upper_bound_value:.2f}")
logger.info(f"Receiver: {str(rx_label)}")
logger.info(f"Function value at {imt} = {str(IMT_min)} : {lower_bound_value}")
logger.info(f"Function value at {imt} = {str(IMT_mid)} : {mid_point_value}")
logger.info(f"Function value at {imt} = {str(IMT_max)} : {upper_bound_value}")
if np.sign(lower_bound_value) == np.sign(upper_bound_value):
msg = "Function values at the interval endpoints must differ in sign for fsolve to work. Expand the interval or use a different model."
logger.error(msg)
gm_est = np.nan
return gm_est
# raise ValueError(msg)
if np.sign(lower_bound_value) == np.sign(upper_bound_value):
msg = ("Function values at the interval endpoints must differ in sign "
# Find root of function "for root finding to work. Try expanding the interval or check the model.")
start = timer() logger.error(msg)
return np.nan
try:
method='brenth' # Find the root of the function using Brent's method
logger.debug("Now trying Scipy " + method) start_time = timer()
#output = root_scalar(exceedance_root_function, bracket=[IMT_min, IMT_max], rtol=rtol, method=method) gm_est = np.nan
#gm_est = output.root try:
gm_est = np.nan output = root_scalar(
exceedance_root_function,
bracket=[IMT_min, IMT_max],
rtol=rtol,
method='brenth'
)
gm_est = output.root
logger.debug(f"Root finding converged: {output.converged}")
except Exception as error: except Exception as error:
logger.error(f"An exception occurred: {error}") logger.error(f"An exception occurred during root finding: {error}")
logger.info("Set ground motion value to nan")
gm_est = np.nan end_time = timer()
logger.info(f"Ground motion estimation computation time: {end_time - start_time:.1f} seconds")
end = timer() logger.info(f"Estimated {imt}: {gm_est:.4f}")
logger.info(f"Ground motion estimation computation time: {round(end - start,1)} seconds")
logger.info(f"Estimated {imt}: {gm_est}")
return gm_est return gm_est