Skip to content
Snippets Groups Projects
Commit 3f51854f authored by Frisinghelli Daniel's avatar Frisinghelli Daniel
Browse files

Added new utility function to restructure a dataset

parent 9b058a2c
No related branches found
No related tags found
No related merge requests found
......@@ -17,7 +17,11 @@ License
# builtins
import os
import re
import shutil
import logging
import pathlib
import tarfile
import zipfile
import datetime
# externals
......@@ -703,3 +707,263 @@ def item_in_enum(name, enum):
enum.__members__)))
else:
return enum.__members__[name].value
def destack_tiff(image, outpath=None, overwrite=False, remove=False,
suffix=''):
"""Destack a TIFF with more than one band into a TIFF file for each band.
Each band in ``image`` is saved to ``outpath`` as distinct TIFF file.
The default filenames are: "filename(``image``) + _B(i).tif", where i is
the respective number of each band in ``image``.
Parameters
----------
image : `str` or `pathlib.Path`
The TIFF to destack.
outpath : `str`, optional
Path to save the output TIFF files. The default is None. If None,
``outpath`` is the path to ``image``.
remove : `bool`, optional
Whether to remove ``image`` from disk after destacking. The default is
False.
overwrite : `bool`, optional
Whether to overwrite existing TIFF files.
suffix : `str`, optional
String to append to the filename of ``image``. If specified, the TIFF
filenames for each band in ``image`` are, "filename(``image``) +
+ _B(i)_ + ``suffix``.tif". The default is ''.
Returns
-------
None.
"""
# stop gdal printing warnings and errors to STDERR
gdal.PushErrorHandler('CPLQuietErrorHandler')
# raise Python exceptions for gdal errors
gdal.UseExceptions()
# convert to pathlib.Path
image = pathlib.Path(image)
if not image.exists():
raise FileNotFoundError('{} does not exist.'.format(image))
# name of the TIFF
imgname = image.stem
# default: output directory is equal to the input directory
if outpath is None:
outpath = image.parent
else:
outpath = pathlib.Path(outpath)
# check if output directory exists
if not outpath.exists():
outpath.mkdir(parents=True, exist_ok=True)
# open the raster
img = gdal.Open(str(image))
# check whether the file was already processed
processed = list(outpath.glob(imgname + '_B*'))
if len(processed) == img.RasterCount and not overwrite:
LOGGER.info('{} already processed.'.format(imgname))
del img
return
# image driver
driver = gdal.GetDriverByName('GTiff')
driver.Register()
# image size and tiles
cols = img.RasterXSize
rows = img.RasterYSize
bands = img.RasterCount
# print progress
LOGGER.info('Processing: {}'.format(imgname))
# iterate the bands of the raster
for b in range(1, bands + 1):
# read the data of band b
band = img.GetRasterBand(b)
data = band.ReadAsArray()
# output file: replace for band name
fname = str(outpath.joinpath(imgname + '_B{:d}.tif'.format(b)))
if suffix:
fname = fname.replace('.tif', '_{}.tif'.format(suffix))
outDs = driver.Create(fname, cols, rows, 1, band.DataType)
# define output band
outband = outDs.GetRasterBand(1)
# write array to output band
outband.WriteArray(data)
outband.FlushCache()
# Set the geographic information
outDs.SetProjection(img.GetProjection())
outDs.SetGeoTransform(img.GetGeoTransform())
# clear memory
del outband, band, data, outDs
# remove old stacked GeoTIFF
del img
if remove:
os.unlink(image)
def standard_eo_structure(source_path, target_path, overwrite=False,
move=False, parser=parse_landsat_scene):
"""Modify the directory structure of a remote sensing dataset.
This function assumes that ``source_path`` points to a directory containing
remote sensing data, where each file in ``source_path`` and its sub-folders
should contain a scene identifier in its filename. The scene identifier is
used to restructure the dataset.
Currently, Landsat and Sentinel-2 datasets are supported.
The directory tree in ``source_path`` is modified to the following
structure in ``target_path``:
target_path/
scene_id_1/
files matching scene_id_1
scene_id_2/
files matching scene_id_2
.
.
.
scene_id_n/
files matching scene_id_n
Parameters
----------
source_path : `str` or `pathlib.Path`
Path to the remote sensing dataset.
target_path : `str` or `pathlib.Path`
Path to save the restructured dataset.
overwrite : `bool`, optional
Whether to overwrite existing files in ``target_path``.
The default is True.
move : `bool`, optional
Whether to move the files from ``source_path`` to ``target_path``. If
True, files in ``source_path`` are moved to ``target_path``, if False,
files in ``source_path`` are copied to ``target_path``. The default is
False.
parser : `function`, optional
The scene identifier parsing function. Depends on the sensor of the
dataset. See e.g., `pysegcnn.core.utils.parse_landsat_scene`.
Returns
-------
None.
"""
# create a directory for each scene
for dirpath, dirnames, filenames in os.walk(source_path):
# check if there are files in the current folder
if not filenames:
continue
# iterate over the files to modify
for file in filenames:
# get the path to the file
old_path = os.path.join(dirpath, file)
# get name of the scene
scene = parser(old_path)
if scene is None:
# path to copy files not matching a scene identifier
new_path = pathlib.Path(target_path).joinpath('misc', file)
# file a warning if the file does not match a scene identifier
LOGGER.warning('{} does not match a scene identifier. Copying '
'to {}.'.format(os.path.basename(old_path),
new_path.parent))
else:
# the name of the scene
scene_name = scene['id']
# define the new path to the file
new_path = pathlib.Path(target_path).joinpath(scene_name, file)
# move files to new directory
if new_path.is_file() and not overwrite:
LOGGER.info('{} already exists.'.format(new_path))
continue
else:
new_path.parent.mkdir(parents=True, exist_ok=True)
if move:
shutil.move(old_path, new_path)
LOGGER.info('mv {}'.format(new_path))
else:
LOGGER.info('cp {}'.format(new_path))
shutil.copy(old_path, new_path)
# remove old file location
if move:
shutil.rmtree(source_path)
def extract_archive(inpath, outpath, overwrite=False):
"""Extract files from an archive.
Parameters
----------
inpath : `str` or `pathlib.Path`
Path to an archive.
outpath : `str` or `pathlib.Path`
Path to save extracted files.
overwrite : `bool`, optional
Whether to overwrite existing extracted files.
Returns
-------
subdir : str
path to the extracted files
"""
inpath = pathlib.Path(inpath)
# create the output directory
outpath = pathlib.Path(outpath)
if not outpath.exists():
outpath.mkdir(parents=True)
# name of the archive
archive = inpath.stem
# path to the extracted files
target = outpath.joinpath(archive)
if target.exists():
if overwrite:
LOGGER.info('Overwriting: {}'.format(target))
shutil.rmtree(target)
else:
LOGGER.info('Extracted files are located in: {}'.format(target))
return target
# create output directory
target.mkdir(parents=True)
# read the archive
if inpath.name.endswith('.tgz') or inpath.name.endswith('.tar.gz'):
tar = tarfile.open(inpath, "r:gz")
elif inpath.name.endswith('.zip'):
tar = zipfile.ZipFile(inpath, 'r')
# extract files to the output directory
LOGGER.info('Extracting: {}'.format(archive))
tar.extractall(target)
return target
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment