Newer
Older
import datetime
import os
import pystac
from pystac.utils import str_to_datetime
import rasterio
# Import extension version
from rio_stac.stac import (
PROJECTION_EXT_VERSION,
RASTER_EXT_VERSION,
EO_EXT_VERSION
)
# Import rio_stac methods
from rio_stac.stac import (
get_dataset_geom,
get_projection_info,
get_raster_info,
get_eobands_info,
bbox_to_geom,
)
import pandas as pd
import json
import xarray as xr
from typing import Callable, Optional, Union
import openeo_processes_dask
from urllib.parse import urlparse, urlunparse
#_log.setLevel(logging.INFO)
conf_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'conf.json')
# loading confs from conf.json
with open(conf_path, 'r') as f:
conf_data = json.load(f)
# loadng AWS credentials/confs
aws_access_key = conf_data["s3"]["aws_access_key"]
aws_secret_key = conf_data["s3"]["aws_secret_key"]
class Raster2STAC():
def __init__(self,data: xr.DataArray,
collection_id: Optional[str] = None, #collection id as string (same of collection and items)
collection_url: Optional[str] = None,
output_folder: Optional[str] = None,
title: Optional[str] = None,
keywords: Optional[list] = None, ### down below: if None, don't put that key on the structure
providers: Optional[list] = None, ### down below: if None, don't put that key on the structure
bucket_file_prefix = conf_data["s3"]["bucket_file_prefix"],
aws_region = conf_data["s3"]["aws_region"],
version = None,
output_format="json_full",
license = None,
write_json_items = False,
sci_citation=None
self.X_DIM = self.data.openeo.x_dim
self.Y_DIM = self.data.openeo.y_dim
self.T_DIM = self.data.openeo.temporal_dims[0]
self.B_DIM = self.data.openeo.band_dims[0]
self.pystac_assets = []
self.media_type = None
# additional properties to add in the item
self.properties = {}
# datetime associated with the item
self.input_datetime = None
# name of collection the item belongs to
self.keywords = keywords
self.providers = providers
self.extensions = [
f"https://stac-extensions.github.io/projection/{PROJECTION_EXT_VERSION}/schema.json",
f"https://stac-extensions.github.io/raster/{RASTER_EXT_VERSION}/schema.json",
f"https://stac-extensions.github.io/eo/{EO_EXT_VERSION}/schema.json",
]
self.set_media_type(pystac.MediaType.COG) # we could also use rio_stac.stac.get_media_type)
if output_folder is not None:
self.output_folder = output_folder
else:
self.output_folder = datetime.datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3]
if output_file is not None:
self.output_file = output_file
else:
self.output_file = "collection.json"
if not os.path.exists(self.output_folder):
os.mkdir(self.output_folder)
self.bucket_name = bucket_name
self.bucket_file_prefix = bucket_file_prefix
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
self.aws_region = aws_region
self.s3_upload = s3_upload
self.s3_client = None
self.version = version
self.title = title
if self.s3_upload:
# Initializing an S3 client
self.s3_client = boto3.client('s3', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key) #region_name=aws_region,
self.output_format = output_format
if self.output_format != "json_full":
self.s3_upload = False
self.license = license
self.write_json_items = write_json_items
self.sci_citation = sci_citation
def fix_path_slash(self, res_loc):
return res_loc if res_loc.endswith('/') else res_loc + '/'
def set_media_type(self, media_type: pystac.MediaType):
# TODO/FIXME: maybe better to put this method as an external static function? (and s3_client attribute as global variable)
def upload_s3(self, file_path):
if self.s3_client is not None:
prefix = self.bucket_file_prefix
file_name = os.path.basename(file_path)
object_name = f"{self.fix_path_slash(prefix)}{file_name}"
try:
self.s3_client.upload_file(file_path, self.bucket_name, object_name)
if self.verbose:
_log.debug(f'Successfully uploaded {file_name} to {self.bucket_name} as {object_name}')
except botocore.exceptions.NoCredentialsError:
if self.verbose:
_log.debug('AWS credentials not found. Make sure you set the correct access key and secret key.')
except botocore.exceptions.ClientError as e:
if self.verbose:
_log.debug(f'Error uploading file: {e.response["Error"]["Message"]}')
def get_root_url(self, url):
parsed_url = urlparse(url)
# Extract protocol + domain
root_url = urlunparse((parsed_url.scheme, parsed_url.netloc, '', '', '', ''))
return root_url
def generate_stac(self):
spatial_extents = []
temporal_extents = []
item_list = [] # Create a list to store the items
#Cycling all timestamps
if self.verbose:
# Convert the time value to a datetime object
timestamp = pd.Timestamp(t)
# Format the timestamp as a string to use in the file name
time_str = timestamp.strftime('%Y%m%d%H%M%S')
# Create a unique directory for each time slice
time_slice_dir = os.path.join(self.output_folder, time_str)
if not os.path.exists(time_slice_dir):
os.makedirs(time_slice_dir)
# Get the band name (you may need to adjust this part based on your data)
pystac_assets = []
# Cycling all bands
# Define the GeoTIFF file path for this time slice and band
path = os.path.join(time_slice_dir, curr_file_name)
# Write the result to the GeoTIFF file
self.data.loc[{self.T_DIM:t,self.B_DIM:band}].to_dataset(name=band).rio.to_raster(raster_path=path, driver='COG')
link_path = path
if self.s3_upload:
#Uploading file to s3
_log.debug(f"Uploading {path} to {self.fix_path_slash(self.bucket_file_prefix)}{os.path.basename(path)}")
self.upload_s3(path)
link_path = f"https://{self.bucket_name}.{self.aws_region}.amazonaws.com/{self.fix_path_slash(self.bucket_file_prefix)}{curr_file_name}"
bboxes = []
# Create an asset dictionary for this time slice
with rasterio.open(path) as src_dst:
# Get BBOX and Footprint
dataset_geom = get_dataset_geom(src_dst, densify_pts=0, precision=-1)
bboxes.append(dataset_geom["bbox"])
proj_info = {
f"proj:{name}": value
for name, value in get_projection_info(src_dst).items()
}
raster_info = {
"raster:bands": get_raster_info(src_dst, max_size=1024)
}
band_dict = get_eobands_info(src_dst)[0]
if type(band_dict) == dict:
del(band_dict["name"])
band_dict["name"] = band_dict["description"]
del(band_dict["description"])
else:
pass #band_dict = {}
eo_bands_list.append(band_dict) #TODO: add to dict, rename description with name and remove name
cloudcover = src_dst.get_tag_item("CLOUDCOVER", "IMAGERY")
# TODO: try to add this field to the COG. Currently not present in the files we write here.
if cloudcover is not None:
self.properties.update({"eo:cloud_cover": int(cloudcover)})
pystac_assets.append(
(
band,
pystac.Asset(
media_type=self.media_type,
extra_fields={
**proj_info,
**raster_info,
**eo_info
},
roles=None,
),
)
)
minx, miny, maxx, maxy = zip(*bboxes)
bbox = [min(minx), min(miny), max(maxx), max(maxy)]
# metadata_item_path = f"{self.fix_path_slash(time_slice_dir)}metadata.json"
# item
item = pystac.Item(
id=time_str,
geometry=bbox_to_geom(bbox),
bbox=bbox,
stac_extensions=self.extensions,
datetime=str_to_datetime(str(t)),
properties=self.properties,
# href=metadata_item_path # no more needed after removing JSON for every item approach
# Calculate the item's spatial extent and add it to the list
item_bbox = item.bbox
spatial_extents.append(item_bbox)
# Calculate the item's temporal extent and add it to the list
item_datetime = item.datetime
temporal_extents.append([item_datetime, item_datetime])
for key, asset in pystac_assets:
item.add_asset(key=key, asset=asset)
item_dict = item.to_dict() #FIXME: declared and assigned now for root issue in item link (see below)
#if we add a collection we MUST add a link
if self.collection_id and self.collection_url:
item.add_link(
pystac.Link(
pystac.RelType.COLLECTION,
f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
media_type=pystac.MediaType.JSON,
)
)
item.add_link(
pystac.Link(
pystac.RelType.PARENT,
f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
media_type=pystac.MediaType.JSON,
)
)
item.add_link(
pystac.Link(
pystac.RelType.SELF,
f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/{time_str}",
media_type=pystac.MediaType.JSON,
)
)
item_dict = item.to_dict()
#FIXME: persistent pystac bug or logical error (urllib error when adding root link to current item)
# now this link is added manually by editing the dict
item_dict["links"].append({"rel": "root", "href": self.get_root_url(f"{self.fix_path_slash(self.collection_url)}{self.collection_id}"), "type": "application/json"})
"""item.add_link(
pystac.Link(
pystac.RelType.ROOT,
#f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
self.get_root_url(f"{self.fix_path_slash(self.collection_url)}{self.collection_id}"),
media_type=pystac.MediaType.JSON,
)
# Append the item to the list instead of adding it to the collection
#item_dict = item.to_dict()
item_dict["collection"] = self.collection_id #self.title
if self.output_format == "json_full":
item_list.append(copy.deepcopy(item_dict)) # If we don't get a deep copy, the properties datetime gets overwritten in the next iteration of the loop, don't know why.
else:
item_oneline = json.dumps(item_dict, separators=(",", ":"), ensure_ascii=False)
output_path = Path(self.output_folder)
with open(f"{output_path}/items.csv", 'a+') as out_csv:
out_csv.write(f"{item_oneline}\n")
if self.write_json_items:
jsons_path = f"{output_path}/items-json/"
if not os.path.exists(jsons_path):
os.mkdir(jsons_path)
with open(f"{self.fix_path_slash(jsons_path)}{self.collection_id}-{time_str}.json", 'w+') as out_json:
out_json.write(json.dumps(item_dict, indent=4))
#break
# Calculate overall spatial extent
minx, miny, maxx, maxy = zip(*spatial_extents)
overall_bbox = [min(minx), min(miny), max(maxx), max(maxy)]
# Calculate overall temporal extent
min_datetime = min(temporal_extents, key=lambda x: x[0])[0]
max_datetime = max(temporal_extents, key=lambda x: x[1])[1]
overall_temporal_extent = [min_datetime, max_datetime]
s_ext = pystac.SpatialExtent([overall_bbox])
t_ext = pystac.TemporalExtent([overall_temporal_extent])
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
extra_fields = {}
extra_fields["stac_version"] = self.stac_version
if self.keywords is not None:
extra_fields["keywords"] = self.keywords
if self.providers is not None:
extra_fields["providers"] = self.providers
if self.version is not None:
extra_fields["version"] = self.version
if self.title is not None:
extra_fields["title"] = self.title
if self.sci_citation is not None:
extra_fields["sci:citation"] = self.sci_citation
extra_fields["summaries"] = eo_info
extra_fields["stac_extensions"] = self.extensions
cube_dimensons = {
self.X_DIM: {
"axis": "x",
"type": "spatial",
"extent": [float(self.data.coords[self.X_DIM].min()), float(self.data.coords[self.X_DIM].max())],
"reference_system": int((self.data.rio.crs.to_string()).split(':')[1])
},
self.Y_DIM: {
"axis": "y",
"type": "spatial",
"extent": [float(self.data.coords[self.Y_DIM].min()), float(self.data.coords[self.Y_DIM].max())],
"reference_system": int((self.data.rio.crs.to_string()).split(':')[1])
},
self.T_DIM: {
#"step": "P1D",
"type": "temporal",
"extent": [str(self.data[self.T_DIM].min().values), str(self.data[self.T_DIM].max().values)],
},
self.B_DIM: {
"type": "bands",
"values": list(self.data[self.B_DIM].values),
}
}
extra_fields["cube:dimensions"] = cube_dimensons
self.stac_collection = pystac.collection.Collection(
id=self.collection_id,
extent=pystac.Extent(spatial=s_ext, temporal=t_ext),
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
#if we add a collection we MUST add a link
if self.collection_id and self.collection_url:
self.stac_collection.add_link(
pystac.Link(
pystac.RelType.ITEMS,
f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items",
media_type=pystac.MediaType.JSON,
)
)
self.stac_collection.add_link(
pystac.Link(
pystac.RelType.PARENT,
self.get_root_url(f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items"),
media_type=pystac.MediaType.JSON,
)
)
self.stac_collection.add_link(
pystac.Link(
pystac.RelType.SELF,
f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
media_type=pystac.MediaType.JSON,
)
)
#self.stac_collection.remove_links(rel=pystac.RelType.ROOT)
self.stac_collection.add_link(
pystac.Link(
pystac.RelType.ROOT,
self.get_root_url(f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items"),
media_type=pystac.MediaType.JSON,
)
)
"""
self.stac_collection.add_links(
[
pystac.Link(
pystac.RelType.ROOT,
self.get_root_url(f"{self.fix_path_slash(collection_url)}{self.collection_id}/items"),
media_type=pystac.MediaType.JSON,
),
pystac.Link(
pystac.RelType.SELF,
f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
media_type=pystac.MediaType.JSON,
),
pystac.Link(
pystac.RelType.PARENT,
self.get_root_url(f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items"),
media_type=pystac.MediaType.JSON,
),
pystac.Link(
pystac.RelType.ITEMS,
f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items",
media_type=pystac.MediaType.JSON,
)
]
)
"""
if self.license is not None:
self.stac_collection.license = self.license
# Create a single JSON file with all the items
stac_collection_dict = self.stac_collection.to_dict()
# in order to solve the double "root" link bug/issue
links_dict = stac_collection_dict["links"]
ctr_roots = 0
self_exists = False
self_idx = 0
for idx, link in enumerate(links_dict):
if link["rel"] == "root":
ctr_roots = ctr_roots + 1
if link["rel"] == "self":
self_exists = True
self_idx = idx
if ctr_roots == 2 and self_exists:
for idx, link in enumerate(links_dict):
if link["rel"] == "root" and link["href"] == links_dict[self_idx]["href"] and link["type"] == links_dict[self_idx]["type"]:
del links_dict[idx]
break
if self.output_format == "json_full":
stac_collection_dict["features"] = item_list # Replace the "features" field with the list of items
json_str = json.dumps(stac_collection_dict, indent=4)
#printing metadata.json test output file
output_path = Path(self.output_folder) / Path(self.output_file)
with open(output_path, "w+") as metadata:
if self.s3_upload:
#Uploading metadata JSON file to s3
_log.debug(f"Uploading metatada JSON \"{output_path}\" to {self.fix_path_slash(self.bucket_file_prefix)}{os.path.basename(output_path)}")
self.upload_s3(output_path)