Skip to content
Snippets Groups Projects
Commit df0d1412 authored by Frisinghelli Daniel's avatar Frisinghelli Daniel
Browse files

Added Sentinel 2 L1C top of atmosphere reflectance computation.

parent e221bb2f
No related branches found
No related tags found
No related merge requests found
......@@ -25,6 +25,7 @@ import zipfile
import datetime
import warnings
import platform
import xml.etree.ElementTree as ET
# externals
import gdal
......@@ -614,8 +615,12 @@ def parse_sentinel2_scene(scene_id):
granule_semantic + site + 'V' + date + time +
aorbit + det_or_tile + baseline)
# ProSnow project naming convention
ProSnow = re.compile(tile + date + time.replace('_', ''))
# Sentitel 2 granule naming convention
S2_L1C_Granule_Only = re.compile('L[0-1][ABC]_' + tile + 'A[0-9]{6}_' +
date + time.replace('_', ''))
# Sentinel 2 tile naming convetion
S2_L1C_Tile = re.compile(tile + date + time.replace('_', ''))
# whether a scene identifier matches the Sentinel naming convention
scene = {}
......@@ -624,7 +629,7 @@ def parse_sentinel2_scene(scene_id):
# the match of the regular expression
match = S2_L1C_Old.search(scene_id)[0]
# split scene into respective parts
# split scene into respective part
parts = match.split('_')
# the metadata of the scene identifier
......@@ -677,23 +682,38 @@ def parse_sentinel2_scene(scene_id):
parts[7].split('T')[0].replace('V', ''), '%Y%m%d')
scene['tile'] = parts[9]
elif ProSnow.search(scene_id):
elif S2_L1C_Granule_Only.search(scene_id):
# the match of the regular expression
match = ProSnow.search(scene_id)[0]
match = S2_L1C_Granule_Only.search(scene_id)[0]
# split scene into respective parts
parts = match.split('_')
# the metadata of the scene identifier
scene['id'] = match
scene['satellite'] = 'S2'
scene['tile'] = parts[0]
scene['date'] = datetime.datetime.strptime(parts[1].split('T')[0],
'%Y%m%d')
scene['satellite'] = 'Sentinel 2'
scene['file class'] = parts[0]
scene['orbit'] = parts[2]
scene['date'] = datetime.datetime.strptime(
parts[3].split('T')[0].replace('V', ''), '%Y%m%d')
scene['tile'] = parts[1]
else:
scene = None
elif S2_L1C_Tile.search(scene_id):
# the match of the regular expression
match = S2_L1C_Tile.search(scene_id)[0]
# split scene into respective parts
parts = match.split('_')
# the metadata of the scene identifier
scene['id'] = match
scene['satellite'] = 'Sentinel 2'
scene['file class'] = 'L1C'
scene['date'] = datetime.datetime.strptime(
parts[1].split('T')[0].replace('V', ''), '%Y%m%d')
scene['tile'] = parts[0]
return scene
......@@ -866,7 +886,7 @@ def destack_tiff(image, outpath=None, overwrite=False, remove=False,
def standard_eo_structure(source_path, target_path, overwrite=False,
move=False, parser=parse_landsat_scene):
move=False, parser=parse_landsat_scene, skip=True):
"""Modify the directory structure of a remote sensing dataset.
This function assumes that ``source_path`` points to a directory containing
......@@ -906,6 +926,9 @@ def standard_eo_structure(source_path, target_path, overwrite=False,
parser : `function`, optional
The scene identifier parsing function. Depends on the sensor of the
dataset. See e.g., :py:func:`pysegcnn.core.utils.parse_landsat_scene`.
skip : `bool`, optional
Whether to skip files not matching a scene identifier after ``parser``.
The default is `True`.
"""
# create a directory for each scene
......@@ -917,13 +940,16 @@ def standard_eo_structure(source_path, target_path, overwrite=False,
# iterate over the files to modify
for file in filenames:
# get the path to the file
old_path = os.path.join(dirpath, file)
# get name of the scene
scene = parser(old_path)
if scene is None:
scene = parser(file)
if not scene:
# check whether to skip file not matching a scene identifier
if skip:
continue
# path to copy files not matching a scene identifier
new_path = pathlib.Path(target_path).joinpath('misc', file)
......@@ -1204,7 +1230,7 @@ def landsat_radiometric_calibration(scene, outpath=None, exclude=[],
# exclude defined bands from the calibration procedure
for i in images:
current_band = re.search('B\\d{1,2}', str(i))[0]
current_band = ipattern.search(str(i))[0].split('.')[0]
if current_band in exclude:
images.remove(i)
......@@ -1318,6 +1344,163 @@ def landsat_radiometric_calibration(scene, outpath=None, exclude=[],
return
def s2_l1c_toa_ref(scene, outpath=None, exclude=[], overwrite=False,
remove_raw=True):
"""Convert a `Sentinel-2 L1C`_ product to top of atmosphere reflectance.
Parameters
----------
scene : `str` or :py:class:`pathlib.Path`
Path to a Landsat scene in digital number format.
outpath : `str` or :py:class:`pathlib.Path`, optional
Path to save the calibrated images. The default is `None`, which means
saving in the same directory ``scene``.
exclude : `list` [`str`], optional
Bands to exclude from the radiometric calibration. The default is `[]`.
overwrite : `bool`, optional
Whether to overwrite the calibrated images. The default is `False`.
remove_raw : `bool`, optional
Whether to remove the raw digitial number images. The default is `True`
.
Raises
------
FileNotFoundError
Raised if ``scene`` does not exist.
.. _Sentinel-2 L1C:
https://sentinel.esa.int/web/sentinel/technical-guides/sentinel-2-msi/level-1c/algorithm
"""
scene = pathlib.Path(scene)
# check if the input scene exists
if not scene.exists():
raise FileNotFoundError('{} does not exist.'.format(scene))
# default: output directory is equal to the input directory
if outpath is None:
outpath = scene
else:
outpath = pathlib.Path(outpath)
# check if output directory exists
if not outpath.exists():
outpath.mkdir(parents=True, exist_ok=True)
# search the metadata file
try:
mpattern = re.compile('MTD_MSIL1C.xml')
metafile = [f for f in scene.iterdir() if mpattern.search(str(f))].pop()
except IndexError:
LOGGER.error('Can not calibrate scene {}: {} does not exist.'
.format(scene.name, mpattern.pattern))
return
# get quantification value to convert to top of atmosphere reflectance
tree = ET.parse(metafile)
root = tree.getroot()
Q = np.float(root.find('.//QUANTIFICATION_VALUE').text)
# print current scene id
LOGGER.info('Sentinel 2 scene id: {}'.format(
parse_landsat_scene(scene.name)['id']))
# images to process
ipattern = re.compile('B[0-9](?:[0-9]|A)\.jp2')
images = [file for file in scene.iterdir() if ipattern.search(str(file))]
# pattern to match calibrated images
cal_pattern = re.compile('({}|{}|{}).[tT][iI][fF]'.format(*SUFFIXES))
# check if any images were already processed
processed = [file for file in images if cal_pattern.search(str(file))]
if any(processed):
LOGGER.info('The following images have already been processed:')
# images that were already processed
LOGGER.info(('\n ' + (len(__name__) + 1) * ' ').join(
[str(file.name) for file in processed]))
# overwrite: remove processed images and redo calibration
if overwrite:
LOGGER.info('Preparing to overwrite ...')
# remove processed images
for toremove in processed:
# remove from disk
os.unlink(toremove)
LOGGER.info('rm {}'.format(toremove))
# remove from list to process
images.remove(toremove)
# not overwriting, terminate calibration
else:
return
# exclude defined bands from the calibration procedure
for i in images:
current_band = ipattern.search(str(i))[0].split('.')[0]
if current_band in exclude:
images.remove(i)
# image driver
driver = gdal.GetDriverByName('GTiff')
driver.Register()
# iterate over the different bands
for image in images:
LOGGER.info('Processing: {}'.format(image.name))
# read the image
img = gdal.Open(str(image))
band = img.GetRasterBand(1)
# read data as array
data = band.ReadAsArray()
# mask of erroneous values, i.e. mask of values < 0
mask = data < 0
# output filename
fname = outpath.joinpath(image.stem + '_toa_ref.tif')
# convert to top of atmosphere reflectance
toa = data.astype(np.float32) / Q
# mask erroneous values
toa[mask] = np.nan
# output file
outDs = driver.Create(str(fname), img.RasterXSize, img.RasterYSize,
img.RasterCount, gdal.GDT_Float32)
outband = outDs.GetRasterBand(1)
# write array
outband.WriteArray(toa)
outband.FlushCache()
# Set the geographic information
outDs.SetProjection(img.GetProjection())
outDs.SetGeoTransform(img.GetGeoTransform())
# clear memory
del outband, band, img, outDs, toa, mask
# check whether to remove raw archive data
if remove_raw:
# raw digital number images
_dn = [i for i in scene.iterdir() if ipattern.search(str(i)) and
not cal_pattern.search(str(i))]
# remove raw digital number images
for toremove in _dn:
# remove from disk
os.unlink(toremove)
LOGGER.info('rm {}'.format(toremove))
return
def check_filename_length(filename):
"""Extended-length paths on Windows.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment