From 4c2679a0052730b4e96b287b22d2fe396e0a2369 Mon Sep 17 00:00:00 2001 From: Krystyna Milian Date: Thu, 12 Oct 2023 14:27:53 +0200 Subject: [PATCH] Added logging MAE for the best runs, option to run a pipeline on a specific dataset, template bash scripts, GPLv3 license. Modified behavior of generating eval targets, it is skipped if targets already exist. --- .gitignore | 3 +- README.md | 80 ++++--- config.json | 28 +-- experiments/sweep_basicphase_ae.yaml | 19 ++ experiments/sweep_eqtransformer.yaml | 20 ++ experiments/sweep_gpd.yaml | 6 +- experiments/sweep_phasenet.yaml | 2 +- ... mseeds from Bogdanka to Seisbench format.ipynb | 6 +- ...mseeds from Lumineos to SeisBench dataset.ipynb | 31 +++ scripts/convert_data_template.sh | 19 ++ scripts/generate_eval_targets.py | 22 +- scripts/hyperparameter_sweep.py | 33 ++- {utils => scripts}/mseeds_to_seisbench.py | 0 scripts/pipeline.py | 43 +++- scripts/run_pipeline_template.sh | 19 ++ scripts/util.py | 98 ++++++--- utils/convert_data.sh | 19 -- utils/utils.py | 230 --------------------- 18 files changed, 310 insertions(+), 368 deletions(-) create mode 100644 experiments/sweep_basicphase_ae.yaml create mode 100644 experiments/sweep_eqtransformer.yaml rename {utils => notebooks}/Transforming mseeds from Bogdanka to Seisbench format.ipynb (99%) rename {utils => notebooks}/Transforming mseeds from Lumineos to SeisBench dataset.ipynb (99%) create mode 100644 scripts/convert_data_template.sh rename {utils => scripts}/mseeds_to_seisbench.py (100%) create mode 100644 scripts/run_pipeline_template.sh delete mode 100644 utils/convert_data.sh delete mode 100644 utils/utils.py diff --git a/.gitignore b/.gitignore index 0b16eca..38b2193 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ __pycache__/ */.ipynb_checkpoints/ .ipynb_checkpoints/ .env +*.out weights/ datasets/ wip @@ -10,4 +11,4 @@ artifacts/ wandb/ scripts/pred/ scripts/pred_resampled/ -scripts/lightning_logs/ \ No newline at end of file +scripts/lightning_logs/ diff --git a/README.md b/README.md index ffd10db..b2cd7ea 100644 --- a/README.md +++ b/README.md @@ -2,12 +2,13 @@ This repo contains notebooks and scripts demonstrating how to: -- Prepare data for training a seisbench model detecting P and S waves (i.e. transform mseeds into [SeisBench data format](https://seisbench.readthedocs.io/en/stable/pages/data_format.html)), check the [notebook](utils/Transforming%20mseeds%20from%20Bogdanka%20to%20Seisbench%20format.ipynb) and the [script](utils/mseeds_to_seisbench.py) -- [to update] Explore available data, check the [notebook](notebooks/Explore%20igf%20data.ipynb) +- Prepare data for training a seisbench model detecting P and S waves (i.e. transform mseeds into [SeisBench data format](https://seisbench.readthedocs.io/en/stable/pages/data_format.html)), check the [notebook](notebooks/Transforming%20mseeds%20from%20Bogdanka%20to%20Seisbench%20format.ipynb) and the [script](scripts/mseeds_to_seisbench.py) + +[//]: # (- [to update] Explore available data, check the [notebook](notebooks/Explore%20igf%20data.ipynb)) - Train various cnn models available in seisbench library and compare their performance of detecting P and S waves, check the [script](scripts/pipeline.py) -- [to update] Validate model performance, check the [notebook](notebooks/Check%20model%20performance%20depending%20on%20station-random%20window.ipynb) -- [to update] Use model for detecting P phase, check the [notebook](notebooks/Present%20model%20predictions.ipynb) +[//]: # (- [to update] Validate model performance, check the [notebook](notebooks/Check%20model%20performance%20depending%20on%20station-random%20window.ipynb)) +[//]: # (- [to update] Use model for detecting P phase, check the [notebook](notebooks/Present%20model%20predictions.ipynb)) ### Acknowledgments @@ -69,10 +70,13 @@ poetry shell WANDB_USER="your user" WANDB_PROJECT="training_seisbench_models" BENCHMARK_DEFAULT_WORKER=2 + ``` 2. Transform data into seisbench format. - To utilize functionality of Seisbench library, data need to be transformed to [SeisBench data format](https://seisbench.readthedocs.io/en/stable/pages/data_format.html)). If your data is in the MSEED format, you can use the prepared script `mseeds_to_seisbench.py` to perform the transformation. Please make sure that your data has the same structure as the data used in this project. + To utilize functionality of Seisbench library, data need to be transformed to [SeisBench data format](https://seisbench.readthedocs.io/en/stable/pages/data_format.html)). + + If your data is stored in the MSEED format and catalog in the QuakeML format, you can use the prepared script `mseeds_to_seisbench.py` to perform the transformation. Please make sure that your data has the same structure as the data used in this project. The script assumes that: * the data is stored in the following directory structure: `input_path/year/station_network_code/station_code/trace_channel.D` e.g. @@ -80,24 +84,20 @@ poetry shell * the file names follow the pattern: `station_network_code.station_code..trace_channel.D.year.day_of_year` e.g. `PL.ALBE..EHE.D.2018.282` - * events catalog is stored in quakeML format - - Run the script `mseeds_to_seisbench` located in the `utils` directory + Run the `mseeds_to_seisbench.py` script with the following arguments: ``` - cd utils python mseeds_to_seisbench.py --input_path $input_path --catalog_path $catalog_path --output_path $output_path ``` - If you want to run the script on a cluster, you can use the script `convert_data.sh` as a template (adjust the grant name, computing name and paths) and send the job to queue using sbatch command on login node of e.g. Ares: - - ``` - cd utils - sbatch convert_data.sh + If you want to run the script on a cluster, you can use the template script `convert_data_template.sh`. +After adjusting the grant name, the paths to conda env and the paths to data send the job to queue using sbatch command on a login node of e.g. Ares: + ``` + sbatch convert_data_template.sh ``` - If your data has a different structure or format, use the notebooks to gain an understanding of the Seisbench format and what needs to be done to transform your data: + If your data has a different structure or format, check the notebooks to gain an understanding of the Seisbench format and what needs to be done to transform your data: * [Seisbench example](https://colab.research.google.com/github/seisbench/seisbench/blob/main/examples/01a_dataset_basics.ipynb) or - * [Transforming mseeds from Bogdanka to Seisbench format](utils/Transforming mseeds from Bogdanka to Seisbench format.ipynb) notebook + * [Transforming mseeds from Bogdanka to Seisbench format](notebooks/Transforming mseeds from Bogdanka to Seisbench format.ipynb) notebook 3. Adjust the `config.json` and specify: @@ -110,34 +110,48 @@ poetry shell `python pipeline.py` The script performs the following steps: - * Generates evaluation targets in `datasets//targets` directory. - * Trains multiple versions of GPD, PhaseNet and ... models to find the best hyperparameters, producing the lowest validation loss. + 1. Generates evaluation targets in `datasets//targets` directory. + 1. Trains multiple versions of GPD, PhaseNet and ... models to find the best hyperparameters, producing the lowest validation loss. - This step utilizes the Weights & Biases platform to perform the hyperparameters search (called sweeping) and track the training process and store the results. - The results are available at - `https://epos-ai.grid.cyfronet.pl//` - Weights and training logs can be downloaded from the platform. - Additionally, the most important data are saved locally in `weights/_/ ` directory: - * Weights of the best checkpoint of each model are saved as `__sweep=-run=-epoch=-val_loss=.ckpt` - * Metrics and hyperparams are saved in folders + This step utilizes the Weights & Biases platform to perform the hyperparameters search (called sweeping) and track the training process and store the results. + The results are available at + `https://epos-ai.grid.cyfronet.pl//` + Weights and training logs can be downloaded from the platform. + Additionally, the most important data are saved locally in `weights/_/ ` directory: + * Weights of the best checkpoint of each model are saved as `__sweep=-run=-epoch=-val_loss=.ckpt` + * Metrics and hyperparams are saved in folders - * Uses the best performing model of each type to generate predictions. The predictons are saved in the `scripts/pred/_/` directory. - * Evaluates the performance of each model by comparing the predictions with the evaluation targets. - The results are saved in the `scripts/pred/results.csv` file. + 1. Uses the best performing model of each type to generate predictions. The predictons are saved in the `scripts/pred/_/` directory. + 1. Evaluates the performance of each model by comparing the predictions with the evaluation targets and calculating MAE metrics. + The results are saved in the `scripts/pred/results.csv` file. They are additionally logged in Weights & Biases platform as summary metrics of corresponding runs. + +
+ The default settings are saved in config.json file. To change the settings, edit the config.json file or pass the new settings as arguments to the script. For example, to change the sweep configuration file for GPD model, run: - The default settings are saved in config.json file. To change the settings, edit the config.json file or pass the new settings as arguments to the script. - For example, to change the sweep configuration file for GPD model, run: - `python pipeline.py --gpd_config ` - The new config file should be placed in the `experiments` folder or as specified in the `configs_path` parameter in the config.json file. + ```python pipeline.py --gpd_config ``` + + The new config file should be placed in the `experiments` folder or as specified in the `configs_path` parameter in the config.json file. + + If you have multiple datasets, you can run the pipeline for each dataset separately by specifying the dataset name as an argument: + + ```python pipeline.py --dataset ``` ### Troubleshooting +* Problem with reading the catalog file: please make sure that your quakeML xml file has the following opening and closing tags: +``` + + + .... + +``` + * `wandb: ERROR Run .. errored: OSError(24, 'Too many open files')` -> https://github.com/wandb/wandb/issues/2825 ### Licence -TODO +The code is licenced under the GNU General Public License v3.0. See the [LICENSE](LICENSE.txt) file for details. ### Copyright diff --git a/config.json b/config.json index 5e32110..acd11b9 100644 --- a/config.json +++ b/config.json @@ -1,15 +1,17 @@ { - "dataset_name": "bogdanka", - "data_path": "datasets/bogdanka/seisbench_format/", - "targets_path": "datasets/targets", - "models_path": "weights", - "configs_path": "experiments", - "sampling_rate": 100, - "num_workers": 1, - "seed": 10, - "sweep_files": { - "GPD": "sweep_gpd.yaml", - "PhaseNet": "sweep_phasenet.yaml" - }, - "experiment_count": 20 + "dataset_name": "bogdanka", + "data_path": "datasets/bogdanka/seisbench_format/", + "targets_path": "datasets/targets", + "models_path": "weights", + "configs_path": "experiments", + "sampling_rate": 100, + "num_workers": 1, + "seed": 10, + "sweep_files": { + "GPD": "sweep_gpd.yaml", + "PhaseNet": "sweep_phasenet.yaml", + "BasicPhaseAE": "sweep_basicphase_ae.yaml", + "EQTransformer": "sweep_eqtransformer.yaml" + }, + "experiment_count": 20 } \ No newline at end of file diff --git a/experiments/sweep_basicphase_ae.yaml b/experiments/sweep_basicphase_ae.yaml new file mode 100644 index 0000000..ba45c47 --- /dev/null +++ b/experiments/sweep_basicphase_ae.yaml @@ -0,0 +1,19 @@ +method: bayes +metric: + goal: minimize + name: val_loss +parameters: + model_name: + value: + - BasicPhaseAE + batch_size: + distribution: int_uniform + max: 1024 + min: 256 + max_epochs: + value: + - 20 + learning_rate: + distribution: uniform + max: 0.02 + min: 0.001 diff --git a/experiments/sweep_eqtransformer.yaml b/experiments/sweep_eqtransformer.yaml new file mode 100644 index 0000000..21735f4 --- /dev/null +++ b/experiments/sweep_eqtransformer.yaml @@ -0,0 +1,20 @@ +name: EQTransformer +method: bayes +metric: + goal: minimize + name: val_loss +parameters: + model_name: + value: + - EQTransformer + batch_size: + distribution: int_uniform + max: 1024 + min: 256 + max_epochs: + value: + - 30 + learning_rate: + distribution: uniform + max: 0.02 + min: 0.005 diff --git a/experiments/sweep_gpd.yaml b/experiments/sweep_gpd.yaml index db0f435..58f3674 100644 --- a/experiments/sweep_gpd.yaml +++ b/experiments/sweep_gpd.yaml @@ -13,14 +13,14 @@ parameters: min: 256 max_epochs: value: - - 3 + - 30 learning_rate: distribution: uniform max: 0.02 min: 0.005 highpass: value: - - 2 + - 1 lowpass: value: - - 10 \ No newline at end of file + - 10 diff --git a/experiments/sweep_phasenet.yaml b/experiments/sweep_phasenet.yaml index 702b69b..ab3fea4 100644 --- a/experiments/sweep_phasenet.yaml +++ b/experiments/sweep_phasenet.yaml @@ -13,7 +13,7 @@ parameters: min: 256 max_epochs: value: - - 15 + - 30 learning_rate: distribution: uniform max: 0.02 diff --git a/utils/Transforming mseeds from Bogdanka to Seisbench format.ipynb b/notebooks/Transforming mseeds from Bogdanka to Seisbench format.ipynb similarity index 99% rename from utils/Transforming mseeds from Bogdanka to Seisbench format.ipynb rename to notebooks/Transforming mseeds from Bogdanka to Seisbench format.ipynb index a6a9794..0e169d2 100644 --- a/utils/Transforming mseeds from Bogdanka to Seisbench format.ipynb +++ b/notebooks/Transforming mseeds from Bogdanka to Seisbench format.ipynb @@ -18,9 +18,7 @@ "import seisbench.data as sbd\n", "import seisbench.util as sbu\n", "import numpy as np\n", - "\n", - "\n", - "import utils\n" + "\n" ] }, { @@ -1126,7 +1124,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.5" + "version": "3.10.6" } }, "nbformat": 4, diff --git a/utils/Transforming mseeds from Lumineos to SeisBench dataset.ipynb b/notebooks/Transforming mseeds from Lumineos to SeisBench dataset.ipynb similarity index 99% rename from utils/Transforming mseeds from Lumineos to SeisBench dataset.ipynb rename to notebooks/Transforming mseeds from Lumineos to SeisBench dataset.ipynb index 3f1fec0..37551b2 100644 --- a/utils/Transforming mseeds from Lumineos to SeisBench dataset.ipynb +++ b/notebooks/Transforming mseeds from Lumineos to SeisBench dataset.ipynb @@ -36,6 +36,16 @@ "\n" ] }, + { + "cell_type": "markdown", + "id": "70c64dc6-e4dd-4c01-939d-a28914866f5d", + "metadata": {}, + "source": [ + "##### The catalog has a custom format with the following properties: \n", + "###### 'Datetime', 'X', 'Y', 'Depth', 'Mw', 'Phases', 'mseed_name'\n", + "###### Phases is a string with detected phases seperated by comma: e.g. \"Pg BRDW 2020-01-01 10:09:44.400, Sg BRDW 2020-01-01 10:09:45.696\"" + ] + }, { "cell_type": "code", "execution_count": 2, @@ -106,6 +116,27 @@ "catalog.head(1)" ] }, + { + "cell_type": "code", + "execution_count": 4, + "id": "03257d45-299d-4ed1-bc64-03303d2a9873", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'Pg BRDW 2020-01-01 10:09:44.400, Sg BRDW 2020-01-01 10:09:45.696, Pg GROD 2020-01-01 10:09:45.206, Sg GROD 2020-01-01 10:09:46.655, Pg GUZI 2020-01-01 10:09:45.116, Sg GUZI 2020-01-01 10:09:46.561, Pg JEDR 2020-01-01 10:09:44.920, Sg JEDR 2020-01-01 10:09:46.285, Pg MOSK2 2020-01-01 10:09:45.417, Sg MOSK2 2020-01-01 10:09:46.921, Pg NWLU 2020-01-01 10:09:45.686, Sg NWLU 2020-01-01 10:09:47.175, Pg PCHB 2020-01-01 10:09:45.213, Sg PCHB 2020-01-01 10:09:46.565, Pg PPOL 2020-01-01 10:09:44.755, Sg PPOL 2020-01-01 10:09:46.069, Pg RUDN 2020-01-01 10:09:44.502, Sg RUDN 2020-01-01 10:09:45.756, Pg RYNR 2020-01-01 10:09:43.442, Sg RYNR 2020-01-01 10:09:44.394, Pg RZEC 2020-01-01 10:09:46.075, Sg RZEC 2020-01-01 10:09:47.587, Pg SGOR 2020-01-01 10:09:45.817, Sg SGOR 2020-01-01 10:09:47.284, Pg TRBC2 2020-01-01 10:09:44.833, Sg TRBC2 2020-01-01 10:09:46.095, Pg TRN2 2020-01-01 10:09:44.488, Sg TRN2 2020-01-01 10:09:45.698, Pg TRZS 2020-01-01 10:09:46.232, Sg TRZS 2020-01-01 10:09:47.727, Pg ZMST 2020-01-01 10:09:43.592, Sg ZMST 2020-01-01 10:09:44.553, Pg LUBW 2020-01-01 10:09:43.119, Sg LUBW 2020-01-01 10:09:43.929'" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "catalog.Phases[0]" + ] + }, { "cell_type": "markdown", "id": "fe0627b1-6fa0-4b5a-8a60-d80626b5c9be", diff --git a/scripts/convert_data_template.sh b/scripts/convert_data_template.sh new file mode 100644 index 0000000..507625a --- /dev/null +++ b/scripts/convert_data_template.sh @@ -0,0 +1,19 @@ +#!/bin/bash +#SBATCH --job-name=mseeds_to_seisbench +#SBATCH --time=1:00:00 +#SBATCH --account= ### to fill +#SBATCH --partition plgrid +#SBATCH --cpus-per-task=1 +#SBATCH --ntasks-per-node=1 +#SBATCH --mem=24gb + + +## activate conda environment +source /path/to/mambaforge/bin/activate ### to adjust +conda activate epos-ai-train + +input_path="/path/to/folder/with/mseed/files" +catalog_path="/path/to/catolog.xml" +output_path="/path/to/output/in/seisbench_format" + +python mseeds_to_seisbench.py --input_path $input_path --catalog_path $catalog_path --output_path $output_path diff --git a/scripts/generate_eval_targets.py b/scripts/generate_eval_targets.py index 9cc7c11..5ac0a09 100644 --- a/scripts/generate_eval_targets.py +++ b/scripts/generate_eval_targets.py @@ -39,10 +39,15 @@ from pathlib import Path import pandas as pd import numpy as np from tqdm import tqdm - +import logging from models import phase_dict + +logging.root.setLevel(logging.INFO) +logger = logging.getLogger('targets generator') + + def main(dataset_name, output, tasks, sampling_rate, noise_before_events): np.random.seed(42) tasks = [str(i) in tasks.split(",") for i in range(1, 4)] @@ -64,17 +69,24 @@ def main(dataset_name, output, tasks, sampling_rate, noise_before_events): dataset = sbd.WaveformDataset(dataset_name, **dataset_args) output = Path(output) - output.mkdir(parents=True, exist_ok=False) + output.mkdir(parents=True, exist_ok=True) if "split" in dataset.metadata.columns: dataset.filter(dataset["split"].isin(["dev", "test"]), inplace=True) dataset.preload_waveforms(pbar=True) - + if tasks[0]: - generate_task1(dataset, output, sampling_rate, noise_before_events) + if not Path.exists(output / "task1.csv"): + generate_task1(dataset, output, sampling_rate, noise_before_events) + else: + logger.info(f"{output}/task1.csv already exists. Skipping generation of targets.") if tasks[1] or tasks[2]: - generate_task23(dataset, output, sampling_rate) + if not Path.exists(output / "task23.csv"): + generate_task23(dataset, output, sampling_rate) + else: + logger.info(f"{output}/task23.csv already exists. Skipping generation of targets.") + def generate_task1(dataset, output, sampling_rate, noise_before_events): diff --git a/scripts/hyperparameter_sweep.py b/scripts/hyperparameter_sweep.py index 2f3ca63..a0a0e22 100644 --- a/scripts/hyperparameter_sweep.py +++ b/scripts/hyperparameter_sweep.py @@ -18,9 +18,7 @@ from dotenv import load_dotenv import models import train import util -from config_loader import config as common_config -from config_loader import models_path, dataset_name, seed, experiment_count - +import config_loader torch.multiprocessing.set_sharing_strategy('file_system') os.system("ulimit -n unlimited") @@ -35,8 +33,6 @@ if host is None: wandb.login(key=wandb_api_key, host=host) -# wandb.login(key=wandb_api_key) - wandb_project_name = os.environ.get("WANDB_PROJECT") wandb_user_name = os.environ.get("WANDB_USER") @@ -68,11 +64,9 @@ class HyperparameterSweep: # Create the sweep self.sweep_id = wandb.sweep(self.sweep_config, project=self.project_name) - logger.info("Created sweep with ID: " + self.sweep_id) - # Run the sweep - wandb.agent(self.sweep_id, function=self.run_experiment, count=experiment_count) + wandb.agent(self.sweep_id, function=self.run_experiment, count=config_loader.experiment_count) def all_runs_finished(self): @@ -96,13 +90,14 @@ class HyperparameterSweep: logger.debug("Starting a new run...") run = wandb.init( project=self.project_name, - config=common_config, - ) - - wandb.run.log_code( - ".", - include_fn=lambda path: path.endswith(os.path.basename(__file__)) + config=config_loader.config, + save_code=True ) + run.log_code( + root=".", + include_fn=lambda path: path.endswith(".py") or path.endswith(".sh"), + exclude_fn=lambda path: path.endswith("template.sh") + ) # not working as expected model_name = wandb.config.model_name[0] model_args = models.get_model_specific_args(wandb.config) @@ -116,8 +111,8 @@ class HyperparameterSweep: wandb_logger.watch(model) # CSV logger - also used for saving configuration as yaml - experiment_name = f"{dataset_name}_{model_name}" - csv_logger = CSVLogger(models_path, experiment_name, version=run.id) + experiment_name = f"{config_loader.dataset_name}_{model_name}" + csv_logger = CSVLogger(config_loader.models_path, experiment_name, version=run.id) csv_logger.log_hyperparams(wandb.config) loggers = [wandb_logger, csv_logger] @@ -131,7 +126,7 @@ class HyperparameterSweep: filename=experiment_signature + "-{epoch}-{val_loss:.3f}", monitor="val_loss", mode="min", - dirpath=f"{models_path}/{experiment_name}/", + dirpath=f"{config_loader.models_path}/{experiment_name}/", ) # save_top_k=1, monitor="val_loss", mode="min": save the best model in terms of validation loss checkpoint_callback.STARTING_VERSION = 1 @@ -143,7 +138,7 @@ class HyperparameterSweep: callbacks = [checkpoint_callback, early_stopping_callback] trainer = pl.Trainer( - default_root_dir=models_path, + default_root_dir=config_loader.models_path, logger=loggers, callbacks=callbacks, **get_trainer_args(wandb.config) @@ -162,7 +157,7 @@ class HyperparameterSweep: def start_sweep(sweep_config): logger.info("Starting sweep with config: " + str(sweep_config)) - set_random_seed(seed) + set_random_seed(config_loader.seed) sweep_runner = HyperparameterSweep(project_name=wandb_project_name, sweep_config=sweep_config) sweep_runner.run_sweep() diff --git a/utils/mseeds_to_seisbench.py b/scripts/mseeds_to_seisbench.py similarity index 100% rename from utils/mseeds_to_seisbench.py rename to scripts/mseeds_to_seisbench.py diff --git a/scripts/pipeline.py b/scripts/pipeline.py index 26714fc..6b637a5 100644 --- a/scripts/pipeline.py +++ b/scripts/pipeline.py @@ -15,21 +15,25 @@ import generate_eval_targets import hyperparameter_sweep import eval import collect_results -from config_loader import data_path, targets_path, sampling_rate, dataset_name, sweep_files +import importlib +import config_loader logging.root.setLevel(logging.INFO) logger = logging.getLogger('pipeline') def load_sweep_config(model_name, args): - if model_name == "PhaseNet" and args.phasenet_config is not None: sweep_fname = args.phasenet_config elif model_name == "GPD" and args.gpd_config is not None: sweep_fname = args.gpd_config + elif model_name == "BasicPhaseAE" and args.basic_phase_ae_config is not None: + sweep_fname = args.basic_phase_ae_config + elif model_name == "EQTransformer" and args.eqtransformer_config is not None: + sweep_fname = args.eqtransformer_config else: # use the default sweep config for the model - sweep_fname = sweep_files[model_name] + sweep_fname = config_loader.sweep_files[model_name] logger.info(f"Loading sweep config: {sweep_fname}") @@ -37,7 +41,6 @@ def load_sweep_config(model_name, args): def find_the_best_params(model_name, args): - # find the best hyperparams for the model_name logger.info(f"Starting searching for the best hyperparams for the model: {model_name}") @@ -58,9 +61,9 @@ def find_the_best_params(model_name, args): def generate_predictions(sweep_id, model_name): - experiment_name = f"{dataset_name}_{model_name}" + experiment_name = f"{config_loader.dataset_name}_{model_name}" eval.main(weights=experiment_name, - targets=targets_path, + targets=config_loader.targets_path, sets='dev,test', batchsize=128, num_workers=4, @@ -73,22 +76,42 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument("--phasenet_config", type=str, required=False) parser.add_argument("--gpd_config", type=str, required=False) + parser.add_argument("--basic_phase_ae_config", type=str, required=False) + parser.add_argument("--eqtransformer_config", type=str, required=False) + parser.add_argument("--dataset", type=str, required=False) args = parser.parse_args() + if args.dataset is not None: + util.set_dataset(args.dataset) + importlib.reload(config_loader) + + logger.info(f"Started pipeline for the {config_loader.dataset_name} dataset.") + # generate labels logger.info("Started generating labels for the dataset.") - generate_eval_targets.main(data_path, targets_path, "2,3", sampling_rate, None) + generate_eval_targets.main(config_loader.data_path, config_loader.targets_path, "2,3", config_loader.sampling_rate, + None) # find the best hyperparams for the models logger.info("Started training the models.") - for model_name in ["GPD", "PhaseNet"]: + for model_name in ["GPD", "PhaseNet", "BasicPhaseAE", "EQTransformer"]: + if config_loader.dataset_name == "lumineos" and model_name == "EQTransformer": + break sweep_id = find_the_best_params(model_name, args) generate_predictions(sweep_id, model_name) # collect results logger.info("Collecting results.") - collect_results.traverse_path("pred", "pred/results.csv") - logger.info("Results saved in pred/results.csv") + results_path = "pred/results.csv" + collect_results.traverse_path("pred", results_path) + logger.info(f"Results saved in {results_path}") + + # log calculated metrics (MAE) on w&b + logger.info("Logging MAE metrics on w&b.") + util.log_metrics(results_path) + + logger.info("Pipeline finished") + if __name__ == "__main__": main() diff --git a/scripts/run_pipeline_template.sh b/scripts/run_pipeline_template.sh new file mode 100644 index 0000000..2bbf22e --- /dev/null +++ b/scripts/run_pipeline_template.sh @@ -0,0 +1,19 @@ +#!/bin/bash +#SBATCH --job-name=job_name +#SBATCH --time=10:00:00 +#SBATCH --account= ### to fill +#SBATCH --partition=plgrid-gpu-v100 +#SBATCH --cpus-per-task=1 +#SBATCH --ntasks-per-node=1 +#SBATCH --gres=gpu:1 + +source path/to/mambaforge/bin/activate ### to change +conda activate epos-ai-train + + +python -c "import torch; print('CUDA available:', torch.cuda.is_available())" +python -c "import torch; print('Number of CUDA devices:', torch.cuda.device_count())" +python -c "import torch; print('Name of GPU:', torch.cuda.get_device_name(torch.cuda.current_device()))" + + +python pipeline.py --dataset "bogdanka" diff --git a/scripts/util.py b/scripts/util.py index f982321..5830f6c 100644 --- a/scripts/util.py +++ b/scripts/util.py @@ -1,5 +1,10 @@ """ -This script offers general functionality required in multiple places. +----------------- +Copyright © 2023 ACK Cyfronet AGH, Poland. +This work was partially funded by EPOS Project funded in frame of PL-POIR4.2 +----------------- + +This script runs the pipeline for the training and evaluation of the models. """ import numpy as np @@ -7,13 +12,15 @@ import pandas as pd import os import logging import glob +import json import wandb + from dotenv import load_dotenv import sys -from config_loader import models_path, configs_path +from config_loader import models_path, configs_path, config_path import yaml -load_dotenv() +load_dotenv() logging.basicConfig() logging.getLogger().setLevel(logging.INFO) @@ -38,8 +45,16 @@ def load_best_model_data(sweep_id, weights): # Get best run parameters best_run = sweep.best_run() run_id = best_run.id - matching_models = glob.glob(f"{models_path}/{weights}/*run={run_id}*ckpt") - if len(matching_models)!=1: + + run = api.run(f"{wandb_user}/{wandb_project_name}/runs/{run_id}") + dataset = run.config["dataset_name"] + model = run.config["model_name"][0] + experiment = f"{dataset}_{model}" + + checkpoints_path = f"{models_path}/{experiment}/*run={run_id}*ckpt" + logging.debug(f"Searching for checkpoints in dir: {checkpoints_path}") + matching_models = glob.glob(checkpoints_path) + if len(matching_models) != 1: raise ValueError("Unable to determine the best checkpoint for run_id: " + run_id) best_checkpoint_path = matching_models[0] @@ -62,31 +77,6 @@ def load_best_model_data(sweep_id, weights): return best_checkpoint_path, run_id -def load_best_model(model_cls, weights, version): - """ - Determines the model with lowest validation loss from the csv logs and loads it - - :param model_cls: Class of the lightning module to load - :param weights: Path to weights as in cmd arguments - :param version: String of version file - :return: Instance of lightning module that was loaded from the best checkpoint - """ - metrics = pd.read_csv(weights / version / "metrics.csv") - - idx = np.nanargmin(metrics["val_loss"]) - min_row = metrics.iloc[idx] - - # For default checkpoint filename, see https://github.com/Lightning-AI/lightning/pull/11805 - # and https://github.com/Lightning-AI/lightning/issues/16636. - # For example, 'epoch=0-step=1.ckpt' means the 1st step has finish, but the 1st epoch hasn't - checkpoint = f"epoch={min_row['epoch']:.0f}-step={min_row['step']+1:.0f}.ckpt" - - # For default save path of checkpoints, see https://github.com/Lightning-AI/lightning/pull/12372 - checkpoint_path = weights / version / "checkpoints" / checkpoint - - return model_cls.load_from_checkpoint(checkpoint_path) - - default_workers = os.getenv("BENCHMARK_DEFAULT_WORKERS", None) if default_workers is None: logging.warning( @@ -117,3 +107,51 @@ def load_sweep_config(sweep_fname): sys.exit(1) return sweep_config + + +def log_metrics(results_file): + """ + + :param results_file: csv file with calculated metrics + :return: + """ + + api = wandb.Api() + wandb_project_name = os.environ.get("WANDB_PROJECT") + wandb_user = os.environ.get("WANDB_USER") + + results = pd.read_csv(results_file) + for run_id in results["version"].unique(): + try: + run = api.run(f"{wandb_user}/{wandb_project_name}/{run_id}") + metrics_to_log = {} + run_results = results[results["version"] == run_id] + for col in run_results.columns: + if 'mae' in col: + metrics_to_log[col] = run_results[col].values[0] + run.summary[col] = run_results[col].values[0] + + run.summary.update() + logging.info(f"Logged metrics for run: {run_id}, {metrics_to_log}") + + except Exception as e: + print(f"An error occurred: {e}, {type(e).__name__}, {e.args}") + + +def set_dataset(dataset_name): + """ + Sets the dataset name in the config file + :param dataset_name: + :return: + """ + + with open(config_path, "r+") as f: + config = json.load(f) + config["dataset_name"] = dataset_name + config["data_path"] = f"datasets/{dataset_name}/seisbench_format/" + + f.seek(0) # rewind + json.dump(config, f, indent=4) + f.truncate() + + diff --git a/utils/convert_data.sh b/utils/convert_data.sh deleted file mode 100644 index e96d0b9..0000000 --- a/utils/convert_data.sh +++ /dev/null @@ -1,19 +0,0 @@ -#!/bin/bash -#SBATCH --job-name=mseeds_to_seisbench -#SBATCH --time=1:00:00 -#SBATCH --account=plgeposai22gpu-gpu -#SBATCH --partition plgrid -#SBATCH --cpus-per-task=1 -#SBATCH --ntasks-per-node=1 -#SBATCH --mem=24gb - - -## activate conda environment -source /net/pr2/projects/plgrid/plggeposai/kmilian/mambaforge/bin/activate -conda activate epos-ai-train - -input_path="/net/pr2/projects/plgrid/plggeposai/datasets/bogdanka" -catalog_path="/net/pr2/projects/plgrid/plggeposai/datasets/bogdanka/BOIS_all.xml" -output_path="/net/pr2/projects/plgrid/plggeposai/kmilian/platform-demo-scripts/datasets/bogdanka/seisbench_format" - -python mseeds_to_seisbench.py --input_path $input_path --catalog_path $catalog_path --output_path $output_path diff --git a/utils/utils.py b/utils/utils.py deleted file mode 100644 index 57f1295..0000000 --- a/utils/utils.py +++ /dev/null @@ -1,230 +0,0 @@ -import os -import pandas as pd -import glob -from pathlib import Path - -import obspy -from obspy.core.event import read_events - -import seisbench.data as sbd -import seisbench.util as sbu -import sys -import logging - -logging.basicConfig(filename="output.out", - filemode='a', - format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', - datefmt='%H:%M:%S', - level=logging.DEBUG) - -logger = logging.getLogger('converter') - -def create_traces_catalog(directory, years): - for year in years: - directory = f"{directory}/{year}" - files = glob.glob(directory) - traces = [] - for i, f in enumerate(files): - st = obspy.read(f) - - for tr in st.traces: - # trace_id = tr.id - # start = tr.meta.starttime - # end = tr.meta.endtime - - trs = pd.Series({ - 'trace_id': tr.id, - 'trace_st': tr.meta.starttime, - 'trace_end': tr.meta.endtime, - 'stream_fname': f - }) - traces.append(trs) - - traces_catalog = pd.DataFrame(pd.concat(traces)).transpose() - traces_catalog.to_csv("data/bogdanka/traces_catalog.csv", append=True, index=False) - - -def split_events(events, input_path): - - logger.info("Splitting available events into train, dev and test sets ...") - events_stats = pd.DataFrame() - events_stats.index.name = "event" - - for i, event in enumerate(events): - #check if mseed exists - actual_picks = 0 - for pick in event.picks: - trace_params = get_trace_params(pick) - trace_path = get_trace_path(input_path, trace_params) - if os.path.isfile(trace_path): - actual_picks += 1 - - events_stats.loc[i, "pick_count"] = actual_picks - - events_stats['pick_count_cumsum'] = events_stats.pick_count.cumsum() - - train_th = 0.7 * events_stats.pick_count_cumsum.values[-1] - dev_th = 0.85 * events_stats.pick_count_cumsum.values[-1] - - events_stats['split'] = 'test' - for i, event in events_stats.iterrows(): - if event['pick_count_cumsum'] < train_th: - events_stats.loc[i, 'split'] = 'train' - elif event['pick_count_cumsum'] < dev_th: - events_stats.loc[i, 'split'] = 'dev' - else: - break - - return events_stats - - -def get_event_params(event): - origin = event.preferred_origin() - if origin is None: - return {} - # print(origin) - - mag = event.preferred_magnitude() - - source_id = str(event.resource_id) - - event_params = { - "source_id": source_id, - "source_origin_uncertainty_sec": origin.time_errors["uncertainty"], - "source_latitude_deg": origin.latitude, - "source_latitude_uncertainty_km": origin.latitude_errors["uncertainty"], - "source_longitude_deg": origin.longitude, - "source_longitude_uncertainty_km": origin.longitude_errors["uncertainty"], - "source_depth_km": origin.depth / 1e3, - "source_depth_uncertainty_km": origin.depth_errors["uncertainty"] / 1e3 if origin.depth_errors[ - "uncertainty"] is not None else None, - } - - if mag is not None: - event_params["source_magnitude"] = mag.mag - event_params["source_magnitude_uncertainty"] = mag.mag_errors["uncertainty"] - event_params["source_magnitude_type"] = mag.magnitude_type - event_params["source_magnitude_author"] = mag.creation_info.agency_id if mag.creation_info is not None else None - - return event_params - - -def get_trace_params(pick): - net = pick.waveform_id.network_code - sta = pick.waveform_id.station_code - - trace_params = { - "station_network_code": net, - "station_code": sta, - "trace_channel": pick.waveform_id.channel_code, - "station_location_code": pick.waveform_id.location_code, - "time": pick.time - } - - return trace_params - - -def find_trace(pick_time, traces): - for tr in traces: - if pick_time > tr.stats.endtime: - continue - if pick_time >= tr.stats.starttime: - # print(pick_time, " - selected trace: ", tr) - return tr - - logger.warning(f"no matching trace for peak: {pick_time}") - return None - - -def get_trace_path(input_path, trace_params): - year = trace_params["time"].year - day_of_year = pd.Timestamp(str(trace_params["time"])).day_of_year - net = trace_params["station_network_code"] - station = trace_params["station_code"] - tr_channel = trace_params["trace_channel"] - - path = f"{input_path}/{year}/{net}/{station}/{tr_channel}.D/{net}.{station}..{tr_channel}.D.{year}.{day_of_year}" - return path - - -def load_trace(input_path, trace_params): - trace_path = get_trace_path(input_path, trace_params) - trace = None - - if not os.path.isfile(trace_path): - logger.w(trace_path + " not found") - else: - stream = obspy.read(trace_path) - if len(stream.traces) > 1: - trace = find_trace(trace_params["time"], stream.traces) - elif len(stream.traces) == 0: - logger.warning(f"no data in: {trace_path}") - else: - trace = stream.traces[0] - - return trace - - -def load_stream(input_path, trace_params, time_before=60, time_after=60): - trace_path = get_trace_path(input_path, trace_params) - sampling_rate, stream = None, None - pick_time = trace_params["time"] - - if not os.path.isfile(trace_path): - print(trace_path + " not found") - else: - stream = obspy.read(trace_path) - stream = stream.slice(pick_time - time_before, pick_time + time_after) - if len(stream.traces) == 0: - print(f"no data in: {trace_path}") - else: - sampling_rate = stream.traces[0].stats.sampling_rate - - return sampling_rate, stream - - -def convert_mseed_to_seisbench_format(): - input_path = "/net/pr2/projects/plgrid/plggeposai" - logger.info("Loading events catalog ...") - events = read_events(input_path + "/BOIS_all.xml") - events_stats = split_events(events) - output_path = input_path + "/seisbench_format" - metadata_path = output_path + "/metadata.csv" - waveforms_path = output_path + "/waveforms.hdf5" - - with sbd.WaveformDataWriter(metadata_path, waveforms_path) as writer: - writer.data_format = { - "dimension_order": "CW", - "component_order": "ZNE", - } - for i, event in enumerate(events): - logger.debug(f"Converting {i} event") - event_params = get_event_params(event) - event_params["split"] = events_stats.loc[i, "split"] - # b = False - - for pick in event.picks: - trace_params = get_trace_params(pick) - sampling_rate, stream = load_stream(input_path, trace_params) - if stream is None: - continue - - actual_t_start, data, _ = sbu.stream_to_array( - stream, - component_order=writer.data_format["component_order"], - ) - - trace_params["trace_sampling_rate_hz"] = sampling_rate - trace_params["trace_start_time"] = str(actual_t_start) - - pick_time = obspy.core.utcdatetime.UTCDateTime(trace_params["time"]) - pick_idx = (pick_time - actual_t_start) * sampling_rate - - trace_params[f"trace_{pick.phase_hint}_arrival_sample"] = int(pick_idx) - - writer.add_trace({**event_params, **trace_params}, data) - - -if __name__ == "__main__": - convert_mseed_to_seisbench_format() - # create_traces_catalog("/net/pr2/projects/plgrid/plggeposai/", ["2018", "2019"])