# -*- coding: utf-8 -*- import numpy as np from scipy.stats import t, norm from scipy.optimize import root_scalar from timeit import default_timer as timer import logging logger = logging.getLogger(__name__) from openquake.hazardlib.geo import Point #This class represents a geographical point in terms of longitude, latitude, and depth (with respect to the Earth surface). from openquake.hazardlib.geo.surface.planar import PlanarSurface from openquake.hazardlib.source.characteristic import CharacteristicFaultSource from openquake.hazardlib.mfd import ArbitraryMFD from openquake.hazardlib.tom import PoissonTOM from openquake.hazardlib.scalerel import WC1994 #Wells and Coppersmith magnitude – rupture area relationships from openquake.hazardlib.site import Site, SiteCollection from openquake.hazardlib.contexts import ContextMaker from openquake.hazardlib.valid import gsim from openquake.hazardlib.imt import PGA import sys def my_trivial_task(x): """ A function that does a trivial task """ return x * x def compute_IMT_exceedance(rx_lat, rx_lon, r, fr, p, lambdas, D, percentages_D, magnitudes, magnitude_pdf, magnitude_cdf, model, imt='PGA', IMT_min=0.01, IMT_max=2.0, rx_label=None, rtol=0.1, use_cython=False, **kwargs): n_events = len(r) try: gmpes = [gsim(model)] except: msg = f"{model} was not found in the openquake gsim directory" logger.error(msg) raise Exception(msg) if model == 'Lasocki2013': #this model requires the number of earthquake records if imt=='PGA': #extract number of records for PGA num_ground_motion_records = gmpes[0].COEFFS.non_sa_coeffs[PGA()]['N'] else: #extract number of records for SA() freq = float(imt[imt.find('(')+1:imt.find(')')]) # get the desired frequency of SA first_index = np.where(gmpes[0].COEFFS.get_coeffs('N')[0]==freq)[0][0] num_ground_motion_records = gmpes[0].COEFFS.get_coeffs('N')[1][first_index][0] else: num_ground_motion_records = 0 #placeholder values that do not have any effect Mag = 5.0 #placeholder mag, must be valid for that context; will be overwritten in loop rupture_aratio = 1.5 Strike = 0 Dip = 90 Rake = 0 Hypocenter = Point(rx_lon, rx_lat, 0.0) #does not matter in our case; just set eq location to be same as receiver #according to the magnitude and MSR calculate planar surface planar_surface = PlanarSurface.from_hypocenter( hypoc=Hypocenter, msr=WC1994(), mag=Mag, aratio=rupture_aratio, strike=Strike, dip=Dip, rake=Rake, ) # site for which we compute (receiver location) site_collection = SiteCollection([Site(location=Point(rx_lon, rx_lat, 0))]) imtls = {s: [0] for s in [imt]} #required for context maker, M = 2 IMTs context_maker = ContextMaker('Induced', gmpes, {'imtls': imtls, 'mags': [Mag]}) #necessary contexts builder src = CharacteristicFaultSource(source_id = 1, name = 'rup', tectonic_region_type = 'Induced', mfd = ArbitraryMFD([Mag], [0.01]), #this does not have any effect temporal_occurrence_model = PoissonTOM(50.), #this is also not really used surface = planar_surface, rake = Rake) ctx = context_maker.from_srcs([src], site_collection)[0] #returns one context from the source for one rupture if use_cython: from cython_exceedance import exceedance_core def exceedance_root_function(a): return exceedance_core(a, r, fr, lambdas, D, percentages_D, magnitudes, magnitude_pdf, magnitude_cdf, context_maker, ctx, model, num_ground_motion_records) - p else: def exceedance_root_function(a): exceedance_prob_sum = 0 log_a = np.log(a) # Precompute log(a) for j in range(len(lambdas)): #loop through all lambdas lambda_j = lambdas[j] D_j_val = percentages_D[j] * D # Use a different name to avoid shadowing D lambda_D_j = lambda_j * D_j_val denom_j = (1 - np.exp(-lambda_D_j)) if denom_j == 0: # Avoid division by zero if lambda_D_j is very small or zero continue for i in range(n_events): #loop through all events ri = r[i] # Epicentral distance fr_i = fr[i] # Location probability f(r) ctx.repi = ri # Precompute terms only dependent on lambda_j, D_j, m lambda_D_j_f_m = lambda_D_j * magnitude_pdf exp_term_m = np.exp(-lambda_D_j * (1 - magnitude_cdf)) f_conditional_base_m = (lambda_D_j_f_m * exp_term_m) / denom_j for k in range(len(magnitudes)): #loop through all values of magnitude pdf and cdf m = magnitudes[k] ctx.mag = m # update context magnitude # Calculate f_conditional (simpler now) f_conditional = f_conditional_base_m[k] mean, sig, _, _ = context_maker.get_mean_stds(ctx) log_gm_predicted = mean[0][0][0] variance_term = sig[0][0][0] residual = log_a - log_gm_predicted # Use precomputed log_a if residual <= 0: exceedance_probability = 1.0 else: # Avoid division by zero or very small numbers if variance_term is ~0 if variance_term < 1e-15: # Adjust threshold as needed exceedance_probability = 0.0 else: t_value = residual / variance_term if model == 'Lasocki2013': exceedance_probability = t.sf(t_value, num_ground_motion_records - 3) # student t distribution, degrees of freedom: n-3; sf = 1 - cdf else: exceedance_probability = norm.sf(t_value) # equivalent to 1.0 - norm.cdf(t_value) location_exceedance_prob = exceedance_probability * f_conditional * fr_i exceedance_prob_sum += location_exceedance_prob return exceedance_prob_sum - p # Check function values at different test points IMT_mid = (IMT_max-IMT_min)/2 lower_bound_value = exceedance_root_function(IMT_min) mid_point_value = exceedance_root_function(IMT_mid) upper_bound_value = exceedance_root_function(IMT_max) #logger.info(f"Receiver: {str(rx_label)}") #logger.info(f"Function value at {imt} = {str(IMT_min)} : {lower_bound_value}") #logger.info(f"Function value at {imt} = {str(IMT_mid)} : {mid_point_value}") #logger.info(f"Function value at {imt} = {str(IMT_max)} : {upper_bound_value}") if np.sign(lower_bound_value) == np.sign(upper_bound_value): msg = "Function values at the interval endpoints must differ in sign for fsolve to work. Expand the interval or use a different model." #logger.error(msg) gm_est = np.nan return gm_est # raise ValueError(msg) # Find root of function start = timer() try: method='brenth' #logger.debug("Now trying Scipy " + method) output = root_scalar(exceedance_root_function, bracket=[IMT_min, IMT_max], rtol=rtol, method=method) gm_est = output.root except Exception as error: #logger.error(f"An exception occurred: {error}") #logger.info("Set ground motion value to nan") gm_est = np.nan end = timer() #logger.info(f"Ground motion estimation computation time: {round(end - start,1)} seconds") #logger.info(f"Estimated {imt}: {gm_est}") return gm_est def main(catalog_file, mc_file, pdf_file, m_file, m_select, mag_label, mc, m_max, m_kde_method, xy_select, grid_dim, xy_win_method, rate_select, time_win_duration, forecast_select, custom_rate, forecast_len, time_unit, model, products_string, verbose): """ Python application that reads an earthquake catalog and performs seismic hazard forecasting. Arguments: catalog_file: Path to input file of type 'catalog' mc_file: Path to input file of type 'mccalc_output' pdf_file: Path to input file of type 'PDF' m_file: Path to input file of type 'm' m_select: If True, perform an estimation of the magnitude distribution using KDE with the chosen KDE method mag_label: The name of the column corresponding to magnitude in the catalog. Different catalogs can have different magnitude labels and sometimes more than one magnitude column. If no value is provided, then the program looks for a label of 'Mw' for magnitude in the catalog mc: The magnitude of completeness (Mc) of the catalog m_max:M_max. The magnitude distribution is estimated for the range from Mc to M_max. If no value is provided, then the program sets M_max to be 1 magnitude units above the maximum magnitude value in the catalog. m_kde_method: The kernel density estimator to use. xy_select: If True, perform an estimation of the magnitude distribution using KDE with the chosen KDE method grid_dim: The grid cell size (in metres) of the final ground motion product map. A smaller cell size will allow for a higher resolution at the cost of computation time. xy_win_method: If True, use an exponential time weighting function to give more weight to recent earthquakes when performing the location distribution estimation. rate_select: If True, estimate the activity rate of the catalog using the Rbeast package. time_win_duration: The time window to use in estimating the activity rate. The units are specified in the 'Time units' input further down. forecast_select: If True, forecast the seismic hazard for the time period of Forecast length and produce a seismic hazard map of the selected Ground Motion Products custom_rate: The user has the option of providing a single value activity rate of the catalog to be used in forecasting. forecast_len: Length of the forecast for seismic hazard assessment. time_unit: Times units for the inputs Time Window Duration, Custom Activity Rate, and Forecast Length. model: Select from the following ground motion models available. Other models in the Openquake library are available but have not yet been tested. products_string: The ground motion intensity types to output. Use a space between names to select more than one. E.g. “PGA SA(1.0) SA(10)” verbose: If True, will increase the amount of details in the log file. ... Returns: SVG file image overlays for display in Google Maps. ... """ import sys from importlib.metadata import version import logging from base_logger import getDefaultLogger from timeit import default_timer as timer from math import ceil, floor, isnan import numpy as np import dask from dask.diagnostics import ProgressBar # use Dask progress bar import kalepy as kale import utm from skimage.transform import resize import igfash from igfash.io import read_mat_cat, read_mat_m, read_mat_mc, read_mat_pdf, read_csv from igfash.window import win_CTL, win_CNE import igfash.kde as kde #from igfash.gm import compute_IMT_exceedance from igfash.compute import get_cdf, hellinger_dist, cols_to_rows from igfash.rate import lambda_probs, calc_bins, bootstrap_forecast_rolling from igfash.mc import estimate_mc import matplotlib.pyplot as plt from matplotlib.ticker import MultipleLocator from matplotlib.contour import ContourSet import xml.etree.ElementTree as ET import json logger = getDefaultLogger('igfash') if pdf_file is None: # magnitude PDF file provided m_pdf = [None] else: m_pdf = read_mat_pdf(pdf_file) if m_file is None: # magnitude range file provided m_range = [None] else: m_range = read_mat_m(m_file) m_max = m_range[-1] # take m_max from the m_file if verbose: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) # temporary hard-coded configuration # exclude_low_fxy = False exclude_low_fxy = True thresh_fxy = 1e-3 # minimum fxy value (location PDF) needed to do PGA estimation (to skip low probability areas); also should scale according to number of grid points # log user selections logger.debug(f"User input files\n Catalog: {catalog_file}\n Mc: {mc_file}\n Mag_PDF: {pdf_file}\n Mag: {m_file}") logger.debug( f"User options\n m_select: {m_select}\n mag_label: {mag_label}\n mc: {mc}\n m_max:{m_max}\n m_kde_method: {m_kde_method}\n \ xy_select: {xy_select}\n grid_dim: {grid_dim}\n xy_win_method: {xy_win_method}\n rate_select: {rate_select}\n time_win_duration: {time_win_duration}\n \ forecast_select: {forecast_select}\n custom_rate: {custom_rate}\n forecast_len: {forecast_len}\n time_unit: {time_unit}\n model: {model}\n products: {products_string}\n \ verbose: {verbose}") # print key package version numbers logger.debug(f"Python version {sys.version}") logger.debug(f"Numpy version {version('numpy')}") logger.debug(f"Scipy version {version('scipy')}") logger.debug(f"Obspy version {version('obspy')}") logger.debug(f"Openquake version {version('openquake.engine')}") logger.debug(f"Igfash version {version('igfash')}") logger.debug(f"Rbeast version {version('rbeast')}") logger.debug(f"Dask version {version('dask')}") logger.debug(f"Logging version {logging.__version__}") dask.config.set(scheduler='processes') # run magnitude distribution modeling if selected by user and no magnitude pdf file provided if m_select and m_range[0] == None and m_pdf[0] == None: logger.info("Magnitude distribution modeling selected") if m_kde_method == None: logger.info("No KDE method specified, therefore use diffKDE by default") m_kde_method = 'diffKDE' if mag_label == None: logger.info("No magnitude label of catalog specified, therefore try Mw by default") mag_label = 'Mw' # if cat_label == None: # print("No magnitude label of catalog specified, therefore try 'Catalog' by default") # cat_label='Catalog' time, mag, lat, lon, depth = read_mat_cat(catalog_file, mag_label=mag_label, catalog_label='Catalog') # check for null magnitude values m_null_idx = np.where(np.isnan(mag))[0] if len(m_null_idx) > 0: msg = f"There are null values in the magnitude column of the catalog at indices {m_null_idx}" logger.error(msg) raise Exception(msg) if mc != None: logger.info("Mc value provided by user") trim_to_mc = True elif mc_file != None: logger.info("Mc estimation output file provided; selecting largest Mc from the list") mc = read_mat_mc(mc_file) trim_to_mc = True else: logger.info("No Mc provided; using all magnitudes from the catalog") trim_to_mc = False mc = mag.min() # remove events below mc if trim_to_mc: logger.info(f"Removing all magnitudes below {mc}") indices = np.argwhere(mag < mc) mag = np.delete(mag, indices) time = np.delete(time, indices) lat = np.delete(lat, indices) lon = np.delete(lon, indices) # if user does not provide a m_max, set m_max to 1 magnitude unit above max magnitude in catalog if m_max == None: m_max = mag.max() + 1.0 logger.info(f"No m_max was given. Therefore m_max is automatically set to: {m_max}") start = timer() t_windowed, r_windowed = win_CNE(time, [lon, lat, mag], win_size=len(mag), win_overlap=0, min_events=1) m_windowed = [r[2] for r in r_windowed] # extract only magnitude windows if m_kde_method[:5] == 'KDEpy': pdf = kde.compute_kde(m_windowed, xmin=mc, xmax=m_max, bw_method=m_kde_method[6:], pp=False) elif m_kde_method == 'adaptiveKDE': pdf = kde.compute_adaptivekde(m_windowed, bw_method="adaptive-local", xmin=mc, xmax=m_max, pp=False) elif m_kde_method == 'diffKDE': pdf = kde.compute_diffkde(m_windowed, xmin=mc, xmax=m_max, pp=False) elif m_kde_method[:5] == 'arviz': pdf = kde.compute_arvizkde(m_windowed, xmin=mc, xmax=m_max, bw_method=m_kde_method[6:], pp=False) end = timer() logger.info(f"Magnitude KDE Computation time: {round(end - start, 1)} seconds") m_pdf = 2 * pdf[-1] # select last window's pdf as the one to use for forecasting m_range = np.linspace(mc, m_max, 256) bin_width = 0.1 bins = np.arange(min(m_windowed[-1]), max(m_windowed[-1]) + bin_width, bin_width) # plot only the last window fig = plt.figure(dpi=300, figsize=(8, 6)) plt.hist(m_windowed[-1], bins=bins, color='blue', edgecolor='black', alpha=0.6, density=True, label='Magnitude bins') plt.plot(m_range, m_pdf, color='red', linewidth=2.5, label='KDE') plt.legend() # configure ticks ax = plt.gca() ax.xaxis.set_major_locator(MultipleLocator(0.2)) # Major ticks every 0.2 ax.xaxis.set_minor_locator(MultipleLocator(0.1)) # Minor ticks every 0.1 ax.tick_params(axis='x', which='major', labelsize=10) # Labels only for major ticks ax.tick_params(axis='x', which='minor', length=5, labelsize=0) # No labels for minor ticks plt.xticks(rotation=45) # Rotate ticks by 45 degrees plt.ylabel("f(M)") plt.xlabel("Magnitude") plt.savefig('KDE_magnitude_PDF.png') np.savetxt('KDE_magnitude_PDF.csv', np.c_[m_range, m_pdf]) # run location distribution modeling if xy_select: logger.info("Event location distribution modeling selected") time, mag, lat, lon, depth = read_mat_cat(catalog_file) # convert to UTM u = utm.from_latlon(lat, lon) x = u[0] y = u[1] utm_zone_number = u[2] utm_zone_letter = u[3] logger.debug(f"Latitude / Longitude coordinates correspond to UTM zone {utm_zone_number}{utm_zone_letter}") # define corners of grid based on global dataset x_min = x.min() y_min = y.min() x_max = x.max() y_max = y.max() grid_x_max = int(ceil(x_max / grid_dim) * grid_dim) grid_x_min = int(floor(x_min / grid_dim) * grid_dim) grid_y_max = int(ceil(y_max / grid_dim) * grid_dim) grid_y_min = int(floor(y_min / grid_dim) * grid_dim) grid_lat_max, grid_lon_max = utm.to_latlon(grid_x_max, grid_y_max, utm_zone_number, utm_zone_letter) grid_lat_min, grid_lon_min = utm.to_latlon(grid_x_min, grid_y_min, utm_zone_number, utm_zone_letter) # rectangular grid nx = int((grid_x_max - grid_x_min) / grid_dim) + 1 ny = int((grid_y_max - grid_y_min) / grid_dim) + 1 # ensure a square grid is used if nx > ny: # enlarge y dimension to match x ny = nx grid_y_max = int(grid_y_min + (ny - 1) * grid_dim) else: # enlarge x dimension to match y nx = ny grid_x_max = int(grid_x_min + (nx - 1) * grid_dim) # new x and y range x_range = np.linspace(grid_x_min, grid_x_max, nx) y_range = np.linspace(grid_y_min, grid_y_max, ny) t_windowed = time r_windowed = [[x, y]] # %% compute KDE and extract PDF start = timer() if xy_win_method == "TW": logger.info("Time weighting function selected") x_weights = np.linspace(0, 15, len(t_windowed)) weighting_fcn = np.exp(x_weights) # exponential weighting output_kale, output_kde = kde.compute_2d_kde([grid_x_min, grid_x_max, grid_y_min, grid_y_max], r_windowed, n_kde=nx, weighting_fcn=weighting_fcn) else: output_kale, output_kde = kde.compute_2d_kde([grid_x_min, grid_x_max, grid_y_min, grid_y_max], r_windowed, n_kde=nx) end = timer() logger.info(f"Location KDE Computation time: {round(end - start, 1)} seconds") xy_kale = output_kale[0] xy_kde = output_kde[0] # plot location PDF xy_kale_km = type(xy_kale)(xy_kale.dataset / 1000) corner = kale.corner(xy_kale_km, quantiles=[0.025, 0.16, 0.50, 0.84, 0.975], cmap='hot') # modify bottom left plot ax00 = corner[0].axes[0][0] ax00.set_ylabel("Probability Density") # Remove the PDF ticks below zero yticks = ax00.get_yticks() new_yticks = yticks[yticks >= 0] ax00.set_yticks(new_yticks) # ax01 = corner[0].axes[0][1] # bottom right plot # modify top left plot ax10 = corner[0].axes[1][0] ax10.set_xlabel("UTM X (km)") ax10.set_ylabel("UTM Y (km)") ax10.ticklabel_format(style='plain') for coll in ax10.collections: if isinstance(coll, ContourSet): # Check if it's a contour plot ax10.clabel(coll, inline=True, fontsize='smaller', fmt="%.1f") # modify top right plot ax11 = corner[0].axes[1][1] ax11.set_xlabel("Probability Density") ax11.ticklabel_format(style='plain') # Remove the PDF ticks below zero xticks = ax11.get_xticks() new_xticks = xticks[xticks >= 0] ax11.set_xticks(new_xticks) ax11.set_yticklabels([]) # This removes the ytick labels # Rotate x-axis tick labels for all bottom-row plots for ax in corner[0].axes[-1, :]: # Last row corresponds to the bottom x-axes for label in ax.get_xticklabels(): label.set_rotation(46) corner[0].fig.savefig('KDE_location_PDF', bbox_inches="tight", dpi=300) np.savetxt('KDE_location_PDF.csv', np.array(output_kde[0][0])) # run activity rate modeling lambdas = [None] if custom_rate != None and forecast_select: logger.info(f"Using activity rate specified by user: {custom_rate} per {time_unit}") lambdas = np.array([custom_rate], dtype='d') lambdas_perc = np.array([1], dtype='d') elif rate_select: logger.info(f"Activity rate modeling selected") time, mag_dummy, lat_dummy, lon_dummy, depth_dummy = read_mat_cat(catalog_file, output_datenum=True) datenum_data = time # REMEMBER THE DECIMAL DENOTES DAYS if time_unit == 'hours': multiplicator = 24 elif time_unit == 'days': multiplicator = 1 elif time_unit == 'weeks': multiplicator = 1 / 7 elif time_unit == 'years': multiplicator = 1 / 365 # Raise an exception when time_win_duration from the user is too large relative to the catalog if time_win_duration/multiplicator > 0.5*(time[-1] - time[0]): msg = "Activity rate estimation time window must be less than half the catalog length. Use a shorter time window." logger.error(msg) raise Exception(msg) # Selects dates in datenum format and procceeds to forecast value start_date = datenum_data[-1] - (2 * time_win_duration / multiplicator) dates_calc = [date for date in datenum_data if start_date <= date <= datenum_data[-1]] forecasts, bca_conf95, rate_mean = bootstrap_forecast_rolling(dates_calc, multiplicator) # FINAL VALUES OF RATE AND ITS UNCERTAINTY IN THE 5-95 PERCENTILE unc_bca05 = [ci.low for ci in bca_conf95]; unc_bca95 = [ci.high for ci in bca_conf95] rate_unc_high = multiplicator / np.array(unc_bca05); rate_unc_low = multiplicator / np.array(unc_bca95); rate_forecast = multiplicator / np.median(forecasts) # [per time unit] # Plot of forecasted activity rate with previous binned activity rate act_rate, bin_counts, bin_edges, out, pprs, rt, idx, u_e = calc_bins(np.array(datenum_data), time_unit, time_win_duration, dates_calc, rate_forecast, rate_unc_high, rate_unc_low, multiplicator, quiet=True, figsize=(14,9)) # Assign probabilities lambdas, lambdas_perc = lambda_probs(act_rate, dates_calc, bin_edges) lambdas = np.array(lambdas, dtype='d') lambdas_perc = np.array(lambdas_perc, dtype='d') # print("Forecasted activity rates: ", lambdas, "events per", time_unit[:-1]) logger.info(f"Forecasted activity rates: {lambdas} events per {time_unit} with percentages {lambdas_perc}") np.savetxt('activity_rate.csv', np.vstack((lambdas, lambdas_perc)).T, header="lambda, percentage", delimiter=',', fmt='%1.4f') if forecast_select: products = products_string.split() logger.info(f"Ground motion forecasting selected with ground motion model {model} and IMT products {products_string}") # validate m_max against the grond motion model models_anthro_limited = ['Lasocki2013', 'Atkinson2015', 'ConvertitoEtAl2012Geysers'] # these models require that m_max<=4.5 if m_max > 4.5 and model in models_anthro_limited: if m_file is None: msg = f"The selected ground motion model {model} is only valid for magnitudes up to 4.5. Please select a lower maximum magnitude." else: msg = f"The selected ground motion model {model} is only valid for magnitudes up to 4.5, but the provided magnitude file includes values up to {m_max}. Please adjust the magnitude range in the file accordingly." logger.error(msg) raise Exception(msg) if not xy_select: msg = "Event location distribution modeling was not selected; cannot continue..." logger.error(msg) raise Exception(msg) if m_pdf[0] == None: msg = "Magnitude distribution modeling was not selected and magnitude PDF file was not provided; cannot continue..." logger.error(msg) raise Exception(msg) if lambdas[0] == None: msg = "Activity rate modeling was not selected and custom activity rate was not provided; cannot continue..." logger.error(msg) raise Exception(msg) Mag = 5.0 # placeholder mag, must be valid for that context; will be overwritten during function call rupture_aratio = 1.5 Strike = 0 Dip = 90 Rake = 0 p = 0.05 # Probability of exceedance m_cdf = get_cdf(m_pdf) fxy = xy_kde[0] logger.debug(f"Normalization check; sum of all f(x,y) values = {np.sum(fxy)}") xx, yy = np.meshgrid(x_range, y_range, indexing='ij') # grid points # set every grid point to be a receiver x_rx = xx.flatten() y_rx = yy.flatten() # compute distance matrix for each receiver distances = np.zeros(shape=(nx * ny, nx, ny)) rx_lat = np.zeros(nx * ny) rx_lon = np.zeros(nx * ny) for i in range(nx * ny): # Compute the squared distances directly using NumPy's vectorized operations squared_distances = (xx - x_rx[i]) ** 2 + (yy - y_rx[i]) ** 2 distances[i] = np.sqrt(squared_distances) # create context object for receiver and append to list rx_lat[i], rx_lon[i] = utm.to_latlon(x_rx[i], y_rx[i], utm_zone_number, utm_zone_letter) # get receiver location as lat,lon # convert distances from m to km because openquake ground motion models take input distances in kilometres distances = distances/1000.0 # compute ground motion only at grid points that have minimum probability density of thresh_fxy if exclude_low_fxy: indices = list(np.where(fxy.flatten() > thresh_fxy)[0]) else: indices = range(0, len(distances)) fr = fxy.flatten() # For each receiver compute estimated ground motion values logger.info(f"Estimating ground motion intensity at {len(indices)} grid points...") PGA = np.zeros(shape=(nx * ny)) start = timer() use_pp = True if use_pp: # use dask parallel computing pbar = ProgressBar() pbar.register() # iter = range(0,len(distances)) iter = indices iml_grid_raw = [] # raw ground motion grids for imt in products: logger.info(f"Estimating {imt}") imls = [dask.delayed(compute_IMT_exceedance)(rx_lat[i], rx_lon[i], distances[i].flatten(), fr, p, lambdas, forecast_len, lambdas_perc, m_range, m_pdf, m_cdf, model, imt=imt, IMT_min=0.0, IMT_max=2.0, rx_label=i, rtol=0.1, use_cython=True) for i in iter[:2]] #imls = [dask.delayed(my_trivial_task)(i) for i in iter] iml = dask.compute(*imls) iml_grid_raw.append(list(iml)) else: iml_grid_raw = [] iter = indices for imt in products: iml = [] for i in iter: iml_i = compute_IMT_exceedance(rx_lat[i], rx_lon[i], distances[i].flatten(), fr, p, lambdas, forecast_len, lambdas_perc, m_range, m_pdf, m_cdf, model, imt=imt, IMT_min = 0.0, IMT_max = 2.0, rx_label = i, rtol = 0.1, use_cython=True) iml.append(iml_i) logger.info(f"Estimated {imt} at rx {i} is {iml_i}") iml_grid_raw.append(iml) end = timer() logger.info(f"Ground motion exceedance computation time: {round(end - start, 1)} seconds") logger.info("Test3 run finished") sys.exit() if np.isnan(iml_grid_raw).all(): msg = "No valid ground motion intensity measures were forecasted. Try a different ground motion model." logger.error(msg) raise Exception(msg) # create list of one empty list for each imt iml_grid = [[] for _ in range(len(products))] # final ground motion grids iml_grid_prep = iml_grid.copy() # temp ground motion grids if exclude_low_fxy: for i in range(0, len(distances)): if i in indices: for j in range(0, len(products)): iml_grid_prep[j].append(iml_grid_raw[j].pop(0)) else: list(map(lambda lst: lst.append(np.nan), iml_grid_prep)) # use np.nan to indicate grid point excluded else: iml_grid_prep = iml_grid_raw for j in range(0, len(products)): vmin = min(x for x in iml_grid_prep[j] if x is not np.nan) vmax = max(x for x in iml_grid_prep[j] if x is not np.nan) iml_grid[j] = np.reshape(iml_grid_prep[j], (nx, ny)).astype( dtype=np.float64) # this reduces values to 8 decimal places iml_grid_tmp = np.nan_to_num(iml_grid[j]) # change nans to zeroes # upscale the grid up_factor = 4 iml_grid_hd = resize(iml_grid_tmp, (up_factor * len(iml_grid_tmp), up_factor * len(iml_grid_tmp)), mode='reflect', anti_aliasing=False) iml_grid_hd[iml_grid_hd == 0.0] = np.nan # change zeroes back to nan # trim edges so the grid is not so blocky vmin_hd = min(x for x in iml_grid_hd.flatten() if not isnan(x)) vmax_hd = max(x for x in iml_grid_hd.flatten() if not isnan(x)) trim_thresh = vmin iml_grid_hd[iml_grid_hd < trim_thresh] = np.nan # generate image overlay north, south = lat.max(), lat.min() # Latitude range east, west = lon.max(), lon.min() # Longitude range bounds = [[south, west], [north, east]] map_center = [np.mean([north, south]), np.mean([east, west])] # Create an image from the grid cmap_name = 'YlOrRd' cmap = plt.get_cmap(cmap_name) fig, ax = plt.subplots(figsize=(6, 6)) ax.imshow(iml_grid_hd, origin='lower', cmap=cmap, vmin=vmin, vmax=vmax) ax.axis('off') # Save the figure fig.canvas.draw() overlay_filename = f"overlay_{j}.svg" plt.savefig(overlay_filename, bbox_inches="tight", pad_inches=0, transparent=True) plt.close(fig) # Embed geographic bounding box into the SVG map_bounds = dict(zip(("south", "west", "north", "east"), map(float, (grid_lat_min, grid_lon_min, grid_lat_max, grid_lon_max)))) tree = ET.parse(overlay_filename) tree.getroot().set("data-map-bounds", json.dumps(map_bounds)) tree.write(overlay_filename, encoding="utf-8", xml_declaration=True) logger.info(f"Saved geographic bounds to SVG metadata (data-map-bounds): {overlay_filename} → {map_bounds}") # Make the color bar width = 50 height = 500 gradient = np.linspace(0, 1, height) gradient = np.vstack((gradient, gradient)).T gradient = np.tile(gradient, (1, width)) colorbar_title = products[j] if "PGA" in colorbar_title or "SA" in colorbar_title: colorbar_title = colorbar_title + " (g)" fig, ax = plt.subplots(figsize=((width + 40) / 100.0, (height + 20) / 100.0), dpi=100) # Increase fig size for labels ax.imshow(gradient, aspect='auto', cmap=cmap.reversed(), extent=[0, 1, vmin, vmax_hd]) # Note: extent order is different for vertical ax.set_xticks([]) # Remove x-ticks for vertical colorbar num_ticks = 11 # Show more ticks tick_positions = np.linspace(vmin, vmax_hd, num_ticks) ax.set_yticks(tick_positions) ax.set_yticklabels([f"{tick:.2f}" for tick in tick_positions]) # format tick labels ax.set_title(colorbar_title, loc='right', pad=15) fig.subplots_adjust(left=0.25, right=0.75, bottom=0.05, top=0.95) # Adjust Layout fig.savefig("colorbar_" + str(j) + ".svg", bbox_inches='tight') plt.close(fig)