diff --git a/climax/main/download_ERA5.py b/climax/main/download_ERA5.py index 755303aff5b21268de47a6769b9bb6b9986cecd4..4835c9759c775bf1e089b26d1240889730de175f 100644 --- a/climax/main/download_ERA5.py +++ b/climax/main/download_ERA5.py @@ -6,11 +6,17 @@ # builtins import os import pathlib +import logging +from logging.config import dictConfig from joblib import Parallel, delayed # externals import cdsapi import numpy as np +import xarray as xr + +# locals +from pysegcnn.core.logging import log_conf # ERA-5 product product = 'reanalysis-era5-pressure-levels' @@ -46,9 +52,15 @@ CONFIG = { # output path target = pathlib.Path('/mnt/CEPH_PROJECTS/FACT_CLIMAX/REANALYSIS/ERA5/') +# module level Logger +LOGGER = logging.getLogger(__name__) + if __name__ == '__main__': + # initialize logging + dictConfig(log_conf()) + # initialize client c = cdsapi.Client() @@ -59,11 +71,33 @@ if __name__ == '__main__': if not output.exists(): output.mkdir(parents=True, exist_ok=True) + # create output files + files = [output.joinpath('_'.join(['ERA5', var, year]) + '.nc') for + year in years] + # split the download to the different years: CDS API cannot handle # requests over the whole time period Parallel(n_jobs=min(len(years), os.cpu_count()), verbose=51)( delayed(c.retrieve)( - product, - {**CONFIG, **{'variable': var, 'year': year}}, - output.joinpath('_'.join(['ERA5', var, year]) + '.nc')) - for year in years) + product, {**CONFIG, **{'variable': var, 'year': year}}, file) + for file, year in zip(files, years) if not file.exists()) + + # aggregate files for different years into a single file using xarray + # and dask + ds = xr.open_mfdataset(files, parallel=True).compute() + filename = output.joinpath('_'.join('ERA5', var, years[0], years[-1])) + + # set NetCDF file compression for each variable + for _, var in ds.data_vars.items(): + var.encoding['zlib'] = True + var.encoding['complevel'] = 5 + + # save aggregated netcdf file + LOGGER.info('Compressing NetCDF: {}'.format(filename)) + ds.to_netcdf(filename, engine='h5netcdf') + + # remove single netcdf files from disk + # LOGGER.info('Removing individual NetCDF files ...') + # for file in files: + # file.unlink() + # LOGGER.info('rm {}'.format(file))