Skip to content
Snippets Groups Projects
Commit 1eeec5fe authored by Frisinghelli Daniel's avatar Frisinghelli Daniel
Browse files

Implementing preprocessing based on input csv file.

parent 64e3e90d
No related branches found
No related tags found
No related merge requests found
......@@ -13,6 +13,7 @@ from logging.config import dictConfig
# externals
import numpy as np
import xarray as xr
import pandas as pd
# locals
from pysegcnn.core.logging import log_conf
......@@ -63,57 +64,18 @@ if __name__ == '__main__':
LOGGER.info('{} does not exist.'.format(args.grid))
sys.exit()
# create target directory: check that it is differen from the source
# directory
if args.target == args.source:
LOGGER.info('Source and target paths cannot be equal.')
sys.exit()
if not args.target.exists():
LOGGER.info('mkdir {}'.format(args.target))
args.target.mkdir(parents=True, exist_ok=True)
def preprocess_cordex_simulation(pattern):
# get all the files matching the defined pattern in the source
# directory
source = sorted(get_inventory(args.source, pattern))
# check ensemble identifier
ensemble = _check_str_arg(args.ensemble, 'r[0-9]i[0-9]p[0-9]')
# log list of input files
LogConfig.init_log('Files matching "{}":'.format(pattern))
LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join(
['{}'.format(file) for file in source]))
# check regional climate model version
version = _check_str_arg(args.version, 'v[0-9]')
# construct file pattern to match from input parameters
pattern = '(.*).nc$'
pattern = '_'.join([param if param is not None else '(.*)' for param in
[args.variable, EUROCORDEX_DOMAIN, args.gcm,
args.scenario, ensemble, args.rcm, version,
pattern]])
# get all the files matching the defined pattern in the source
# directory
source = sorted(get_inventory(args.source, pattern, return_df=False))
# log list of input files
LogConfig.init_log('Files matching "{}" for variable "{}":'.format(
pattern, args.variable))
LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join(
['{}'.format(file) for file in source]))
# generate target filenames
# output path: target/var/scenario
if (args.variable is not None) and (args.scenario is not None):
# if variable and scenarios are specified, directly build
# output path
output_path = args.target.joinpath(args.variable).joinpath(
args.scenario)
# check if output path exists
if not output_path.exists():
LOGGER.info('mkdir {}'.format(output_path))
output_path.mkdir(parents=True, exist_ok=True)
# list of output files
target = [output_path.joinpath(file.name) for file in source]
else:
# if variable and scenarios are not specified, infer output
# path from file names
# generate target filenames: infer from source file names
# output path: target/var/scenario
target = []
for file in source:
# parts: dictionary of file name components
......@@ -132,68 +94,115 @@ if __name__ == '__main__':
# output file name
target.append(output_path.joinpath(file.name))
# log list of output files
LogConfig.init_log('Output file names')
LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join(
['{}'.format(file) for file in target]))
# check whether to only print which files would be processed
if not args.dry_run:
# run reprojection in parallel
target = Parallel(n_jobs=-1, verbose=51)(
delayed(reproject_cdo)(args.grid, src, trg, args.mode,
args.overwrite)
for src, trg in zip(source, target))
# check whether to aggregate the netcdf files of a simulation covering
# differnt time periods into a single file
if args.aggregate:
LogConfig.init_log('Aggregating time periods of simulations.')
# list of unique simulations
simulations = np.unique([file.stem.rpartition('_')[0] for file in
target])
# group the list of output files by model simulation
for sim in simulations:
# chronologically sorted list of current model simulation
group = [file for file in target if file.name.startswith(sim)]
group = sorted(group)
# create filename for netcdf covering the entire time period of
# the current simulation
y_min, _ = _parse_cordex_time_span(group[0]) # first year
_, y_max = _parse_cordex_time_span(group[-1]) # last year
filename = '_'.join([sim, '-'.join([y_min, y_max])]) + '.nc'
filename = group[0].parent.joinpath(filename)
# log simulation name, time span and files
LogConfig.init_log('Aggregating simulation: {}, Time span: {}'
.format(sim, '-'.join([y_min, y_max])))
LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join(
['{}'.format(file) for file in group]))
# read multiple netcdf files using xarray and dask
ds = xr.open_mfdataset(group, parallel=True).compute()
# set encoding of time: calendar
ds.time.encoding = ds.time_bnds.encoding
# set NetCDF file compression for each variable
for _, var in ds.data_vars.items():
var.encoding['zlib'] = True
var.encoding['complevel'] = 5
# save aggregated netcdf file
LOGGER.info('Compressing NetCDF: {}'.format(filename))
ds.to_netcdf(filename, engine='h5netcdf')
# remove single netcdf files from disk
if args.remove:
LOGGER.info('Removing individual NetCDF files ...')
for file in group:
file.unlink()
LOGGER.info('rm {}'.format(file))
# log list of output files
LogConfig.init_log('Output file names')
LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join(
['{}'.format(file) for file in target]))
# check whether to only print which files would be processed
if not args.dry_run:
# run reprojection in parallel
target = Parallel(n_jobs=-1, verbose=51)(
delayed(reproject_cdo)(args.grid, src, trg, args.mode,
args.overwrite)
for src, trg in zip(source, target))
# check whether to aggregate the netcdf files of a simulation
# covering different time periods into a single file
if args.aggregate:
LogConfig.init_log('Aggregating time periods of simulations.')
# list of unique simulations
simulations = np.unique([file.stem.rpartition('_')[0] for file
in target])
# group the list of output files by model simulation
for sim in simulations:
# chronologically sorted list of current model simulation
group = [file for file in target if
file.name.startswith(sim)]
group = sorted(group)
# create filename for netcdf covering the entire time
# period of the current simulation
y_min, _ = _parse_cordex_time_span(group[0]) # first year
_, y_max = _parse_cordex_time_span(group[-1]) # last year
filename = '_'.join([sim, '-'.join([y_min, y_max])])
filename = group[0].parent.joinpath(filename + '.nc')
# log simulation name, time span and files
LogConfig.init_log('Aggregating simulation: {}, '
'Time span: {}'
.format(sim, '-'.join([y_min, y_max])))
LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join(
['{}'.format(file) for file in group]))
# read multiple netcdf files using xarray and dask
ds = xr.open_mfdataset(group, parallel=True).compute()
# set encoding of time: calendar
ds.time.encoding = ds.time_bnds.encoding
# set NetCDF file compression for each variable
for _, var in ds.data_vars.items():
var.encoding['zlib'] = True
var.encoding['complevel'] = 5
# save aggregated netcdf file
LOGGER.info('Compressing NetCDF: {}'.format(filename))
ds.to_netcdf(filename, engine='h5netcdf')
# remove single netcdf files from disk
if args.remove:
LOGGER.info('Removing individual NetCDF files ...')
for file in group:
file.unlink()
LOGGER.info('rm {}'.format(file))
# create target directory: check that it is different from the source
# directory
if args.target == args.source:
LOGGER.info('Source and target paths cannot be equal.')
sys.exit()
if not args.target.exists():
LOGGER.info('mkdir {}'.format(args.target))
args.target.mkdir(parents=True, exist_ok=True)
# check if an input file is specified
nc_pattern = '(.*).nc$' # search for NetCDF files
if args.file is not None:
# read input file to dataframe
df = pd.read_csv(args.file, delimiter=';')
# iterate over cordex simulations
for index, row in df.iterrows():
# construct pattern to search for cordex files
pattern = '_'.join([row.variable, row.domain, row.gcm,
row.experiment, row.ensemble,
row.institute_rcm,
row.downscale_realisation, nc_pattern])
# TODO: filter pattern by scenario and variable
# run cordex preprocessing
preprocess_cordex_simulation(pattern)
else:
# check ensemble identifier
ensemble = _check_str_arg(args.ensemble, 'r[0-9]i[0-9]p[0-9]')
# check regional climate model version
version = _check_str_arg(args.version, 'v[0-9]')
# construct file pattern to match from input parameters
pattern = '_'.join([param if param is not None else '(.*)' for
param in [args.variable, EUROCORDEX_DOMAIN,
args.gcm, args.scenario, ensemble,
args.rcm, version, nc_pattern]])
# run cordex preprocessing
preprocess_cordex_simulation(pattern)
else:
LOGGER.info('{} does not exist.'.format(str(args.source)))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment