Newer
Older
Raster2STAC - Generate STAC metadata from raster data
This module provides a class `Raster2STAC` for extracting from raster data, represented as an `xr.DataArray`
or a file path to netCDF(s) file(s), SpatioTemporal Asset Catalog (STAC) format metadata JSON files.
Authors: Mercurio Lorenzo, Eurac Research - Inst. for Earth Observation, Bolzano/Bozen IT
Authors: Michele Claus, Eurac Research - Inst. for Earth Observation, Bolzano/Bozen IT
import datetime
import os
import pystac
from pystac.utils import str_to_datetime
import rasterio
# Import extension version
from rio_stac.stac import (
PROJECTION_EXT_VERSION,
RASTER_EXT_VERSION,
EO_EXT_VERSION
)
# Import rio_stac methods
from rio_stac.stac import (
get_dataset_geom,
get_projection_info,
get_raster_info,
get_eobands_info,
bbox_to_geom,
)
import pandas as pd
import xarray as xr
from typing import Callable, Optional, Union
import openeo_processes_dask
from urllib.parse import urlparse, urlunparse
from fsspec.implementations.local import LocalFileSystem
import openeo_processes_dask.process_implementations.cubes._xr_interop
from openeo.local import LocalConnection
from .rioxarray_stac import rioxarray_get_dataset_geom, rioxarray_get_projection_info, rioxarray_get_raster_info
"""
Raster2STAC Class - Converte dati raster nel formato STAC.
Args:
data: str or xr.DataArray
Raster data as xr.DataArray or file path referring to a netCDF file.
collection_id: str
Identifier of the collection as a string (Example: 'blue-river-basin')
collection_url: str
Collection URL (must refer to the FastAPI URL where this collection will be uploaded).
item_prefix: Optional[str] = ""
Prefix to add before the datetime as item_id, if the same datetime can occur in multiple items.
output_folder: Optional[str] = None
Local folder for rasters and STAC metadata outputs. Default folder will be set as run timestamp folder
(ex: ./20231215103000/)
description: Optional[str] = ""
Description of the STAC collection.
title: Optional[str] = None,
Title of the STAC collection.
ignore_warns: Optional[bool] = False,
If True, warnings during processing (such as xr lib warnings) will be ignored.
keywords: Optional[list] = None,
Keywords associated with the STAC item.
providers: Optional[list] = None,
Data providers associated with the STAC item.
stac_version: str = "1.0.0",
Version of the STAC specification to use.
s3_upload: bool = True,
1. For the "COG" output format: upload to S3 the COG files
2. For the "KERCHUNK" output format: upload the netCDFs and the json Kerchunk files to S3.
Part of AWS S3 configuration: bucket name.
bucket_file_prefix: str = None,
Part of AWS S3 configuration: prefix for files in the S3 bucket.
aws_region: str = None,
Part of AWS S3 configuration: AWS region for S3.
version: Optional[str] = None,
Version of the STAC collection.
license: Optional[str] = None,
License information about STAC collection and its assets.
sci_citation: Optional[str] = None
Scientific citation(s) reference(s) about STAC collection.
write_collection_assets = False,
Include all assets in the STAC Collection, with unique keys.
def __init__(self,
data,
item_prefix: Optional[str] = "",
keywords: Optional[list] = None,
providers: Optional[list] = None,
links: Optional[list] = None,
s3_upload=False,
bucket_name = None,
bucket_file_prefix = None,
aws_region = None,
aws_access_key = None,
aws_secret_key = None,
sci_citation=None,
write_collection_assets=False
if ignore_warns == True:
import warnings
warnings.filterwarnings("ignore")
self.data = data
self.X_DIM = None
self.Y_DIM = None
self.T_DIM = None
self.B_DIM = None
self.output_format = None
self.properties = {} # additional properties to add in the item
self.collection_id = collection_id # name of collection the item belongs to
self.item_prefix = item_prefix
self.keywords = keywords
self.providers = providers
self.write_collection_assets = write_collection_assets
self.extensions = [
f"https://stac-extensions.github.io/projection/{PROJECTION_EXT_VERSION}/schema.json",
f"https://stac-extensions.github.io/raster/{RASTER_EXT_VERSION}/schema.json",
f"https://stac-extensions.github.io/eo/{EO_EXT_VERSION}/schema.json",
# f"https://stac-extensions.github.io/datacube/{DATACUBE_EXT_VERSION}/schema.json",
if output_folder is not None:
self.output_folder = output_folder
else:
self.output_folder = datetime.datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3]
Path(self.output_folder).mkdir(parents=True, exist_ok=True)
self.bucket_name = bucket_name
self.bucket_file_prefix = bucket_file_prefix
self.aws_region = aws_region
self.s3_upload = s3_upload
self.s3_client = None
self.title = title
if self.s3_upload:
# Initializing an S3 client
self.s3_client = boto3.client('s3', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key) #region_name=aws_region,
# available_output_formats = ["csv","json_full"]
# if output_format not in available_output_formats:
# raise ValueError(f"output_format can be set to one of {available_output_formats}")
# self.output_format = output_format
self.license = license
self.sci_citation = sci_citation
#TODO: implement following attributes: self.overwrite,
def fix_path_slash(self, res_loc):
return res_loc if res_loc.endswith('/') else res_loc + '/'
# TODO/FIXME: maybe better to put this method as an external static function? (and s3_client attribute as global variable)
def upload_s3(self, file_path):
if self.s3_client is not None:
prefix = self.bucket_file_prefix
file_name = os.path.basename(file_path)
object_name = f"{self.fix_path_slash(prefix)}{file_name}"
try:
self.s3_client.upload_file(file_path, self.bucket_name, object_name)
_log.debug(f'Successfully uploaded {file_name} to {self.bucket_name} as {object_name}')
_log.debug('AWS credentials not found. Make sure you set the correct access key and secret key.')
_log.debug(f'Error uploading file: {e.response["Error"]["Message"]}')
def get_root_url(self, url):
parsed_url = urlparse(url)
# Extract protocol + domain
root_url = urlunparse((parsed_url.scheme, parsed_url.netloc, '', '', '', ''))
return root_url
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
def generate_kerchunk_stac(self):
from kerchunk.hdf import SingleHdf5ToZarr
def gen_json(u, so, json_dir):
fs = LocalFileSystem(skip_instant_cache=True)
with fs.open(u, **so) as inf:
h5chunks = SingleHdf5ToZarr(inf, u, inline_threshold=300)
with open(f"{str(json_dir)}/{u.split('/')[-1]}.json", 'wb') as outf:
outf.write(ujson.dumps(h5chunks.translate()).encode())
return f"{str(json_dir)}/{u.split('/')[-1]}.json"
# Create the output folder for the Kerchunk files
kerchunk_folder = os.path.join(self.output_folder,"kerchunk")
Path(kerchunk_folder).mkdir(parents=True, exist_ok=True)
kerchunk_files_list = []
# Read the list of netCDFs
for same_time_netcdfs in self.data:
t_labels = []
for var in same_time_netcdfs:
source_nc = var
source_path = os.path.dirname(var)
local_conn = LocalConnection(source_path)
tmp = local_conn.load_collection(source_nc).execute()
t_labels.append(tuple(tmp[tmp.openeo.temporal_dims[0]].values))
t_steps = [len(x) for x in t_labels]
if len(set(t_steps)) != 1:
raise ValueError(f"The provided netCDFs contain a different number of dates! {same_time_netcdfs}")
if len(set(t_labels)) != 1:
raise ValueError(f"The provided netCDFs contain a different set of dates!")
so = dict(mode='rb', anon=True, default_fill_cache=False)
same_time_kerchunks = dask.compute(*[dask.delayed(gen_json)(var, so, kerchunk_folder) for var in same_time_netcdfs])
kerchunk_files_list.append(same_time_kerchunks)
# List of List json Kerchunk.
# First list: each element (list) different year/time
# Second list: each element different variables
datasets_list = []
for same_time_data in kerchunk_files_list:
for d in same_time_data:
if d.endswith(".json"):
self.data = xr.open_dataset(
"reference://",
engine="zarr",
decode_coords="all",
backend_kwargs={
"storage_options": {
"fo":d,
},
"consolidated": False
},chunks={}
).to_dataarray(dim="bands")
IS_KERCHUNK = True
datasets_list.append(self.data)
# Need to create one Item per time/netCDF
self.data = xr.combine_by_coords(datasets_list,combine_attrs="drop_conflicts")
# raise ValueError("'data' paramter must be either xr.DataArray, a str (path to a netCDF) or a list of lists with paths to JSON Kerchunk files.")
self.X_DIM = self.data.openeo.x_dim
self.Y_DIM = self.data.openeo.y_dim
self.T_DIM = self.data.openeo.temporal_dims[0]
self.B_DIM = self.data.openeo.band_dims[0]
_log.debug(f'Extracted label dimensions from input are:\nx dimension:{self.X_DIM}\ny dimension:{self.Y_DIM}\nbands dimension:{self.B_DIM}\ntemporal dimension:{self.T_DIM}')
self.output_format = "KERCHUNK"
self.media_type = pystac.MediaType.JSON
spatial_extents = []
temporal_extents = []
item_list = [] # Create a list to store the items
# Get the time dimension values
time_values = self.data[self.T_DIM].values
eo_info = {}
#resetting CSV file
open(f"{Path(self.output_folder)}/inline_items.csv", 'w+')
_log.debug("Cycling all timestamps")
# Loop over the kerchunk files
for same_time_data in kerchunk_files_list:
_log.debug(f"\nts: {same_time_data}")
time_ranges = []
bands_data = {}
for d in same_time_data:
if d.endswith(".json"):
band_data = xr.open_dataset(
"reference://",
engine="zarr",
decode_coords="all",
backend_kwargs={
"storage_options": {
"fo":d,
},
"consolidated": False
},chunks={}
).to_dataarray(dim="bands")
bands_data[d] = band_data
time_ranges.append(band_data[self.T_DIM].values)
# for i,t_r in enumerate(time_ranges):
# if i==0:
# first_range = t_r
# _are_all_time_steps_equal = True
# else:
# _are_all_time_steps_equal = np.array_equal(first_range,t_r) and _are_all_time_steps_equal
# _log.debug(f"Are the time steps provided in the kerchunks aligned {_are_all_time_steps_equal}")
# if not _are_all_time_steps_equal:
# raise Exception(f"The time steps provided in the kerchunk files {same_time_data} are not the same, can't continue.")
# Now we can create one STAC Item for this time range, with one asset each band/variable
start_datetime = np.min(time_ranges[0])
end_datetime = np.max(time_ranges[0])
# Convert the time value to a datetime object
# Format the timestamp as a string to use in the file name
start_datetime_str = pd.Timestamp(start_datetime).strftime('%Y%m%d%H%M%S')
end_datetime_str = pd.Timestamp(end_datetime).strftime('%Y%m%d%H%M%S')
_log.debug(f"Extracted temporal extrema for this time range: {start_datetime_str} {end_datetime_str}")
item_id = f"{f'{self.item_prefix}_' if self.item_prefix != '' else ''}{start_datetime_str}_{end_datetime_str}"
# Create a unique directory for each time slice
time_slice_dir = os.path.join(self.output_folder, f"{start_datetime_str}_{end_datetime_str}")
Path(time_slice_dir).mkdir(parents=True, exist_ok=True)
# Get the band name (you may need to adjust this part based on your data)
# bands = self.data[self.B_DIM].values
pystac_assets = []
# Cycling all bands/variables
_log.debug("Cycling all bands")
eo_bands_list = []
for b_d in bands_data:
band = bands_data[b_d][self.B_DIM].values[0]
kerchunk_file = b_d
_log.debug(f"b: {band}")
link_path = kerchunk_file
# if self.s3_upload:
# #Uploading file to s3
# _log.debug(f"Uploading {path} to {self.fix_path_slash(self.bucket_file_prefix)}{os.path.basename(path)}")
# self.upload_s3(path)
# link_path = f"https://{self.bucket_name}.{self.aws_region}.amazonaws.com/{self.fix_path_slash(self.bucket_file_prefix)}{curr_file_name}"
bboxes = []
# Create an asset dictionary for this time slice
# Get BBOX and Footprint
_log.debug(bands_data[b_d].rio.crs)
_log.debug(bands_data[b_d].rio.bounds())
dataset_geom = rioxarray_get_dataset_geom(bands_data[b_d], densify_pts=0, precision=-1)
bboxes.append(dataset_geom["bbox"])
proj_info = {
f"proj:{name}": value
for name, value in rioxarray_get_projection_info(bands_data[b_d]).items()
}
raster_info = {
"raster:bands": rioxarray_get_raster_info(bands_data[b_d], max_size=1024)
}
band_dict = {
"name": band
}
eo_bands_list.append(band_dict)
eo_info["eo:bands"] = [band_dict]
pystac_assets.append(
(
band,
pystac.Asset(
href=link_path,
media_type=self.media_type,
extra_fields={
**proj_info,
**raster_info,
**eo_info
},
roles=["data","index"],
),
)
)
eo_info["eo:bands"] = eo_bands_list
minx, miny, maxx, maxy = zip(*bboxes)
bbox = [min(minx), min(miny), max(maxx), max(maxy)]
# item
item = pystac.Item(
id=item_id,
geometry=bbox_to_geom(bbox),
bbox=bbox,
collection=None, #self.collection_id, #FIXME: da errore se lo si decommenta
stac_extensions=self.extensions,
datetime=None,
start_datetime=pd.Timestamp(start_datetime),
end_datetime=pd.Timestamp(end_datetime),
properties=self.properties,
)
# Calculate the item's spatial extent and add it to the list
spatial_extents.append(item.bbox)
# Calculate the item's temporal extent and add it to the list
# item_datetime = item.start_datetime
temporal_extents.append([pd.Timestamp(start_datetime), pd.Timestamp(end_datetime)])
for key, asset in pystac_assets:
item.add_asset(key=key, asset=asset)
item.validate()
item.add_link(
pystac.Link(
pystac.RelType.COLLECTION,
f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
media_type=pystac.MediaType.JSON,
)
)
item.add_link(
pystac.Link(
pystac.RelType.PARENT,
f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
media_type=pystac.MediaType.JSON,
)
)
item.add_link(
pystac.Link(
pystac.RelType.SELF,
f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/{item_id}",
media_type=pystac.MediaType.JSON,
)
)
item_dict = item.to_dict()
#FIXME: persistent pystac bug or logical error (urllib error when adding root link to current item)
# now this link is added manually by editing the dict
item_dict["links"].append({"rel": "root", "href": self.get_root_url(f"{self.fix_path_slash(self.collection_url)}{self.collection_id}"), "type": "application/json"})
# self.stac_collection.add_item(item)
# Append the item to the list instead of adding it to the collection
#item_dict = item.to_dict()
item_dict["collection"] = self.collection_id
# if self.output_format == "json_full":
# item_list.append(copy.deepcopy(item_dict)) # If we don't get a deep copy, the properties datetime gets overwritten in the next iteration of the loop, don't know why.
# elif self.output_format == "csv":
item_oneline = json.dumps(item_dict, separators=(",", ":"), ensure_ascii=False)
output_path = Path(self.output_folder)
with open(f"{output_path}/inline_items.csv", 'a+') as out_csv:
out_csv.write(f"{item_oneline}\n")
jsons_path = f"{output_path}/items/"
Path(jsons_path).mkdir(parents=True, exist_ok=True)
with open(f"{self.fix_path_slash(jsons_path)}{item_id}.json", 'w+') as out_json:
out_json.write(json.dumps(item_dict, indent=4))
# Calculate overall spatial extent
minx, miny, maxx, maxy = zip(*spatial_extents)
overall_bbox = [min(minx), min(miny), max(maxx), max(maxy)]
# Calculate overall temporal extent
min_datetime = min(temporal_extents, key=lambda x: x[0])[0]
max_datetime = max(temporal_extents, key=lambda x: x[1])[1]
overall_temporal_extent = [min_datetime, max_datetime]
s_ext = pystac.SpatialExtent([overall_bbox])
t_ext = pystac.TemporalExtent([overall_temporal_extent])
extra_fields = {}
extra_fields["stac_version"] = self.stac_version
if self.keywords is not None:
extra_fields["keywords"] = self.keywords
if self.providers is not None:
extra_fields["providers"] = self.providers
if self.version is not None:
extra_fields["version"] = self.version
if self.title is not None:
extra_fields["title"] = self.title
if self.sci_citation is not None:
extra_fields["sci:citation"] = self.sci_citation
if self.sci_doi is not None:
extra_fields["sci:doi"] = self.sci_doi
if self.sci_citation is not None or self.sci_doi is not None:
self.extensions.append("https://stac-extensions.github.io/scientific/v1.0.0/schema.json")
self.extensions.append("https://stac-extensions.github.io/datacube/v2.2.0/schema.json")
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
extra_fields["stac_extensions"] = self.extensions
cube_dimensons = {
self.X_DIM: {
"axis": "x",
"type": "spatial",
"extent": [float(self.data.coords[self.X_DIM].min()), float(self.data.coords[self.X_DIM].max())],
"reference_system": int((self.data.rio.crs.to_string()).split(':')[1])
},
self.Y_DIM: {
"axis": "y",
"type": "spatial",
"extent": [float(self.data.coords[self.Y_DIM].min()), float(self.data.coords[self.Y_DIM].max())],
"reference_system": int((self.data.rio.crs.to_string()).split(':')[1])
},
self.T_DIM: {
"type": "temporal",
"extent": [str(self.data[self.T_DIM].min().values), str(self.data[self.T_DIM].max().values)],
},
self.B_DIM: {
"type": "bands",
"values": list(self.data[self.B_DIM].values),
}
}
extra_fields["cube:dimensions"] = cube_dimensons
self.stac_collection = pystac.collection.Collection(
id=self.collection_id,
description=self.description,
extent=pystac.Extent(spatial=s_ext, temporal=t_ext),
extra_fields=extra_fields,
)
self.stac_collection.add_link(
pystac.Link(
pystac.RelType.ITEMS,
f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items",
media_type=pystac.MediaType.JSON,
)
)
self.stac_collection.add_link(
pystac.Link(
pystac.RelType.PARENT,
self.get_root_url(f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items"),
media_type=pystac.MediaType.JSON,
)
)
self.stac_collection.add_link(
pystac.Link(
pystac.RelType.SELF,
f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
media_type=pystac.MediaType.JSON,
)
)
#self.stac_collection.remove_links(rel=pystac.RelType.ROOT)
self.stac_collection.add_link(
pystac.Link(
pystac.RelType.ROOT,
self.get_root_url(f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items"),
media_type=pystac.MediaType.JSON,
)
)
if self.license is not None:
self.stac_collection.license = self.license
# Create a single JSON file with all the items
stac_collection_dict = self.stac_collection.to_dict()
# in order to solve the double "root" link bug/issue
links_dict = stac_collection_dict["links"]
ctr_roots = 0
self_exists = False
self_idx = 0
for idx, link in enumerate(links_dict):
if link["rel"] == "root":
ctr_roots = ctr_roots + 1
if link["rel"] == "self":
self_exists = True
self_idx = idx
if ctr_roots == 2 and self_exists:
for idx, link in enumerate(links_dict):
if link["rel"] == "root" and link["href"] == links_dict[self_idx]["href"] and link["type"] == links_dict[self_idx]["type"]:
del links_dict[idx]
break
if self.links is not None:
stac_collection_dict["links"] = stac_collection_dict["links"] + self.links
# if self.output_format == "json_full":
# stac_collection_dict["features"] = item_list # Replace the "features" field with the list of items
json_str = json.dumps(stac_collection_dict, indent=4)
#printing metadata.json test output file
output_path = Path(self.output_folder) / Path(self.output_file)
with open(output_path, "w+") as metadata:
metadata.write(json_str)
if self.s3_upload:
#Uploading metadata JSON file to s3
_log.debug(f"Uploading metatada JSON \"{output_path}\" to {self.fix_path_slash(self.bucket_file_prefix)}{os.path.basename(output_path)}")
self.upload_s3(output_path)
def generate_cog_stac(self):
if isinstance(self.data, xr.DataArray) or isinstance(self.data, str):
if(isinstance(self.data, xr.DataArray)):
pass
elif(isinstance(self.data, str)):
source_path = os.path.dirname(self.data)
local_conn = LocalConnection(source_path)
self.data = local_conn.load_collection(self.data).execute()
self.output_format = "COG"
self.media_type = pystac.MediaType.COG # we could also use rio_stac.stac.get_media_type)
self.X_DIM = self.data.openeo.x_dim
self.Y_DIM = self.data.openeo.y_dim
self.T_DIM = self.data.openeo.temporal_dims[0]
self.B_DIM = self.data.openeo.band_dims[0]
_log.debug(f'Extracted label dimensions from input are:\nx dimension:{self.X_DIM}\ny dimension:{self.Y_DIM}\nbands dimension:{self.B_DIM}\ntemporal dimension:{self.T_DIM}')
spatial_extents = []
temporal_extents = []
# item_list = [] # Create a list to store the items
collection_assets = {}
open(f"{Path(self.output_folder)}/inline_items.csv", 'w+')
# Convert the time value to a datetime object
timestamp = pd.Timestamp(t)
# Format the timestamp as a string to use in the file name
time_str = timestamp.strftime('%Y%m%d%H%M%S')
item_id = f"{f'{self.item_prefix}_' if self.item_prefix != '' else ''}{time_str}"
# Create a unique directory for each time slice
time_slice_dir = os.path.join(self.output_folder, time_str)
if not os.path.exists(time_slice_dir):
os.makedirs(time_slice_dir)
# Get the band name (you may need to adjust this part based on your data)
pystac_assets = []
# Cycling all bands
# Define the GeoTIFF file path for this time slice and band
path = os.path.join(time_slice_dir, curr_file_name)
# Write the result to the GeoTIFF file
self.data.loc[{self.T_DIM:t,self.B_DIM:band}].to_dataset(name=band).rio.to_raster(raster_path=path, driver='COG')
link_path = path
if self.s3_upload:
#Uploading file to s3
_log.debug(f"Uploading {path} to {self.fix_path_slash(self.bucket_file_prefix)}{os.path.basename(path)}")
self.upload_s3(path)
link_path = f"https://{self.bucket_name}.{self.aws_region}.amazonaws.com/{self.fix_path_slash(self.bucket_file_prefix)}{curr_file_name}"
bboxes = []
# Create an asset dictionary for this time slice
with rasterio.open(path) as src_dst:
# Get BBOX and Footprint
dataset_geom = get_dataset_geom(src_dst, densify_pts=0, precision=-1)
bboxes.append(dataset_geom["bbox"])
proj_info = {
f"proj:{name}": value
for name, value in get_projection_info(src_dst).items()
}
raster_info = {
"raster:bands": get_raster_info(src_dst, max_size=1024)
}
band_dict = get_eobands_info(src_dst)[0]
if type(band_dict) == dict:
del(band_dict["name"])
band_dict["name"] = band_dict["description"]
del(band_dict["description"])
else:
pass #band_dict = {}
eo_bands_list.append(band_dict) #TODO: add to dict, rename description with name and remove name
cloudcover = src_dst.get_tag_item("CLOUDCOVER", "IMAGERY")
# TODO: try to add this field to the COG. Currently not present in the files we write here.
if cloudcover is not None:
self.properties.update({"eo:cloud_cover": int(cloudcover)})
media_type=self.media_type,
extra_fields={
**proj_info,
**raster_info,
**eo_info
},
)
pystac_assets.append(
(
band,
asset,
if self.write_collection_assets:
collection_assets[f"{item_id}_{band}"] = asset
minx, miny, maxx, maxy = zip(*bboxes)
bbox = [min(minx), min(miny), max(maxx), max(maxy)]
# metadata_item_path = f"{self.fix_path_slash(time_slice_dir)}metadata.json"
geometry=bbox_to_geom(bbox),
bbox=bbox,
collection=None, #self.collection_id, #FIXME: da errore se lo si decommenta
stac_extensions=self.extensions,
datetime=str_to_datetime(str(t)),
properties=self.properties,
# href=metadata_item_path # no more needed after removing JSON for every item approach
# Calculate the item's spatial extent and add it to the list
item_bbox = item.bbox
spatial_extents.append(item_bbox)
# Calculate the item's temporal extent and add it to the list
item_datetime = item.datetime
temporal_extents.append([item_datetime, item_datetime])
for key, asset in pystac_assets:
item.add_asset(key=key, asset=asset)
item.add_link(
pystac.Link(
pystac.RelType.COLLECTION,
f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
media_type=pystac.MediaType.JSON,
)
item.add_link(
pystac.Link(
pystac.RelType.PARENT,
f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
media_type=pystac.MediaType.JSON,
)
item.add_link(
pystac.Link(
pystac.RelType.SELF,
f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items/{item_id}",
media_type=pystac.MediaType.JSON,
item_dict = item.to_dict()
#FIXME: persistent pystac bug or logical error (urllib error when adding root link to current item)
# now this link is added manually by editing the dict
item_dict["links"].append({"rel": "root", "href": self.get_root_url(f"{self.fix_path_slash(self.collection_url)}{self.collection_id}"), "type": "application/json"})
# Append the item to the list instead of adding it to the collection
#item_dict = item.to_dict()
item_dict["collection"] = self.collection_id
# if self.output_format == "json_full":
# item_list.append(copy.deepcopy(item_dict)) # If we don't get a deep copy, the properties datetime gets overwritten in the next iteration of the loop.
# elif self.output_format == "csv":
item_oneline = json.dumps(item_dict, separators=(",", ":"), ensure_ascii=False)
output_path = Path(self.output_folder)
with open(f"{output_path}/inline_items.csv", 'a+') as out_csv:
out_csv.write(f"{item_oneline}\n")
jsons_path = f"{output_path}/items/"
Path(jsons_path).mkdir(parents=True, exist_ok=True)
with open(f"{self.fix_path_slash(jsons_path)}{item_id}.json", 'w+') as out_json:
out_json.write(json.dumps(item_dict, indent=4))
# Calculate overall spatial extent
minx, miny, maxx, maxy = zip(*spatial_extents)
overall_bbox = [min(minx), min(miny), max(maxx), max(maxy)]
# Calculate overall temporal extent
min_datetime = min(temporal_extents, key=lambda x: x[0])[0]
max_datetime = max(temporal_extents, key=lambda x: x[1])[1]
overall_temporal_extent = [min_datetime, max_datetime]
s_ext = pystac.SpatialExtent([overall_bbox])
t_ext = pystac.TemporalExtent([overall_temporal_extent])
extra_fields = {}
extra_fields["stac_version"] = self.stac_version
if self.keywords is not None:
extra_fields["keywords"] = self.keywords
if self.providers is not None:
extra_fields["providers"] = self.providers
if self.version is not None:
extra_fields["version"] = self.version
if self.title is not None:
extra_fields["title"] = self.title
if self.sci_citation is not None:
extra_fields["sci:citation"] = self.sci_citation
if self.sci_doi is not None:
extra_fields["sci:doi"] = self.sci_doi
if self.sci_citation is not None or self.sci_doi is not None:
self.extensions.append("https://stac-extensions.github.io/scientific/v1.0.0/schema.json")
self.extensions.append("https://stac-extensions.github.io/datacube/v2.2.0/schema.json")
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
extra_fields["stac_extensions"] = self.extensions
cube_dimensons = {
self.X_DIM: {
"axis": "x",
"type": "spatial",
"extent": [float(self.data.coords[self.X_DIM].min()), float(self.data.coords[self.X_DIM].max())],
"reference_system": int((self.data.rio.crs.to_string()).split(':')[1])
},
self.Y_DIM: {
"axis": "y",
"type": "spatial",
"extent": [float(self.data.coords[self.Y_DIM].min()), float(self.data.coords[self.Y_DIM].max())],
"reference_system": int((self.data.rio.crs.to_string()).split(':')[1])
},
self.T_DIM: {
"type": "temporal",
"extent": [str(self.data[self.T_DIM].min().values), str(self.data[self.T_DIM].max().values)],
},
self.B_DIM: {
"type": "bands",
"values": list(self.data[self.B_DIM].values),
}
}
extra_fields["cube:dimensions"] = cube_dimensons
if self.write_collection_assets:
self.stac_collection = pystac.collection.Collection(
id=self.collection_id,
description=self.description,
extent=pystac.Extent(spatial=s_ext, temporal=t_ext),
extra_fields=extra_fields,
assets = collection_assets
)
else:
self.stac_collection = pystac.collection.Collection(
id=self.collection_id,
description=self.description,
extent=pystac.Extent(spatial=s_ext, temporal=t_ext),
extra_fields=extra_fields
)
self.stac_collection.add_link(
pystac.Link(
pystac.RelType.ITEMS,
f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items",
media_type=pystac.MediaType.JSON,
)
self.stac_collection.add_link(
pystac.Link(
pystac.RelType.PARENT,
self.get_root_url(f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items"),
media_type=pystac.MediaType.JSON,
self.stac_collection.add_link(
pystac.Link(
pystac.RelType.SELF,
f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
media_type=pystac.MediaType.JSON,
#self.stac_collection.remove_links(rel=pystac.RelType.ROOT)
self.stac_collection.add_link(
pystac.Link(
pystac.RelType.ROOT,
self.get_root_url(f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items"),
media_type=pystac.MediaType.JSON,
if self.license is not None:
self.stac_collection.license = self.license
# Create a single JSON file with all the items