Skip to content
Snippets Groups Projects
Commit 23100a2b authored by Frisinghelli Daniel's avatar Frisinghelli Daniel
Browse files

Reproject and resample: on single files.

parent 13f853d2
No related branches found
No related tags found
No related merge requests found
...@@ -8,6 +8,7 @@ import re ...@@ -8,6 +8,7 @@ import re
import sys import sys
import logging import logging
from logging.config import dictConfig from logging.config import dictConfig
from joblib import Parallel, delayed
# externals # externals
import xarray as xr import xarray as xr
...@@ -55,14 +56,14 @@ if __name__ == '__main__': ...@@ -55,14 +56,14 @@ if __name__ == '__main__':
# iterate over the variables to preprocess # iterate over the variables to preprocess
for var in variables: for var in variables:
# path to files of the current variable # path to files of the current variable
files = sorted(search_files( source = sorted(search_files(
args.source, '_'.join(['^ERA5', var, '[0-9]{4}.nc$']))) args.source, '_'.join(['^ERA5', var, '[0-9]{4}.nc$'])))
ymin, ymax = (re.search('[0-9]{4}', files[0].name)[0], ymin, ymax = (re.search('[0-9]{4}', source[0].name)[0],
re.search('[0-9]{4}', files[-1].name)[0]) re.search('[0-9]{4}', source[-1].name)[0])
LogConfig.init_log('Aggregating ERA5 years: {}'.format( LogConfig.init_log('Aggregating ERA5 years: {}'.format(
'-'.join([ymin, ymax]))) '-'.join([ymin, ymax])))
LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join( LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join(
['{}'.format(file) for file in files])) ['{}'.format(file) for file in source]))
# check for dry-run # check for dry-run
if args.dry_run: if args.dry_run:
...@@ -79,10 +80,19 @@ if __name__ == '__main__': ...@@ -79,10 +80,19 @@ if __name__ == '__main__':
LOGGER.info('{} already exists.'.format(filename)) LOGGER.info('{} already exists.'.format(filename))
continue continue
# create filenames for reprojected and resampled files
target = [args.target.joinpath(f.name) for f in source]
# reproject and resample to target grid in parallel
target = Parallel(n_jobs=-1, verbose=51)(
delayed(reproject_cdo)(args.grid, src, trg, args.mode,
args.overwrite)
for src, trg in zip(source, target))
# aggregate files for different years into a single file using # aggregate files for different years into a single file using
# xarray and dask # xarray and dask
LOGGER.info('Reading ERA5 data ...') LOGGER.info('Reading ERA5 data ...')
ds = xr.open_mfdataset(files, parallel=True).compute() ds = xr.open_mfdataset(target, parallel=True).compute()
# aggregate hourly data to daily data: resample in case of missing # aggregate hourly data to daily data: resample in case of missing
# days # days
...@@ -95,15 +105,8 @@ if __name__ == '__main__': ...@@ -95,15 +105,8 @@ if __name__ == '__main__':
var.encoding['complevel'] = 5 var.encoding['complevel'] = 5
# save aggregated netcdf file # save aggregated netcdf file
tmp = filename.parent.joinpath(filename.name.replace('.nc',
'_tmp.nc'))
LOGGER.info('Compressing NetCDF: {}'.format(filename)) LOGGER.info('Compressing NetCDF: {}'.format(filename))
ds.to_netcdf(tmp, engine='h5netcdf') ds.to_netcdf(filename, engine='h5netcdf')
# reproject and resample to target grid
reproject_cdo(args.grid, tmp, filename, mode=args.mode,
overwrite=args.overwrite)
tmp.unlink()
else: else:
LOGGER.info('{} does not exist.'.format(str(args.source))) LOGGER.info('{} does not exist.'.format(str(args.source)))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment