Skip to content
Snippets Groups Projects
Commit 2ce1f770 authored by Frisinghelli Daniel's avatar Frisinghelli Daniel
Browse files

Implemented aggregation of simulations over different time periods.

parent 05f81a31
No related branches found
No related tags found
No related merge requests found
......@@ -93,6 +93,19 @@ def preprocess_parser():
help='Overwrite existing files {}.'.format(default),
default=False, nargs='?', const=True, metavar='')
# optional argument: whether to aggregate time periods of simulations
parser.add_argument('-a', '--aggregate', type=bool,
help=('Aggregate time periods of simulations to a '
'single NetCDF file {}.'.format(default)),
default=False, nargs='?', const=True, metavar='')
# optional argument: whether to remove single netcdf files
parser.add_argument('-rm', '--remove', type=bool,
help=('Remove individual NetCDF files of different '
'time periods. Only applies when -a is True {}.'
.format(default)),
default=False, nargs='?', const=True, metavar='')
# optional argument: dry run, print files which would be processed
parser.add_argument('-d', '--dry-run', type=bool,
help=('Print files which would be processed {}.'
......
......@@ -36,13 +36,20 @@ def get_inventory(path, pattern='(.*).nc$', return_df=False):
return inventory
def _parse_cordex_time_span(filename):
filename = pathlib.Path(filename)
y_min, y_max = filename.stem.rpartition('_')[-1].split('-')
return y_min, y_max
def reproject_cdo(grid, src_ds, trg_ds, mode='bilinear', overwrite=False):
# instanciate the cdo
operator = cdo.Cdo()
# check if dataset exists
if pathlib.Path(trg_ds).exists() and not overwrite:
trg_ds = pathlib.Path(trg_ds)
if trg_ds.exists() and not overwrite:
LOGGER.info('{} already exists. Aborting ...'.format(trg_ds))
return trg_ds
......
......@@ -10,10 +10,15 @@ import logging
from joblib import Parallel, delayed
from logging.config import dictConfig
# externals
import numpy as np
import xarray as xr
# locals
from pysegcnn.core.logging import log_conf
from pysegcnn.core.trainer import LogConfig
from climax.core.utils import get_inventory, reproject_cdo
from climax.core.utils import (get_inventory, reproject_cdo,
_parse_cordex_time_span)
from climax.core.cli import preprocess_parser
from climax.core.constants import EUROCORDEX_DOMAIN, CORDEX_PARAMETERS
......@@ -140,6 +145,38 @@ if __name__ == '__main__':
args.overwrite)
for src, trg in zip(source, target))
# check whether to aggregate the netcdf files of a simulation covering
# differnt time periods into a single file
if args.aggregate:
# list of unique simulations
simulations = np.unique([file.stem.rpartition('_')[0] for file in
target])
# group the list of output files by model simulation
for sim in simulations:
# chronologically sorted list of current model simulation
group = [file for file in target if file.name.startswith(sim)]
group = sorted(group)
# read multiple netcdf files using xarray and dask
ds = xr.open_mfdataset(group)
# create filename for netcdf covering the entire time period of
# the current simulation
y_min, _ = _parse_cordex_time_span(group[0]) # first year
_, y_max = _parse_cordex_time_span(group[-1]) # last year
filename = '_'.join([sim, '-'.join([y_min, y_max])])
filename = file.parent.joinpath(filename)
# save aggregated netcdf file
ds.to_netcdf(filename)
# remove single netcdf files from disk
if args.remove:
for file in group:
file.unlink()
else:
LOGGER.info('{} does not exist.'.format(str(args.source)))
sys.exit()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment