Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
"""
Modified from https://github.com/developmentseed/rio-stac/blob/main/rio_stac/stac.py
Using as main data model an xArray object, accessing the properties using rioxarray instead of rasterio
"""
"""Create STAC Item from a rasterio dataset."""
import datetime
import math
import os
import warnings
from contextlib import ExitStack
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy
import pystac
import rasterio
import xarray as xr
import rioxarray
from pystac.utils import str_to_datetime
from rasterio import transform, warp
from rasterio.features import bounds as feature_bounds
from rasterio.io import DatasetReader, DatasetWriter, MemoryFile
from rasterio.vrt import WarpedVRT
PROJECTION_EXT_VERSION = "v1.1.0"
RASTER_EXT_VERSION = "v1.1.0"
EO_EXT_VERSION = "v1.1.0"
EPSG_4326 = rasterio.crs.CRS.from_epsg(4326)
def bbox_to_geom(bbox: Tuple[float, float, float, float]) -> Dict:
"""Return a geojson geometry from a bbox."""
return {
"type": "Polygon",
"coordinates": [
[
[bbox[0], bbox[1]],
[bbox[2], bbox[1]],
[bbox[2], bbox[3]],
[bbox[0], bbox[3]],
[bbox[0], bbox[1]],
]
],
}
def rioxarray_get_dataset_geom(
src_dst: xr.DataArray,
densify_pts: int = 0,
precision: int = -1,
) -> Dict:
"""Get Raster Footprint."""
if densify_pts < 0:
raise ValueError("`densify_pts` must be positive")
if src_dst.rio.crs is not None:
# 1. Create Polygon from raster bounds
geom = bbox_to_geom(src_dst.rio.bounds())
# 2. Densify the Polygon geometry
if src_dst.rio.crs != EPSG_4326 and densify_pts:
# Derived from code found at
# https://stackoverflow.com/questions/64995977/generating-equidistance-points-along-the-boundary-of-a-polygon-but-cw-ccw
coordinates = numpy.asarray(geom["coordinates"][0])
densified_number = len(coordinates) * densify_pts
existing_indices = numpy.arange(0, densified_number, densify_pts)
interp_indices = numpy.arange(existing_indices[-1] + 1)
interp_x = numpy.interp(interp_indices, existing_indices, coordinates[:, 0])
interp_y = numpy.interp(interp_indices, existing_indices, coordinates[:, 1])
geom = {
"type": "Polygon",
"coordinates": [[(x, y) for x, y in zip(interp_x, interp_y)]],
}
# 3. Reproject the geometry to "epsg:4326"
geom = warp.transform_geom(src_dst.rio.crs, EPSG_4326, geom, precision=precision)
bbox = feature_bounds(geom)
else:
warnings.warn(
"Input file doesn't have CRS information, setting geometry and bbox to (-180,-90,180,90)."
)
bbox = (-180.0, -90.0, 180.0, 90.0)
geom = bbox_to_geom(bbox)
return {"bbox": list(bbox), "footprint": geom}
def rioxarray_get_projection_info(
src_dst: xr.DataArray,
) -> Dict:
"""Get projection metadata.
The STAC projection extension allows for three different ways to describe the coordinate reference system
associated with a raster :
- EPSG code
- WKT2
- PROJJSON
All are optional, and they can be provided altogether as well. Therefore, as long as one can be obtained from
the data, we add it to the returned dictionary.
see: https://github.com/stac-extensions/projection
"""
projjson = None
wkt2 = None
epsg = None
if src_dst.rio.crs is not None:
# EPSG
epsg = src_dst.rio.crs.to_epsg() if src_dst.rio.crs.is_epsg_code else None
# PROJJSON
try:
projjson = src_dst.rio.crs.to_dict(projjson=True)
except (AttributeError, TypeError) as ex:
warnings.warn(f"Could not get PROJJSON from dataset : {ex}")
pass
# WKT2
try:
wkt2 = src_dst.rio.crs.to_wkt()
except Exception as ex:
warnings.warn(f"Could not get WKT2 from dataset : {ex}")
pass
meta = {
"epsg": epsg,
"geometry": bbox_to_geom(src_dst.rio.bounds()),
"bbox": list(src_dst.rio.bounds()),
"shape": [src_dst.rio.height, src_dst.rio.width],
"transform": list(src_dst.rio.transform()),
}
if projjson is not None:
meta["projjson"] = projjson
if wkt2 is not None:
meta["wkt2"] = wkt2
return meta
def get_eobands_info(
src_dst: Union[DatasetReader, DatasetWriter, WarpedVRT, MemoryFile]
) -> List:
"""Get eo:bands metadata.
see: https://github.com/stac-extensions/eo#item-properties-or-asset-fields
"""
eo_bands = []
colors = src_dst.colorinterp
for ix in src_dst.indexes:
band_meta = {"name": f"b{ix}"}
descr = src_dst.descriptions[ix - 1]
color = colors[ix - 1].name
# Description metadata or Colorinterp or Nothing
description = descr or color
if description:
band_meta["description"] = description
eo_bands.append(band_meta)
return eo_bands
def _rioxarray_get_stats(arr: numpy.ndarray, **kwargs: Any) -> Dict:
"""Calculate array statistics."""
# Avoid non masked nan/inf values
arr = arr.values
numpy.ma.fix_invalid(arr, copy=False)
sample, edges = numpy.histogram(arr)
return {
"statistics": {
"mean": float(arr.mean()),
"minimum": float(arr.min()),
"maximum": float(arr.max()),
"stddev": float(arr.std()),
"valid_percent": float(numpy.count_nonzero(arr))
/ float(arr.size)
* 100,
},
"histogram": {
"count": len(edges),
"min": float(edges.min()),
"max": float(edges.max()),
"buckets": sample.tolist(),
},
}
def rioxarray_get_raster_info( # noqa: C901
src_dst: xr.DataArray,
max_size: int = 1024,
) -> List[Dict]:
"""Get raster metadata.
see: https://github.com/stac-extensions/raster#raster-band-object
"""
height = src_dst.rio.height
width = src_dst.rio.width
if max_size:
if max(width, height) > max_size:
ratio = height / width
if ratio > 1:
height = max_size
width = math.ceil(height / ratio)
else:
width = max_size
height = math.ceil(width * ratio)
meta: List[Dict] = []
# area_or_point = src_dst.tags().get("AREA_OR_POINT", "").lower()
# Missing `bits_per_sample` and `spatial_resolution`
# It should contain only one band/variable
# for band in src_dst.indexes:
value = {
"data_type": str(src_dst.dtype),
"scale": 1, # TODO: load scale and offset if present
"offset": 0,
}
# if area_or_point:
# value["sampling"] = area_or_point
# If the Nodata is not set we don't forward it.
if src_dst.rio.nodata is not None:
if numpy.isnan(src_dst.rio.nodata):
value["nodata"] = "nan"
elif numpy.isposinf(src_dst.rio.nodata):
value["nodata"] = "inf"
elif numpy.isneginf(src_dst.rio.nodata):
value["nodata"] = "-inf"
else:
value["nodata"] = src_dst.rio.nodata
# TODO: check if we can get the unit
# if src_dst.rio.units[0] is not None:
# value["unit"] = src_dst.rio.units[0]
value.update(_rioxarray_get_stats(src_dst))
meta.append(value)
return meta
def get_media_type(
src_dst: Union[DatasetReader, DatasetWriter, WarpedVRT, MemoryFile]
) -> Optional[pystac.MediaType]:
"""Find MediaType for a raster dataset."""
driver = src_dst.driver
if driver == "GTiff":
if src_dst.crs:
return pystac.MediaType.GEOTIFF
else:
return pystac.MediaType.TIFF
elif driver in [
"JP2ECW",
"JP2KAK",
"JP2LURA",
"JP2MrSID",
"JP2OpenJPEG",
"JPEG2000",
]:
return pystac.MediaType.JPEG2000
elif driver in ["HDF4", "HDF4Image"]:
return pystac.MediaType.HDF
elif driver in ["HDF5", "HDF5Image"]:
return pystac.MediaType.HDF5
elif driver == "JPEG":
return pystac.MediaType.JPEG
elif driver == "PNG":
return pystac.MediaType.PNG
warnings.warn("Could not determine the media type from GDAL driver.")
return None
def create_stac_item(
source: Union[str, DatasetReader, DatasetWriter, WarpedVRT, MemoryFile],
input_datetime: Optional[datetime.datetime] = None,
extensions: Optional[List[str]] = None,
collection: Optional[str] = None,
collection_url: Optional[str] = None,
properties: Optional[Dict] = None,
id: Optional[str] = None,
assets: Optional[Dict[str, pystac.Asset]] = None,
asset_name: str = "asset",
asset_roles: Optional[List[str]] = None,
asset_media_type: Optional[Union[str, pystac.MediaType]] = "auto",
asset_href: Optional[str] = None,
with_proj: bool = False,
with_raster: bool = False,
with_eo: bool = False,
raster_max_size: int = 1024,
geom_densify_pts: int = 0,
geom_precision: int = -1,
) -> pystac.Item:
"""Create a Stac Item.
Args:
source (str or opened rasterio dataset): input path or rasterio dataset.
input_datetime (datetime.datetime, optional): datetime associated with the item.
extensions (list of str): input list of extensions to use in the item.
collection (str, optional): name of collection the item belongs to.
collection_url (str, optional): Link to the STAC Collection.
properties (dict, optional): additional properties to add in the item.
id (str, optional): id to assign to the item (default to the source basename).
assets (dict, optional): Assets to set in the item. If set we won't create one from the source.
asset_name (str, optional): asset name in the Assets object.
asset_roles (list of str, optional): list of str | list of asset's roles.
asset_media_type (str or pystac.MediaType, optional): asset's media type.
asset_href (str, optional): asset's URI (default to input path).
with_proj (bool): Add the `projection` extension and properties (default to False).
with_raster (bool): Add the `raster` extension and properties (default to False).
with_eo (bool): Add the `eo` extension and properties (default to False).
raster_max_size (int): Limit array size from which to get the raster statistics. Defaults to 1024.
geom_densify_pts (int): Number of points to add to each edge to account for nonlinear edges transformation (Note: GDAL uses 21).
geom_precision (int): If >= 0, geometry coordinates will be rounded to this number of decimal.
Returns:
pystac.Item: valid STAC Item.
"""
properties = properties or {}
extensions = extensions or []
asset_roles = asset_roles or []
with ExitStack() as ctx:
if isinstance(source, (DatasetReader, DatasetWriter, WarpedVRT)):
dataset = source
else:
dataset = ctx.enter_context(rasterio.open(source))
if dataset.gcps[0]:
src_dst = ctx.enter_context(
WarpedVRT(
dataset,
src_crs=dataset.gcps[1],
src_transform=transform.from_gcps(dataset.gcps[0]),
)
)
else:
src_dst = dataset
dataset_geom = get_dataset_geom(
src_dst,
densify_pts=geom_densify_pts,
precision=geom_precision,
)
media_type = (
get_media_type(dataset) if asset_media_type == "auto" else asset_media_type
)
if "start_datetime" not in properties and "end_datetime" not in properties:
# Try to get datetime from https://gdal.org/user/raster_data_model.html#imagery-domain-remote-sensing
dst_date = src_dst.get_tag_item("ACQUISITIONDATETIME", "IMAGERY")
dst_datetime = str_to_datetime(dst_date) if dst_date else None
input_datetime = (
input_datetime or dst_datetime or datetime.datetime.utcnow()
)
# add projection properties
if with_proj:
extensions.append(
f"https://stac-extensions.github.io/projection/{PROJECTION_EXT_VERSION}/schema.json",
)
properties.update(
{
f"proj:{name}": value
for name, value in get_projection_info(src_dst).items()
}
)
# add raster properties
raster_info = {}
if with_raster:
extensions.append(
f"https://stac-extensions.github.io/raster/{RASTER_EXT_VERSION}/schema.json",
)
raster_info = {
"raster:bands": get_raster_info(dataset, max_size=raster_max_size)
}
eo_info: Dict[str, List] = {}
if with_eo:
extensions.append(
f"https://stac-extensions.github.io/eo/{EO_EXT_VERSION}/schema.json",
)
eo_info = {"eo:bands": get_eobands_info(src_dst)}
cloudcover = src_dst.get_tag_item("CLOUDCOVER", "IMAGERY")
if cloudcover is not None:
properties.update({"eo:cloud_cover": int(cloudcover)})
# item
item = pystac.Item(
id=id or os.path.basename(dataset.name),
geometry=dataset_geom["footprint"],
bbox=dataset_geom["bbox"],
collection=collection,
stac_extensions=extensions,
datetime=input_datetime,
properties=properties,
)
# if we add a collection we MUST add a link
if collection:
item.add_link(
pystac.Link(
pystac.RelType.COLLECTION,
collection_url or collection,
media_type=pystac.MediaType.JSON,
)
)
# item.assets
if assets:
for key, asset in assets.items():
item.add_asset(key=key, asset=asset)
else:
item.add_asset(
key=asset_name,
asset=pystac.Asset(
href=asset_href or dataset.name,
media_type=media_type,
extra_fields={**raster_info, **eo_info},
roles=asset_roles,
),
)
return item