diff --git a/climax/core/cli.py b/climax/core/cli.py index 1df6e331119618d97b8cc53e7ee70ad1a45fca47..cc254014094963f250f3a63b3a4d45490b7bda54 100644 --- a/climax/core/cli.py +++ b/climax/core/cli.py @@ -93,6 +93,19 @@ def preprocess_parser(): help='Overwrite existing files {}.'.format(default), default=False, nargs='?', const=True, metavar='') + # optional argument: whether to aggregate time periods of simulations + parser.add_argument('-a', '--aggregate', type=bool, + help=('Aggregate time periods of simulations to a ' + 'single NetCDF file {}.'.format(default)), + default=False, nargs='?', const=True, metavar='') + + # optional argument: whether to remove single netcdf files + parser.add_argument('-rm', '--remove', type=bool, + help=('Remove individual NetCDF files of different ' + 'time periods. Only applies when -a is True {}.' + .format(default)), + default=False, nargs='?', const=True, metavar='') + # optional argument: dry run, print files which would be processed parser.add_argument('-d', '--dry-run', type=bool, help=('Print files which would be processed {}.' diff --git a/climax/core/utils.py b/climax/core/utils.py index e8af6a9f51fb92048d6d2f4ebdf3875b116f6a0f..3f5bdc54f6e8068c22c5d63be38c57526a52eefc 100644 --- a/climax/core/utils.py +++ b/climax/core/utils.py @@ -36,13 +36,20 @@ def get_inventory(path, pattern='(.*).nc$', return_df=False): return inventory +def _parse_cordex_time_span(filename): + filename = pathlib.Path(filename) + y_min, y_max = filename.stem.rpartition('_')[-1].split('-') + return y_min, y_max + + def reproject_cdo(grid, src_ds, trg_ds, mode='bilinear', overwrite=False): # instanciate the cdo operator = cdo.Cdo() # check if dataset exists - if pathlib.Path(trg_ds).exists() and not overwrite: + trg_ds = pathlib.Path(trg_ds) + if trg_ds.exists() and not overwrite: LOGGER.info('{} already exists. Aborting ...'.format(trg_ds)) return trg_ds diff --git a/climax/main/preprocess.py b/climax/main/preprocess.py index f915e75e496374a70ce62cf04df654431a5cddfb..dab0ef237570f8c6a43c15d9f6a43d01b692c910 100644 --- a/climax/main/preprocess.py +++ b/climax/main/preprocess.py @@ -10,10 +10,15 @@ import logging from joblib import Parallel, delayed from logging.config import dictConfig +# externals +import numpy as np +import xarray as xr + # locals from pysegcnn.core.logging import log_conf from pysegcnn.core.trainer import LogConfig -from climax.core.utils import get_inventory, reproject_cdo +from climax.core.utils import (get_inventory, reproject_cdo, + _parse_cordex_time_span) from climax.core.cli import preprocess_parser from climax.core.constants import EUROCORDEX_DOMAIN, CORDEX_PARAMETERS @@ -140,6 +145,38 @@ if __name__ == '__main__': args.overwrite) for src, trg in zip(source, target)) + # check whether to aggregate the netcdf files of a simulation covering + # differnt time periods into a single file + if args.aggregate: + # list of unique simulations + simulations = np.unique([file.stem.rpartition('_')[0] for file in + target]) + + # group the list of output files by model simulation + for sim in simulations: + + # chronologically sorted list of current model simulation + group = [file for file in target if file.name.startswith(sim)] + group = sorted(group) + + # read multiple netcdf files using xarray and dask + ds = xr.open_mfdataset(group) + + # create filename for netcdf covering the entire time period of + # the current simulation + y_min, _ = _parse_cordex_time_span(group[0]) # first year + _, y_max = _parse_cordex_time_span(group[-1]) # last year + filename = '_'.join([sim, '-'.join([y_min, y_max])]) + filename = file.parent.joinpath(filename) + + # save aggregated netcdf file + ds.to_netcdf(filename) + + # remove single netcdf files from disk + if args.remove: + for file in group: + file.unlink() + else: LOGGER.info('{} does not exist.'.format(str(args.source))) sys.exit()