Newer
Older
Create STAC Item from a rasterio dataset.
Modified from https://github.com/developmentseed/rio-stac/blob/main/rio_stac/stac.py
Using as main data model an xArray object, accessing the properties using rioxarray instead of rasterio
"""
import datetime
import math
import os
import warnings
from contextlib import ExitStack
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy
import pystac
import rasterio
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from pystac.utils import str_to_datetime
from rasterio import transform, warp
from rasterio.features import bounds as feature_bounds
from rasterio.io import DatasetReader, DatasetWriter, MemoryFile
from rasterio.vrt import WarpedVRT
PROJECTION_EXT_VERSION = "v1.1.0"
RASTER_EXT_VERSION = "v1.1.0"
EO_EXT_VERSION = "v1.1.0"
EPSG_4326 = rasterio.crs.CRS.from_epsg(4326)
def bbox_to_geom(bbox: Tuple[float, float, float, float]) -> Dict:
"""Return a geojson geometry from a bbox."""
return {
"type": "Polygon",
"coordinates": [
[
[bbox[0], bbox[1]],
[bbox[2], bbox[1]],
[bbox[2], bbox[3]],
[bbox[0], bbox[3]],
[bbox[0], bbox[1]],
]
],
}
def rioxarray_get_dataset_geom(
src_dst: xr.DataArray,
densify_pts: int = 0,
precision: int = -1,
) -> Dict:
"""Get Raster Footprint."""
if densify_pts < 0:
raise ValueError("`densify_pts` must be positive")
if src_dst.rio.crs is not None:
# 1. Create Polygon from raster bounds
geom = bbox_to_geom(src_dst.rio.bounds())
# 2. Densify the Polygon geometry
if src_dst.rio.crs != EPSG_4326 and densify_pts:
# Derived from code found at
# https://stackoverflow.com/questions/64995977/generating-equidistance-points-along-the-boundary-of-a-polygon-but-cw-ccw
coordinates = numpy.asarray(geom["coordinates"][0])
densified_number = len(coordinates) * densify_pts
existing_indices = numpy.arange(0, densified_number, densify_pts)
interp_indices = numpy.arange(existing_indices[-1] + 1)
interp_x = numpy.interp(interp_indices, existing_indices, coordinates[:, 0])
interp_y = numpy.interp(interp_indices, existing_indices, coordinates[:, 1])
geom = {
"type": "Polygon",
"coordinates": [[(x, y) for x, y in zip(interp_x, interp_y)]],
}
# 3. Reproject the geometry to "epsg:4326"
geom = warp.transform_geom(
src_dst.rio.crs, EPSG_4326, geom, precision=precision
)
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
bbox = feature_bounds(geom)
else:
warnings.warn(
"Input file doesn't have CRS information, setting geometry and bbox to (-180,-90,180,90)."
)
bbox = (-180.0, -90.0, 180.0, 90.0)
geom = bbox_to_geom(bbox)
return {"bbox": list(bbox), "footprint": geom}
def rioxarray_get_projection_info(
src_dst: xr.DataArray,
) -> Dict:
"""Get projection metadata.
The STAC projection extension allows for three different ways to describe the coordinate reference system
associated with a raster :
- EPSG code
- WKT2
- PROJJSON
All are optional, and they can be provided altogether as well. Therefore, as long as one can be obtained from
the data, we add it to the returned dictionary.
see: https://github.com/stac-extensions/projection
"""
projjson = None
wkt2 = None
epsg = None
if src_dst.rio.crs is not None:
# EPSG
epsg = src_dst.rio.crs.to_epsg() if src_dst.rio.crs.is_epsg_code else None
# PROJJSON
try:
projjson = src_dst.rio.crs.to_dict(projjson=True)
except (AttributeError, TypeError) as ex:
warnings.warn(f"Could not get PROJJSON from dataset : {ex}")
pass
# WKT2
try:
wkt2 = src_dst.rio.crs.to_wkt()
except Exception as ex:
warnings.warn(f"Could not get WKT2 from dataset : {ex}")
pass
meta = {
"epsg": epsg,
"geometry": bbox_to_geom(src_dst.rio.bounds()),
"bbox": list(src_dst.rio.bounds()),
"shape": [src_dst.rio.height, src_dst.rio.width],
"transform": list(src_dst.rio.transform()),
}
if projjson is not None:
meta["projjson"] = projjson
if wkt2 is not None:
meta["wkt2"] = wkt2
return meta
def get_eobands_info(
src_dst: Union[DatasetReader, DatasetWriter, WarpedVRT, MemoryFile],
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
) -> List:
"""Get eo:bands metadata.
see: https://github.com/stac-extensions/eo#item-properties-or-asset-fields
"""
eo_bands = []
colors = src_dst.colorinterp
for ix in src_dst.indexes:
band_meta = {"name": f"b{ix}"}
descr = src_dst.descriptions[ix - 1]
color = colors[ix - 1].name
# Description metadata or Colorinterp or Nothing
description = descr or color
if description:
band_meta["description"] = description
eo_bands.append(band_meta)
return eo_bands
def _rioxarray_get_stats(arr: numpy.ndarray, **kwargs: Any) -> Dict:
"""Calculate array statistics."""
# Avoid non masked nan/inf values
arr = arr.values
numpy.ma.fix_invalid(arr, copy=False)
sample, edges = numpy.histogram(arr)
return {
"statistics": {
"mean": float(arr.mean()),
"minimum": float(arr.min()),
"maximum": float(arr.max()),
"stddev": float(arr.std()),
"valid_percent": float(numpy.count_nonzero(arr)) / float(arr.size) * 100,
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
},
"histogram": {
"count": len(edges),
"min": float(edges.min()),
"max": float(edges.max()),
"buckets": sample.tolist(),
},
}
def rioxarray_get_raster_info( # noqa: C901
src_dst: xr.DataArray,
max_size: int = 1024,
) -> List[Dict]:
"""Get raster metadata.
see: https://github.com/stac-extensions/raster#raster-band-object
"""
height = src_dst.rio.height
width = src_dst.rio.width
if max_size:
if max(width, height) > max_size:
ratio = height / width
if ratio > 1:
height = max_size
width = math.ceil(height / ratio)
else:
width = max_size
height = math.ceil(width * ratio)
meta: List[Dict] = []
# area_or_point = src_dst.tags().get("AREA_OR_POINT", "").lower()
# Missing `bits_per_sample` and `spatial_resolution`
# It should contain only one band/variable
# for band in src_dst.indexes:
if src_dst.attrs["scale_factor"]:
value = {
"data_type": str(src_dst.dtype),
"scale": src_dst.attrs["scale_factor"],
}
else:
value = {
"data_type": str(src_dst.dtype),
"scale": 1,
}
# add offset
if src_dst.attrs["add_offset"]:
value["offset"] = src_dst.attrs["add_offset"]
else:
value["offset"] = 0
# if area_or_point:
# value["sampling"] = area_or_point
# If the Nodata is not set we don't forward it.
if src_dst.rio.nodata is not None:
if numpy.isnan(src_dst.rio.nodata):
value["nodata"] = "nan"
elif numpy.isposinf(src_dst.rio.nodata):
value["nodata"] = "inf"
elif numpy.isneginf(src_dst.rio.nodata):
value["nodata"] = "-inf"
else:
value["nodata"] = (src_dst.rio.nodata).astype((src_dst.dtype)[:2])
# TODO: check if we can get the unit
# if src_dst.rio.units[0] is not None:
# value["unit"] = src_dst.rio.units[0]
value.update(_rioxarray_get_stats(src_dst))
meta.append(value)
return meta
def get_media_type(
src_dst: Union[DatasetReader, DatasetWriter, WarpedVRT, MemoryFile],
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
) -> Optional[pystac.MediaType]:
"""Find MediaType for a raster dataset."""
driver = src_dst.driver
if driver == "GTiff":
if src_dst.crs:
return pystac.MediaType.GEOTIFF
else:
return pystac.MediaType.TIFF
elif driver in [
"JP2ECW",
"JP2KAK",
"JP2LURA",
"JP2MrSID",
"JP2OpenJPEG",
"JPEG2000",
]:
return pystac.MediaType.JPEG2000
elif driver in ["HDF4", "HDF4Image"]:
return pystac.MediaType.HDF
elif driver in ["HDF5", "HDF5Image"]:
return pystac.MediaType.HDF5
elif driver == "JPEG":
return pystac.MediaType.JPEG
elif driver == "PNG":
return pystac.MediaType.PNG
warnings.warn("Could not determine the media type from GDAL driver.")
return None
def create_stac_item(
source: Union[str, DatasetReader, DatasetWriter, WarpedVRT, MemoryFile],
input_datetime: Optional[datetime.datetime] = None,
extensions: Optional[List[str]] = None,
collection: Optional[str] = None,
collection_url: Optional[str] = None,
properties: Optional[Dict] = None,
id: Optional[str] = None,
assets: Optional[Dict[str, pystac.Asset]] = None,
asset_name: str = "asset",
asset_roles: Optional[List[str]] = None,
asset_media_type: Optional[Union[str, pystac.MediaType]] = "auto",
asset_href: Optional[str] = None,
with_proj: bool = False,
with_raster: bool = False,
with_eo: bool = False,
raster_max_size: int = 1024,
geom_densify_pts: int = 0,
geom_precision: int = -1,
) -> pystac.Item:
"""Create a Stac Item.
Args:
source (str or opened rasterio dataset): input path or rasterio dataset.
input_datetime (datetime.datetime, optional): datetime associated with the item.
extensions (list of str): input list of extensions to use in the item.
collection (str, optional): name of collection the item belongs to.
collection_url (str, optional): Link to the STAC Collection.
properties (dict, optional): additional properties to add in the item.
id (str, optional): id to assign to the item (default to the source basename).
assets (dict, optional): Assets to set in the item. If set we won't create one from the source.
asset_name (str, optional): asset name in the Assets object.
asset_roles (list of str, optional): list of str | list of asset's roles.
asset_media_type (str or pystac.MediaType, optional): asset's media type.
asset_href (str, optional): asset's URI (default to input path).
with_proj (bool): Add the `projection` extension and properties (default to False).
with_raster (bool): Add the `raster` extension and properties (default to False).
with_eo (bool): Add the `eo` extension and properties (default to False).
raster_max_size (int): Limit array size from which to get the raster statistics. Defaults to 1024.
geom_densify_pts (int): Number of points to add to each edge to account for nonlinear edges transformation (Note: GDAL uses 21).
geom_precision (int): If >= 0, geometry coordinates will be rounded to this number of decimal.
Returns:
pystac.Item: valid STAC Item.
"""
properties = properties or {}
extensions = extensions or []
asset_roles = asset_roles or []
with ExitStack() as ctx:
if isinstance(source, (DatasetReader, DatasetWriter, WarpedVRT)):
dataset = source
else:
dataset = ctx.enter_context(rasterio.open(source))
if dataset.gcps[0]:
src_dst = ctx.enter_context(
WarpedVRT(
dataset,
src_crs=dataset.gcps[1],
src_transform=transform.from_gcps(dataset.gcps[0]),
)
)
else:
src_dst = dataset
dataset_geom = rioxarray_get_dataset_geom(
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
src_dst,
densify_pts=geom_densify_pts,
precision=geom_precision,
)
media_type = (
get_media_type(dataset) if asset_media_type == "auto" else asset_media_type
)
if "start_datetime" not in properties and "end_datetime" not in properties:
# Try to get datetime from https://gdal.org/user/raster_data_model.html#imagery-domain-remote-sensing
dst_date = src_dst.get_tag_item("ACQUISITIONDATETIME", "IMAGERY")
dst_datetime = str_to_datetime(dst_date) if dst_date else None
input_datetime = (
input_datetime or dst_datetime or datetime.datetime.utcnow()
)
# add projection properties
if with_proj:
extensions.append(
f"https://stac-extensions.github.io/projection/{PROJECTION_EXT_VERSION}/schema.json",
)
properties.update(
{
f"proj:{name}": value
for name, value in rioxarray_get_projection_info(src_dst).items()
}
)
# add raster properties
raster_info = {}
if with_raster:
extensions.append(
f"https://stac-extensions.github.io/raster/{RASTER_EXT_VERSION}/schema.json",
)
raster_info = {
"raster:bands": rioxarray_get_raster_info(
dataset, max_size=raster_max_size
)
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
}
eo_info: Dict[str, List] = {}
if with_eo:
extensions.append(
f"https://stac-extensions.github.io/eo/{EO_EXT_VERSION}/schema.json",
)
eo_info = {"eo:bands": get_eobands_info(src_dst)}
cloudcover = src_dst.get_tag_item("CLOUDCOVER", "IMAGERY")
if cloudcover is not None:
properties.update({"eo:cloud_cover": int(cloudcover)})
# item
item = pystac.Item(
id=id or os.path.basename(dataset.name),
geometry=dataset_geom["footprint"],
bbox=dataset_geom["bbox"],
collection=collection,
stac_extensions=extensions,
datetime=input_datetime,
properties=properties,
)
# if we add a collection we MUST add a link
if collection:
item.add_link(
pystac.Link(
pystac.RelType.COLLECTION,
collection_url or collection,
media_type=pystac.MediaType.JSON,
)
)
# item.assets
if assets:
for key, asset in assets.items():
item.add_asset(key=key, asset=asset)
else:
item.add_asset(
key=asset_name,
asset=pystac.Asset(
href=asset_href or dataset.name,
media_type=media_type,
extra_fields={**raster_info, **eo_info},
roles=asset_roles,
),
)
return item