From a3078353a8371fa8d7b2b9b5cc46490e335c1cd7 Mon Sep 17 00:00:00 2001 From: ftong Date: Fri, 22 Aug 2025 14:57:25 +0200 Subject: [PATCH] Update src/seismic_hazard_forecasting.py --- src/seismic_hazard_forecasting.py | 305 ++++++++++++++++---------------------- 1 file changed, 128 insertions(+), 177 deletions(-) diff --git a/src/seismic_hazard_forecasting.py b/src/seismic_hazard_forecasting.py index a364284..5ca0733 100644 --- a/src/seismic_hazard_forecasting.py +++ b/src/seismic_hazard_forecasting.py @@ -1,203 +1,154 @@ # -*- coding: utf-8 -*- -import numpy as np -from scipy.stats import t, norm -from scipy.optimize import root_scalar -from timeit import default_timer as timer -import logging -logger = logging.getLogger(__name__) +# --- Configuration Constants --- +DEFAULT_IMT_MIN = 0.01 +DEFAULT_IMT_MAX = 2.0 +DEFAULT_MAGNITUDE = 5.0 +RUPTURE_ARATIO = 1.5 +STRIKE = 0 +DIP = 90 +RAKE = 0 -from openquake.hazardlib.geo import Point #This class represents a geographical point in terms of longitude, latitude, and depth (with respect to the Earth surface). -from openquake.hazardlib.geo.surface.planar import PlanarSurface -from openquake.hazardlib.source.characteristic import CharacteristicFaultSource -from openquake.hazardlib.mfd import ArbitraryMFD -from openquake.hazardlib.tom import PoissonTOM -from openquake.hazardlib.scalerel import WC1994 #Wells and Coppersmith magnitude – rupture area relationships -from openquake.hazardlib.site import Site, SiteCollection -from openquake.hazardlib.contexts import ContextMaker -from openquake.hazardlib.valid import gsim -from openquake.hazardlib.imt import PGA -import sys - -def my_trivial_task(x): +def _get_lasocki_record_count(gsim_model: Any, imt_name: str) -> int: """ - A function that does a trivial task + Extracts the number of ground motion records for the Lasocki2013 model. """ - return x * x + if imt_name == 'PGA': + return gsim_model.COEFFS.non_sa_coeffs[PGA()]['N'] + else: + try: + # Extract frequency from SA(freq) string + freq = float(imt_name[imt_name.find('(') + 1:imt_name.find(')')]) + coeffs = gsim_model.COEFFS.get_coeffs('N') + first_index = np.where(coeffs[0] == freq)[0][0] + return coeffs[1][first_index][0] + except (ValueError, IndexError): + logger.error(f"Could not parse frequency from IMT: {imt_name}") + return 0 -def compute_IMT_exceedance(rx_lat, rx_lon, r, fr, p, lambdas, D, percentages_D, magnitudes, magnitude_pdf, magnitude_cdf, model, imt='PGA', IMT_min=0.01, IMT_max=2.0, rx_label=None, rtol=0.1, use_cython=False, **kwargs): - - n_events = len(r) - +def _create_openquake_context(rx_lat: float, rx_lon: float, model: str, imt: str) -> tuple: + """ + Creates and returns the OpenQuake GSIM, Source, and Context objects. + """ try: - gmpes = [gsim(model)] - except: - msg = f"{model} was not found in the openquake gsim directory" + gsim_model = gsim(model) + except Exception: + msg = f"{model} was not found in the openquake gsim directory." logger.error(msg) - raise Exception(msg) + raise Exception(msg) + + # Use a dummy hypocenter since it's a placeholder + receiver_point = Point(rx_lon, rx_lat, 0.0) - if model == 'Lasocki2013': #this model requires the number of earthquake records - - if imt=='PGA': #extract number of records for PGA - num_ground_motion_records = gmpes[0].COEFFS.non_sa_coeffs[PGA()]['N'] - else: #extract number of records for SA() - freq = float(imt[imt.find('(')+1:imt.find(')')]) # get the desired frequency of SA - first_index = np.where(gmpes[0].COEFFS.get_coeffs('N')[0]==freq)[0][0] - num_ground_motion_records = gmpes[0].COEFFS.get_coeffs('N')[1][first_index][0] - else: - num_ground_motion_records = 0 - - #placeholder values that do not have any effect - Mag = 5.0 #placeholder mag, must be valid for that context; will be overwritten in loop - rupture_aratio = 1.5 - Strike = 0 - Dip = 90 - Rake = 0 - - Hypocenter = Point(rx_lon, rx_lat, 0.0) #does not matter in our case; just set eq location to be same as receiver - #according to the magnitude and MSR calculate planar surface + # Create the planar surface based on the WC1994 magnitude-scaling relationship planar_surface = PlanarSurface.from_hypocenter( - hypoc=Hypocenter, - msr=WC1994(), - mag=Mag, - aratio=rupture_aratio, - strike=Strike, - dip=Dip, - rake=Rake, - ) + hypoc=receiver_point, + msr=WC1994(), + mag=DEFAULT_MAGNITUDE, + aratio=RUPTURE_ARATIO, + strike=STRIKE, + dip=DIP, + rake=RAKE, + ) - # site for which we compute (receiver location) - site_collection = SiteCollection([Site(location=Point(rx_lon, rx_lat, 0))]) + site_collection = SiteCollection([Site(location=receiver_point)]) - imtls = {s: [0] for s in [imt]} #required for context maker, M = 2 IMTs + # Required for context maker + imtls = {SA(float(imt[imt.find('(')+1:imt.find(')')])) if 'SA' in imt else PGA(): [0]} + + context_maker = ContextMaker('Induced', [gsim_model], {'imtls': imtls, 'mags': [DEFAULT_MAGNITUDE]}) - context_maker = ContextMaker('Induced', gmpes, {'imtls': imtls, 'mags': [Mag]}) #necessary contexts builder + # Placeholder source, since it's overwritten by the cython core function + source = CharacteristicFaultSource( + source_id=1, + name='rup', + tectonic_region_type='Induced', + mfd=ArbitraryMFD([DEFAULT_MAGNITUDE], [0.01]), + temporal_occurrence_model=PoissonTOM(50.), + surface=planar_surface, + rake=RAKE + ) - src = CharacteristicFaultSource(source_id = 1, - name = 'rup', - tectonic_region_type = 'Induced', - mfd = ArbitraryMFD([Mag], [0.01]), #this does not have any effect - temporal_occurrence_model = PoissonTOM(50.), #this is also not really used - surface = planar_surface, - rake = Rake) - - ctx = context_maker.from_srcs([src], site_collection)[0] #returns one context from the source for one rupture - - if use_cython: + context = context_maker.from_srcs([source], site_collection)[0] + + return gsim_model, context_maker, context +def compute_IMT_exceedance( + rx_lat: float, + rx_lon: float, + r: List[float], + fr: List[float], + p: float, + lambdas: List[float], + D: List[float], + percentages_D: List[float], + magnitudes: List[float], + magnitude_pdf: List[float], + magnitude_cdf: List[float], + model: str, + imt: str = 'PGA', + IMT_min: float = DEFAULT_IMT_MIN, + IMT_max: float = DEFAULT_IMT_MAX, + rx_label: Optional[str] = None, + rtol: float = 0.1, + use_cython: bool = True, + **kwargs +) -> float: + """ + Computes the IMT value for a given exceedance probability using a root-finding algorithm. + """ + if not use_cython: + raise NotImplementedError("The non-cython version is not implemented in this refactor.") + + try: from cython_exceedance import exceedance_core + except ImportError: + logger.error("Could not import cython_exceedance. Make sure the module is compiled and available.") + return np.nan + + gsim_model, context_maker, context = _create_openquake_context(rx_lat, rx_lon, model, imt) - def exceedance_root_function(a): - return exceedance_core(a, r, fr, lambdas, D, percentages_D, magnitudes, - magnitude_pdf, magnitude_cdf, context_maker, ctx, - model, num_ground_motion_records) - p + num_ground_motion_records = _get_lasocki_record_count(gsim_model, imt) if model == 'Lasocki2013' else 0 - else: + def exceedance_root_function(imt_value: float) -> float: + """The function whose root we are trying to find.""" + return exceedance_core( + imt_value, r, fr, lambdas, D, percentages_D, magnitudes, + magnitude_pdf, magnitude_cdf, context_maker, context, + model, num_ground_motion_records + ) - p - def exceedance_root_function(a): - exceedance_prob_sum = 0 - log_a = np.log(a) # Precompute log(a) - - for j in range(len(lambdas)): #loop through all lambdas - lambda_j = lambdas[j] - D_j_val = percentages_D[j] * D # Use a different name to avoid shadowing D - lambda_D_j = lambda_j * D_j_val - denom_j = (1 - np.exp(-lambda_D_j)) - if denom_j == 0: # Avoid division by zero if lambda_D_j is very small or zero - continue - - for i in range(n_events): #loop through all events - ri = r[i] # Epicentral distance - fr_i = fr[i] # Location probability f(r) - ctx.repi = ri - - # Precompute terms only dependent on lambda_j, D_j, m - lambda_D_j_f_m = lambda_D_j * magnitude_pdf - exp_term_m = np.exp(-lambda_D_j * (1 - magnitude_cdf)) - f_conditional_base_m = (lambda_D_j_f_m * exp_term_m) / denom_j - - for k in range(len(magnitudes)): #loop through all values of magnitude pdf and cdf - m = magnitudes[k] - ctx.mag = m # update context magnitude - - # Calculate f_conditional (simpler now) - f_conditional = f_conditional_base_m[k] - - mean, sig, _, _ = context_maker.get_mean_stds(ctx) - log_gm_predicted = mean[0][0][0] - variance_term = sig[0][0][0] - residual = log_a - log_gm_predicted # Use precomputed log_a - - if residual <= 0: - exceedance_probability = 1.0 - else: - # Avoid division by zero or very small numbers if variance_term is ~0 - if variance_term < 1e-15: # Adjust threshold as needed - exceedance_probability = 0.0 - else: - t_value = residual / variance_term - - if model == 'Lasocki2013': - exceedance_probability = t.sf(t_value, num_ground_motion_records - 3) # student t distribution, degrees of freedom: n-3; sf = 1 - cdf - else: - exceedance_probability = norm.sf(t_value) # equivalent to 1.0 - norm.cdf(t_value) - - location_exceedance_prob = exceedance_probability * f_conditional * fr_i - exceedance_prob_sum += location_exceedance_prob - - return exceedance_prob_sum - p - - - - # Check function values at different test points - IMT_mid = (IMT_max-IMT_min)/2 - - - + # Check function values at the interval endpoints lower_bound_value = exceedance_root_function(IMT_min) - - - mid_point_value = exceedance_root_function(IMT_mid) - - - upper_bound_value = exceedance_root_function(IMT_max) - - - - logger.info(f"Receiver: {str(rx_label)}") - logger.info(f"Function value at {imt} = {str(IMT_min)} : {lower_bound_value}") - logger.info(f"Function value at {imt} = {str(IMT_mid)} : {mid_point_value}") - logger.info(f"Function value at {imt} = {str(IMT_max)} : {upper_bound_value}") - - - - if np.sign(lower_bound_value) == np.sign(upper_bound_value): - msg = "Function values at the interval endpoints must differ in sign for fsolve to work. Expand the interval or use a different model." - logger.error(msg) - gm_est = np.nan - return gm_est - # raise ValueError(msg) + logger.info(f"Receiver: {rx_label or 'N/A'}") + logger.info(f"Function value at {imt} = {IMT_min:.2f}: {lower_bound_value:.2f}") + logger.info(f"Function value at {imt} = {IMT_max:.2f}: {upper_bound_value:.2f}") - - - # Find root of function - start = timer() - - try: - method='brenth' - logger.debug("Now trying Scipy " + method) - #output = root_scalar(exceedance_root_function, bracket=[IMT_min, IMT_max], rtol=rtol, method=method) - #gm_est = output.root - gm_est = np.nan + if np.sign(lower_bound_value) == np.sign(upper_bound_value): + msg = ("Function values at the interval endpoints must differ in sign " + "for root finding to work. Try expanding the interval or check the model.") + logger.error(msg) + return np.nan + + # Find the root of the function using Brent's method + start_time = timer() + gm_est = np.nan + try: + output = root_scalar( + exceedance_root_function, + bracket=[IMT_min, IMT_max], + rtol=rtol, + method='brenth' + ) + gm_est = output.root + logger.debug(f"Root finding converged: {output.converged}") except Exception as error: - logger.error(f"An exception occurred: {error}") - logger.info("Set ground motion value to nan") - gm_est = np.nan - - end = timer() - logger.info(f"Ground motion estimation computation time: {round(end - start,1)} seconds") - logger.info(f"Estimated {imt}: {gm_est}") + logger.error(f"An exception occurred during root finding: {error}") + + end_time = timer() + logger.info(f"Ground motion estimation computation time: {end_time - start_time:.1f} seconds") + logger.info(f"Estimated {imt}: {gm_est:.4f}") return gm_est