From 3cd35843b7b7b1587ac61759699bc84603391219 Mon Sep 17 00:00:00 2001 From: "Daniel.Frisinghelli" <daniel.frisinghelli@eurac.edu> Date: Thu, 24 Jun 2021 14:06:24 +0200 Subject: [PATCH] Modules to preprocess Cordex, Era5 and observation data. --- climax/main/preprocess/download_ERA5.py | 67 ++++++ climax/main/preprocess/preprocess_CORDEX.py | 220 ++++++++++++++++++++ climax/main/preprocess/preprocess_ERA5.py | 132 ++++++++++++ climax/main/preprocess/preprocess_OBS.py | 94 +++++++++ 4 files changed, 513 insertions(+) create mode 100644 climax/main/preprocess/download_ERA5.py create mode 100644 climax/main/preprocess/preprocess_CORDEX.py create mode 100644 climax/main/preprocess/preprocess_ERA5.py create mode 100644 climax/main/preprocess/preprocess_OBS.py diff --git a/climax/main/preprocess/download_ERA5.py b/climax/main/preprocess/download_ERA5.py new file mode 100644 index 0000000..2dfd77b --- /dev/null +++ b/climax/main/preprocess/download_ERA5.py @@ -0,0 +1,67 @@ +"""Download ERA-5 data from the Copernicus Climate Data Store.""" + +# !/usr/bin/env python +# -*- coding: utf-8 -*- + +# builtins +import os +from joblib import Parallel, delayed + +# externals +import cdsapi +import numpy as np + +# locals +from climax.core.constants import ERA5_VARIABLES +from climax.main.config import ERA5_PATH + +# ERA-5 product +product = 'reanalysis-era5-pressure-levels' +product_type = 'reanalysis' + +# pressure levels +pressure_levels = ['850', '500'] + +# time period +years = [str(y) for y in np.arange(1981, 2011)] +month = [str(m) for m in np.arange(1, 13)] +days = [str(d) for d in np.arange(1, 32)] +time = ["{:02d}:00".format(t) for t in np.arange(0,24)] + +# area of interest (Alps): North, West, South, East +area = [52, 2, 40, 20] + +# ERA5 download configuration dictionary +CONFIG = { + 'product_type': product_type, + 'pressure_level': pressure_levels, + 'month': month, + 'day': days, + 'time': time, + 'format': 'netcdf', + 'area': area +} + + +if __name__ == '__main__': + + # initialize client + c = cdsapi.Client() + + # download data for the different variables + for var in ERA5_VARIABLES: + # create output directory + output = ERA5_PATH.joinpath('Downloads', var) + if not output.exists(): + output.mkdir(parents=True, exist_ok=True) + + # create output files + files = [output.joinpath('_'.join(['ERA5', var, year]) + '.nc') for + year in years] + + # split the download to the different years: CDS API cannot handle + # requests over the whole time period + Parallel(n_jobs=min(len(years), os.cpu_count()), verbose=51)( + delayed(c.retrieve)( + product, {**CONFIG, **{'variable': var, 'year': year}}, file) + for file, year in zip(files, years) if not file.exists()) diff --git a/climax/main/preprocess/preprocess_CORDEX.py b/climax/main/preprocess/preprocess_CORDEX.py new file mode 100644 index 0000000..446fb68 --- /dev/null +++ b/climax/main/preprocess/preprocess_CORDEX.py @@ -0,0 +1,220 @@ +"""Reproject and resample Cordex data to target grid.""" + +# !/usr/bin/env python +# -*- coding: utf-8 -*- + +# builtins +import re +import sys +import logging +from joblib import Parallel, delayed +from logging.config import dictConfig + +# externals +import numpy as np +import xarray as xr +import pandas as pd + +# locals +from pysegcnn.core.logging import log_conf +from pysegcnn.core.trainer import LogConfig +from climax.core.utils import (get_inventory, reproject_cdo, + _parse_cordex_time_span) +from climax.core.cli import preprocess_cordex_parser +from climax.core.constants import EUROCORDEX_DOMAIN, CORDEX_PARAMETERS + +# module level logger +LOGGER = logging.getLogger(__name__) + + +def _check_str_arg(arg, pattern): + # check if input argument matches a defined pattern + if arg is not None: + arg_id = re.search(pattern, arg) + if arg_id is None: + LOGGER.info('Argument "{}" invalid, form "{}" required.' + .format(arg, pattern)) + sys.exit() + else: + arg = arg_id[0] + + return arg + + +if __name__ == '__main__': + + # configure logging + dictConfig(log_conf()) + + # define command line argument parser + parser = preprocess_cordex_parser() + + # parse command line arguments + args = sys.argv[1:] + if not args: + parser.print_help() + sys.exit() + else: + args = parser.parse_args(args) + + # check whether the source directory exists + if args.source.exists(): + # check whether the target grid file exists + if not args.grid.exists(): + LOGGER.info('{} does not exist.'.format(args.grid)) + sys.exit() + + def preprocess_cordex_simulation(pattern): + # get all the files matching the defined pattern in the source + # directory + source = sorted(get_inventory(args.source, pattern)) + + # log list of input files + LogConfig.init_log('Files matching "{}":'.format(pattern)) + LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join( + ['{}'.format(file) for file in source])) + + # generate target filenames: infer from source file names + # output path: target/var/scenario + target = [] + for file in source: + # parts: dictionary of file name components + parts = {k: v for k, v in zip(CORDEX_PARAMETERS, + file.stem.split('_'))} + + # construct output path for current file + output_path = args.target.joinpath( + parts['Variable']).joinpath(parts['Scenario']) + + # check if output path exists + if not output_path.exists(): + LOGGER.info('mkdir {}'.format(output_path)) + output_path.mkdir(parents=True, exist_ok=True) + + # output file name + target.append(output_path.joinpath(file.name)) + + # log list of output files + LogConfig.init_log('Output file names') + LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join( + ['{}'.format(file) for file in target])) + + # check whether to only print which files would be processed + if args.dry_run: + LogConfig.init_log('Dry run. No files processed.') + return + + # run reprojection in parallel + target = Parallel(n_jobs=-1, verbose=51)( + delayed(reproject_cdo)(args.grid, src, trg, args.mode, + args.overwrite) + for src, trg in zip(source, target)) + + # check whether to aggregate the netcdf files of a simulation + # covering different time periods into a single file + if args.aggregate: + LogConfig.init_log('Aggregating time periods of simulations.') + # list of unique simulations + simulations = np.unique([file.stem.rpartition('_')[0] for file + in target]) + + # group the list of output files by model simulation + for sim in simulations: + + # chronologically sorted list of current model simulation + group = [file for file in target if + file.name.startswith(sim)] + group = sorted(group) + + # create filename for netcdf covering the entire time + # period of the current simulation + y_min, _ = _parse_cordex_time_span(group[0]) # first year + _, y_max = _parse_cordex_time_span(group[-1]) # last year + filename = '_'.join([sim, '-'.join([y_min, y_max])]) + filename = group[0].parent.joinpath(filename + '.nc') + + # log simulation name, time span and files + LogConfig.init_log('Aggregating simulation: {}, ' + 'Time span: {}' + .format(sim, '-'.join([y_min, y_max]))) + LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join( + ['{}'.format(file) for file in group])) + + # read multiple netcdf files using xarray and dask + ds = xr.open_mfdataset(group, parallel=True).compute() + + # set encoding of time: calendar + ds.time.encoding = ds.time_bnds.encoding + + # set NetCDF file compression for each variable + for _, var in ds.data_vars.items(): + var.encoding['zlib'] = True + var.encoding['complevel'] = 5 + + # save aggregated netcdf file + LOGGER.info('Compressing NetCDF: {}'.format(filename)) + ds.to_netcdf(filename, engine='h5netcdf') + + # remove single netcdf files from disk + if args.remove: + LOGGER.info('Removing individual NetCDF files ...') + for file in group: + file.unlink() + LOGGER.info('rm {}'.format(file)) + + # create target directory: check that it is different from the source + # directory + if args.target == args.source: + LOGGER.info('Source and target paths cannot be equal.') + sys.exit() + + if not args.target.exists(): + LOGGER.info('mkdir {}'.format(args.target)) + args.target.mkdir(parents=True, exist_ok=True) + + # check if an input file is specified + nc_pattern = '(.*).nc$' # search for NetCDF files + if args.file is not None: + # read input file to dataframe + df = pd.read_csv(args.file, delimiter=';') + + # iterate over cordex simulations + for index, row in df.iterrows(): + # construct pattern to search for cordex files + pattern = '_'.join([row.variable, row.domain, row.gcm, + row.experiment, row.ensemble, + row.institute_rcm, + row.downscale_realisation, nc_pattern]) + + # filter by variable + if args.variable is not None: + if not pattern.startswith(args.variable): + continue + + # filter by experiment + if args.scenario is not None: + if not args.scenario in pattern: + continue + + # run cordex preprocessing + preprocess_cordex_simulation(pattern) + + else: + # check ensemble identifier + ensemble = _check_str_arg(args.ensemble, 'r[0-9]i[0-9]p[0-9]') + + # check regional climate model version + version = _check_str_arg(args.version, 'v[0-9]') + + # construct file pattern to match from input parameters + pattern = '_'.join([param if param is not None else '(.*)' for + param in [args.variable, EUROCORDEX_DOMAIN, + args.gcm, args.scenario, ensemble, + args.rcm, version, nc_pattern]]) + + # run cordex preprocessing + preprocess_cordex_simulation(pattern) + + else: + LOGGER.info('{} does not exist.'.format(str(args.source))) + sys.exit() diff --git a/climax/main/preprocess/preprocess_ERA5.py b/climax/main/preprocess/preprocess_ERA5.py new file mode 100644 index 0000000..66eaa5d --- /dev/null +++ b/climax/main/preprocess/preprocess_ERA5.py @@ -0,0 +1,132 @@ +"""Preprocess ERA-5 data: aggregate to daily data.""" + +# !/usr/bin/env python +# -*- coding: utf-8 -*- + +# builtins +import re +import sys +import logging +from logging.config import dictConfig +from joblib import Parallel, delayed + +# externals +import xarray as xr + +# locals +from climax.core.cli import preprocess_era5_parser +from climax.core.constants import ERA5_VARIABLES +from climax.core.utils import reproject_cdo +from pysegcnn.core.logging import log_conf +from pysegcnn.core.utils import search_files +from pysegcnn.core.trainer import LogConfig + +# module level logger +LOGGER = logging.getLogger(__name__) + + +if __name__ == '__main__': + + # initialize logging + dictConfig(log_conf()) + + # define command line argument parser + parser = preprocess_era5_parser() + + # parse command line arguments + args = sys.argv[1:] + if not args: + parser.print_help() + sys.exit() + else: + args = parser.parse_args(args) + + # check whether the source directory exists + if args.source.exists(): + # check whether the target grid file exists + if not args.grid.exists(): + LOGGER.info('{} does not exist.'.format(args.grid)) + sys.exit() + + # check whether a single variable is specified + variables = ERA5_VARIABLES + if args.variable is not None: + variables = args.variable + + # iterate over the variables to preprocess + for var in variables: + # path to files of the current variable + source = sorted(search_files( + args.source, '_'.join(['^ERA5', var, '[0-9]{4}.nc$']))) + ymin, ymax = (re.search('[0-9]{4}', source[0].name)[0], + re.search('[0-9]{4}', source[-1].name)[0]) + LogConfig.init_log('Aggregating ERA5 years: {}'.format( + '-'.join([ymin, ymax]))) + LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join( + ['{}'.format(file) for file in source])) + + # check for dry-run + if args.dry_run: + LOGGER.info('Dry run: No output produced.') + continue + + # check if aggregated file exists + filename = '_'.join(['ERA5', var, ymin, ymax]) + '.nc' + filename = args.target.joinpath(var, filename) + if not filename.parent.exists(): + LOGGER.info('mkdir {}'.format(filename.parent)) + filename.parent.mkdir(parents=True, exist_ok=True) + if filename.exists() and not args.overwrite: + LOGGER.info('{} already exists.'.format(filename)) + continue + + # create filenames for reprojected and resampled files + target = [args.target.joinpath(var, f.name) for f in source] + dlyavg = [args.target.joinpath(var, f.name.replace('.nc', '_d.nc')) + for f in source] + + # aggregate hourly data to daily data: resample in case of missing + # days + LOGGER.info('Computing daily averages ...') + for src, tmp in zip(source, dlyavg): + ds = xr.open_dataset(src) + + # compute daily averages + ds = ds.resample(time='D').mean(dim='time') + + # save intermediate file for resampling and reprojection to + # target grid + ds.to_netcdf(tmp, engine='h5netcdf') + + # reproject and resample to target grid in parallel + LOGGER.info('Reprojecting and resampling to target grid ...') + target = Parallel(n_jobs=-1, verbose=51)( + delayed(reproject_cdo)(args.grid, tmp, trg, args.mode, + args.overwrite) + for tmp, trg in zip(dlyavg, target)) + + # remove temporary daily averages + for avg in dlyavg: + avg.unlink() + + # aggregate files for different years into a single file using + # xarray and dask + LOGGER.info('Aggregating different years into single file ...') + ds = xr.open_mfdataset(target, parallel=True).compute() + + # set NetCDF file compression for each variable + for _, var in ds.data_vars.items(): + var.encoding['zlib'] = True + var.encoding['complevel'] = 5 + + # save aggregated netcdf file + LOGGER.info('Compressing NetCDF: {}'.format(filename)) + ds.to_netcdf(filename, engine='h5netcdf') + + # remove single-year files + for trg in target: + trg.unlink() + + else: + LOGGER.info('{} does not exist.'.format(str(args.source))) + sys.exit() diff --git a/climax/main/preprocess/preprocess_OBS.py b/climax/main/preprocess/preprocess_OBS.py new file mode 100644 index 0000000..55748a3 --- /dev/null +++ b/climax/main/preprocess/preprocess_OBS.py @@ -0,0 +1,94 @@ +"""Preprocess in-situ observation data.""" + +# !/usr/bin/env python +# -*- coding: utf-8 -*- + +# builtins +import re +import sys +import logging +from logging.config import dictConfig + +# externals +import xarray as xr + +# locals +from climax.core.cli import preprocess_obs_parser +from climax.core.constants import CORDEX_VARIABLES +from pysegcnn.core.logging import log_conf +from pysegcnn.core.utils import search_files +from pysegcnn.core.trainer import LogConfig + +# module level logger +LOGGER = logging.getLogger(__name__) + + +if __name__ == '__main__': + + # initialize logging + dictConfig(log_conf()) + + # define command line argument parser + parser = preprocess_obs_parser() + + # parse command line arguments + args = sys.argv[1:] + if not args: + parser.print_help() + sys.exit() + else: + args = parser.parse_args(args) + + # check whether the source directory exists + if args.source.exists(): + + # check whether a single variable is specified + variables = CORDEX_VARIABLES + if args.variable is not None: + variables = args.variable + + # iterate over the variables to preprocess + for var in variables: + # path to files of the current variable + source = sorted(search_files(args.source.joinpath(var), '(.*).nc$')) + + # get observation time period + years = [re.search('[0-9]{4}', str(f))[0] for f in source] + y_min, y_max = min(years), max(years) + LogConfig.init_log('Aggregating files for years: {}' + .format('-'.join([y_min, y_max]))) + LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join( + ['{}'.format(file) for file in source])) + + # check for dry-run + if args.dry_run: + LOGGER.info('Dry run: No output produced.') + continue + + # check if aggregated file exists + filename = '_'.join(['OBS', var, y_min, y_max]) + '.nc' + filename = args.target.joinpath(var, filename) + if not filename.parent.exists(): + LOGGER.info('mkdir {}'.format(filename.parent)) + filename.parent.mkdir(parents=True, exist_ok=True) + if filename.exists() and not args.overwrite: + LOGGER.info('{} already exists.'.format(filename)) + continue + + # aggregate files for different years into a single file using + # xarray and dask + LOGGER.info('Aggregating different years into single file ...') + ds = xr.open_mfdataset(source, parallel=True).compute() + + # set NetCDF file compression for each variable + for _, var in ds.data_vars.items(): + var.encoding['zlib'] = True + var.encoding['complevel'] = 5 + + # save aggregated netcdf file + LOGGER.info('Compressing NetCDF: {}'.format(filename)) + ds.to_netcdf(filename, engine='h5netcdf') + + else: + LOGGER.info('{} does not exist.'.format(str(args.source))) + sys.exit() -- GitLab