From 23100a2bb59d65fc242bfe032e2b89f45db6d0a6 Mon Sep 17 00:00:00 2001
From: "Daniel.Frisinghelli" <daniel.frisinghelli@eurac.edu>
Date: Thu, 24 Jun 2021 10:45:12 +0200
Subject: [PATCH] Reproject and resample: on single files.

---
 climax/main/preprocess_ERA5.py | 29 ++++++++++++++++-------------
 1 file changed, 16 insertions(+), 13 deletions(-)

diff --git a/climax/main/preprocess_ERA5.py b/climax/main/preprocess_ERA5.py
index 0a71d97..f71eb9a 100644
--- a/climax/main/preprocess_ERA5.py
+++ b/climax/main/preprocess_ERA5.py
@@ -8,6 +8,7 @@ import re
 import sys
 import logging
 from logging.config import dictConfig
+from joblib import Parallel, delayed
 
 # externals
 import xarray as xr
@@ -55,14 +56,14 @@ if __name__ == '__main__':
         # iterate over the variables to preprocess
         for var in variables:
             # path to files of the current variable
-            files = sorted(search_files(
+            source = sorted(search_files(
                 args.source, '_'.join(['^ERA5', var, '[0-9]{4}.nc$'])))
-            ymin, ymax = (re.search('[0-9]{4}', files[0].name)[0],
-                          re.search('[0-9]{4}', files[-1].name)[0])
+            ymin, ymax = (re.search('[0-9]{4}', source[0].name)[0],
+                          re.search('[0-9]{4}', source[-1].name)[0])
             LogConfig.init_log('Aggregating ERA5 years: {}'.format(
                 '-'.join([ymin, ymax])))
             LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join(
-                        ['{}'.format(file) for file in files]))
+                        ['{}'.format(file) for file in source]))
 
             # check for dry-run
             if args.dry_run:
@@ -79,10 +80,19 @@ if __name__ == '__main__':
                 LOGGER.info('{} already exists.'.format(filename))
                 continue
 
+            # create filenames for reprojected and resampled files
+            target = [args.target.joinpath(f.name) for f in source]
+
+            # reproject and resample to target grid in parallel
+            target = Parallel(n_jobs=-1, verbose=51)(
+                    delayed(reproject_cdo)(args.grid, src, trg, args.mode,
+                                           args.overwrite)
+                    for src, trg in zip(source, target))
+
             # aggregate files for different years into a single file using
             # xarray and dask
             LOGGER.info('Reading ERA5 data ...')
-            ds = xr.open_mfdataset(files, parallel=True).compute()
+            ds = xr.open_mfdataset(target, parallel=True).compute()
 
             # aggregate hourly data to daily data: resample in case of missing
             # days
@@ -95,15 +105,8 @@ if __name__ == '__main__':
                 var.encoding['complevel'] = 5
 
             # save aggregated netcdf file
-            tmp = filename.parent.joinpath(filename.name.replace('.nc',
-                                                                 '_tmp.nc'))
             LOGGER.info('Compressing NetCDF: {}'.format(filename))
-            ds.to_netcdf(tmp, engine='h5netcdf')
-
-            # reproject and resample to target grid
-            reproject_cdo(args.grid, tmp, filename, mode=args.mode,
-                          overwrite=args.overwrite)
-            tmp.unlink()
+            ds.to_netcdf(filename, engine='h5netcdf')
 
     else:
         LOGGER.info('{} does not exist.'.format(str(args.source)))
-- 
GitLab