From 1eeec5fe2f0dbaffc368adcd7ef14d1597f82fd6 Mon Sep 17 00:00:00 2001 From: "Daniel.Frisinghelli" <daniel.frisinghelli@eurac.edu> Date: Wed, 16 Jun 2021 17:24:42 +0200 Subject: [PATCH] Implementing preprocessing based on input csv file. --- climax/main/preprocess.py | 231 ++++++++++++++++++++------------------ 1 file changed, 120 insertions(+), 111 deletions(-) diff --git a/climax/main/preprocess.py b/climax/main/preprocess.py index e1b6944..1111903 100644 --- a/climax/main/preprocess.py +++ b/climax/main/preprocess.py @@ -13,6 +13,7 @@ from logging.config import dictConfig # externals import numpy as np import xarray as xr +import pandas as pd # locals from pysegcnn.core.logging import log_conf @@ -63,57 +64,18 @@ if __name__ == '__main__': LOGGER.info('{} does not exist.'.format(args.grid)) sys.exit() - # create target directory: check that it is differen from the source - # directory - if args.target == args.source: - LOGGER.info('Source and target paths cannot be equal.') - sys.exit() - - if not args.target.exists(): - LOGGER.info('mkdir {}'.format(args.target)) - args.target.mkdir(parents=True, exist_ok=True) + def preprocess_cordex_simulation(pattern): + # get all the files matching the defined pattern in the source + # directory + source = sorted(get_inventory(args.source, pattern)) - # check ensemble identifier - ensemble = _check_str_arg(args.ensemble, 'r[0-9]i[0-9]p[0-9]') + # log list of input files + LogConfig.init_log('Files matching "{}":'.format(pattern)) + LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join( + ['{}'.format(file) for file in source])) - # check regional climate model version - version = _check_str_arg(args.version, 'v[0-9]') - - # construct file pattern to match from input parameters - pattern = '(.*).nc$' - pattern = '_'.join([param if param is not None else '(.*)' for param in - [args.variable, EUROCORDEX_DOMAIN, args.gcm, - args.scenario, ensemble, args.rcm, version, - pattern]]) - - # get all the files matching the defined pattern in the source - # directory - source = sorted(get_inventory(args.source, pattern, return_df=False)) - - # log list of input files - LogConfig.init_log('Files matching "{}" for variable "{}":'.format( - pattern, args.variable)) - LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join( - ['{}'.format(file) for file in source])) - - # generate target filenames - # output path: target/var/scenario - if (args.variable is not None) and (args.scenario is not None): - # if variable and scenarios are specified, directly build - # output path - output_path = args.target.joinpath(args.variable).joinpath( - args.scenario) - - # check if output path exists - if not output_path.exists(): - LOGGER.info('mkdir {}'.format(output_path)) - output_path.mkdir(parents=True, exist_ok=True) - - # list of output files - target = [output_path.joinpath(file.name) for file in source] - else: - # if variable and scenarios are not specified, infer output - # path from file names + # generate target filenames: infer from source file names + # output path: target/var/scenario target = [] for file in source: # parts: dictionary of file name components @@ -132,68 +94,115 @@ if __name__ == '__main__': # output file name target.append(output_path.joinpath(file.name)) - # log list of output files - LogConfig.init_log('Output file names') - LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join( - ['{}'.format(file) for file in target])) - - # check whether to only print which files would be processed - if not args.dry_run: - # run reprojection in parallel - target = Parallel(n_jobs=-1, verbose=51)( - delayed(reproject_cdo)(args.grid, src, trg, args.mode, - args.overwrite) - for src, trg in zip(source, target)) - - # check whether to aggregate the netcdf files of a simulation covering - # differnt time periods into a single file - if args.aggregate: - LogConfig.init_log('Aggregating time periods of simulations.') - # list of unique simulations - simulations = np.unique([file.stem.rpartition('_')[0] for file in - target]) - - # group the list of output files by model simulation - for sim in simulations: - - # chronologically sorted list of current model simulation - group = [file for file in target if file.name.startswith(sim)] - group = sorted(group) - - # create filename for netcdf covering the entire time period of - # the current simulation - y_min, _ = _parse_cordex_time_span(group[0]) # first year - _, y_max = _parse_cordex_time_span(group[-1]) # last year - filename = '_'.join([sim, '-'.join([y_min, y_max])]) + '.nc' - filename = group[0].parent.joinpath(filename) - - # log simulation name, time span and files - LogConfig.init_log('Aggregating simulation: {}, Time span: {}' - .format(sim, '-'.join([y_min, y_max]))) - LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join( - ['{}'.format(file) for file in group])) - - # read multiple netcdf files using xarray and dask - ds = xr.open_mfdataset(group, parallel=True).compute() - - # set encoding of time: calendar - ds.time.encoding = ds.time_bnds.encoding - - # set NetCDF file compression for each variable - for _, var in ds.data_vars.items(): - var.encoding['zlib'] = True - var.encoding['complevel'] = 5 - - # save aggregated netcdf file - LOGGER.info('Compressing NetCDF: {}'.format(filename)) - ds.to_netcdf(filename, engine='h5netcdf') - - # remove single netcdf files from disk - if args.remove: - LOGGER.info('Removing individual NetCDF files ...') - for file in group: - file.unlink() - LOGGER.info('rm {}'.format(file)) + # log list of output files + LogConfig.init_log('Output file names') + LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join( + ['{}'.format(file) for file in target])) + + # check whether to only print which files would be processed + if not args.dry_run: + # run reprojection in parallel + target = Parallel(n_jobs=-1, verbose=51)( + delayed(reproject_cdo)(args.grid, src, trg, args.mode, + args.overwrite) + for src, trg in zip(source, target)) + + # check whether to aggregate the netcdf files of a simulation + # covering different time periods into a single file + if args.aggregate: + LogConfig.init_log('Aggregating time periods of simulations.') + # list of unique simulations + simulations = np.unique([file.stem.rpartition('_')[0] for file + in target]) + + # group the list of output files by model simulation + for sim in simulations: + + # chronologically sorted list of current model simulation + group = [file for file in target if + file.name.startswith(sim)] + group = sorted(group) + + # create filename for netcdf covering the entire time + # period of the current simulation + y_min, _ = _parse_cordex_time_span(group[0]) # first year + _, y_max = _parse_cordex_time_span(group[-1]) # last year + filename = '_'.join([sim, '-'.join([y_min, y_max])]) + filename = group[0].parent.joinpath(filename + '.nc') + + # log simulation name, time span and files + LogConfig.init_log('Aggregating simulation: {}, ' + 'Time span: {}' + .format(sim, '-'.join([y_min, y_max]))) + LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join( + ['{}'.format(file) for file in group])) + + # read multiple netcdf files using xarray and dask + ds = xr.open_mfdataset(group, parallel=True).compute() + + # set encoding of time: calendar + ds.time.encoding = ds.time_bnds.encoding + + # set NetCDF file compression for each variable + for _, var in ds.data_vars.items(): + var.encoding['zlib'] = True + var.encoding['complevel'] = 5 + + # save aggregated netcdf file + LOGGER.info('Compressing NetCDF: {}'.format(filename)) + ds.to_netcdf(filename, engine='h5netcdf') + + # remove single netcdf files from disk + if args.remove: + LOGGER.info('Removing individual NetCDF files ...') + for file in group: + file.unlink() + LOGGER.info('rm {}'.format(file)) + + # create target directory: check that it is different from the source + # directory + if args.target == args.source: + LOGGER.info('Source and target paths cannot be equal.') + sys.exit() + + if not args.target.exists(): + LOGGER.info('mkdir {}'.format(args.target)) + args.target.mkdir(parents=True, exist_ok=True) + + # check if an input file is specified + nc_pattern = '(.*).nc$' # search for NetCDF files + if args.file is not None: + # read input file to dataframe + df = pd.read_csv(args.file, delimiter=';') + + # iterate over cordex simulations + for index, row in df.iterrows(): + # construct pattern to search for cordex files + pattern = '_'.join([row.variable, row.domain, row.gcm, + row.experiment, row.ensemble, + row.institute_rcm, + row.downscale_realisation, nc_pattern]) + + # TODO: filter pattern by scenario and variable + + # run cordex preprocessing + preprocess_cordex_simulation(pattern) + + else: + # check ensemble identifier + ensemble = _check_str_arg(args.ensemble, 'r[0-9]i[0-9]p[0-9]') + + # check regional climate model version + version = _check_str_arg(args.version, 'v[0-9]') + + # construct file pattern to match from input parameters + pattern = '_'.join([param if param is not None else '(.*)' for + param in [args.variable, EUROCORDEX_DOMAIN, + args.gcm, args.scenario, ensemble, + args.rcm, version, nc_pattern]]) + + # run cordex preprocessing + preprocess_cordex_simulation(pattern) else: LOGGER.info('{} does not exist.'.format(str(args.source))) -- GitLab