From 2ce1f77089cca3dca23f7330fdd974dbf088b205 Mon Sep 17 00:00:00 2001 From: "Daniel.Frisinghelli" <daniel.frisinghelli@eurac.edu> Date: Fri, 21 May 2021 14:58:53 +0200 Subject: [PATCH] Implemented aggregation of simulations over different time periods. --- climax/core/cli.py | 13 +++++++++++++ climax/core/utils.py | 9 ++++++++- climax/main/preprocess.py | 39 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 59 insertions(+), 2 deletions(-) diff --git a/climax/core/cli.py b/climax/core/cli.py index 1df6e33..cc25401 100644 --- a/climax/core/cli.py +++ b/climax/core/cli.py @@ -93,6 +93,19 @@ def preprocess_parser(): help='Overwrite existing files {}.'.format(default), default=False, nargs='?', const=True, metavar='') + # optional argument: whether to aggregate time periods of simulations + parser.add_argument('-a', '--aggregate', type=bool, + help=('Aggregate time periods of simulations to a ' + 'single NetCDF file {}.'.format(default)), + default=False, nargs='?', const=True, metavar='') + + # optional argument: whether to remove single netcdf files + parser.add_argument('-rm', '--remove', type=bool, + help=('Remove individual NetCDF files of different ' + 'time periods. Only applies when -a is True {}.' + .format(default)), + default=False, nargs='?', const=True, metavar='') + # optional argument: dry run, print files which would be processed parser.add_argument('-d', '--dry-run', type=bool, help=('Print files which would be processed {}.' diff --git a/climax/core/utils.py b/climax/core/utils.py index e8af6a9..3f5bdc5 100644 --- a/climax/core/utils.py +++ b/climax/core/utils.py @@ -36,13 +36,20 @@ def get_inventory(path, pattern='(.*).nc$', return_df=False): return inventory +def _parse_cordex_time_span(filename): + filename = pathlib.Path(filename) + y_min, y_max = filename.stem.rpartition('_')[-1].split('-') + return y_min, y_max + + def reproject_cdo(grid, src_ds, trg_ds, mode='bilinear', overwrite=False): # instanciate the cdo operator = cdo.Cdo() # check if dataset exists - if pathlib.Path(trg_ds).exists() and not overwrite: + trg_ds = pathlib.Path(trg_ds) + if trg_ds.exists() and not overwrite: LOGGER.info('{} already exists. Aborting ...'.format(trg_ds)) return trg_ds diff --git a/climax/main/preprocess.py b/climax/main/preprocess.py index f915e75..dab0ef2 100644 --- a/climax/main/preprocess.py +++ b/climax/main/preprocess.py @@ -10,10 +10,15 @@ import logging from joblib import Parallel, delayed from logging.config import dictConfig +# externals +import numpy as np +import xarray as xr + # locals from pysegcnn.core.logging import log_conf from pysegcnn.core.trainer import LogConfig -from climax.core.utils import get_inventory, reproject_cdo +from climax.core.utils import (get_inventory, reproject_cdo, + _parse_cordex_time_span) from climax.core.cli import preprocess_parser from climax.core.constants import EUROCORDEX_DOMAIN, CORDEX_PARAMETERS @@ -140,6 +145,38 @@ if __name__ == '__main__': args.overwrite) for src, trg in zip(source, target)) + # check whether to aggregate the netcdf files of a simulation covering + # differnt time periods into a single file + if args.aggregate: + # list of unique simulations + simulations = np.unique([file.stem.rpartition('_')[0] for file in + target]) + + # group the list of output files by model simulation + for sim in simulations: + + # chronologically sorted list of current model simulation + group = [file for file in target if file.name.startswith(sim)] + group = sorted(group) + + # read multiple netcdf files using xarray and dask + ds = xr.open_mfdataset(group) + + # create filename for netcdf covering the entire time period of + # the current simulation + y_min, _ = _parse_cordex_time_span(group[0]) # first year + _, y_max = _parse_cordex_time_span(group[-1]) # last year + filename = '_'.join([sim, '-'.join([y_min, y_max])]) + filename = file.parent.joinpath(filename) + + # save aggregated netcdf file + ds.to_netcdf(filename) + + # remove single netcdf files from disk + if args.remove: + for file in group: + file.unlink() + else: LOGGER.info('{} does not exist.'.format(str(args.source))) sys.exit() -- GitLab