Files
SeismicHazardForecasting/src/seismic_hazard_forecasting.py

777 lines
35 KiB
Python
Raw Normal View History

2025-04-15 17:23:42 +02:00
# -*- coding: utf-8 -*-
import numpy as np
from scipy.stats import t, norm
from scipy.optimize import root_scalar
from timeit import default_timer as timer
import logging
logger = logging.getLogger(__name__)
from openquake.hazardlib.geo import Point #This class represents a geographical point in terms of longitude, latitude, and depth (with respect to the Earth surface).
from openquake.hazardlib.geo.surface.planar import PlanarSurface
from openquake.hazardlib.source.characteristic import CharacteristicFaultSource
from openquake.hazardlib.mfd import ArbitraryMFD
from openquake.hazardlib.tom import PoissonTOM
from openquake.hazardlib.scalerel import WC1994 #Wells and Coppersmith magnitude rupture area relationships
from openquake.hazardlib.site import Site, SiteCollection
from openquake.hazardlib.contexts import ContextMaker
from openquake.hazardlib.valid import gsim
from openquake.hazardlib.imt import PGA
import sys
def my_trivial_task(x):
"""
A function that does a trivial task
"""
return x * x
2025-04-15 17:23:42 +02:00
def compute_IMT_exceedance(rx_lat, rx_lon, r, fr, p, lambdas, D, percentages_D, magnitudes, magnitude_pdf, magnitude_cdf, model, imt='PGA', IMT_min=0.01, IMT_max=2.0, rx_label=None, rtol=0.1, use_cython=False, **kwargs):
n_events = len(r)
try:
gmpes = [gsim(model)]
except:
msg = f"{model} was not found in the openquake gsim directory"
logger.error(msg)
raise Exception(msg)
if model == 'Lasocki2013': #this model requires the number of earthquake records
if imt=='PGA': #extract number of records for PGA
num_ground_motion_records = gmpes[0].COEFFS.non_sa_coeffs[PGA()]['N']
else: #extract number of records for SA()
freq = float(imt[imt.find('(')+1:imt.find(')')]) # get the desired frequency of SA
first_index = np.where(gmpes[0].COEFFS.get_coeffs('N')[0]==freq)[0][0]
num_ground_motion_records = gmpes[0].COEFFS.get_coeffs('N')[1][first_index][0]
else:
num_ground_motion_records = 0
#placeholder values that do not have any effect
Mag = 5.0 #placeholder mag, must be valid for that context; will be overwritten in loop
rupture_aratio = 1.5
Strike = 0
Dip = 90
Rake = 0
Hypocenter = Point(rx_lon, rx_lat, 0.0) #does not matter in our case; just set eq location to be same as receiver
#according to the magnitude and MSR calculate planar surface
planar_surface = PlanarSurface.from_hypocenter(
hypoc=Hypocenter,
msr=WC1994(),
mag=Mag,
aratio=rupture_aratio,
strike=Strike,
dip=Dip,
rake=Rake,
)
# site for which we compute (receiver location)
site_collection = SiteCollection([Site(location=Point(rx_lon, rx_lat, 0))])
imtls = {s: [0] for s in [imt]} #required for context maker, M = 2 IMTs
context_maker = ContextMaker('Induced', gmpes, {'imtls': imtls, 'mags': [Mag]}) #necessary contexts builder
src = CharacteristicFaultSource(source_id = 1,
name = 'rup',
tectonic_region_type = 'Induced',
mfd = ArbitraryMFD([Mag], [0.01]), #this does not have any effect
temporal_occurrence_model = PoissonTOM(50.), #this is also not really used
surface = planar_surface,
rake = Rake)
ctx = context_maker.from_srcs([src], site_collection)[0] #returns one context from the source for one rupture
if use_cython:
from cython_exceedance import exceedance_core
def exceedance_root_function(a):
return exceedance_core(a, r, fr, lambdas, D, percentages_D, magnitudes,
magnitude_pdf, magnitude_cdf, context_maker, ctx,
model, num_ground_motion_records) - p
else:
def exceedance_root_function(a):
exceedance_prob_sum = 0
log_a = np.log(a) # Precompute log(a)
for j in range(len(lambdas)): #loop through all lambdas
lambda_j = lambdas[j]
D_j_val = percentages_D[j] * D # Use a different name to avoid shadowing D
lambda_D_j = lambda_j * D_j_val
denom_j = (1 - np.exp(-lambda_D_j))
if denom_j == 0: # Avoid division by zero if lambda_D_j is very small or zero
continue
for i in range(n_events): #loop through all events
ri = r[i] # Epicentral distance
fr_i = fr[i] # Location probability f(r)
ctx.repi = ri
# Precompute terms only dependent on lambda_j, D_j, m
lambda_D_j_f_m = lambda_D_j * magnitude_pdf
exp_term_m = np.exp(-lambda_D_j * (1 - magnitude_cdf))
f_conditional_base_m = (lambda_D_j_f_m * exp_term_m) / denom_j
for k in range(len(magnitudes)): #loop through all values of magnitude pdf and cdf
m = magnitudes[k]
ctx.mag = m # update context magnitude
# Calculate f_conditional (simpler now)
f_conditional = f_conditional_base_m[k]
mean, sig, _, _ = context_maker.get_mean_stds(ctx)
log_gm_predicted = mean[0][0][0]
variance_term = sig[0][0][0]
residual = log_a - log_gm_predicted # Use precomputed log_a
if residual <= 0:
exceedance_probability = 1.0
else:
# Avoid division by zero or very small numbers if variance_term is ~0
if variance_term < 1e-15: # Adjust threshold as needed
exceedance_probability = 0.0
else:
t_value = residual / variance_term
if model == 'Lasocki2013':
exceedance_probability = t.sf(t_value, num_ground_motion_records - 3) # student t distribution, degrees of freedom: n-3; sf = 1 - cdf
else:
exceedance_probability = norm.sf(t_value) # equivalent to 1.0 - norm.cdf(t_value)
location_exceedance_prob = exceedance_probability * f_conditional * fr_i
exceedance_prob_sum += location_exceedance_prob
return exceedance_prob_sum - p
return None
# Check function values at different test points
IMT_mid = (IMT_max-IMT_min)/2
lower_bound_value = exceedance_root_function(IMT_min)
mid_point_value = exceedance_root_function(IMT_mid)
upper_bound_value = exceedance_root_function(IMT_max)
logger.info(f"Receiver: {str(rx_label)}")
logger.info(f"Function value at {imt} = {str(IMT_min)} : {lower_bound_value}")
logger.info(f"Function value at {imt} = {str(IMT_mid)} : {mid_point_value}")
logger.info(f"Function value at {imt} = {str(IMT_max)} : {upper_bound_value}")
if np.sign(lower_bound_value) == np.sign(upper_bound_value):
msg = "Function values at the interval endpoints must differ in sign for fsolve to work. Expand the interval or use a different model."
logger.error(msg)
gm_est = np.nan
return gm_est
# raise ValueError(msg)
# Find root of function
start = timer()
try:
method='brenth'
logger.debug("Now trying Scipy " + method)
output = root_scalar(exceedance_root_function, bracket=[IMT_min, IMT_max], rtol=rtol, method=method)
gm_est = output.root
except Exception as error:
logger.error(f"An exception occurred: {error}")
logger.info("Set ground motion value to nan")
gm_est = np.nan
end = timer()
logger.info(f"Ground motion estimation computation time: {round(end - start,1)} seconds")
logger.info(f"Estimated {imt}: {gm_est}")
return gm_est
2025-04-15 17:23:42 +02:00
def main(catalog_file, mc_file, pdf_file, m_file, m_select, mag_label, mc, m_max,
m_kde_method, xy_select, grid_dim, xy_win_method, rate_select, time_win_duration,
forecast_select, custom_rate, forecast_len, time_unit, model, products_string, verbose):
"""
Python application that reads an earthquake catalog and performs seismic hazard forecasting.
Arguments:
catalog_file: Path to input file of type 'catalog'
mc_file: Path to input file of type 'mccalc_output'
pdf_file: Path to input file of type 'PDF'
m_file: Path to input file of type 'm'
m_select: If True, perform an estimation of the magnitude distribution using KDE with the chosen KDE method
mag_label: The name of the column corresponding to magnitude in the catalog. Different catalogs can have
different magnitude labels and sometimes more than one magnitude column. If no value is provided,
then the program looks for a label of 'Mw' for magnitude in the catalog
2025-04-15 17:23:42 +02:00
mc: The magnitude of completeness (Mc) of the catalog
m_max:M_max. The magnitude distribution is estimated for the range from Mc to M_max. If no value is provided,
then the program sets M_max to be 1 magnitude units above the maximum magnitude value in the catalog.
2025-04-15 17:23:42 +02:00
m_kde_method: The kernel density estimator to use.
xy_select: If True, perform an estimation of the magnitude distribution using KDE with the chosen KDE method
grid_dim: The grid cell size (in metres) of the final ground motion product map. A smaller cell size will
allow for a higher resolution at the cost of computation time.
xy_win_method: If True, use an exponential time weighting function to give more weight to recent earthquakes
when performing the location distribution estimation.
rate_select: If True, estimate the activity rate of the catalog using the Rbeast package.
time_win_duration: The time window to use in estimating the activity rate. The units are specified in the
'Time units' input further down.
2025-04-15 17:23:42 +02:00
forecast_select: If True, forecast the seismic hazard for the time period of Forecast length and produce
a seismic hazard map of the selected Ground Motion Products
custom_rate: The user has the option of providing a single value activity rate of the catalog to be used in
forecasting.
forecast_len: Length of the forecast for seismic hazard assessment.
time_unit: Times units for the inputs Time Window Duration, Custom Activity Rate, and Forecast Length.
model: Select from the following ground motion models available. Other models in the Openquake library are
available but have not yet been tested.
products_string: The ground motion intensity types to output. Use a space between names to select more than
one. E.g. PGA SA(1.0) SA(10)
verbose: If True, will increase the amount of details in the log file.
...
Returns:
SVG file image overlays for display in Google Maps.
...
"""
import sys
from importlib.metadata import version
2025-04-15 17:23:42 +02:00
import logging
from base_logger import getDefaultLogger
from timeit import default_timer as timer
from math import ceil, floor, isnan
2025-04-15 17:23:42 +02:00
import numpy as np
import dask
2025-04-15 17:23:42 +02:00
from dask.diagnostics import ProgressBar # use Dask progress bar
import kalepy as kale
import utm
from skimage.transform import resize
import igfash
from igfash.io import read_mat_cat, read_mat_m, read_mat_mc, read_mat_pdf, read_csv
2025-04-15 17:23:42 +02:00
from igfash.window import win_CTL, win_CNE
import igfash.kde as kde
#from igfash.gm import compute_IMT_exceedance
2025-04-15 17:23:42 +02:00
from igfash.compute import get_cdf, hellinger_dist, cols_to_rows
from igfash.rate import lambda_probs, calc_bins, bootstrap_forecast_rolling
from igfash.mc import estimate_mc
import matplotlib.pyplot as plt
from matplotlib.ticker import MultipleLocator
from matplotlib.contour import ContourSet
2025-05-09 10:30:00 +02:00
import xml.etree.ElementTree as ET
import json
2025-04-15 17:23:42 +02:00
logger = getDefaultLogger('igfash')
if pdf_file is None: # magnitude PDF file provided
m_pdf = [None]
else:
m_pdf = read_mat_pdf(pdf_file)
if m_file is None: # magnitude range file provided
m_range = [None]
else:
m_range = read_mat_m(m_file)
m_max = m_range[-1] # take m_max from the m_file
2025-04-15 17:23:42 +02:00
if verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
# temporary hard-coded configuration
# exclude_low_fxy = False
exclude_low_fxy = True
thresh_fxy = 1e-3 # minimum fxy value (location PDF) needed to do PGA estimation (to skip low probability areas); also should scale according to number of grid points
# log user selections
logger.debug(f"User input files\n Catalog: {catalog_file}\n Mc: {mc_file}\n Mag_PDF: {pdf_file}\n Mag: {m_file}")
logger.debug(
f"User options\n m_select: {m_select}\n mag_label: {mag_label}\n mc: {mc}\n m_max:{m_max}\n m_kde_method: {m_kde_method}\n \
xy_select: {xy_select}\n grid_dim: {grid_dim}\n xy_win_method: {xy_win_method}\n rate_select: {rate_select}\n time_win_duration: {time_win_duration}\n \
forecast_select: {forecast_select}\n custom_rate: {custom_rate}\n forecast_len: {forecast_len}\n time_unit: {time_unit}\n model: {model}\n products: {products_string}\n \
verbose: {verbose}")
# print key package version numbers
logger.debug(f"Python version {sys.version}")
logger.debug(f"Numpy version {version('numpy')}")
logger.debug(f"Scipy version {version('scipy')}")
logger.debug(f"Obspy version {version('obspy')}")
logger.debug(f"Openquake version {version('openquake.engine')}")
logger.debug(f"Igfash version {version('igfash')}")
logger.debug(f"Rbeast version {version('rbeast')}")
logger.debug(f"Dask version {version('dask')}")
2025-04-15 17:23:42 +02:00
dask.config.set(scheduler='processes')
# run magnitude distribution modeling if selected by user and no magnitude pdf file provided
if m_select and m_range[0] == None and m_pdf[0] == None:
logger.info("Magnitude distribution modeling selected")
if m_kde_method == None:
logger.info("No KDE method specified, therefore use diffKDE by default")
m_kde_method = 'diffKDE'
if mag_label == None:
logger.info("No magnitude label of catalog specified, therefore try Mw by default")
mag_label = 'Mw'
# if cat_label == None:
# print("No magnitude label of catalog specified, therefore try 'Catalog' by default")
# cat_label='Catalog'
time, mag, lat, lon, depth = read_mat_cat(catalog_file, mag_label=mag_label, catalog_label='Catalog')
# check for null magnitude values
m_null_idx = np.where(np.isnan(mag))[0]
if len(m_null_idx) > 0:
msg = f"There are null values in the magnitude column of the catalog at indices {m_null_idx}"
logger.error(msg)
raise Exception(msg)
2025-04-15 17:23:42 +02:00
if mc != None:
logger.info("Mc value provided by user")
trim_to_mc = True
elif mc_file != None:
logger.info("Mc estimation output file provided; selecting largest Mc from the list")
mc = read_mat_mc(mc_file)
2025-04-15 17:23:42 +02:00
trim_to_mc = True
else:
logger.info("No Mc provided; using all magnitudes from the catalog")
trim_to_mc = False
mc = mag.min()
# remove events below mc
if trim_to_mc:
logger.info(f"Removing all magnitudes below {mc}")
indices = np.argwhere(mag < mc)
mag = np.delete(mag, indices)
time = np.delete(time, indices)
lat = np.delete(lat, indices)
lon = np.delete(lon, indices)
# if user does not provide a m_max, set m_max to 1 magnitude unit above max magnitude in catalog
2025-04-15 17:23:42 +02:00
if m_max == None:
m_max = mag.max() + 1.0
2025-07-07 14:49:35 +02:00
logger.info(f"No m_max was given. Therefore m_max is automatically set to: {m_max}")
2025-04-15 17:23:42 +02:00
start = timer()
t_windowed, r_windowed = win_CNE(time, [lon, lat, mag], win_size=len(mag), win_overlap=0, min_events=1)
m_windowed = [r[2] for r in r_windowed] # extract only magnitude windows
if m_kde_method[:5] == 'KDEpy':
pdf = kde.compute_kde(m_windowed, xmin=mc, xmax=m_max, bw_method=m_kde_method[6:], pp=False)
elif m_kde_method == 'adaptiveKDE':
pdf = kde.compute_adaptivekde(m_windowed, bw_method="adaptive-local", xmin=mc, xmax=m_max, pp=False)
elif m_kde_method == 'diffKDE':
pdf = kde.compute_diffkde(m_windowed, xmin=mc, xmax=m_max, pp=False)
elif m_kde_method[:5] == 'arviz':
pdf = kde.compute_arvizkde(m_windowed, xmin=mc, xmax=m_max, bw_method=m_kde_method[6:], pp=False)
end = timer()
logger.info(f"Magnitude KDE Computation time: {round(end - start, 1)} seconds")
m_pdf = 2 * pdf[-1] # select last window's pdf as the one to use for forecasting
m_range = np.linspace(mc, m_max, 256)
bin_width = 0.1
bins = np.arange(min(m_windowed[-1]), max(m_windowed[-1]) + bin_width, bin_width)
# plot only the last window
fig = plt.figure(dpi=300, figsize=(8, 6))
plt.hist(m_windowed[-1], bins=bins, color='blue', edgecolor='black', alpha=0.6, density=True,
label='Magnitude bins')
plt.plot(m_range, m_pdf, color='red', linewidth=2.5, label='KDE')
plt.legend()
# configure ticks
ax = plt.gca()
ax.xaxis.set_major_locator(MultipleLocator(0.2)) # Major ticks every 0.2
ax.xaxis.set_minor_locator(MultipleLocator(0.1)) # Minor ticks every 0.1
ax.tick_params(axis='x', which='major', labelsize=10) # Labels only for major ticks
ax.tick_params(axis='x', which='minor', length=5, labelsize=0) # No labels for minor ticks
plt.xticks(rotation=45) # Rotate ticks by 45 degrees
plt.ylabel("f(M)")
plt.xlabel("Magnitude")
plt.savefig('KDE_magnitude_PDF.png')
np.savetxt('KDE_magnitude_PDF.csv', np.c_[m_range, m_pdf])
# run location distribution modeling
if xy_select:
logger.info("Event location distribution modeling selected")
time, mag, lat, lon, depth = read_mat_cat(catalog_file)
# convert to UTM
u = utm.from_latlon(lat, lon)
x = u[0]
y = u[1]
utm_zone_number = u[2]
utm_zone_letter = u[3]
logger.debug(f"Latitude / Longitude coordinates correspond to UTM zone {utm_zone_number}{utm_zone_letter}")
# define corners of grid based on global dataset
x_min = x.min()
y_min = y.min()
x_max = x.max()
y_max = y.max()
grid_x_max = int(ceil(x_max / grid_dim) * grid_dim)
grid_x_min = int(floor(x_min / grid_dim) * grid_dim)
grid_y_max = int(ceil(y_max / grid_dim) * grid_dim)
grid_y_min = int(floor(y_min / grid_dim) * grid_dim)
grid_lat_max, grid_lon_max = utm.to_latlon(grid_x_max, grid_y_max, utm_zone_number, utm_zone_letter)
grid_lat_min, grid_lon_min = utm.to_latlon(grid_x_min, grid_y_min, utm_zone_number, utm_zone_letter)
2025-04-15 17:23:42 +02:00
# rectangular grid
nx = int((grid_x_max - grid_x_min) / grid_dim) + 1
ny = int((grid_y_max - grid_y_min) / grid_dim) + 1
# ensure a square grid is used
if nx > ny: # enlarge y dimension to match x
ny = nx
grid_y_max = int(grid_y_min + (ny - 1) * grid_dim)
else: # enlarge x dimension to match y
nx = ny
grid_x_max = int(grid_x_min + (nx - 1) * grid_dim)
# new x and y range
x_range = np.linspace(grid_x_min, grid_x_max, nx)
y_range = np.linspace(grid_y_min, grid_y_max, ny)
t_windowed = time
r_windowed = [[x, y]]
# %% compute KDE and extract PDF
start = timer()
if xy_win_method == "TW":
logger.info("Time weighting function selected")
x_weights = np.linspace(0, 15, len(t_windowed))
weighting_fcn = np.exp(x_weights) # exponential weighting
output_kale, output_kde = kde.compute_2d_kde([grid_x_min, grid_x_max, grid_y_min, grid_y_max], r_windowed,
n_kde=nx, weighting_fcn=weighting_fcn)
else:
output_kale, output_kde = kde.compute_2d_kde([grid_x_min, grid_x_max, grid_y_min, grid_y_max], r_windowed,
n_kde=nx)
end = timer()
logger.info(f"Location KDE Computation time: {round(end - start, 1)} seconds")
xy_kale = output_kale[0]
xy_kde = output_kde[0]
# plot location PDF
xy_kale_km = type(xy_kale)(xy_kale.dataset / 1000)
corner = kale.corner(xy_kale_km, quantiles=[0.025, 0.16, 0.50, 0.84, 0.975], cmap='hot')
# modify bottom left plot
ax00 = corner[0].axes[0][0]
ax00.set_ylabel("Probability Density")
# Remove the PDF ticks below zero
yticks = ax00.get_yticks()
new_yticks = yticks[yticks >= 0]
ax00.set_yticks(new_yticks)
# ax01 = corner[0].axes[0][1] # bottom right plot
# modify top left plot
ax10 = corner[0].axes[1][0]
ax10.set_xlabel("UTM X (km)")
ax10.set_ylabel("UTM Y (km)")
ax10.ticklabel_format(style='plain')
for coll in ax10.collections:
if isinstance(coll, ContourSet): # Check if it's a contour plot
ax10.clabel(coll, inline=True, fontsize='smaller', fmt="%.1f")
# modify top right plot
ax11 = corner[0].axes[1][1]
ax11.set_xlabel("Probability Density")
ax11.ticklabel_format(style='plain')
# Remove the PDF ticks below zero
xticks = ax11.get_xticks()
new_xticks = xticks[xticks >= 0]
ax11.set_xticks(new_xticks)
ax11.set_yticklabels([]) # This removes the ytick labels
# Rotate x-axis tick labels for all bottom-row plots
for ax in corner[0].axes[-1, :]: # Last row corresponds to the bottom x-axes
for label in ax.get_xticklabels():
label.set_rotation(46)
corner[0].fig.savefig('KDE_location_PDF', bbox_inches="tight", dpi=300)
np.savetxt('KDE_location_PDF.csv', np.array(output_kde[0][0]))
# run activity rate modeling
lambdas = [None]
if custom_rate != None and forecast_select:
logger.info(f"Using activity rate specified by user: {custom_rate} per {time_unit}")
lambdas = np.array([custom_rate], dtype='d')
lambdas_perc = np.array([1], dtype='d')
2025-04-15 17:23:42 +02:00
elif rate_select:
logger.info(f"Activity rate modeling selected")
time, mag_dummy, lat_dummy, lon_dummy, depth_dummy = read_mat_cat(catalog_file, output_datenum=True)
datenum_data = time # REMEMBER THE DECIMAL DENOTES DAYS
if time_unit == 'hours':
multiplicator = 24
elif time_unit == 'days':
multiplicator = 1
elif time_unit == 'weeks':
multiplicator = 1 / 7
elif time_unit == 'years':
multiplicator = 1 / 365
# Raise an exception when time_win_duration from the user is too large relative to the catalog
if time_win_duration/multiplicator > 0.5*(time[-1] - time[0]):
msg = "Activity rate estimation time window must be less than half the catalog length. Use a shorter time window."
logger.error(msg)
raise Exception(msg)
2025-04-15 17:23:42 +02:00
# Selects dates in datenum format and procceeds to forecast value
start_date = datenum_data[-1] - (2 * time_win_duration / multiplicator)
dates_calc = [date for date in datenum_data if start_date <= date <= datenum_data[-1]]
forecasts, bca_conf95, rate_mean = bootstrap_forecast_rolling(dates_calc, multiplicator)
# FINAL VALUES OF RATE AND ITS UNCERTAINTY IN THE 5-95 PERCENTILE
unc_bca05 = [ci.low for ci in bca_conf95];
unc_bca95 = [ci.high for ci in bca_conf95]
rate_unc_high = multiplicator / np.array(unc_bca05);
rate_unc_low = multiplicator / np.array(unc_bca95);
rate_forecast = multiplicator / np.median(forecasts) # [per time unit]
# Plot of forecasted activity rate with previous binned activity rate
act_rate, bin_counts, bin_edges, out, pprs, rt, idx, u_e = calc_bins(np.array(datenum_data), time_unit,
time_win_duration, dates_calc,
rate_forecast, rate_unc_high, rate_unc_low,
multiplicator, quiet=True, figsize=(14,9))
2025-04-15 17:23:42 +02:00
# Assign probabilities
lambdas, lambdas_perc = lambda_probs(act_rate, dates_calc, bin_edges)
lambdas = np.array(lambdas, dtype='d')
lambdas_perc = np.array(lambdas_perc, dtype='d')
2025-04-15 17:23:42 +02:00
# print("Forecasted activity rates: ", lambdas, "events per", time_unit[:-1])
logger.info(f"Forecasted activity rates: {lambdas} events per {time_unit} with percentages {lambdas_perc}")
np.savetxt('activity_rate.csv', np.vstack((lambdas, lambdas_perc)).T, header="lambda, percentage",
delimiter=',', fmt='%1.4f')
if forecast_select:
products = products_string.split()
logger.info(f"Ground motion forecasting selected with ground motion model {model} and IMT products {products_string}")
# validate m_max against the grond motion model
models_anthro_limited = ['Lasocki2013', 'Atkinson2015', 'ConvertitoEtAl2012Geysers'] # these models require that m_max<=4.5
if m_max > 4.5 and model in models_anthro_limited:
if m_file is None:
2025-07-09 15:23:42 +02:00
msg = f"The selected ground motion model {model} is only valid for magnitudes up to 4.5. Please select a lower maximum magnitude."
else:
2025-07-09 15:23:42 +02:00
msg = f"The selected ground motion model {model} is only valid for magnitudes up to 4.5, but the provided magnitude file includes values up to {m_max}. Please adjust the magnitude range in the file accordingly."
logger.error(msg)
raise Exception(msg)
2025-04-15 17:23:42 +02:00
if not xy_select:
msg = "Event location distribution modeling was not selected; cannot continue..."
logger.error(msg)
raise Exception(msg)
if m_pdf[0] == None:
2025-04-15 17:23:42 +02:00
msg = "Magnitude distribution modeling was not selected and magnitude PDF file was not provided; cannot continue..."
logger.error(msg)
raise Exception(msg)
if lambdas[0] == None:
2025-04-15 17:23:42 +02:00
msg = "Activity rate modeling was not selected and custom activity rate was not provided; cannot continue..."
logger.error(msg)
raise Exception(msg)
Mag = 5.0 # placeholder mag, must be valid for that context; will be overwritten during function call
rupture_aratio = 1.5
Strike = 0
Dip = 90
Rake = 0
p = 0.05 # Probability of exceedance
m_cdf = get_cdf(m_pdf)
fxy = xy_kde[0]
logger.debug(f"Normalization check; sum of all f(x,y) values = {np.sum(fxy)}")
xx, yy = np.meshgrid(x_range, y_range, indexing='ij') # grid points
# set every grid point to be a receiver
x_rx = xx.flatten()
y_rx = yy.flatten()
# compute distance matrix for each receiver
distances = np.zeros(shape=(nx * ny, nx, ny))
rx_lat = np.zeros(nx * ny)
rx_lon = np.zeros(nx * ny)
for i in range(nx * ny):
# Compute the squared distances directly using NumPy's vectorized operations
squared_distances = (xx - x_rx[i]) ** 2 + (yy - y_rx[i]) ** 2
distances[i] = np.sqrt(squared_distances)
# create context object for receiver and append to list
rx_lat[i], rx_lon[i] = utm.to_latlon(x_rx[i], y_rx[i], utm_zone_number,
utm_zone_letter) # get receiver location as lat,lon
2025-07-01 16:09:24 +02:00
# convert distances from m to km because openquake ground motion models take input distances in kilometres
distances = distances/1000.0
# compute ground motion only at grid points that have minimum probability density of thresh_fxy
2025-04-15 17:23:42 +02:00
if exclude_low_fxy:
indices = list(np.where(fxy.flatten() > thresh_fxy)[0])
else:
indices = range(0, len(distances))
fr = fxy.flatten()
# For each receiver compute estimated ground motion values
logger.info(f"Estimating ground motion intensity at {len(indices)} grid points...")
PGA = np.zeros(shape=(nx * ny))
start = timer()
use_pp = True
if use_pp: # use dask parallel computing
pbar = ProgressBar()
pbar.register()
# iter = range(0,len(distances))
iter = indices
iml_grid_raw = [] # raw ground motion grids
for imt in products:
logger.info(f"Estimating {imt}")
imls = [dask.delayed(compute_IMT_exceedance)(rx_lat[i], rx_lon[i], distances[i].flatten(), fr, p, lambdas, forecast_len,
lambdas_perc, m_range, m_pdf, m_cdf, model, imt=imt, IMT_min=0.0,
IMT_max=2.0, rx_label=i, rtol=0.1, use_cython=True) for i in iter]
#imls = [dask.delayed(my_trivial_task)(i) for i in iter]
iml = dask.compute(*imls)
iml_grid_raw.append(list(iml))
else:
iml_grid_raw = []
iter = indices
for imt in products:
iml = []
for i in iter:
iml_i = compute_IMT_exceedance(rx_lat[i], rx_lon[i], distances[i].flatten(), fr, p, lambdas, forecast_len,
lambdas_perc, m_range, m_pdf, m_cdf, model, imt=imt, IMT_min = 0.0,
IMT_max = 2.0, rx_label = i, rtol = 0.1, use_cython=True)
iml.append(iml_i)
logger.info(f"Estimated {imt} at rx {i} is {iml_i}")
iml_grid_raw.append(iml)
2025-04-15 17:23:42 +02:00
end = timer()
logger.info(f"Ground motion exceedance computation time: {round(end - start, 1)} seconds")
logger.info("Test3 run finished")
sys.exit()
if np.isnan(iml_grid_raw).all():
msg = "No valid ground motion intensity measures were forecasted. Try a different ground motion model."
logger.error(msg)
raise Exception(msg)
2025-04-15 17:23:42 +02:00
# create list of one empty list for each imt
iml_grid = [[] for _ in range(len(products))] # final ground motion grids
iml_grid_prep = iml_grid.copy() # temp ground motion grids
if exclude_low_fxy:
for i in range(0, len(distances)):
if i in indices:
for j in range(0, len(products)):
iml_grid_prep[j].append(iml_grid_raw[j].pop(0))
else:
list(map(lambda lst: lst.append(np.nan),
iml_grid_prep)) # use np.nan to indicate grid point excluded
else:
iml_grid_prep = iml_grid_raw
for j in range(0, len(products)):
vmin = min(x for x in iml_grid_prep[j] if x is not np.nan)
vmax = max(x for x in iml_grid_prep[j] if x is not np.nan)
iml_grid[j] = np.reshape(iml_grid_prep[j], (nx, ny)).astype(
dtype=np.float64) # this reduces values to 8 decimal places
iml_grid_tmp = np.nan_to_num(iml_grid[j]) # change nans to zeroes
# upscale the grid
up_factor = 4
iml_grid_hd = resize(iml_grid_tmp, (up_factor * len(iml_grid_tmp), up_factor * len(iml_grid_tmp)),
mode='reflect', anti_aliasing=False)
iml_grid_hd[iml_grid_hd == 0.0] = np.nan # change zeroes back to nan
# trim edges so the grid is not so blocky
vmin_hd = min(x for x in iml_grid_hd.flatten() if not isnan(x))
vmax_hd = max(x for x in iml_grid_hd.flatten() if not isnan(x))
trim_thresh = vmin
iml_grid_hd[iml_grid_hd < trim_thresh] = np.nan
2025-04-15 17:23:42 +02:00
# generate image overlay
north, south = lat.max(), lat.min() # Latitude range
east, west = lon.max(), lon.min() # Longitude range
bounds = [[south, west], [north, east]]
map_center = [np.mean([north, south]), np.mean([east, west])]
# Create an image from the grid
cmap_name = 'YlOrRd'
cmap = plt.get_cmap(cmap_name)
2025-04-15 17:23:42 +02:00
fig, ax = plt.subplots(figsize=(6, 6))
ax.imshow(iml_grid_hd, origin='lower', cmap=cmap, vmin=vmin, vmax=vmax)
2025-04-15 17:23:42 +02:00
ax.axis('off')
# Save the figure
fig.canvas.draw()
2025-05-09 10:30:00 +02:00
overlay_filename = f"overlay_{j}.svg"
plt.savefig(overlay_filename, bbox_inches="tight", pad_inches=0, transparent=True)
2025-04-15 17:23:42 +02:00
plt.close(fig)
2025-05-09 10:30:00 +02:00
# Embed geographic bounding box into the SVG
map_bounds = dict(zip(("south", "west", "north", "east"),
map(float, (grid_lat_min, grid_lon_min, grid_lat_max, grid_lon_max))))
2025-05-09 10:30:00 +02:00
tree = ET.parse(overlay_filename)
tree.getroot().set("data-map-bounds", json.dumps(map_bounds))
tree.write(overlay_filename, encoding="utf-8", xml_declaration=True)
logger.info(f"Saved geographic bounds to SVG metadata (data-map-bounds): {overlay_filename}{map_bounds}")
2025-04-15 17:23:42 +02:00
# Make the color bar
width = 50
height = 500
gradient = np.linspace(0, 1, height)
gradient = np.vstack((gradient, gradient)).T
gradient = np.tile(gradient, (1, width))
colorbar_title = products[j]
if "PGA" in colorbar_title or "SA" in colorbar_title:
colorbar_title = colorbar_title + " (g)"
2025-04-15 17:23:42 +02:00
fig, ax = plt.subplots(figsize=((width + 40) / 100.0, (height + 20) / 100.0),
dpi=100) # Increase fig size for labels
ax.imshow(gradient, aspect='auto', cmap=cmap.reversed(),
extent=[0, 1, vmin, vmax_hd]) # Note: extent order is different for vertical
2025-04-15 17:23:42 +02:00
ax.set_xticks([]) # Remove x-ticks for vertical colorbar
num_ticks = 11 # Show more ticks
tick_positions = np.linspace(vmin, vmax_hd, num_ticks)
2025-04-15 17:23:42 +02:00
ax.set_yticks(tick_positions)
ax.set_yticklabels([f"{tick:.2f}" for tick in tick_positions]) # format tick labels
ax.set_title(colorbar_title, loc='right', pad=15)
2025-04-15 17:23:42 +02:00
fig.subplots_adjust(left=0.25, right=0.75, bottom=0.05, top=0.95) # Adjust Layout
fig.savefig("colorbar_" + str(j) + ".svg", bbox_inches='tight')
2025-04-15 17:23:42 +02:00
plt.close(fig)