# -*- coding: utf-8 -*- from eqdist.rate import datenum_to_datetime import Rbeast as rb; from scipy.stats import bootstrap from matplotlib.dates import DateFormatter, AutoDateLocator from matplotlib.ticker import MultipleLocator import matplotlib.pyplot as plt import numpy as np global ncp_choice, tcp_max, torder_min, torder_max ncp_choice = 'default' tcp_max = 5 torder_min = 0 torder_max = 1 #AOI_lat = np.array([51.48, 51.54]) #AOI_lon = np.array([16.15, 16.24]) #AOI_lat = np.array([None, None]) #AOI_lon = np.array([None, None]) def plot_results(act_rate, bin_edges, bin_edges_dt, rt, boundaries, bin_dur, unit, multiplicator, rate_forecast, rate_unc_high, rate_unc_low, datenum_data, mag_data): end_date = bin_edges[-1] fig, ax = plt.subplots(figsize=(14, 6)) ax.plot(bin_edges_dt[1:], act_rate, '-o', linewidth=2.5, markersize=6.5, label='Activity rate') if rate_forecast is not None: next_date = end_date + (bin_dur / multiplicator) ax.plot(datenum_to_datetime(next_date), rate_forecast, 'ro', label='Forecasted Rate', markersize=6.5) ax.plot([bin_edges_dt[-1], datenum_to_datetime(next_date)], [act_rate[-1], rate_forecast], 'r-', linewidth=2.5) ax.vlines(datenum_to_datetime(next_date), rate_unc_low, rate_unc_high, colors='r', linewidth=2, label='Bootstrap uncertainty') ax.xaxis.set_major_locator(AutoDateLocator()) ax.xaxis.set_major_formatter(DateFormatter('%d-%b-%Y')) plt.xticks(rotation=45) plt.title(f'Activity rate (Time Unit: {unit}, Bin Duration: {bin_dur} {unit})',fontsize=18) # plt.title(f'Activity rate (Bin Duration: {bin_dur} {unit})',fontsize=18) plt.xlabel('Time (Bin Center Date)', fontsize=16) ax.set_ylabel('Activity rate per selected time period',fontsize=16) plt.grid(True) if len(rt) > 0: for i in range(len(rt)): ax.plot(bin_edges_dt[1:][boundaries[i]:boundaries[i+1]], [rt[i]] * (boundaries[i+1] - boundaries[i]), linewidth=2, label=f'Rate period {i+1}') # ---- Magnitude scatter on right y-axis ---- ax2 = ax.twinx() event_dates = [datenum_to_datetime(d) for d in datenum_data] #-------------extract magnitude bins from data--------------------- mags = np.array(mag_data) min_mag = mags.min() max_mag = mags.max() low_thresh = int(np.floor(min_mag)) high_thresh = int(np.floor(max_mag)) thresholds = list(range(low_thresh, high_thresh + 1)) base_size = 15 size_step = 35 bins_def = [] for idx, t in enumerate(thresholds): low = t if idx < len(thresholds) - 1: high = thresholds[idx + 1] label = f'{low:.1f} \u2264 M < {high:.1f}' else: high = np.inf label = f'M \u2265 {low:.1f}' size = base_size + idx * size_step bins_def.append((low, high, size, label)) for low, high, size, label in bins_def: mask = (mags >= low) & (mags < high) if np.any(mask): sel_dates = [d for d, m in zip(event_dates, mask) if m] sel_mags = mags[mask] ax2.scatter(sel_dates, sel_mags, s=size, facecolor='purple', edgecolor='black', alpha=0.15, linewidth=1, label=label) ax2.set_ylabel('Magnitude', color='purple',fontsize=16) ax2.yaxis.set_major_locator(MultipleLocator(0.5)) ax2.yaxis.set_minor_locator(MultipleLocator(0.1)) ax2.spines['right'].set_color('purple') ax2.tick_params(axis='y', colors='purple') h1, l1 = ax.get_legend_handles_labels() h2, l2 = ax2.get_legend_handles_labels() handles = h1 + h2 labels = l1 + l2 n_legend = len(handles) ncols = max(1, int(np.ceil(n_legend / 5))) # ~5 entries per column #-------add 20% headroom above the data to make space for legend------ ymin, ymax = ax.get_ylim() ax.set_ylim(ymin, ymax * 1.20) ax.legend(handles, labels, loc='best', ncol=ncols, borderaxespad=0,framealpha=0.7) ax.set_zorder(ax2.get_zorder() + 1) # put scatter plot behind the line plot ax.patch.set_visible(False) fig.tight_layout() plt.savefig("activity_rate.png", dpi=600) plt.show() def bootstrap_forecast(data): window_data=data if len(window_data) >= 5: res95 = bootstrap((window_data,), np.mean, confidence_level=0.95, method='BCa', n_resamples=1000) else: res95 = bootstrap((window_data,), np.mean, confidence_level=0.95, method='BCa', n_resamples=int(len(window_data) ** len(window_data))) forecast = np.mean(res95.bootstrap_distribution) bca_conf95 = res95.confidence_interval return forecast, bca_conf95 def calc_rates(act_rate, cps): """ Calculates mean activity rates between changepoints. cps : sorted array of changepoint indices into act_rate Returns rt (list of rates) and segment boundaries """ boundaries = [0] + list(cps.astype(int)) + [len(act_rate)] rt = [np.mean(act_rate[boundaries[i]:boundaries[i+1]]) for i in range(len(boundaries)-1)] return rt, boundaries def apply_beast(act_rate): """ Applies BEAST to the smmothed rate data using different smoothing windows. Input act_rate : The activity rate data array to smooth and apply BEAST. Output out : A list of BEAST results for each smoothed rate array. prob : A list of probabilities and change points extracted from BEAST results. """ mirror_len = int(np.ceil(0.20 * len(act_rate))) left_mirror = act_rate[:mirror_len][::-1] right_mirror = act_rate[-mirror_len:][::-1] act_rate_mirrored = np.concatenate([left_mirror, act_rate, right_mirror]) mcmc_th = int(np.clip(np.ceil(len(act_rate) / 100), 2, 15)) beast_result = rb.beast(act_rate_mirrored, period=0, tcp_minmax=[0, tcp_max], torder_minmax=[torder_min, torder_max], tseg_minlength=2, mcmc_chains=10, mcmc_thin=mcmc_th, mcmc_seed=10) # User-driven ncp selection if ncp_choice == 'median': ncp = beast_result.trend.ncp_median if np.isnan(ncp) or ncp == 0: return beast_result, np.array([]) elif ncp_choice == 'mode': ncp = beast_result.trend.ncp_mode if np.isnan(ncp) or ncp == 0: return beast_result, np.array([]) elif ncp_choice == 'pct90': ncp = beast_result.trend.ncp_pct90 if np.isnan(ncp) or ncp == 0: return beast_result, np.array([]) else: # default: median with mode and pct90 fallback ncp = beast_result.trend.ncp_median if np.isnan(ncp) or ncp == 0: ncp = beast_result.trend.mode if np.isnan(ncp) or ncp == 0: ncp = beast_result.trend.ncp_pct90 if np.isnan(ncp) or ncp == 0: return beast_result, np.array([]) ncp = int(ncp) cps = beast_result.trend.cp[:ncp] # Discard mirrored zone changepoints and correct indices valid_mask = (cps > mirror_len) & (cps <= mirror_len + len(act_rate)) cps = cps[valid_mask] - mirror_len return beast_result, np.sort(cps) def bins_and_beast(dates, unit, bin_dur, multiplicator): start_date = dates.min() end_date = dates.max() valid_units = ['hours', 'days'] if unit not in valid_units: unit = 'days' bin_dur = 15 if (end_date - start_date) < 15 and unit == 'days': unit = 'hours' bin_dur = 12 bin_edges = [end_date] while bin_edges[-1] > start_date: bin_edges.append(bin_edges[-1] - (bin_dur / multiplicator)) bin_edges = bin_edges[::-1] #-------Drop first bin or keep it if >80% of set duration------ first_width_days = bin_edges[1] - start_date first_width_units = first_width_days * multiplicator if first_width_units >= 0.8 * bin_dur: bin_edges[0] = start_date # edge of first bin is at data start else: bin_edges = bin_edges[1:] # drop bin 0 (and its events) #------------Error if remaining bins are fewer than 2------------ if len(bin_edges) < 2: raise ValueError( f"Not enough data to form at least one full bin of duration " f"{bin_dur} {unit}(s) after dropping the partial first bin " f"({first_width_units:.2f} {unit}(s), below the 80% threshold). " f"Try a shorter bin_dur or check your input data range." ) bin_edges_dt = [datenum_to_datetime(d) for d in bin_edges] bin_counts, _ = np.histogram(dates, bins=bin_edges) act_rate = [count / ((bin_edges[i + 1] - bin_edges[i]) * multiplicator / bin_dur) for i, count in enumerate(bin_counts)] out, cps = apply_beast(act_rate) if len(cps) > 0: rt, boundaries = calc_rates(act_rate, cps) print(f'Changepoints detected at bins: {cps}') else: rt = [] boundaries = [] print('-----------------------------------------------------') print('No changepoints detected by BEAST (Zhao et al., 2019)') print('-----------------------------------------------------') return act_rate, bin_counts, bin_edges, bin_edges_dt, out, cps, rt, boundaries, bin_dur, unit def main(catalog_file, mc_file, pdf_file, m_file, m_select, mag_label, mc, m_max, m_kde_method, xy_select, grid_dim, xy_win_method, rate_select, time_win_duration, # forecast_select, custom_rate, forecast_len, time_unit, model, products_string, verbose): forecast_select, custom_rate, forecast_len, time_unit, AOI_extent, model, products_string, verbose): """ Python application that reads an earthquake catalog and performs seismic hazard forecasting. Arguments: catalog_file: Path to input file of type 'catalog' mc_file: Path to input file of type 'mccalc_output' pdf_file: Path to input file of type 'PDF' m_file: Path to input file of type 'm' m_select: If True, perform an estimation of the magnitude distribution using KDE with the chosen KDE method mag_label: The name of the column corresponding to magnitude in the catalog. Different catalogs can have different magnitude labels and sometimes more than one magnitude column. If no value is provided, then the program looks for a label of 'Mw' for magnitude in the catalog mc: The magnitude of completeness (Mc) of the catalog m_max:M_max. The magnitude distribution is estimated for the range from Mc to M_max. If no value is provided, then the program sets M_max to be 1 magnitude units above the maximum magnitude value in the catalog. m_kde_method: The kernel density estimator to use. xy_select: If True, perform an estimation of the magnitude distribution using KDE with the chosen KDE method grid_dim: The grid cell size (in metres) of the final ground motion product map. A smaller cell size will allow for a higher resolution at the cost of computation time. xy_win_method: If True, use an exponential time weighting function to give more weight to recent earthquakes when performing the location distribution estimation. rate_select: If True, estimate the activity rate of the catalog using the Rbeast package. time_win_duration: The time window to use in estimating the activity rate. The units are specified in the 'Time units' input further down. forecast_select: If True, forecast the seismic hazard for the time period of Forecast length and produce a seismic hazard map of the selected Ground Motion Products custom_rate: The user has the option of providing a single value activity rate of the catalog to be used in forecasting. forecast_len: Length of the forecast for seismic hazard assessment. time_unit: Times units for the inputs Time Window Duration, Custom Activity Rate, and Forecast Length. AOI_extent: The forecast geographical area of interest specified as a latitude and longitude range in decimal degrees in the form [lat_min, lat_max, lon_min, lon_max]. model: Select from the following ground motion models available. Other models in the Openquake library are available but have not yet been tested. products_string: The ground motion intensity types to output. Use a space between names to select more than one. E.g. “PGA SA(1.0) SA(10)” verbose: If True, will increase the amount of details in the log file. ... Returns: SVG file image overlays for display in Google Maps. ... """ import sys from importlib.metadata import version import logging from base_logger import getDefaultLogger from timeit import default_timer as timer from math import ceil, floor, isnan import numpy as np import dask import kalepy as kale import utm from skimage.transform import resize import igfash from igfash.io import read_mat_cat, read_mat_m, read_mat_mc, read_mat_pdf, read_csv from igfash.window import win_CTL, win_CNE import igfash.kde as kde from igfash.gm import compute_IMT_exceedance from igfash.compute import get_cdf, hellinger_dist, cols_to_rows from igfash.rate import lambda_probs, calc_bins, bootstrap_forecast_rolling from igfash.mc import estimate_mc import matplotlib.pyplot as plt from matplotlib.ticker import MultipleLocator from matplotlib.contour import ContourSet import xml.etree.ElementTree as ET import json import multiprocessing as mp logger = getDefaultLogger('igfash') if pdf_file is None: # magnitude PDF file provided m_pdf = [None] else: m_pdf = read_mat_pdf(pdf_file) if m_file is None: # magnitude range file provided m_range = [None] else: m_range = read_mat_m(m_file) m_max = m_range[-1] # take m_max from the m_file if verbose: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) exclude_low_fxy = False # skip low probability areas of the map thresh_fxy = 1e-3 # minimum fxy value (location PDF) needed to do PGA estimation (to skip low probability areas); also should scale according to number of grid points AOI_lat = np.array(AOI_extent[:2]) AOI_lon = np.array(AOI_extent[2:]) # log user selections logger.debug(f"User input files\n Catalog: {catalog_file}\n Mc: {mc_file}\n Mag_PDF: {pdf_file}\n Mag: {m_file}") logger.debug( f"User options\n m_select: {m_select}\n mag_label: {mag_label}\n mc: {mc}\n m_max:{m_max}\n m_kde_method: {m_kde_method}\n \ xy_select: {xy_select}\n grid_dim: {grid_dim}\n xy_win_method: {xy_win_method}\n rate_select: {rate_select}\n time_win_duration: {time_win_duration}\n \ forecast_select: {forecast_select}\n custom_rate: {custom_rate}\n forecast_len: {forecast_len}\n time_unit: {time_unit}\n model: {model}\n products: {products_string}\n \ verbose: {verbose}") # print key package version numbers logger.debug(f"Python version {sys.version}") logger.debug(f"Numpy version {version('numpy')}") logger.debug(f"Scipy version {version('scipy')}") logger.debug(f"Obspy version {version('obspy')}") logger.debug(f"Openquake version {version('openquake.engine')}") logger.debug(f"Igfash version {version('igfash')}") logger.debug(f"Rbeast version {version('rbeast')}") logger.debug(f"Dask version {version('dask')}") dask.config.set(scheduler='processes') # run magnitude distribution modeling if selected by user and no magnitude pdf file provided if m_select and m_range[0] == None and m_pdf[0] == None: logger.info("Magnitude distribution modeling selected") if m_kde_method == None: logger.info("No KDE method specified, therefore use diffKDE by default") m_kde_method = 'diffKDE' if mag_label == None: logger.info("No magnitude label of catalog specified, therefore try Mw by default") mag_label = 'Mw' time, mag, lat, lon, depth = read_mat_cat(catalog_file, mag_label=mag_label, catalog_label='Catalog') # check for null magnitude values m_null_idx = np.where(np.isnan(mag))[0] if len(m_null_idx) > 0: msg = f"There are null values in the magnitude column of the catalog at indices {m_null_idx}" logger.error(msg) raise Exception(msg) if mc != None: logger.info("Mc value provided by user") trim_to_mc = True elif mc_file != None: logger.info("Mc estimation output file provided; selecting largest Mc from the list") mc = read_mat_mc(mc_file) trim_to_mc = True else: logger.info("No Mc provided; using all magnitudes from the catalog") trim_to_mc = False mc = mag.min() # remove events below mc if trim_to_mc: logger.info(f"Removing all magnitudes below {mc}") indices = np.argwhere(mag < mc) mag = np.delete(mag, indices) time = np.delete(time, indices) lat = np.delete(lat, indices) lon = np.delete(lon, indices) # if user does not provide a m_max, set m_max to 1 magnitude unit above max magnitude in catalog if m_max == None: m_max = mag.max() + 1.0 logger.info(f"No m_max was given. Therefore m_max is automatically set to: {m_max}") start = timer() t_windowed, r_windowed = win_CNE(time, [lon, lat, mag], win_size=len(mag), win_overlap=0, min_events=1) m_windowed = [r[2] for r in r_windowed] # extract only magnitude windows if m_kde_method[:5] == 'KDEpy': pdf = kde.compute_kde(m_windowed, xmin=mc, xmax=m_max, bw_method=m_kde_method[6:], pp=False) elif m_kde_method == 'adaptiveKDE': pdf = kde.compute_adaptivekde(m_windowed, bw_method="adaptive-local", xmin=mc, xmax=m_max, pp=False) elif m_kde_method == 'diffKDE': pdf = kde.compute_diffkde(m_windowed, xmin=mc, xmax=m_max, pp=False) elif m_kde_method[:5] == 'arviz': pdf = kde.compute_arvizkde(m_windowed, xmin=mc, xmax=m_max, bw_method=m_kde_method[6:], pp=False) end = timer() logger.info(f"Magnitude KDE Computation time: {round(end - start, 1)} seconds") m_pdf = 2 * pdf[-1] # select last window's pdf as the one to use for forecasting m_range = np.linspace(mc, m_max, 256) bin_width = 0.1 bins = np.arange(min(m_windowed[-1]), max(m_windowed[-1]) + bin_width, bin_width) # plot only the last window fig = plt.figure(dpi=300, figsize=(8, 6)) plt.hist(m_windowed[-1], bins=bins, color='blue', edgecolor='black', alpha=0.6, density=True, label='Magnitude bins') plt.plot(m_range, m_pdf, color='red', linewidth=2.5, label='KDE') plt.legend() # configure ticks ax = plt.gca() ax.xaxis.set_major_locator(MultipleLocator(0.2)) # Major ticks every 0.2 ax.xaxis.set_minor_locator(MultipleLocator(0.1)) # Minor ticks every 0.1 ax.tick_params(axis='x', which='major', labelsize=10) # Labels only for major ticks ax.tick_params(axis='x', which='minor', length=5, labelsize=0) # No labels for minor ticks plt.xticks(rotation=45) # Rotate ticks by 45 degrees plt.ylabel("f(M)") plt.xlabel("Magnitude") plt.savefig('KDE_magnitude_PDF.png') np.savetxt('KDE_magnitude_PDF.csv', np.c_[m_range, m_pdf]) # run location distribution modeling if xy_select: logger.info("Event location distribution modeling selected") time, mag, lat, lon, depth = read_mat_cat(catalog_file) # convert to UTM u = utm.from_latlon(lat, lon) x = u[0] y = u[1] utm_zone_number = u[2] utm_zone_letter = u[3] logger.debug(f"Latitude / Longitude coordinates correspond to UTM zone {utm_zone_number}{utm_zone_letter}") if (None not in AOI_lat) and (None not in AOI_lon): use_AOI = True logger.info(f"Area of Interest (AOI) selected with latitutde range {AOI_lat} and longitude range {AOI_lon}") #convert AOI to UTM u_AOI = utm.from_latlon(AOI_lat, AOI_lon) x_AOI = u_AOI[0] y_AOI = u_AOI[1] # make sure grid contains the user's AOI x_min = np.concatenate((x, x_AOI)).min() y_min = np.concatenate((y, y_AOI)).min() x_max = np.concatenate((x, x_AOI)).max() y_max = np.concatenate((y, y_AOI)).max() exclude_low_fxy = False # don't exclude any points because we need to analyze all grid points in the AOI else: use_AOI = False # define corners of grid based on global dataset x_min = x.min() y_min = y.min() x_max = x.max() y_max = y.max() grid_x_max = int(ceil(x_max / grid_dim) * grid_dim) grid_x_min = int(floor(x_min / grid_dim) * grid_dim) grid_y_max = int(ceil(y_max / grid_dim) * grid_dim) grid_y_min = int(floor(y_min / grid_dim) * grid_dim) # rectangular grid nx = int((grid_x_max - grid_x_min) / grid_dim) + 1 ny = int((grid_y_max - grid_y_min) / grid_dim) + 1 # ensure a square grid is used if nx > ny: # enlarge y dimension to match x ny = nx grid_y_max = int(grid_y_min + (ny - 1) * grid_dim) else: # enlarge x dimension to match y nx = ny grid_x_max = int(grid_x_min + (nx - 1) * grid_dim) # update grid extent in lat/lon grid_lat_max, grid_lon_max = utm.to_latlon(grid_x_max, grid_y_max, utm_zone_number, utm_zone_letter) grid_lat_min, grid_lon_min = utm.to_latlon(grid_x_min, grid_y_min, utm_zone_number, utm_zone_letter) # new x and y range x_range = np.linspace(grid_x_min, grid_x_max, nx) y_range = np.linspace(grid_y_min, grid_y_max, ny) logger.debug(f"Grid X range: {x_range}, Y range: {y_range}") t_windowed = time r_windowed = [[x, y]] # %% compute KDE and extract PDF start = timer() if xy_win_method: logger.info("Time weighting function selected") x_weights = np.linspace(0, 15, len(t_windowed)) weighting_fcn = np.exp(x_weights) # exponential weighting output_kale, output_kde = kde.compute_2d_kde([grid_x_min, grid_x_max, grid_y_min, grid_y_max], r_windowed, n_kde=nx, weighting_fcn=weighting_fcn) else: output_kale, output_kde = kde.compute_2d_kde([grid_x_min, grid_x_max, grid_y_min, grid_y_max], r_windowed, n_kde=nx) end = timer() logger.info(f"Location KDE Computation time: {round(end - start, 1)} seconds") xy_kale = output_kale[0] xy_kde = output_kde[0] # plot location PDF xy_kale_km = type(xy_kale)(xy_kale.dataset / 1000) corner = kale.corner(xy_kale_km, quantiles=[0.025, 0.16, 0.50, 0.84, 0.975], cmap='hot') # modify bottom left plot ax00 = corner[0].axes[0][0] ax00.set_ylabel("Probability Density") # Remove the PDF ticks below zero yticks = ax00.get_yticks() new_yticks = yticks[yticks >= 0] ax00.set_yticks(new_yticks) # ax01 = corner[0].axes[0][1] # bottom right plot # modify top left plot ax10 = corner[0].axes[1][0] ax10.set_xlabel("UTM X (km)") ax10.set_ylabel("UTM Y (km)") ax10.ticklabel_format(style='plain') for coll in ax10.collections: if isinstance(coll, ContourSet): # Check if it's a contour plot ax10.clabel(coll, inline=True, fontsize='smaller', fmt="%.1f") # modify top right plot ax11 = corner[0].axes[1][1] ax11.set_xlabel("Probability Density") ax11.ticklabel_format(style='plain') # Remove the PDF ticks below zero xticks = ax11.get_xticks() new_xticks = xticks[xticks >= 0] ax11.set_xticks(new_xticks) ax11.set_yticklabels([]) # This removes the ytick labels # Rotate x-axis tick labels for all bottom-row plots for ax in corner[0].axes[-1, :]: # Last row corresponds to the bottom x-axes for label in ax.get_xticklabels(): label.set_rotation(46) corner[0].fig.savefig('KDE_location_PDF', bbox_inches="tight", dpi=300) np.savetxt('KDE_location_PDF.csv', np.array(output_kde[0][0])) # run activity rate modeling lambdas = [None] if custom_rate != None and forecast_select and not rate_select: logger.info(f"Using activity rate specified by user: {custom_rate} per {time_unit}") lambdas = np.array([custom_rate], dtype='d') lambdas_perc = np.array([1], dtype='d') elif rate_select: logger.info(f"Activity rate modeling selected") datenum_data, mag_data, lat_dummy, lon_dummy, depth_dummy = read_mat_cat(catalog_file, mag_label=mag_label, output_datenum=True) if trim_to_mc: indices = np.argwhere(mag_data < mc) mag_data = np.delete(mag_data, indices) datenum_data = np.delete(datenum_data, indices) if time_unit == 'hours': multiplicator = 24 elif time_unit == 'days': multiplicator = 1 elif time_unit == 'weeks': multiplicator = 1 / 7 elif time_unit == 'years': multiplicator = 1 / 365 # Raise an exception when time_win_duration from the user is too large relative to the catalog if time_win_duration/multiplicator > 0.5*(time[-1] - time[0]): msg = "Activity rate estimation time window must be less than half the catalog length. Use a shorter time window." logger.error(msg) raise Exception(msg) #-----------data are sorted in case they were not----------------- sorted_pairs = sorted(zip(datenum_data, mag_data), key=lambda x: x[0]) datenum_data, mag_data = map(list, zip(*sorted_pairs)) #-------split the data into bins and apply BEAST for changepoint detection-------------------- act_rate, bin_counts, bin_edges, bin_edges_dt, out, cps, rt, boundaries, bin_dur, time_unit = bins_and_beast( np.array(datenum_data), time_unit, time_win_duration, multiplicator) #------Forecasted rate is taken from BEAST or is equal to last value if no changepoints detected----- if len(cps) > 0: rate_forecast = rt[-1] last_cp_bin = int(cps[-1]) else: rate_forecast = act_rate[-1] last_cp_bin = len(act_rate) - 1 last_cp_datenum = bin_edges[last_cp_bin] dates_calc = [date for date in datenum_data if last_cp_datenum <= date <= datenum_data[-1]] interevent_times = np.diff(dates_calc) #------------Use BCa for uncertainty intervals----------------- forecast, bca_conf95 = bootstrap_forecast(interevent_times) rate_unc_high = bin_dur / (bca_conf95.low * multiplicator) rate_unc_low = bin_dur / (bca_conf95.high * multiplicator) #----------------------Plot------------------------------------ plot_results(act_rate, bin_edges, bin_edges_dt, rt, boundaries, bin_dur, time_unit, multiplicator, rate_forecast, rate_unc_high, rate_unc_low, datenum_data, mag_data) logger.info("\n----------------- Forecast Summary -----------------") logger.info(f"Forecasted activity rate (next {bin_dur} {time_unit}(s)): {rate_forecast:.4f}") logger.info(f"95% BCa confidence interval: [{rate_unc_low:.4f}, {rate_unc_high:.4f}]") logger.info("------------------------------------------------------") lambdas = np.array([rate_forecast/bin_dur], dtype='d') lambdas_perc = np.array([1], dtype='d') np.savetxt('activity_rate.csv', lambdas, header=f"Activity Rate (Events per {time_unit[:-1]})", delimiter=',', fmt='%1.4f') if forecast_select: products = products_string.split() logger.info(f"Ground motion forecasting selected with ground motion model {model} and IMT products {products_string}") # validate m_max against the grond motion model models_anthro_limited = ['Lasocki2013', 'Atkinson2015', 'ConvertitoEtAl2012Geysers'] # these models require that m_max<=4.5 if m_max > 4.5 and model in models_anthro_limited: if m_file is None: msg = f"The selected ground motion model {model} is only valid for magnitudes up to 4.5. Please select a lower maximum magnitude." else: msg = f"The selected ground motion model {model} is only valid for magnitudes up to 4.5, but the provided magnitude file includes values up to {m_max}. Please adjust the magnitude range in the file accordingly." logger.error(msg) raise Exception(msg) if not xy_select: msg = "Event location distribution modeling was not selected; cannot continue..." logger.error(msg) raise Exception(msg) if m_pdf[0] == None: msg = "Magnitude distribution modeling was not selected and magnitude PDF file was not provided; cannot continue..." logger.error(msg) raise Exception(msg) if lambdas == None: msg = "Activity rate modeling was not selected and custom activity rate was not provided; cannot continue..." logger.error(msg) raise Exception(msg) Mag = 5.0 # placeholder mag, must be valid for that context; will be overwritten during function call rupture_aratio = 1.5 Strike = 0 Dip = 90 Rake = 0 p = 0.05 # Probability of exceedance m_cdf = get_cdf(m_pdf) fxy = xy_kde[0] logger.debug(f"Normalization check; sum of all f(x,y) values = {np.sum(fxy)}") xx, yy = np.meshgrid(x_range, y_range) # grid points # set every grid point to be a receiver grid_shape = xx.shape x_rx = xx.flatten() y_rx = yy.flatten() num_points = x_rx.size distances = np.zeros(shape=(num_points, grid_shape[0], grid_shape[1])) # compute distance matrix for each receiver #distances = np.zeros(shape=(nx * ny, nx, ny)) rx_lat = np.zeros(nx * ny) rx_lon = np.zeros(nx * ny) for i in range(num_points): # Compute the squared distances directly using NumPy's vectorized operations squared_distances = (xx - x_rx[i]) ** 2 + (yy - y_rx[i]) ** 2 distances[i] = np.sqrt(squared_distances) # create context object for receiver and append to list rx_lat[i], rx_lon[i] = utm.to_latlon(x_rx[i], y_rx[i], utm_zone_number, utm_zone_letter) # get receiver location as lat,lon # convert distances from m to km because openquake ground motion models take input distances in kilometres #distances = distances/1000.0 # compute ground motion only at grid points that have minimum probability density of thresh_fxy if exclude_low_fxy: indices = list(np.where(fxy.flatten() > thresh_fxy)[0]) else: indices = np.arange(num_points) if use_AOI: # Filter out receivers outside the AOI; Find indices where values are OUTSIDE the AOI indices_outside_x = np.where((x_rx < x_AOI[0]) | (x_rx > x_AOI[1]))[0] indices_outside_y = np.where((y_rx < y_AOI[0]) | (y_rx > y_AOI[1]))[0] indices_outside_AOI = np.unique(np.concatenate((indices_outside_x, indices_outside_y))) indices_filtered = np.setdiff1d(indices, indices_outside_AOI) else: indices_filtered = indices fr = fxy.flatten() # For each receiver compute estimated ground motion values logger.info(f"Estimating ground motion intensity at {len(indices_filtered)} grid points...") start = timer() use_pp = True if use_pp: # use dask parallel computing mp.set_start_method("fork", force=True) iter = indices_filtered iml_grid_raw = [] # raw ground motion grids for imt in products: logger.info(f"Estimating {imt}") if imt == "PGV": IMT_max = 200 # search interval max for velocity (cm/s) else: IMT_max = 2.0 # search interval max for acceleration (g) imls = [dask.delayed(compute_IMT_exceedance)(rx_lat[i], rx_lon[i], distances[i].flatten(), fr, p, lambdas, forecast_len, lambdas_perc, m_range, m_pdf, m_cdf, model, log_level=logging.DEBUG, imt=imt, IMT_min=0.0, IMT_max=IMT_max, rx_label=i, rtol=0.1, use_cython=True) for i in iter] iml = dask.compute(*imls) iml_grid_raw.append(list(iml)) else: iml_grid_raw = [] iter = indices_filtered for imt in products: if imt == "PGV": IMT_max = 200 # search interval max for velocity (cm/s) else: IMT_max = 2.0 # search interval max for acceleration (g) iml = [] for i in iter: iml_i = compute_IMT_exceedance(rx_lat[i], rx_lon[i], distances[i].flatten(), fr, p, lambdas, forecast_len, lambdas_perc, m_range, m_pdf, m_cdf, model, imt=imt, IMT_min = 0.0, IMT_max = IMT_max, rx_label = i, rtol = 0.1, use_cython=True) iml.append(iml_i) logger.info(f"Estimated {imt} at rx {i} is {iml_i}") iml_grid_raw.append(iml) end = timer() logger.info(f"Ground motion exceedance computation time: {round(end - start, 1)} seconds") logger.debug(f"IMT values: {iml_grid_raw[0]}") if np.isnan(iml_grid_raw).all(): msg = "No valid ground motion intensity measures were forecasted. Try a different ground motion model." logger.error(msg) raise Exception(msg) # create list of one empty list for each imt iml_grid = [[] for _ in range(len(products))] # final ground motion grids iml_grid_prep = iml_grid.copy() # temp ground motion grids #if use_AOI or exclude_low_fxy: for j in range(0, len(products)): # Reassemble the grid cleanly using the original shape # Initialize a flat array filled entirely with NaNs iml_grid_flat = np.full(num_points, np.nan, dtype=np.float64) # Assign the computed values to their exact original 1D index positions iml_grid_flat[indices_filtered] = iml_grid_raw[j] # Reshape back using the exact shape of your original xx/yy grids iml_grid_prep[j] = iml_grid_flat.reshape(grid_shape) #for i in indices: # if i in indices_filtered: # for j in range(0, len(products)): # iml_grid_prep[j].append(iml_grid_raw[j].pop(0)) # else: # list(map(lambda lst: lst.append(np.nan), # iml_grid_prep)) # use np.nan to indicate grid point excluded #elif exclude_low_fxy: # for i in range(0, len(distances)): # if i in indices: # for j in range(0, len(products)): # iml_grid_prep[j].append(iml_grid_raw[j].pop(0)) # else: # list(map(lambda lst: lst.append(np.nan), # iml_grid_prep)) # use np.nan to indicate grid point excluded #else: # iml_grid_prep = iml_grid_raw for j in range(0, len(products)): vmin = np.nanmin(iml_grid_prep[j]) vmax = np.nanmax(iml_grid_prep[j]) #iml_grid[j] = np.reshape(iml_grid_prep[j], (nx, ny)).astype(dtype=np.float64) # this reduces values to 8 decimal places #iml_grid_tmp = np.nan_to_num(iml_grid[j]) # change nans to zeroes # upscale the grid, trim, and interpolate if there are at least 10 grid values with range greater than 0.1 #if np.count_nonzero(iml_grid_tmp) >= 10 and vmax-vmin > 0.1: # up_factor = 1 # iml_grid_hd = resize(iml_grid_tmp, (up_factor * len(iml_grid_tmp), up_factor * len(iml_grid_tmp)), # mode='reflect', anti_aliasing=False) # trim_thresh = vmin # iml_grid_hd[iml_grid_hd < trim_thresh] = np.nan #else: #iml_grid_hd = iml_grid_tmp #iml_grid_hd[iml_grid_hd == 0.0] = np.nan # change zeroes back to nan iml_grid_hd = iml_grid_prep[j] #vmin_hd = min(x for x in iml_grid_hd.flatten() if not isnan(x)) vmax_hd = np.nanmax(iml_grid_hd) # generate image overlay north, south = grid_lat_max, grid_lat_min # Latitude range east, west = grid_lon_max, grid_lon_min # Longitude range bounds = [[south, west], [north, east]] map_center = [np.mean([north, south]), np.mean([east, west])] # Create an image from the grid cmap_name = 'YlOrRd' cmap = plt.get_cmap(cmap_name) fig, ax = plt.subplots(figsize=(6, 6)) fig.add_axes([0, 0, 1, 1]) ax.imshow(iml_grid_hd, origin='lower', cmap=cmap, vmin=vmin, vmax=vmax, interpolation='bilinear', aspect='auto') ax.axis('off') # Save the figure fig.canvas.draw() overlay_filename = f"overlay_{j}.svg" plt.savefig(overlay_filename, pad_inches=0, transparent=True) plt.close(fig) # Embed geographic bounding box into the SVG map_bounds = dict(zip(("south", "west", "north", "east"), map(float, (grid_lat_min, grid_lon_min, grid_lat_max, grid_lon_max)))) tree = ET.parse(overlay_filename) tree.getroot().set("data-map-bounds", json.dumps(map_bounds)) tree.write(overlay_filename, encoding="utf-8", xml_declaration=True) logger.info(f"Saved geographic bounds to SVG metadata (data-map-bounds): {overlay_filename} → {map_bounds}") # Make the color bar width = 50 height = 500 gradient = np.linspace(0, 1, height) gradient = np.vstack((gradient, gradient)).T gradient = np.tile(gradient, (1, width)) colorbar_title = products[j] if "PGA" in colorbar_title or "SA" in colorbar_title: colorbar_title = colorbar_title + " (g)" fig, ax = plt.subplots(figsize=((width + 40) / 100.0, (height + 20) / 100.0), dpi=100) # Increase fig size for labels ax.imshow(gradient, aspect='auto', cmap=cmap.reversed(), extent=[0, 1, vmin, vmax_hd]) # Note: extent order is different for vertical ax.set_xticks([]) # Remove x-ticks for vertical colorbar num_ticks = 11 # Show more ticks tick_positions = np.linspace(vmin, vmax_hd, num_ticks) ax.set_yticks(tick_positions) ax.set_yticklabels([f"{tick:.2f}" for tick in tick_positions]) # format tick labels ax.set_title(colorbar_title, loc='right', pad=15) fig.subplots_adjust(left=0.25, right=0.75, bottom=0.05, top=0.95) # Adjust Layout fig.savefig("colorbar_" + str(j) + ".svg", bbox_inches='tight') plt.close(fig)