"""Utility functions mainly for image IO and reshaping.
License
-------
Copyright (c) 2020 Daniel Frisinghelli
This source code is licensed under the GNU General Public License v3.
See the LICENSE file in the repository's root directory.
"""
# !/usr/bin/env python
# -*- coding: utf-8 -*-
# builtins
import os
import re
import shutil
import logging
import pathlib
import tarfile
import zipfile
import datetime
import warnings
# externals
import gdal
import torch
import numpy as np
# module level logger
LOGGER = logging.getLogger(__name__)
# suffixes for radiometrically calibrated scenes
SUFFIXES = ['toa_ref', 'toa_rad', 'toa_brt']
[docs]def img2np(path, tile_size=None, tile=None, pad=False, cval=0):
"""Read an image to a `numpy.ndarray`.
If ``tile_size`` is not `None`, the input image is divided into square
tiles of size (``tile_size``, ``tile_size``). If the image is not evenly
divisible and ``pad`` = False, a `ValueError` is raised. However, if
``pad`` = True, center padding with constant value ``cval`` is applied.
The tiling works as follows:
(Padded) Input image:
------------------------------------------------
| | | | |
| tile_00 | tile_01 | ... | tile_0n |
| | | | |
|----------------------------------------------|
| | | | |
| tile_10 | tile_11 | ... | tile_1n |
| | | | |
|----------------------------------------------|
| | | | |
| ... | ... | ... | ... |
| | | | |
|----------------------------------------------|
| | | | |
| tile_m0 | tile_m1 | ... | tile_mn |
| | | | |
------------------------------------------------
where m = n. Each tile has its id, which starts at 0 in the topleft corner
of the input image, i.e. tile_00 has id=0, and increases along the width
axis, i.e. tile_0n has id=n, tile_10 has id=n+1, ..., tile_mn has
id=(m * n) - 1.
If ``tile`` is an integer, only the tile with id = ``tile`` is returned.
Parameters
----------
path : `str` or `None` or `numpy.ndarray`
The image to read.
tile_size : `None` or `int`, optional
The size of a tile. The default is None.
tile : `int`, optional
The tile id. The default is None.
pad : `bool`, optional
Whether to center pad the input image. The default is False.
cval : `float`, optional
The constant padding value. The default is 0.
Raises
------
FileNotFoundError
Raised if ``path`` is a path that does not exist.
TypeError
Raised if ``path`` is not `str` or `None` or `numpy.ndarray`.
Returns
-------
image : `numpy.ndarray`
The image array. The output shape is:
if ``tile_size`` is not `None`:
shape=(tiles, bands, tile_size, tile_size)
if the image does only have one band:
shape=(tiles, tile_size, tile_size)
else:
shape=(bands, height, width)
if the image does only have one band:
shape=(height, width)
"""
# check the type of path
if isinstance(path, str):
if not os.path.exists(path):
raise FileNotFoundError('{} does not exist.'.format(path))
# the image dataset as returned by gdal
img = gdal.Open(path)
# number of bands
bands = img.RasterCount
# spatial size
height = img.RasterYSize
width = img.RasterXSize
elif path is None:
LOGGER.warning('Path is of NoneType, returning.')
return
# accept numpy arrays as input
elif isinstance(path, np.ndarray):
img = path
bands = img.shape[0]
height = img.shape[1]
width = img.shape[2]
else:
raise TypeError('Input of type {} not supported'.format(type(img)))
# check whether to read the image in tiles
if tile_size is None:
# number of tiles
ntiles = 1
# create empty numpy array to store whole image
image = np.empty(shape=(ntiles, bands, height, width))
# iterate over the bands of the image
for b in range(bands):
# read the data of band b
if isinstance(img, np.ndarray):
data = img[b, ...]
else:
band = img.GetRasterBand(b+1)
data = band.ReadAsArray()
# append band b to numpy image array
image[0, b, :, :] = data
else:
# check whether the image is evenly divisible in square tiles
# of size (tile_size x tile_size)
ntiles, padding = is_divisible((height, width), tile_size, pad)
# image size after padding
y_size = height + padding[0] + padding[2]
x_size = width + padding[1] + padding[3]
# print progress
LOGGER.debug('Image size: {}'.format((height, width)))
LOGGER.debug('Dividing image into {} tiles of size {} ...'
.format(ntiles, (tile_size, tile_size)))
LOGGER.debug('Padding image (b, l, t, r): {}'.format(tuple(padding)))
LOGGER.debug('Padded image size: {}'.format((y_size, x_size)))
# get the indices of the top left corner for each tile
topleft = tile_topleft_corner((y_size, x_size), tile_size)
# whether to read all tiles or a single tile
if tile is not None:
ntiles = 1
# create empty numpy array to store the tiles
image = np.ones((ntiles, bands, tile_size, tile_size)) * cval
# iterate over the topleft corners of the tiles
for k, corner in topleft.items():
# in case a single tile is required, skip the rest of the tiles
if tile is not None:
if k != tile:
continue
# set the key to 0 for correct array indexing when reading
# a single tile from the image
LOGGER.debug('Processing tile {} ...'.format(k))
k = 0
else:
LOGGER.debug('Creating tile {} with top-left corner {} ...'
.format(k, corner))
# calculate shift between padded and original image
row = corner[0] - padding[2] if corner[0] > 0 else corner[0]
col = corner[1] - padding[1] if corner[1] > 0 else corner[1]
y_tl = row + padding[2] if row == 0 else 0
x_tl = col + padding[1] if col == 0 else 0
# iterate over the bands of the image
for b in range(bands):
# check if the current tile extend exists in the image
nrows, ncols = check_tile_extend(
(height, width), (row, col), tile_size)
# read the current tile from band b
if isinstance(img, np.ndarray):
data = img[b, row:row+nrows, col:col+ncols]
else:
band = img.GetRasterBand(b+1)
data = band.ReadAsArray(col, row, ncols, nrows)
# append band b to numpy image array
image[k, b, y_tl:nrows, x_tl:ncols] = data[0:(nrows - y_tl),
0:(ncols - x_tl)]
# check if there are more than 1 band
if not bands > 1:
image = image.squeeze(axis=1)
# check if there are more than 1 tile
if not ntiles > 1:
image = image.squeeze(axis=0)
# close tif file
del img
# return the image
return image
[docs]def is_divisible(img_size, tile_size, pad=False):
"""Check whether an image is evenly divisible into square tiles.
Parameters
----------
img_size : `tuple`
The image size (height, width).
tile_size : `int`
The size of the tile.
pad : `bool`, optional
Whether to center pad the input image. The default is False.
Raises
------
ValueError
Raised if the image is not evenly divisible and ``pad`` = False.
Returns
-------
ntiles : `int`
The number of tiles fitting ``img_size``.
padding : `tuple`
The amount of padding (bottom, left, top, right).
"""
# calculate number of pixels per tile
pixels_per_tile = tile_size ** 2
# check whether the image is evenly divisible in square tiles of size
# (tile_size, tile_size)
ntiles = ((img_size[0] * img_size[1]) / pixels_per_tile)
# if it is evenly divisible, no padding is required
if ntiles.is_integer():
padding = 4 * (0,)
if not ntiles.is_integer() and not pad:
raise ValueError('Image of size {} not evenly divisible in ({}, {}) '
'tiles.'.format(img_size, tile_size, tile_size))
if not ntiles.is_integer() and pad:
# calculate the desired image size, i.e. the smallest size that is
# evenly divisible into square tiles of size (tile_size, tile_size)
h_new = int(np.ceil(img_size[0] / tile_size) * tile_size)
w_new = int(np.ceil(img_size[1] / tile_size) * tile_size)
# calculate center offset
dh = h_new - img_size[0]
dw = w_new - img_size[1]
# check whether the center offsets are even or odd
# in case both offsets are even, the padding is symmetric on both the
# bottom/top and left/right
if not dh % 2 and not dw % 2:
padding = (dh // 2, dw // 2, dh // 2, dw // 2)
# in case only one offset is even, the padding is symmetric along the
# even offset and asymmetric along the odd offset
if not dh % 2 and dw % 2:
padding = (dh // 2, dw // 2, dh // 2, dw // 2 + 1)
if dh % 2 and not dw % 2:
padding = (dh // 2, dw // 2, dh // 2 + 1, dw // 2)
# in case of offsets are odd, the padding is asymmetric on both the
# bottom/top and left/right
if dh % 2 and dw % 2:
padding = (dh // 2, dw // 2, dh // 2 + 1, dw // 2 + 1)
# calculate number of tiles on padded image
ntiles = (h_new * w_new) / (tile_size ** 2)
return int(ntiles), padding
[docs]def check_tile_extend(img_size, topleft, tile_size):
"""Check if a tile exceeds the image size.
Parameters
----------
img_size : `tuple`
The image size (height, width).
topleft : `tuple`
The topleft corner of the tile (y, x).
tile_size : `int`
The size of the tile.
Returns
-------
nrows : `int`
Number of rows of the tile within the image.
ncols : TYPE
Number of columns of the tile within the image.
"""
# check if the tile is within both the rows and the columns of the image
if (topleft[0] + tile_size < img_size[0] and
topleft[1] + tile_size < img_size[1]):
# both rows and columns can be equal to the tile size
nrows, ncols = tile_size, tile_size
# check if the tile exceeds one of rows or columns of the image
if (topleft[0] + tile_size < img_size[0] and not
topleft[1] + tile_size < img_size[1]):
# adjust columns to remaining columns in the original image
nrows, ncols = tile_size, img_size[1] - topleft[1]
if (topleft[1] + tile_size < img_size[1] and not
topleft[0] + tile_size < img_size[0]):
# adjust rows to remaining rows in the original image
nrows, ncols = img_size[0] - topleft[0], tile_size
# check if the tile exceeds both the rows and the columns of the image
if (not topleft[0] + tile_size < img_size[0] and not
topleft[1] + tile_size < img_size[1]):
# adjust both rows and columns to the remaining ones
nrows, ncols = img_size[0] - topleft[0], img_size[1] - topleft[1]
return nrows, ncols
[docs]def tile_topleft_corner(img_size, tile_size):
"""Return the topleft corners of the tiles in the image.
Parameters
----------
img_size : `tuple`
The image size (height, width).
tile_size : `int`
The size of the tile.
Returns
-------
indices : `dict`
The keys of ``indices`` are the tile ids (`int`) and the values are the
topleft corners (`tuple` = (y, x)) of the tiles.
"""
# check if the image is divisible into square tiles of size
# (tile_size, tile_size)
_, _ = is_divisible(img_size, tile_size, pad=False)
# number of tiles along the width (columns) of the image
ntiles_columns = int(img_size[1] / tile_size)
# number of tiles along the height (rows) of the image
ntiles_rows = int(img_size[0] / tile_size)
# get the indices of the top left corner for each tile
indices = {}
k = 0
for i in range(ntiles_rows):
for j in range(ntiles_columns):
indices[k] = (i * tile_size, j * tile_size)
k += 1
return indices
[docs]def reconstruct_scene(tiles):
"""Reconstruct a tiled image.
Parameters
----------
tiles : `torch.Tensor` or `numpy.ndarray`
The tiled image, shape=(tiles, bands, tile_size, tile_size) or
shape=(tiles, tile_size, tile_size).
Returns
-------
image : `numpy.ndarray`
The reconstructed image.
"""
# convert to numpy array
tiles = np.asarray(tiles)
# check the size
if tiles.ndim > 3:
nbands = tiles.shape[1]
tile_size = tiles.shape[2]
else:
nbands = 1
tile_size = tiles.shape[1]
# calculate image size
img_size = 2 * (int(np.sqrt(tiles.shape[0]) * tile_size),)
# calculate the topleft corners of the tiles
topleft = tile_topleft_corner(img_size, tile_size)
# iterate over the tiles
scene = np.zeros(shape=(nbands,) + img_size)
for t in range(tiles.shape[0]):
scene[...,
topleft[t][0]: topleft[t][0] + tile_size,
topleft[t][1]: topleft[t][1] + tile_size] = tiles[t, ...]
return scene.squeeze()
[docs]def accuracy_function(outputs, labels):
"""Calculate prediction accuracy.
Parameters
----------
outputs : `torch.Tensor` or array_like
The model prediction.
labels : `torch.Tensor` or array_like
The ground truth.
Returns
-------
accuracy : `float`
Mean prediction accuracy.
"""
if isinstance(outputs, torch.Tensor):
return (outputs == labels).float().mean().item()
else:
return (np.asarray(outputs) == np.asarray(labels)).mean().item()
[docs]def parse_landsat_scene(scene_id):
"""Parse a Landsat scene identifier.
Parameters
----------
scene_id : `str`
A Landsat scene identifier.
Returns
-------
scene : `dict` or `None`
A dictionary containing scene metadata. If `None`, ``scene_id`` is not
a valid Landsat scene identifier.
"""
# Landsat Collection 1 naming convention in regular expression
sensor = 'L[COTEM]0[1-8]_'
level = 'L[0-9][A-Z][A-Z]_'
swath = '[0-2][0-9]{2}[0-1][0-9]{2}_'
date = '[0-9]{4}[0-1][0-9][0-3][0-9]_'
doy = '[0-9]{4}[0-3][0-9]{2}'
collection = '0[0-9]_'
category = '[A-Z]([A-Z]|[0-9])'
# Landsat Collection 1 naming
C1 = (sensor + level + swath + date + date + collection + category)
Landsat_C1 = re.compile(C1)
# Landsat naming convention before Collections
C0 = (sensor.replace('_', '').replace('0', '') + swath.replace('_', '') +
doy + '[A-Z]{3}' + '[0-9]{2}')
Landsat_C0 = re.compile(C0)
# mapping from sensor id to sensors
lsensors = {'E': 'Enhanced Thematic Mapper Plus',
'T': 'Thematic Mapper',
'M': 'Multispectral Scanner'}
l8sensors = {'C': 'Operational Land Imager (OLI) & Thermal Infrared Sensor'
' (TIRS)',
'O': 'Operational Land Imager (OLI)',
'T': 'Thermal Infrared Sensor (TIRS)',
}
# whether a scene identifier matches the Landsat naming convention
scene = {}
if Landsat_C0.search(scene_id):
# the match of the regular expression
match = Landsat_C0.search(scene_id)[0]
# the metadata of the scene identifier
scene['id'] = match
scene['satellite'] = 'Landsat {}'.format(match[2])
if int(match[2]) > 7:
scene['sensor'] = l8sensors[match[1]]
else:
scene['sensor'] = lsensors[match[1]]
scene['path'] = match[3:6]
scene['row'] = match[6:9]
scene['date'] = doy2date(match[9:13], match[13:16])
scene['gsi'] = match[16:19]
scene['version'] = match[19:]
elif Landsat_C1.search(scene_id):
# the match of the regular expression
match = Landsat_C1.search(scene_id)[0]
# split scene into respective parts
parts = match.split('_')
# the metadata of the scene identifier
scene['id'] = match
scene['satellite'] = 'Landsat {}'.format(parts[0][2:])
if int(parts[0][3]) > 7:
scene['sensor'] = l8sensors[parts[0][1]]
else:
scene['sensor'] = lsensors[parts[0][1]]
scene['path'] = parts[2][0:3]
scene['row'] = parts[2][3:]
scene['date'] = datetime.datetime.strptime(parts[3], '%Y%m%d')
scene['collection'] = int(parts[5])
scene['version'] = parts[6]
else:
scene = None
return scene
[docs]def parse_sentinel2_scene(scene_id):
"""Parse a Sentinel-2 scene identifier.
Parameters
----------
scene_id : `str`
A Sentinel-2 scene identifier.
Returns
-------
scene : `dict` or `None`
A dictionary containing scene metadata. If `None`, ``scene_id`` is not
a valid Sentinel-2 scene identifier.
"""
# Sentinel 2 Level-1C products naming convention after 6th December 2016
mission = 'S2[A-B]_'
level = 'MSIL1C_'
date = '[0-9]{4}[0-1][0-9][0-3][0-9]'
time = 'T[0-2][0-9][0-5][0-9][0-5][0-9]_'
processing = 'N[0-9]{4}_'
orbit = 'R[0-1][0-9]{2}_'
tile = 'T[0-9]{2}[A-Z]{3}_'
level_1C = (mission + level + date + time + processing +
orbit + tile + date + time.replace('_', ''))
S2_L1C_New = re.compile(level_1C)
# Sentinel 2 Level-1C products naming convention before 6th December 2016
file_class = '[A-Z]{4}_'
file_category = '[A-Z]{3}_'
file_semantic = 'L[0-1]([ABC]|_)_[A-Z]{2}_'
site = '[A-Z_]{4}_'
aorbit = 'A[0-9]{6}_'
S2_L1C_Old = re.compile(mission + file_class + file_category +
file_semantic + site + 'V' + date + time + aorbit +
tile.replace('_', ''))
# ProSnow project naming convention
ProSnow = re.compile(tile + date + time.replace('_', ''))
# whether a scene identifier matches the Sentinel naming convention
scene = {}
if S2_L1C_Old.search(scene_id):
# the match of the regular expression
match = S2_L1C_Old.search(scene_id)[0]
# split scene into respective parts
parts = match.split('_')
# the metadata of the scene identifier
scene['id'] = match
scene['satellite'] = 'Sentinel {}'.format(parts[1:])
scene['file class'] = parts[1]
scene['file category'] = parts[2]
scene['file semantic'] = parts[3]
scene['site'] = parts[4]
scene['orbit'] = parts[6]
scene['date'] = datetime.datetime.strptime(
parts[7].split('T')[0].replace('V', ''), '%Y%m%d')
elif S2_L1C_New.search(scene_id):
# the match of the regular expression
match = S2_L1C_New.search(scene_id)[0]
# split scene into respective parts
parts = match.split('_')
# the metadata of the scene identifier
scene['id'] = match
scene['satellite'] = 'Sentinel {}'.format(parts[1:])
scene['product'] = parts[1]
scene['date'] = datetime.datetime.strptime(parts[2].split('T')[0],
'%Y%m%d')
scene['baseline'] = parts[3]
scene['orbit'] = parts[4]
scene['tile'] = parts[5]
elif ProSnow.search(scene_id):
# the match of the regular expression
match = ProSnow.search(scene_id)[0]
# split scene into respective parts
parts = match.split('_')
# the metadata of the scene identifier
scene['id'] = match
scene['satellite'] = 'Sentinel 2'
scene['tile'] = parts[0]
scene['date'] = datetime.datetime.strptime(parts[1].split('T')[0],
'%Y%m%d')
else:
scene = None
return scene
[docs]def doy2date(year, doy):
"""Convert the (year, day of the year) date format to a datetime object.
Parameters
----------
year : `int`
The year
doy : `int`
The day of the year
Returns
-------
date : `datetime.datetime`
The converted date.
"""
# convert year/day of year to a datetime object
date = (datetime.datetime(int(year), 1, 1) +
datetime.timedelta(days=(int(doy) - 1)))
return date
[docs]def item_in_enum(name, enum):
"""Check if an item exists in an enumeration.
Parameters
----------
name : `str`
Name of the item.
enum : `enum.Enum`
An instance of `enum.Enum`.
Raises
------
ValueError
Raised if ``name`` is not in ``enum``.
Returns
-------
value
The value of ``name`` in ``enum``.
"""
# check whether the input name exists in the enumeration
if name not in enum.__members__:
raise ValueError('{} is not in {} enumeration. Valid names are: \n {}'
.format(name, enum.__name__,
'\n'.join('- {}'.format(n) for n in
enum.__members__)))
else:
return enum.__members__[name].value
[docs]def destack_tiff(image, outpath=None, overwrite=False, remove=False,
suffix=''):
"""Destack a TIFF with more than one band into a TIFF file for each band.
Each band in ``image`` is saved to ``outpath`` as distinct TIFF file.
The default filenames are: "filename(``image``) + _B(i).tif", where i is
the respective number of each band in ``image``.
Parameters
----------
image : `str` or `pathlib.Path`
The TIFF to destack.
outpath : `str`, optional
Path to save the output TIFF files. The default is None. If None,
``outpath`` is the path to ``image``.
remove : `bool`, optional
Whether to remove ``image`` from disk after destacking. The default is
False.
overwrite : `bool`, optional
Whether to overwrite existing TIFF files.
suffix : `str`, optional
String to append to the filename of ``image``. If specified, the TIFF
filenames for each band in ``image`` are, "filename(``image``) +
+ _B(i)_ + ``suffix``.tif". The default is ''.
Raises
------
FileNotFoundError
Raised if ``image`` does not exist.
Returns
-------
None.
"""
# stop gdal printing warnings and errors to STDERR
gdal.PushErrorHandler('CPLQuietErrorHandler')
# raise Python exceptions for gdal errors
gdal.UseExceptions()
# convert to pathlib.Path
image = pathlib.Path(image)
if not image.exists():
raise FileNotFoundError('{} does not exist.'.format(image))
# name of the TIFF
imgname = image.stem
# default: output directory is equal to the input directory
if outpath is None:
outpath = image.parent
else:
outpath = pathlib.Path(outpath)
# check if output directory exists
if not outpath.exists():
outpath.mkdir(parents=True, exist_ok=True)
# open the TIFF
img = gdal.Open(str(image))
# check whether the file was already processed
processed = list(outpath.glob(imgname + '_B*'))
if len(processed) >= img.RasterCount and not overwrite:
LOGGER.info('{} already processed.'.format(imgname))
# destack the TIFF
else:
# image driver
driver = gdal.GetDriverByName('GTiff')
driver.Register()
# image size and tiles
cols = img.RasterXSize
rows = img.RasterYSize
bands = img.RasterCount
# print progress
LOGGER.info('Processing: {}'.format(imgname))
# iterate the bands of the raster
for b in range(1, bands + 1):
# read the data of band b
band = img.GetRasterBand(b)
data = band.ReadAsArray()
# output file: replace for band name
fname = str(outpath.joinpath(imgname + '_B{:d}.tif'.format(b)))
if suffix:
fname = fname.replace('.tif', '_{}.tif'.format(suffix))
outDs = driver.Create(fname, cols, rows, 1, band.DataType)
# define output band
outband = outDs.GetRasterBand(1)
# write array to output band
outband.WriteArray(data)
outband.FlushCache()
# Set the geographic information
outDs.SetProjection(img.GetProjection())
outDs.SetGeoTransform(img.GetGeoTransform())
# clear memory
del outband, band, data, outDs
# remove old stacked GeoTIFF
del img
if remove:
os.unlink(image)
return
[docs]def standard_eo_structure(source_path, target_path, overwrite=False,
move=False, parser=parse_landsat_scene):
"""Modify the directory structure of a remote sensing dataset.
This function assumes that ``source_path`` points to a directory containing
remote sensing data, where each file in ``source_path`` and its sub-folders
should contain a scene identifier in its filename. The scene identifier is
used to restructure the dataset.
Currently, Landsat and Sentinel-2 datasets are supported.
The directory tree in ``source_path`` is modified to the following
structure in ``target_path``:
target_path/
scene_id_1/
files matching scene_id_1
scene_id_2/
files matching scene_id_2
.
.
.
scene_id_n/
files matching scene_id_n
Parameters
----------
source_path : `str` or `pathlib.Path`
Path to the remote sensing dataset.
target_path : `str` or `pathlib.Path`
Path to save the restructured dataset.
overwrite : `bool`, optional
Whether to overwrite existing files in ``target_path``.
The default is True.
move : `bool`, optional
Whether to move the files from ``source_path`` to ``target_path``. If
True, files in ``source_path`` are moved to ``target_path``, if False,
files in ``source_path`` are copied to ``target_path``. The default is
False.
parser : `function`, optional
The scene identifier parsing function. Depends on the sensor of the
dataset. See e.g., `pysegcnn.core.utils.parse_landsat_scene`.
Returns
-------
None.
"""
# create a directory for each scene
for dirpath, dirnames, filenames in os.walk(source_path):
# check if there are files in the current folder
if not filenames:
continue
# iterate over the files to modify
for file in filenames:
# get the path to the file
old_path = os.path.join(dirpath, file)
# get name of the scene
scene = parser(old_path)
if scene is None:
# path to copy files not matching a scene identifier
new_path = pathlib.Path(target_path).joinpath('misc', file)
# file a warning if the file does not match a scene identifier
LOGGER.warning('{} does not match a scene identifier. Copying '
'to {}.'.format(os.path.basename(old_path),
new_path.parent))
else:
# the name of the scene
scene_name = scene['id']
# define the new path to the file
new_path = pathlib.Path(target_path).joinpath(scene_name, file)
# move files to new directory
if new_path.is_file() and not overwrite:
LOGGER.info('{} already exists.'.format(new_path))
continue
else:
new_path.parent.mkdir(parents=True, exist_ok=True)
if move:
shutil.move(old_path, new_path)
LOGGER.info('mv {}'.format(new_path))
else:
LOGGER.info('cp {}'.format(new_path))
shutil.copy(old_path, new_path)
# remove old file location
if move:
shutil.rmtree(source_path)
[docs]def get_radiometric_constants(metadata):
"""Retrieve the radiometric calibration constants.
Parameters
----------
metadata : `dict`
The dictionary returned by ``read_landsat_metadata``.
Returns
-------
oli : `dict`
Radiometric rescaling factors of the OLI sensor.
tir : `dict`
Thermal conversion constants of the TIRS sensor.
"""
# regular expression patterns matching the radiometric rescaling factors
oli_pattern = re.compile('(RADIANCE|REFLECTANCE)_(MULT|ADD)_BAND_\\d{1,2}')
tir_pattern = re.compile('K(1|2)_CONSTANT_BAND_\\d{2}')
# rescaling factors to calculate top of atmosphere radiance and reflectance
oli = {key: float(value) for key, value in metadata.items() if
oli_pattern.search(key) is not None}
# rescaling factors to calculate at-satellite temperatures in Kelvin
tir = {key: float(value) for key, value in metadata.items() if
tir_pattern.search(key) is not None}
return oli, tir
[docs]def landsat_radiometric_calibration(scene, outpath=None, exclude=[],
radiance=False, overwrite=False,
remove_raw=True):
"""Radiometric calibration of Landsat Collection Level 1 scenes.
Convert the Landsat OLI bands to top of atmosphere radiance or reflectance
and the TIRS bands to top of atmosphere brightness temperature.
Conversion is performed following the `equations`_ provided by the USGS.
The filename of each band is extended by one of the following suffixes,
depending on the type of the radiometric calibration:
'toa_ref': top of atmosphere reflectance
'toa_rad': top of atmopshere radiance
'toa_brt': top of atmosphere brightness temperature
Parameters
----------
scene : `str` or `pathlib.Path`
Path to a Landsat scene in digital number format.
outpath : `str` or `pathlib.Path`, optional
Path to save the calibrated images. The default is None, which means
saving to ``scene``.
exclude : `list` [`str`], optional
Bands to exclude from the radiometric calibration. The default is [].
radiance : `bool`, optional
Whether to calculate top of atmosphere radiance. The default is False,
which means calculating top of atmopshere reflectance.
overwrite : `bool`, optional
Whether to overwrite the calibrated images. The default is False.
remove_raw : `bool`, optional
Whether to remove the raw digitial number images. The default is True.
Raises
------
FileNotFoundError
Raised if ``scene`` does not exist.
Returns
-------
None.
.. _equations:
https://www.usgs.gov/land-resources/nli/landsat/using-usgs-landsat-level-1-data-product
"""
scene = pathlib.Path(scene)
# check if the input scene exists
if not scene.exists():
raise FileNotFoundError('{} does not exist.'.format(scene))
# default: output directory is equal to the input directory
if outpath is None:
outpath = scene
else:
outpath = pathlib.Path(outpath)
# check if output directory exists
if not outpath.exists():
outpath.mkdir(parents=True, exist_ok=True)
# the scene metadata file
try:
mpattern = re.compile('[mMtTlL].txt')
metafile = [f for f in scene.iterdir() if mpattern.search(str(f))]
metadata = read_landsat_metadata(metafile.pop())
except IndexError:
LOGGER.error('Can not calibrate scene {}: {} does not exist.'
.format(scene.name, scene.name + '_MTL.txt'))
return
# radiometric calibration constants
oli, tir = get_radiometric_constants(metadata)
# log current Landsat scene ID
LOGGER.info('Landsat scene id: {}'.format(metadata['LANDSAT_SCENE_ID']))
# images to process
ipattern = re.compile('B\\d{1,2}(.*)\\.[tT][iI][fF]')
images = [file for file in scene.iterdir() if ipattern.search(str(file))]
# pattern to match calibrated images
cal_pattern = re.compile('({}|{}|{}).[tT][iI][fF]'.format(*SUFFIXES))
# check if any images were already processe
processed = [file for file in images if cal_pattern.search(str(file))]
if any(processed):
LOGGER.info('The following images have already been processed:')
# images that were already processed
LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join(
[str(file.name) for file in processed]))
# overwrite: remove processed images and redo calibration
if overwrite:
LOGGER.info('Preparing to overwrite ...')
# remove processed images
for toremove in processed:
# remove from disk
os.unlink(toremove)
LOGGER.info('rm {}'.format(toremove))
# remove from list to process
images.remove(toremove)
# not overwriting, terminate calibration
else:
return
# exclude defined bands from the calibration procedure
for i in images:
current_band = re.search('B\\d{1,2}', str(i))[0]
if current_band in exclude:
images.remove(i)
# image driver
driver = gdal.GetDriverByName('GTiff')
driver.Register()
# iterate over the different bands
for image in images:
LOGGER.info('Processing: {}'.format(image.name))
# read the image
img = gdal.Open(str(image))
band = img.GetRasterBand(1)
# read data as array
data = band.ReadAsArray()
# mask of erroneous values, i.e. mask of values < 0
mask = data < 0
# output filename
fname = outpath.joinpath(image.stem)
# get the current band
band = re.search('B\\d{1,2}', str(image))[0].replace('B', 'BAND_')
# check if the current band is a thermal band
if band in ['BAND_10', 'BAND_11']:
# output file name for TIRS bands
fname = pathlib.Path(str(fname) + '_toa_brt.tif')
# calculate top of atmosphere brightness temperature
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=RuntimeWarning)
# top of atmosphere radiance
rad = (oli['RADIANCE_MULT_{}'.format(band)] * data +
oli['RADIANCE_ADD_{}'.format(band)])
# top of atmosphere brightness temperature
den = np.log(tir['K1_CONSTANT_{}'.format(band)] / rad + 1)
toa = tir['K2_CONSTANT_{}'.format(band)] / den
# clear memory
del den, data, rad
else:
# whether to calculate top of atmosphere radiance or reflectance
if radiance:
# output file name for OLI bands: radiance
fname = pathlib.Path(str(fname) + '_toa_rad.tif')
# calculate top of atmosphere radiance
toa = (oli['RADIANCE_MULT_{}'.format(band)] * data +
oli['RADIANCE_ADD_{}'.format(band)])
# clear memory
del data
else:
# output file name for OLI bands: reflectance
fname = pathlib.Path(str(fname) + '_toa_ref.tif')
# solar zenith angle in radians
zenith = np.radians(90 - float(metadata['SUN_ELEVATION']))
# calculate top of the atmosphere reflectance
ref = (oli['REFLECTANCE_MULT_{}'.format(band)] * data +
oli['REFLECTANCE_ADD_{}'.format(band)])
toa = ref / np.cos(zenith)
# clear memory
del ref, data
# mask erroneous values
toa[mask] = np.nan
# output file
outDs = driver.Create(str(fname), img.RasterXSize, img.RasterYSize,
img.RasterCount, gdal.GDT_Float32)
outband = outDs.GetRasterBand(1)
# write array
outband.WriteArray(toa)
outband.FlushCache()
# Set the geographic information
outDs.SetProjection(img.GetProjection())
outDs.SetGeoTransform(img.GetGeoTransform())
# clear memory
del outband, band, img, outDs, toa, mask
# check if raw images should be removed
if remove_raw:
# raw digital number images
_dn = [i for i in scene.iterdir() if ipattern.search(str(i)) and
not cal_pattern.search(str(i))]
# remove raw digital number images
for toremove in _dn:
# remove from disk
os.unlink(toremove)
LOGGER.info('rm {}'.format(toremove))
return