Skip to content
Snippets Groups Projects
raster2stac.py 40.2 KiB
Newer Older
"""
Raster2STAC - Extract STAC format metadata from raster data 

This module provides a class `Raster2STAC` for extracting from raster data, represented as an `xr.DataArray`
or a file path which links to a local .nc file (that will be converted in `xr.DataArray`), 
SpatioTemporal Asset Catalog (STAC) format metadata JSON files.
This allows the output data to be ingested into Eurac's STAC FastApi

Claus Michele's avatar
Claus Michele committed
Authors: Mercurio Lorenzo, Eurac Research - Inst. for Earth Observation, Bolzano/Bozen IT
Authors: Michele Claus, Eurac Research - Inst. for Earth Observation, Bolzano/Bozen IT
Date: 2024-03-06
Claus Michele's avatar
Claus Michele committed
import sys
import datetime 
import os 
import pystac
from pystac.utils import str_to_datetime
import rasterio
Claus Michele's avatar
Claus Michele committed
from pathlib import Path
import copy
# Import extension version
from rio_stac.stac import (
    PROJECTION_EXT_VERSION,
    RASTER_EXT_VERSION,
    EO_EXT_VERSION
)
# Import rio_stac methods
from rio_stac.stac import (
    get_dataset_geom,
    get_projection_info,
    get_raster_info,
    get_eobands_info,
    bbox_to_geom,
)
import pandas as pd
Claus Michele's avatar
Claus Michele committed
import numpy as np
import json
Claus Michele's avatar
Claus Michele committed
import ujson
import xarray as xr
from typing import Callable, Optional, Union
Claus Michele's avatar
Claus Michele committed
import logging
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
import boto3
import botocore
Claus Michele's avatar
Claus Michele committed
import dask
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
import openeo_processes_dask
from urllib.parse import urlparse, urlunparse
Claus Michele's avatar
Claus Michele committed
from fsspec.implementations.local import LocalFileSystem
import openeo_processes_dask.process_implementations.cubes._xr_interop
from openeo.local import LocalConnection
Claus Michele's avatar
Claus Michele committed
from .rioxarray_stac import rioxarray_get_dataset_geom, rioxarray_get_projection_info, rioxarray_get_raster_info
Claus Michele's avatar
Claus Michele committed
_log = logging.getLogger(__name__)
class Raster2STAC():
    """
    Raster2STAC Class - Converte dati raster nel formato STAC.

    Args:
        data: str or xr.DataArray
Claus Michele's avatar
Claus Michele committed
            Raster data as xr.DataArray or file path referring to a netCDF file.
        collection_id: str
            Identifier of the collection as a string (Example: 'blue-river-basin')
        collection_url: str
            Collection URL (must refer to the FastAPI URL where this collection will be uploaded).
        item_prefix: Optional[str] = ""
            Prefix to add before the datetime as item_id, if the same datetime can occur in multiple items.
        output_folder: Optional[str] = None
            Local folder for rasters and STAC metadata outputs. Default folder will be set as run timestamp folder 
            (ex: ./20231215103000/)
        description: Optional[str] = ""
            Description of the STAC collection.
        title: Optional[str] = None,
            Title of the STAC collection.
        ignore_warns: Optional[bool] = False,
            If True, warnings during processing (such as xr lib warnings) will be ignored.
        keywords: Optional[list] = None,
            Keywords associated with the STAC item.
        providers: Optional[list] = None,
            Data providers associated with the STAC item.
        stac_version: str = "1.0.0",
            Version of the STAC specification to use.
        s3_upload: bool = True,
Claus Michele's avatar
Claus Michele committed
            If True, upload files to Amazon S3 Bucket.
Claus Michele's avatar
Claus Michele committed
            1. For the "COG" output format: upload to S3 the COG files
            2. For the "KERCHUNK" output format: upload the netCDFs and the json Kerchunk files to S3.
Claus Michele's avatar
Claus Michele committed
        bucket_name: str = None,
            Part of AWS S3 configuration: bucket name.
Claus Michele's avatar
Claus Michele committed
        bucket_file_prefix: str = None,
             Part of AWS S3 configuration: prefix for files in the S3 bucket.
        aws_region: str = None,
             Part of AWS S3 configuration: AWS region for S3.
        version: Optional[str] = None,
            Version of the STAC collection.
        license: Optional[str] = None,
            License information about STAC collection and its assets.
        sci_citation: Optional[str] = None
            Scientific citation(s) reference(s) about STAC collection.
    """



Claus Michele's avatar
Claus Michele committed
                 collection_url = None,
                 item_prefix: Optional[str] = "",
                 output_folder: Optional[str] = None,
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                 description: Optional[str] = "",
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                 title: Optional[str] = None,
                 ignore_warns: Optional[bool] = False,
                 keywords: Optional[list] = None,
                 providers: Optional[list] = None,
                 links: Optional[list] = None,
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                 stac_version="1.0.0",
Claus Michele's avatar
Claus Michele committed
                 s3_upload=False,
                 bucket_name = None,
                 bucket_file_prefix = None,
                 aws_region = None,
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                 version = None,
                 license = None,
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                 sci_citation=None
        if ignore_warns == True:
            import warnings
            warnings.filterwarnings("ignore")

Claus Michele's avatar
Claus Michele committed
        self.data = data
        self.X_DIM = None
        self.Y_DIM = None
        self.T_DIM = None
        self.B_DIM = None
        self.output_format = None
        self.media_type = None
        self.properties = {} # additional properties to add in the item
        self.collection_id = collection_id # name of collection the item belongs to
        self.collection_url = collection_url
        self.item_prefix = item_prefix
Claus Michele's avatar
Claus Michele committed
        self.description = description
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
        self.keywords = keywords
        self.providers = providers
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
        self.stac_version = stac_version
        self.sci_doi = sci_doi
        self.extensions = [
            f"https://stac-extensions.github.io/projection/{PROJECTION_EXT_VERSION}/schema.json", 
            f"https://stac-extensions.github.io/raster/{RASTER_EXT_VERSION}/schema.json",
            f"https://stac-extensions.github.io/eo/{EO_EXT_VERSION}/schema.json",
        ]
        if output_folder is not None:
            self.output_folder = output_folder
        else:
Claus Michele's avatar
Claus Michele committed
            self.output_folder = datetime.datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3]   

Claus Michele's avatar
Claus Michele committed
        self.output_file = f"{self.collection_id}.json"
Claus Michele's avatar
Claus Michele committed
        Path(self.output_folder).mkdir(parents=True, exist_ok=True)
        self.stac_collection = None
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
        self.bucket_name = bucket_name
        self.bucket_file_prefix = bucket_file_prefix
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
        self.aws_region = aws_region
        self.s3_upload = s3_upload
        self.s3_client = None
        self.version = version 
        self.title = title

        if self.s3_upload:
            # Initializing an S3 client
Claus Michele's avatar
Claus Michele committed
            self.s3_client = boto3.client('s3', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key) #region_name=aws_region,
        # available_output_formats = ["csv","json_full"]
        # if output_format not in available_output_formats:
            # raise ValueError(f"output_format can be set to one of {available_output_formats}")
        # self.output_format = output_format
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
        self.license = license
        self.sci_citation = sci_citation

        #TODO: implement following attributes: self.overwrite, 

Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
    def fix_path_slash(self, res_loc):
        return res_loc if res_loc.endswith('/') else res_loc + '/'
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
    # TODO/FIXME: maybe better to put this method as an external static function? (and s3_client attribute as global variable) 
    def upload_s3(self, file_path):
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
        if self.s3_client is not None:
            prefix = self.bucket_file_prefix
            file_name = os.path.basename(file_path)
            object_name = f"{self.fix_path_slash(prefix)}{file_name}"
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            try:
                self.s3_client.upload_file(file_path, self.bucket_name, object_name)
Claus Michele's avatar
Claus Michele committed
                _log.debug(f'Successfully uploaded {file_name} to {self.bucket_name} as {object_name}')
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            except botocore.exceptions.NoCredentialsError:
Claus Michele's avatar
Claus Michele committed
                _log.debug('AWS credentials not found. Make sure you set the correct access key and secret key.')
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            except botocore.exceptions.ClientError as e:
Claus Michele's avatar
Claus Michele committed
                _log.debug(f'Error uploading file: {e.response["Error"]["Message"]}')
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed

Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
    def get_root_url(self, url):
        parsed_url = urlparse(url)
        # Extract protocol + domain
        root_url = urlunparse((parsed_url.scheme, parsed_url.netloc, '', '', '', ''))
        return root_url

Claus Michele's avatar
Claus Michele committed
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667
    def generate_kerchunk_stac(self):
        from kerchunk.hdf import SingleHdf5ToZarr
        def gen_json(u, so, json_dir):
            fs = LocalFileSystem(skip_instant_cache=True)
            with fs.open(u, **so) as inf:
                h5chunks = SingleHdf5ToZarr(inf, u, inline_threshold=300)
                with open(f"{str(json_dir)}/{u.split('/')[-1]}.json", 'wb') as outf:
                    outf.write(ujson.dumps(h5chunks.translate()).encode())
            return f"{str(json_dir)}/{u.split('/')[-1]}.json"

        # Create the output folder for the Kerchunk files
        kerchunk_folder = os.path.join(self.output_folder,"kerchunk")
        Path(kerchunk_folder).mkdir(parents=True, exist_ok=True)
        
        kerchunk_files_list = []
        # Read the list of netCDFs
        for same_time_netcdfs in self.data:
            t_labels = []
            for var in same_time_netcdfs:
                source_nc = var 
                source_path = os.path.dirname(var)
                local_conn = LocalConnection(source_path)
                tmp = local_conn.load_collection(source_nc).execute()
                t_labels.append(tuple(tmp[tmp.openeo.temporal_dims[0]].values))
            t_steps = [len(x) for x in t_labels]
            if len(set(t_steps)) != 1:
                raise ValueError(f"The provided netCDFs contain a different number of dates! {same_time_netcdfs}")
            if len(set(t_labels)) != 1:
                raise ValueError(f"The provided netCDFs contain a different set of dates!")

            so = dict(mode='rb', anon=True, default_fill_cache=False)
            same_time_kerchunks = dask.compute(*[dask.delayed(gen_json)(var, so, kerchunk_folder) for var in same_time_netcdfs])
            kerchunk_files_list.append(same_time_kerchunks)
            
        
        # List of List json Kerchunk.
        # First list: each element (list) different year/time
        # Second list: each element different variables
        datasets_list = []
        for same_time_data in kerchunk_files_list:
            for d in same_time_data:
                if d.endswith(".json"):
                    self.data = xr.open_dataset(
                        "reference://",
                        engine="zarr",
                        decode_coords="all",
                        backend_kwargs={
                            "storage_options": {
                                "fo":d,
                            },
                            "consolidated": False
                        },chunks={}
                    ).to_dataarray(dim="bands")
                    IS_KERCHUNK = True
                    datasets_list.append(self.data)
                    # Need to create one Item per time/netCDF
        self.data = xr.combine_by_coords(datasets_list,combine_attrs="drop_conflicts")
        # raise ValueError("'data' paramter must be either xr.DataArray, a str (path to a netCDF) or a list of lists with paths to JSON Kerchunk files.") 

        self.X_DIM = self.data.openeo.x_dim
        self.Y_DIM = self.data.openeo.y_dim
        self.T_DIM = self.data.openeo.temporal_dims[0]
        self.B_DIM = self.data.openeo.band_dims[0]
        _log.debug(f'Extracted label dimensions from input are:\nx dimension:{self.X_DIM}\ny dimension:{self.Y_DIM}\nbands dimension:{self.B_DIM}\ntemporal dimension:{self.T_DIM}')
                
        self.output_format = "KERCHUNK"
        self.media_type = pystac.MediaType.JSON
        
        spatial_extents = []
        temporal_extents = []

        item_list = []  # Create a list to store the items

        # Get the time dimension values
        time_values = self.data[self.T_DIM].values

        eo_info = {}
        
        #resetting CSV file
        open(f"{Path(self.output_folder)}/inline_items.csv", 'w+') 
        
        _log.debug("Cycling all timestamps")

        # Loop over the kerchunk files
        for same_time_data in kerchunk_files_list:
            _log.debug(f"\nts: {same_time_data}")
                
            time_ranges = []
            bands_data = {}
            for d in same_time_data:
                if d.endswith(".json"):
                    band_data = xr.open_dataset(
                            "reference://",
                            engine="zarr",
                            decode_coords="all",
                            backend_kwargs={
                            "storage_options": {
                                "fo":d,
                            },
                            "consolidated": False
                        },chunks={}
                    ).to_dataarray(dim="bands")
                    bands_data[d] = band_data
                    time_ranges.append(band_data[self.T_DIM].values)
            # for i,t_r in enumerate(time_ranges):
            #     if i==0:
            #         first_range = t_r
            #         _are_all_time_steps_equal = True
            #     else:
            #         _are_all_time_steps_equal = np.array_equal(first_range,t_r) and _are_all_time_steps_equal
                    
                    
#             _log.debug(f"Are the time steps provided in the kerchunks aligned {_are_all_time_steps_equal}")
            
#             if not _are_all_time_steps_equal:
#                 raise Exception(f"The time steps provided in the kerchunk files {same_time_data} are not the same, can't continue.")
            
            # Now we can create one STAC Item for this time range, with one asset each band/variable

            start_datetime = np.min(time_ranges[0])
            end_datetime = np.max(time_ranges[0])
            
            # Convert the time value to a datetime object
            # Format the timestamp as a string to use in the file name
            start_datetime_str = pd.Timestamp(start_datetime).strftime('%Y%m%d%H%M%S')
            end_datetime_str = pd.Timestamp(end_datetime).strftime('%Y%m%d%H%M%S')

            _log.debug(f"Extracted temporal extrema for this time range: {start_datetime_str} {end_datetime_str}")
          
            item_id = f"{f'{self.item_prefix}_' if self.item_prefix != '' else ''}{start_datetime_str}_{end_datetime_str}"

            # Create a unique directory for each time slice
            time_slice_dir = os.path.join(self.output_folder, f"{start_datetime_str}_{end_datetime_str}")

            Path(time_slice_dir).mkdir(parents=True, exist_ok=True)

            # Get the band name (you may need to adjust this part based on your data)
            # bands = self.data[self.B_DIM].values
            
            pystac_assets = []

            # Cycling all bands/variables
            _log.debug("Cycling all bands")

            eo_bands_list = []
            for b_d in bands_data:
                band = bands_data[b_d][self.B_DIM].values[0]
                kerchunk_file = b_d
                _log.debug(f"b: {band}")

                link_path = kerchunk_file 

#                 if self.s3_upload:                                     
#                     #Uploading file to s3                   
#                     _log.debug(f"Uploading {path} to {self.fix_path_slash(self.bucket_file_prefix)}{os.path.basename(path)}")
#                     self.upload_s3(path)
                    
#                     link_path = f"https://{self.bucket_name}.{self.aws_region}.amazonaws.com/{self.fix_path_slash(self.bucket_file_prefix)}{curr_file_name}"

                bboxes = []

                # Create an asset dictionary for this time slice
                # Get BBOX and Footprint
                _log.debug(bands_data[b_d].rio.crs)
                _log.debug(bands_data[b_d].rio.bounds())

                dataset_geom = rioxarray_get_dataset_geom(bands_data[b_d], densify_pts=0, precision=-1)
                bboxes.append(dataset_geom["bbox"])
                
                proj_info = {
                    f"proj:{name}": value
                    for name, value in rioxarray_get_projection_info(bands_data[b_d]).items()
                }

                raster_info = {
                    "raster:bands": rioxarray_get_raster_info(bands_data[b_d], max_size=1024)
                }

                band_dict = {
                    "name": band
                    }

                eo_bands_list.append(band_dict)

                eo_info["eo:bands"] = [band_dict]

                pystac_assets.append(
                    (
                        band, 
                        pystac.Asset(
                            href=link_path, 
                            media_type=self.media_type,
                            extra_fields={
                                **proj_info,
                                **raster_info, 
                                **eo_info
                            },
                            roles=["data","index"],
                        ),
                    )
                )
                
            
            eo_info["eo:bands"] = eo_bands_list

            minx, miny, maxx, maxy = zip(*bboxes)
            bbox = [min(minx), min(miny), max(maxx), max(maxy)]

            # item
            item = pystac.Item(
                id=item_id,
                geometry=bbox_to_geom(bbox),
                bbox=bbox,
                collection=None, #self.collection_id, #FIXME: da errore se lo si decommenta
                stac_extensions=self.extensions,
                datetime=None,
                start_datetime=pd.Timestamp(start_datetime),
                end_datetime=pd.Timestamp(end_datetime),
                properties=self.properties,
            )

            # Calculate the item's spatial extent and add it to the list
            spatial_extents.append(item.bbox)

            # Calculate the item's temporal extent and add it to the list
            # item_datetime = item.start_datetime
            temporal_extents.append([pd.Timestamp(start_datetime), pd.Timestamp(end_datetime)])

            for key, asset in pystac_assets:
                item.add_asset(key=key, asset=asset)
            
            item.validate()
            
            item.add_link(
                pystac.Link(
                    pystac.RelType.COLLECTION,
                    f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
                    media_type=pystac.MediaType.JSON,
                )
            )
            
            item.add_link(
                pystac.Link(
                    pystac.RelType.PARENT,
                    f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
                    media_type=pystac.MediaType.JSON,
                )
            )
            
            item.add_link(
                pystac.Link(
                    pystac.RelType.SELF,
                    f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/{item_id}",
                    media_type=pystac.MediaType.JSON,
                )
            )

            item_dict = item.to_dict()

            #FIXME: persistent pystac bug or logical error (urllib error when adding root link to current item)
            # now this link is added manually by editing the dict
            item_dict["links"].append({"rel": "root", "href": self.get_root_url(f"{self.fix_path_slash(self.collection_url)}{self.collection_id}"), "type": "application/json"})

            
            # self.stac_collection.add_item(item)
            # Append the item to the list instead of adding it to the collection
            #item_dict = item.to_dict()
            item_dict["collection"] = self.collection_id

            # if self.output_format == "json_full":
            # item_list.append(copy.deepcopy(item_dict)) # If we don't get a deep copy, the properties datetime gets overwritten in the next iteration of the loop, don't know why.
            # elif self.output_format == "csv":
            item_oneline = json.dumps(item_dict, separators=(",", ":"), ensure_ascii=False)

            output_path = Path(self.output_folder)
            with open(f"{output_path}/inline_items.csv", 'a+') as out_csv:
                out_csv.write(f"{item_oneline}\n")


            jsons_path = f"{output_path}/items/"
            Path(jsons_path).mkdir(parents=True, exist_ok=True)

            with open(f"{self.fix_path_slash(jsons_path)}{item_id}.json", 'w+') as out_json:
                out_json.write(json.dumps(item_dict, indent=4))            
        
        # Calculate overall spatial extent
        minx, miny, maxx, maxy = zip(*spatial_extents)
        overall_bbox = [min(minx), min(miny), max(maxx), max(maxy)]

        # Calculate overall temporal extent
        min_datetime = min(temporal_extents, key=lambda x: x[0])[0]
        max_datetime = max(temporal_extents, key=lambda x: x[1])[1]
        overall_temporal_extent = [min_datetime, max_datetime]

        s_ext = pystac.SpatialExtent([overall_bbox])
        t_ext = pystac.TemporalExtent([overall_temporal_extent])

        extra_fields = {}

        extra_fields["stac_version"] = self.stac_version

        if self.keywords is not None:
            extra_fields["keywords"] = self.keywords

        if self.providers is not None:
            extra_fields["providers"] = self.providers

        if self.version is not None:
            extra_fields["version"] = self.version

        if self.title is not None:
            extra_fields["title"] = self.title

        if self.sci_citation is not None:
            extra_fields["sci:citation"] = self.sci_citation
        
        if self.sci_doi is not None:
            extra_fields["sci:doi"] = self.sci_doi
        
        if self.sci_citation is not None or self.sci_doi is not None:
            self.extensions.append("https://stac-extensions.github.io/scientific/v1.0.0/schema.json")

        extra_fields["summaries"] = eo_info

        extra_fields["stac_extensions"] = self.extensions

        cube_dimensons = {
            self.X_DIM: {
                "axis": "x",
                "type": "spatial",
                "extent": [float(self.data.coords[self.X_DIM].min()), float(self.data.coords[self.X_DIM].max())],
                "reference_system": int((self.data.rio.crs.to_string()).split(':')[1])
            },
            self.Y_DIM: {
                "axis": "y",
                "type": "spatial",
                "extent": [float(self.data.coords[self.Y_DIM].min()), float(self.data.coords[self.Y_DIM].max())],
                "reference_system": int((self.data.rio.crs.to_string()).split(':')[1])
            },

            self.T_DIM: {
                "type": "temporal",
                "extent": [str(self.data[self.T_DIM].min().values), str(self.data[self.T_DIM].max().values)],
            },

            self.B_DIM: {
                "type": "bands",
                "values": list(self.data[self.B_DIM].values),
            }
        }

        extra_fields["cube:dimensions"] = cube_dimensons

        self.stac_collection = pystac.collection.Collection(
            id=self.collection_id,
            description=self.description,
            extent=pystac.Extent(spatial=s_ext, temporal=t_ext),
            extra_fields=extra_fields,
        )

        
        self.stac_collection.add_link(
            pystac.Link(
                pystac.RelType.ITEMS,
                f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items",
                media_type=pystac.MediaType.JSON,
            )
        )
        
        self.stac_collection.add_link(
            pystac.Link(
                pystac.RelType.PARENT,
                self.get_root_url(f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items"),
                media_type=pystac.MediaType.JSON,
            )
        )

        self.stac_collection.add_link(
                pystac.Link(
                pystac.RelType.SELF,
                f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
                media_type=pystac.MediaType.JSON,
            )
        )

        #self.stac_collection.remove_links(rel=pystac.RelType.ROOT)

        self.stac_collection.add_link(
            pystac.Link(
                pystac.RelType.ROOT,
                self.get_root_url(f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items"),
                media_type=pystac.MediaType.JSON,
            )
        )

            
        if self.license is not None:
            self.stac_collection.license = self.license

        # Create a single JSON file with all the items
        stac_collection_dict = self.stac_collection.to_dict()

        # in order to solve the double "root" link bug/issue       
        links_dict = stac_collection_dict["links"]

        ctr_roots = 0
        self_exists = False
        self_idx = 0

        for idx, link in enumerate(links_dict):
            if link["rel"] == "root":
                ctr_roots = ctr_roots + 1
            if link["rel"] == "self":
                self_exists = True
                self_idx = idx

        if ctr_roots == 2 and self_exists:
            for idx, link in enumerate(links_dict):
                if link["rel"] == "root" and link["href"] == links_dict[self_idx]["href"] and link["type"] == links_dict[self_idx]["type"]:
                    del links_dict[idx]
                    break
        
        if self.links is not None:
            stac_collection_dict["links"] = stac_collection_dict["links"] + self.links

        # if self.output_format == "json_full":       
            # stac_collection_dict["features"] = item_list  # Replace the "features" field with the list of items

        json_str = json.dumps(stac_collection_dict, indent=4)

        #printing metadata.json test output file
        output_path = Path(self.output_folder) / Path(self.output_file)
        with open(output_path, "w+") as metadata:
            metadata.write(json_str)

        if self.s3_upload:
            #Uploading metadata JSON file to s3
            _log.debug(f"Uploading metatada JSON \"{output_path}\" to {self.fix_path_slash(self.bucket_file_prefix)}{os.path.basename(output_path)}")
            self.upload_s3(output_path)

    def generate_cog_stac(self):
        if isinstance(self.data, xr.DataArray) or isinstance(self.data, str):
            if(isinstance(self.data, xr.DataArray)):
                pass
            elif(isinstance(self.data, str)):
                source_path = os.path.dirname(self.data)
                local_conn = LocalConnection(source_path)
                self.data = local_conn.load_collection(self.data).execute()

        self.output_format = "COG"
        self.media_type = pystac.MediaType.COG  # we could also use rio_stac.stac.get_media_type)
        self.X_DIM = self.data.openeo.x_dim
        self.Y_DIM = self.data.openeo.y_dim
        self.T_DIM = self.data.openeo.temporal_dims[0]
        self.B_DIM = self.data.openeo.band_dims[0]
        _log.debug(f'Extracted label dimensions from input are:\nx dimension:{self.X_DIM}\ny dimension:{self.Y_DIM}\nbands dimension:{self.B_DIM}\ntemporal dimension:{self.T_DIM}')

        spatial_extents = []
        temporal_extents = []

        item_list = []  # Create a list to store the items

        # Get the time dimension values
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
        time_values = self.data[self.T_DIM].values
        eo_info = {}
        
        #resetting CSV file
Claus Michele's avatar
Claus Michele committed
        open(f"{Path(self.output_folder)}/inline_items.csv", 'w+') 
Claus Michele's avatar
Claus Michele committed
        _log.debug("Cycling all timestamps")
        #Cycling all timestamps
        for t in time_values:
Claus Michele's avatar
Claus Michele committed
            _log.debug(f"\nts: {t}")
            # Convert the time value to a datetime object
            timestamp = pd.Timestamp(t)

            # Format the timestamp as a string to use in the file name
            time_str = timestamp.strftime('%Y%m%d%H%M%S')

            item_id = f"{f'{self.item_prefix}_' if self.item_prefix != '' else ''}{time_str}"

            # Create a unique directory for each time slice
            time_slice_dir = os.path.join(self.output_folder, time_str)

            if not os.path.exists(time_slice_dir):
                os.makedirs(time_slice_dir)

            # Get the band name (you may need to adjust this part based on your data)
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            bands = self.data[self.B_DIM].values
            
            pystac_assets = []

            # Cycling all bands
Claus Michele's avatar
Claus Michele committed
            _log.debug("Cycling all bands")
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            eo_bands_list = []

            for band in bands:
Claus Michele's avatar
Claus Michele committed
                _log.debug(f"b: {band}")
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                curr_file_name = f"{band}_{time_str}.tif"
                # Define the GeoTIFF file path for this time slice and band
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                path = os.path.join(time_slice_dir, curr_file_name)

                # Write the result to the GeoTIFF file
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                self.data.loc[{self.T_DIM:t,self.B_DIM:band}].to_dataset(name=band).rio.to_raster(raster_path=path, driver='COG')
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed

Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                link_path = path 

                if self.s3_upload:                                     
                    #Uploading file to s3                   
                    _log.debug(f"Uploading {path} to {self.fix_path_slash(self.bucket_file_prefix)}{os.path.basename(path)}")
                    self.upload_s3(path)
                    
                    link_path = f"https://{self.bucket_name}.{self.aws_region}.amazonaws.com/{self.fix_path_slash(self.bucket_file_prefix)}{curr_file_name}"
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed

                bboxes = []

                # Create an asset dictionary for this time slice
                with rasterio.open(path) as src_dst:
                    # Get BBOX and Footprint
                    dataset_geom = get_dataset_geom(src_dst, densify_pts=0, precision=-1)
                    bboxes.append(dataset_geom["bbox"])

                    proj_info = {
                        f"proj:{name}": value
                        for name, value in get_projection_info(src_dst).items()
                    }

                    raster_info = {
                        "raster:bands": get_raster_info(src_dst, max_size=1024)
                    }

Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                    band_dict = get_eobands_info(src_dst)[0]

                    if type(band_dict) == dict:
                        del(band_dict["name"])
                        band_dict["name"] = band_dict["description"]
                        del(band_dict["description"])
                    else:
                        pass #band_dict = {}

                    eo_bands_list.append(band_dict) #TODO: add to dict, rename description with name and remove name 
                    
                    cloudcover = src_dst.get_tag_item("CLOUDCOVER", "IMAGERY")
                    # TODO: try to add this field to the COG. Currently not present in the files we write here.
                    if cloudcover is not None:
                        self.properties.update({"eo:cloud_cover": int(cloudcover)})

                    eo_info["eo:bands"] = [band_dict]

                    pystac_assets.append(
                        (
                            band, 
                            pystac.Asset(
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                                href=link_path, 
                                media_type=self.media_type,
                                extra_fields={
                                    **proj_info,
                                    **raster_info, 
                                    **eo_info
                                },
Claus Michele's avatar
Claus Michele committed
                                roles=["data"],
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            eo_info["eo:bands"] = eo_bands_list

            minx, miny, maxx, maxy = zip(*bboxes)
            bbox = [min(minx), min(miny), max(maxx), max(maxy)]

Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            # metadata_item_path = f"{self.fix_path_slash(time_slice_dir)}metadata.json"
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed

            # item
            item = pystac.Item(
                geometry=bbox_to_geom(bbox),
                bbox=bbox,
                collection=None, #self.collection_id, #FIXME: da errore se lo si decommenta
                stac_extensions=self.extensions,
                datetime=str_to_datetime(str(t)),
                properties=self.properties,
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                # href=metadata_item_path # no more needed after removing JSON for every item approach 
Claus Michele's avatar
Claus Michele committed

            # Calculate the item's spatial extent and add it to the list
            item_bbox = item.bbox
            spatial_extents.append(item_bbox)

            # Calculate the item's temporal extent and add it to the list
            item_datetime = item.datetime
            temporal_extents.append([item_datetime, item_datetime])

Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed

            for key, asset in pystac_assets:
                item.add_asset(key=key, asset=asset)
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            
            item.validate()
            
            item.add_link(
                pystac.Link(
                    pystac.RelType.COLLECTION,
                    f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
                    media_type=pystac.MediaType.JSON,
            )
            
            item.add_link(
                pystac.Link(
                    pystac.RelType.PARENT,
                    f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
                    media_type=pystac.MediaType.JSON,
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                )
            )
            
            item.add_link(
                pystac.Link(
                    pystac.RelType.SELF,
Claus Michele's avatar
Claus Michele committed
                    f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items/{item_id}",
                    media_type=pystac.MediaType.JSON,
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                )
            item_dict = item.to_dict()
            #FIXME: persistent pystac bug or logical error (urllib error when adding root link to current item)
            # now this link is added manually by editing the dict
            item_dict["links"].append({"rel": "root", "href": self.get_root_url(f"{self.fix_path_slash(self.collection_url)}{self.collection_id}"), "type": "application/json"})
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            # self.stac_collection.add_item(item)
            # Append the item to the list instead of adding it to the collection
            #item_dict = item.to_dict()
            item_dict["collection"] = self.collection_id
Claus Michele's avatar
Claus Michele committed
            # if self.output_format == "json_full":
                # item_list.append(copy.deepcopy(item_dict)) # If we don't get a deep copy, the properties datetime gets overwritten in the next iteration of the loop.
            # elif self.output_format == "csv":
            item_oneline = json.dumps(item_dict, separators=(",", ":"), ensure_ascii=False)
Claus Michele's avatar
Claus Michele committed
            output_path = Path(self.output_folder)
            with open(f"{output_path}/inline_items.csv", 'a+') as out_csv:
                out_csv.write(f"{item_oneline}\n")
Claus Michele's avatar
Claus Michele committed
            jsons_path = f"{output_path}/items/"
            Path(jsons_path).mkdir(parents=True, exist_ok=True)

            with open(f"{self.fix_path_slash(jsons_path)}{item_id}.json", 'w+') as out_json:
                out_json.write(json.dumps(item_dict, indent=4))
Claus Michele's avatar
Claus Michele committed
        
        # Calculate overall spatial extent
        minx, miny, maxx, maxy = zip(*spatial_extents)
        overall_bbox = [min(minx), min(miny), max(maxx), max(maxy)]

        # Calculate overall temporal extent
        min_datetime = min(temporal_extents, key=lambda x: x[0])[0]
        max_datetime = max(temporal_extents, key=lambda x: x[1])[1]
        overall_temporal_extent = [min_datetime, max_datetime]

        s_ext = pystac.SpatialExtent([overall_bbox])
        t_ext = pystac.TemporalExtent([overall_temporal_extent])

Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
        extra_fields = {}

        extra_fields["stac_version"] = self.stac_version

        if self.keywords is not None:
            extra_fields["keywords"] = self.keywords

        if self.providers is not None:
            extra_fields["providers"] = self.providers

        if self.version is not None:
            extra_fields["version"] = self.version

        if self.title is not None:
            extra_fields["title"] = self.title

        if self.sci_citation is not None:
            extra_fields["sci:citation"] = self.sci_citation
        
        if self.sci_doi is not None:
            extra_fields["sci:doi"] = self.sci_doi
        
        if self.sci_citation is not None or self.sci_doi is not None:
            self.extensions.append("https://stac-extensions.github.io/scientific/v1.0.0/schema.json")
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed

        extra_fields["summaries"] = eo_info

        extra_fields["stac_extensions"] = self.extensions

        cube_dimensons = {
            self.X_DIM: {
                "axis": "x",
                "type": "spatial",
                "extent": [float(self.data.coords[self.X_DIM].min()), float(self.data.coords[self.X_DIM].max())],
                "reference_system": int((self.data.rio.crs.to_string()).split(':')[1])
            },
            self.Y_DIM: {
                "axis": "y",
                "type": "spatial",
                "extent": [float(self.data.coords[self.Y_DIM].min()), float(self.data.coords[self.Y_DIM].max())],
                "reference_system": int((self.data.rio.crs.to_string()).split(':')[1])
            },

            self.T_DIM: {
                "type": "temporal",
                "extent": [str(self.data[self.T_DIM].min().values), str(self.data[self.T_DIM].max().values)],
            },

            self.B_DIM: {
                "type": "bands",
                "values": list(self.data[self.B_DIM].values),
            }
        }

        extra_fields["cube:dimensions"] = cube_dimensons

        self.stac_collection = pystac.collection.Collection(
            id=self.collection_id,
Claus Michele's avatar
Claus Michele committed
            description=self.description,
            extent=pystac.Extent(spatial=s_ext, temporal=t_ext),
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            extra_fields=extra_fields,
        
        self.stac_collection.add_link(
            pystac.Link(
                pystac.RelType.ITEMS,
                f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items",
                media_type=pystac.MediaType.JSON,
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            )
        )
        
        self.stac_collection.add_link(
            pystac.Link(
                pystac.RelType.PARENT,
                self.get_root_url(f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items"),
                media_type=pystac.MediaType.JSON,
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            )
        self.stac_collection.add_link(
                pystac.Link(
                pystac.RelType.SELF,
                f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
                media_type=pystac.MediaType.JSON,
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            )
        #self.stac_collection.remove_links(rel=pystac.RelType.ROOT)
        self.stac_collection.add_link(
            pystac.Link(
                pystac.RelType.ROOT,
                self.get_root_url(f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items"),
                media_type=pystac.MediaType.JSON,
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            )
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed

            
        if self.license is not None:
            self.stac_collection.license = self.license

        # Create a single JSON file with all the items
        stac_collection_dict = self.stac_collection.to_dict()
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed

        # in order to solve the double "root" link bug/issue       
        links_dict = stac_collection_dict["links"]

        ctr_roots = 0
        self_exists = False
        self_idx = 0

        for idx, link in enumerate(links_dict):
            if link["rel"] == "root":
                ctr_roots = ctr_roots + 1
            if link["rel"] == "self":
                self_exists = True
                self_idx = idx

        if ctr_roots == 2 and self_exists:
            for idx, link in enumerate(links_dict):
                if link["rel"] == "root" and link["href"] == links_dict[self_idx]["href"] and link["type"] == links_dict[self_idx]["type"]:
                    del links_dict[idx]
                    break
        
        if self.links is not None:
            stac_collection_dict["links"] = stac_collection_dict["links"] + self.links

Claus Michele's avatar
Claus Michele committed
        # if self.output_format == "json_full":       
            # stac_collection_dict["features"] = item_list  # Replace the "features" field with the list of items

        json_str = json.dumps(stac_collection_dict, indent=4)
        #printing metadata.json test output file
Claus Michele's avatar
Claus Michele committed
        output_path = Path(self.output_folder) / Path(self.output_file)
        with open(output_path, "w+") as metadata:
            metadata.write(json_str)
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed

Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
        if self.s3_upload:
            #Uploading metadata JSON file to s3
            _log.debug(f"Uploading metatada JSON \"{output_path}\" to {self.fix_path_slash(self.bucket_file_prefix)}{os.path.basename(output_path)}")
            self.upload_s3(output_path)