diff --git a/climax/main/preprocess_ERA5.py b/climax/main/preprocess_ERA5.py index 303f1d46ebf41b6d208e6a952e5b3a5cdd784598..074de28d8ab5abe8121ba26edd07b85cb22d2c75 100644 --- a/climax/main/preprocess_ERA5.py +++ b/climax/main/preprocess_ERA5.py @@ -3,4 +3,90 @@ # !/usr/bin/env python # -*- coding: utf-8 -*- +# builtins +import re +import sys +import logging +from logging.config import dictConfig +# externals +import xarray as xr + +# locals +from climax.core.cli import preprocess_era5_parser +from climax.core.constants import ERA5_VARIABLES +from pysegcnn.core.logging import log_conf +from pysegcnn.core.utils import search_files +from pysegcnn.core.trainer import LogConfig + +# module level logger +LOGGER = logging.getLogger(__name__) + + +if __name__ == '__main__': + + # initialize logging + dictConfig(log_conf()) + + # define command line argument parser + parser = preprocess_era5_parser() + + # parse command line arguments + args = sys.argv[1:] + if not args: + parser.print_help() + sys.exit() + else: + args = parser.parse_args(args) + + # check whether the source directory exists + if args.source.exists(): + # check whether the target grid file exists + if not args.grid.exists(): + LOGGER.info('{} does not exist.'.format(args.grid)) + sys.exit() + + # check whether a single variable is specified + variables = ERA5_VARIABLES + if args.variable is not None: + variables = args.variable + + # iterate over the variables to preprocess + for var in variables: + # path to files of the current variable + files = search_files(args.source, + '_'.join(['^ERA5', var, '[0-9]{4}.nc$'])) + ymin, ymax = (re.search('[0-9]{4}', files[0].name)[0], + re.search('[0-9]{4}', files[-1].name)[0]) + + # check if aggregate file exists + filename = '_'.join(['ERA5', var, ymin, ymax]) + filename = args.target.joinpath(var, filename) + if filename.exists() and not args.overwrite: + LOGGER.info('{} already exists.'.format(filename)) + continue + + # aggregate files for different years into a single file using + # xarray and dask + ds = xr.open_mfdataset(files, parallel=True).compute() + LogConfig.init_log('Aggregating ERA5 years: {}'.format( + '-'.join([ymin, ymax]))) + LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join( + ['{}'.format(file) for file in files])) + + # aggregate hourly data to daily data: resample in case of missing + # days + ds = ds.resample(time='D').mean(dim='time') + + # set NetCDF file compression for each variable + for _, var in ds.data_vars.items(): + var.encoding['zlib'] = True + var.encoding['complevel'] = 5 + + # save aggregated netcdf file + LOGGER.info('Compressing NetCDF: {}'.format(filename)) + ds.to_netcdf(filename, engine='h5netcdf') + + else: + LOGGER.info('{} does not exist.'.format(str(args.source))) + sys.exit()