revert Update src/seismic_hazard_forecasting.py
This commit is contained in:
2025-08-22 14:58:49 +02:00
parent a3078353a8
commit 95d703dbf2

View File

@@ -1,154 +1,203 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# --- Configuration Constants --- import numpy as np
DEFAULT_IMT_MIN = 0.01 from scipy.stats import t, norm
DEFAULT_IMT_MAX = 2.0 from scipy.optimize import root_scalar
DEFAULT_MAGNITUDE = 5.0 from timeit import default_timer as timer
RUPTURE_ARATIO = 1.5 import logging
STRIKE = 0 logger = logging.getLogger(__name__)
DIP = 90
RAKE = 0
def _get_lasocki_record_count(gsim_model: Any, imt_name: str) -> int: from openquake.hazardlib.geo import Point #This class represents a geographical point in terms of longitude, latitude, and depth (with respect to the Earth surface).
""" from openquake.hazardlib.geo.surface.planar import PlanarSurface
Extracts the number of ground motion records for the Lasocki2013 model. from openquake.hazardlib.source.characteristic import CharacteristicFaultSource
""" from openquake.hazardlib.mfd import ArbitraryMFD
if imt_name == 'PGA': from openquake.hazardlib.tom import PoissonTOM
return gsim_model.COEFFS.non_sa_coeffs[PGA()]['N'] from openquake.hazardlib.scalerel import WC1994 #Wells and Coppersmith magnitude rupture area relationships
else: from openquake.hazardlib.site import Site, SiteCollection
try: from openquake.hazardlib.contexts import ContextMaker
# Extract frequency from SA(freq) string from openquake.hazardlib.valid import gsim
freq = float(imt_name[imt_name.find('(') + 1:imt_name.find(')')]) from openquake.hazardlib.imt import PGA
coeffs = gsim_model.COEFFS.get_coeffs('N') import sys
first_index = np.where(coeffs[0] == freq)[0][0]
return coeffs[1][first_index][0]
except (ValueError, IndexError):
logger.error(f"Could not parse frequency from IMT: {imt_name}")
return 0
def _create_openquake_context(rx_lat: float, rx_lon: float, model: str, imt: str) -> tuple: def my_trivial_task(x):
""" """
Creates and returns the OpenQuake GSIM, Source, and Context objects. A function that does a trivial task
""" """
return x * x
def compute_IMT_exceedance(rx_lat, rx_lon, r, fr, p, lambdas, D, percentages_D, magnitudes, magnitude_pdf, magnitude_cdf, model, imt='PGA', IMT_min=0.01, IMT_max=2.0, rx_label=None, rtol=0.1, use_cython=False, **kwargs):
n_events = len(r)
try: try:
gsim_model = gsim(model) gmpes = [gsim(model)]
except Exception: except:
msg = f"{model} was not found in the openquake gsim directory." msg = f"{model} was not found in the openquake gsim directory"
logger.error(msg) logger.error(msg)
raise Exception(msg) raise Exception(msg)
# Use a dummy hypocenter since it's a placeholder
receiver_point = Point(rx_lon, rx_lat, 0.0)
# Create the planar surface based on the WC1994 magnitude-scaling relationship if model == 'Lasocki2013': #this model requires the number of earthquake records
if imt=='PGA': #extract number of records for PGA
num_ground_motion_records = gmpes[0].COEFFS.non_sa_coeffs[PGA()]['N']
else: #extract number of records for SA()
freq = float(imt[imt.find('(')+1:imt.find(')')]) # get the desired frequency of SA
first_index = np.where(gmpes[0].COEFFS.get_coeffs('N')[0]==freq)[0][0]
num_ground_motion_records = gmpes[0].COEFFS.get_coeffs('N')[1][first_index][0]
else:
num_ground_motion_records = 0
#placeholder values that do not have any effect
Mag = 5.0 #placeholder mag, must be valid for that context; will be overwritten in loop
rupture_aratio = 1.5
Strike = 0
Dip = 90
Rake = 0
Hypocenter = Point(rx_lon, rx_lat, 0.0) #does not matter in our case; just set eq location to be same as receiver
#according to the magnitude and MSR calculate planar surface
planar_surface = PlanarSurface.from_hypocenter( planar_surface = PlanarSurface.from_hypocenter(
hypoc=receiver_point, hypoc=Hypocenter,
msr=WC1994(), msr=WC1994(),
mag=DEFAULT_MAGNITUDE, mag=Mag,
aratio=RUPTURE_ARATIO, aratio=rupture_aratio,
strike=STRIKE, strike=Strike,
dip=DIP, dip=Dip,
rake=RAKE, rake=Rake,
) )
site_collection = SiteCollection([Site(location=receiver_point)]) # site for which we compute (receiver location)
site_collection = SiteCollection([Site(location=Point(rx_lon, rx_lat, 0))])
# Required for context maker imtls = {s: [0] for s in [imt]} #required for context maker, M = 2 IMTs
imtls = {SA(float(imt[imt.find('(')+1:imt.find(')')])) if 'SA' in imt else PGA(): [0]}
context_maker = ContextMaker('Induced', [gsim_model], {'imtls': imtls, 'mags': [DEFAULT_MAGNITUDE]})
# Placeholder source, since it's overwritten by the cython core function context_maker = ContextMaker('Induced', gmpes, {'imtls': imtls, 'mags': [Mag]}) #necessary contexts builder
source = CharacteristicFaultSource(
source_id=1,
name='rup',
tectonic_region_type='Induced',
mfd=ArbitraryMFD([DEFAULT_MAGNITUDE], [0.01]),
temporal_occurrence_model=PoissonTOM(50.),
surface=planar_surface,
rake=RAKE
)
context = context_maker.from_srcs([source], site_collection)[0] src = CharacteristicFaultSource(source_id = 1,
name = 'rup',
return gsim_model, context_maker, context tectonic_region_type = 'Induced',
mfd = ArbitraryMFD([Mag], [0.01]), #this does not have any effect
temporal_occurrence_model = PoissonTOM(50.), #this is also not really used
surface = planar_surface,
rake = Rake)
ctx = context_maker.from_srcs([src], site_collection)[0] #returns one context from the source for one rupture
if use_cython:
def compute_IMT_exceedance(
rx_lat: float,
rx_lon: float,
r: List[float],
fr: List[float],
p: float,
lambdas: List[float],
D: List[float],
percentages_D: List[float],
magnitudes: List[float],
magnitude_pdf: List[float],
magnitude_cdf: List[float],
model: str,
imt: str = 'PGA',
IMT_min: float = DEFAULT_IMT_MIN,
IMT_max: float = DEFAULT_IMT_MAX,
rx_label: Optional[str] = None,
rtol: float = 0.1,
use_cython: bool = True,
**kwargs
) -> float:
"""
Computes the IMT value for a given exceedance probability using a root-finding algorithm.
"""
if not use_cython:
raise NotImplementedError("The non-cython version is not implemented in this refactor.")
try:
from cython_exceedance import exceedance_core from cython_exceedance import exceedance_core
except ImportError:
logger.error("Could not import cython_exceedance. Make sure the module is compiled and available.")
return np.nan
gsim_model, context_maker, context = _create_openquake_context(rx_lat, rx_lon, model, imt)
num_ground_motion_records = _get_lasocki_record_count(gsim_model, imt) if model == 'Lasocki2013' else 0 def exceedance_root_function(a):
return exceedance_core(a, r, fr, lambdas, D, percentages_D, magnitudes,
magnitude_pdf, magnitude_cdf, context_maker, ctx,
model, num_ground_motion_records) - p
def exceedance_root_function(imt_value: float) -> float: else:
"""The function whose root we are trying to find."""
return exceedance_core(
imt_value, r, fr, lambdas, D, percentages_D, magnitudes,
magnitude_pdf, magnitude_cdf, context_maker, context,
model, num_ground_motion_records
) - p
# Check function values at the interval endpoints def exceedance_root_function(a):
exceedance_prob_sum = 0
log_a = np.log(a) # Precompute log(a)
for j in range(len(lambdas)): #loop through all lambdas
lambda_j = lambdas[j]
D_j_val = percentages_D[j] * D # Use a different name to avoid shadowing D
lambda_D_j = lambda_j * D_j_val
denom_j = (1 - np.exp(-lambda_D_j))
if denom_j == 0: # Avoid division by zero if lambda_D_j is very small or zero
continue
for i in range(n_events): #loop through all events
ri = r[i] # Epicentral distance
fr_i = fr[i] # Location probability f(r)
ctx.repi = ri
# Precompute terms only dependent on lambda_j, D_j, m
lambda_D_j_f_m = lambda_D_j * magnitude_pdf
exp_term_m = np.exp(-lambda_D_j * (1 - magnitude_cdf))
f_conditional_base_m = (lambda_D_j_f_m * exp_term_m) / denom_j
for k in range(len(magnitudes)): #loop through all values of magnitude pdf and cdf
m = magnitudes[k]
ctx.mag = m # update context magnitude
# Calculate f_conditional (simpler now)
f_conditional = f_conditional_base_m[k]
mean, sig, _, _ = context_maker.get_mean_stds(ctx)
log_gm_predicted = mean[0][0][0]
variance_term = sig[0][0][0]
residual = log_a - log_gm_predicted # Use precomputed log_a
if residual <= 0:
exceedance_probability = 1.0
else:
# Avoid division by zero or very small numbers if variance_term is ~0
if variance_term < 1e-15: # Adjust threshold as needed
exceedance_probability = 0.0
else:
t_value = residual / variance_term
if model == 'Lasocki2013':
exceedance_probability = t.sf(t_value, num_ground_motion_records - 3) # student t distribution, degrees of freedom: n-3; sf = 1 - cdf
else:
exceedance_probability = norm.sf(t_value) # equivalent to 1.0 - norm.cdf(t_value)
location_exceedance_prob = exceedance_probability * f_conditional * fr_i
exceedance_prob_sum += location_exceedance_prob
return exceedance_prob_sum - p
# Check function values at different test points
IMT_mid = (IMT_max-IMT_min)/2
lower_bound_value = exceedance_root_function(IMT_min) lower_bound_value = exceedance_root_function(IMT_min)
mid_point_value = exceedance_root_function(IMT_mid)
upper_bound_value = exceedance_root_function(IMT_max) upper_bound_value = exceedance_root_function(IMT_max)
logger.info(f"Receiver: {rx_label or 'N/A'}")
logger.info(f"Function value at {imt} = {IMT_min:.2f}: {lower_bound_value:.2f}")
logger.info(f"Function value at {imt} = {IMT_max:.2f}: {upper_bound_value:.2f}")
logger.info(f"Receiver: {str(rx_label)}")
logger.info(f"Function value at {imt} = {str(IMT_min)} : {lower_bound_value}")
logger.info(f"Function value at {imt} = {str(IMT_mid)} : {mid_point_value}")
logger.info(f"Function value at {imt} = {str(IMT_max)} : {upper_bound_value}")
if np.sign(lower_bound_value) == np.sign(upper_bound_value): if np.sign(lower_bound_value) == np.sign(upper_bound_value):
msg = ("Function values at the interval endpoints must differ in sign " msg = "Function values at the interval endpoints must differ in sign for fsolve to work. Expand the interval or use a different model."
"for root finding to work. Try expanding the interval or check the model.")
logger.error(msg) logger.error(msg)
return np.nan gm_est = np.nan
return gm_est
# raise ValueError(msg)
# Find the root of the function using Brent's method
start_time = timer()
gm_est = np.nan # Find root of function
try: start = timer()
output = root_scalar(
exceedance_root_function, try:
bracket=[IMT_min, IMT_max], method='brenth'
rtol=rtol, logger.debug("Now trying Scipy " + method)
method='brenth' #output = root_scalar(exceedance_root_function, bracket=[IMT_min, IMT_max], rtol=rtol, method=method)
) #gm_est = output.root
gm_est = output.root gm_est = np.nan
logger.debug(f"Root finding converged: {output.converged}")
except Exception as error: except Exception as error:
logger.error(f"An exception occurred during root finding: {error}") logger.error(f"An exception occurred: {error}")
logger.info("Set ground motion value to nan")
end_time = timer() gm_est = np.nan
logger.info(f"Ground motion estimation computation time: {end_time - start_time:.1f} seconds")
logger.info(f"Estimated {imt}: {gm_est:.4f}") end = timer()
logger.info(f"Ground motion estimation computation time: {round(end - start,1)} seconds")
logger.info(f"Estimated {imt}: {gm_est}")
return gm_est return gm_est