Skip to content
Snippets Groups Projects
raster2stac.py 41.5 KiB
Newer Older
Raster2STAC - Generate STAC metadata from raster data

This module provides a class `Raster2STAC` for extracting from raster data, represented as an `xr.DataArray`
or a file path to netCDF(s) file(s), SpatioTemporal Asset Catalog (STAC) format metadata JSON files.
Claus Michele's avatar
Claus Michele committed
Authors: Mercurio Lorenzo, Eurac Research - Inst. for Earth Observation, Bolzano/Bozen IT
Authors: Michele Claus, Eurac Research - Inst. for Earth Observation, Bolzano/Bozen IT
Date: 2024-03-24
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
import datetime
import json
Claus Michele's avatar
Claus Michele committed
import logging
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
import os
from pathlib import Path
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
from urllib.parse import urlparse, urlunparse

Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
import boto3
import botocore
Claus Michele's avatar
Claus Michele committed
import dask
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
import numpy as np
import pandas as pd
import pystac
import rasterio
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
import ujson
import xarray as xr
from fsspec.implementations.local import LocalFileSystem
Claus Michele's avatar
Claus Michele committed
from openeo.local import LocalConnection
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
from pystac.utils import str_to_datetime
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
# Import rio_stac methods
# Import extension version
from rio_stac.stac import (
    EO_EXT_VERSION,
    PROJECTION_EXT_VERSION,
    RASTER_EXT_VERSION,
    bbox_to_geom,
    get_dataset_geom,
    get_eobands_info,
    get_projection_info,
    get_raster_info,
)

from .rioxarray_stac import (
    rioxarray_get_dataset_geom,
    rioxarray_get_projection_info,
    rioxarray_get_raster_info,
)
Claus Michele's avatar
Claus Michele committed
_log = logging.getLogger(__name__)
DATACUBE_EXT_VERSION = "v1.0.0"

Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed

class Raster2STAC:
    Raster2STAC Class - Convert raster data format into STAC metadata.

    Args:
        data: str or xr.DataArray
Claus Michele's avatar
Claus Michele committed
            Raster data as xr.DataArray or file path referring to a netCDF file.
        collection_id: str
            Identifier of the collection as a string (Example: 'blue-river-basin')
        collection_url: str
            Collection URL (must refer to the FastAPI URL where this collection will be uploaded).
        item_prefix: Optional[str] = ""
            Prefix to add before the datetime as item_id, if the same datetime can occur in multiple items.
        output_folder: Optional[str] = None
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            Local folder for rasters and STAC metadata outputs. Default folder will be set as run timestamp folder
            (ex: ./20231215103000/)
        description: Optional[str] = ""
            Description of the STAC collection.
        title: Optional[str] = None,
            Title of the STAC collection.
        ignore_warns: Optional[bool] = False,
            If True, warnings during processing (such as xr lib warnings) will be ignored.
        keywords: Optional[list] = None,
            Keywords associated with the STAC item.
        providers: Optional[list] = None,
            Data providers associated with the STAC item.
        stac_version: str = "1.0.0",
            Version of the STAC specification to use.
        s3_upload: bool = True,
Claus Michele's avatar
Claus Michele committed
            If True, upload files to Amazon S3 Bucket.
Claus Michele's avatar
Claus Michele committed
            1. For the "COG" output format: upload to S3 the COG files
            2. For the "KERCHUNK" output format: upload the netCDFs and the json Kerchunk files to S3.
Claus Michele's avatar
Claus Michele committed
        bucket_name: str = None,
            Part of AWS S3 configuration: bucket name.
Claus Michele's avatar
Claus Michele committed
        bucket_file_prefix: str = None,
             Part of AWS S3 configuration: prefix for files in the S3 bucket.
        aws_region: str = None,
             Part of AWS S3 configuration: AWS region for S3.
        version: Optional[str] = None,
            Version of the STAC collection.
        license: Optional[str] = None,
            License information about STAC collection and its assets.
        sci_citation: Optional[str] = None
            Scientific citation(s) reference(s) about STAC collection.
        write_collection_assets = False,
            Include all assets in the STAC Collection, with unique keys.
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
    def __init__(
        self,
        data,
        collection_id,
        collection_url=None,
        item_prefix: Optional[str] = "",
        output_folder: Optional[str] = None,
        description: Optional[str] = "",
        title: Optional[str] = None,
        ignore_warns: Optional[bool] = False,
        keywords: Optional[list] = None,
        providers: Optional[list] = None,
        links: Optional[list] = None,
        stac_version="1.0.0",
        s3_upload=False,
        bucket_name=None,
        bucket_file_prefix=None,
        aws_region=None,
        aws_access_key=None,
        aws_secret_key=None,
        version=None,
        license=None,
        sci_doi=None,
        sci_citation=None,
        write_collection_assets=False,
    ):
            import warnings
            warnings.filterwarnings("ignore")

Claus Michele's avatar
Claus Michele committed
        self.data = data
Claus Michele's avatar
Claus Michele committed
        self.X_DIM = None
        self.Y_DIM = None
        self.T_DIM = None
        self.B_DIM = None
        self.output_format = None
        self.media_type = None
Claus Michele's avatar
Claus Michele committed

Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
        self.properties = {}  # additional properties to add in the item
        self.collection_id = collection_id  # name of collection the item belongs to
        self.collection_url = collection_url
        self.item_prefix = item_prefix
Claus Michele's avatar
Claus Michele committed
        self.description = description
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
        self.keywords = keywords
        self.providers = providers
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
        self.stac_version = stac_version
        self.sci_doi = sci_doi
        self.write_collection_assets = write_collection_assets
        self.extensions = [
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            f"https://stac-extensions.github.io/projection/{PROJECTION_EXT_VERSION}/schema.json",
            f"https://stac-extensions.github.io/raster/{RASTER_EXT_VERSION}/schema.json",
            f"https://stac-extensions.github.io/eo/{EO_EXT_VERSION}/schema.json",
        ]
Claus Michele's avatar
Claus Michele committed

        if output_folder is not None:
            self.output_folder = output_folder
        else:
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            self.output_folder = datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S%f")[
                :-3
            ]
Claus Michele's avatar
Claus Michele committed
        self.output_file = f"{self.collection_id}.json"
Claus Michele's avatar
Claus Michele committed
        Path(self.output_folder).mkdir(parents=True, exist_ok=True)
        self.stac_collection = None
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
        self.bucket_name = bucket_name
        self.bucket_file_prefix = bucket_file_prefix
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
        self.aws_region = aws_region
        self.s3_upload = s3_upload
        self.s3_client = None
        self.version = version
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
        self.title = title

        if self.s3_upload:
            # Initializing an S3 client
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            self.s3_client = boto3.client(
                "s3",
                aws_access_key_id=aws_access_key,
                aws_secret_access_key=aws_secret_key,
            )  # region_name=aws_region,
Claus Michele's avatar
Claus Michele committed
        # available_output_formats = ["csv","json_full"]
        # if output_format not in available_output_formats:
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
        # raise ValueError(f"output_format can be set to one of {available_output_formats}")
Claus Michele's avatar
Claus Michele committed
        # self.output_format = output_format
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
        self.license = license
        self.sci_citation = sci_citation

Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
        # TODO: implement following attributes: self.overwrite,
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
    def fix_path_slash(self, res_loc):
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
        return res_loc if res_loc.endswith("/") else res_loc + "/"
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
    # TODO/FIXME: maybe better to put this method as an external static function? (and s3_client attribute as global variable)
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
    def upload_s3(self, file_path):
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
        if self.s3_client is not None:
            prefix = self.bucket_file_prefix
            file_name = os.path.basename(file_path)
            object_name = f"{self.fix_path_slash(prefix)}{file_name}"
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            try:
                self.s3_client.upload_file(file_path, self.bucket_name, object_name)
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                _log.debug(
                    f"Successfully uploaded {file_name} to {self.bucket_name} as {object_name}"
                )
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            except botocore.exceptions.NoCredentialsError:
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                _log.debug(
                    "AWS credentials not found. Make sure you set the correct access key and secret key."
                )
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            except botocore.exceptions.ClientError as e:
Claus Michele's avatar
Claus Michele committed
                _log.debug(f'Error uploading file: {e.response["Error"]["Message"]}')
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
    def get_root_url(self, url):
        parsed_url = urlparse(url)
        # Extract protocol + domain
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
        root_url = urlunparse((parsed_url.scheme, parsed_url.netloc, "", "", "", ""))
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
        return root_url

Claus Michele's avatar
Claus Michele committed
    def generate_kerchunk_stac(self):
        from kerchunk.hdf import SingleHdf5ToZarr
Claus Michele's avatar
Claus Michele committed
        def gen_json(u, so, json_dir):
            fs = LocalFileSystem(skip_instant_cache=True)
            with fs.open(u, **so) as inf:
                h5chunks = SingleHdf5ToZarr(inf, u, inline_threshold=300)
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                with open(f"{str(json_dir)}/{u.split('/')[-1]}.json", "wb") as outf:
Claus Michele's avatar
Claus Michele committed
                    outf.write(ujson.dumps(h5chunks.translate()).encode())
            return f"{str(json_dir)}/{u.split('/')[-1]}.json"

        # Create the output folder for the Kerchunk files
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
        kerchunk_folder = os.path.join(self.output_folder, "kerchunk")
Claus Michele's avatar
Claus Michele committed
        Path(kerchunk_folder).mkdir(parents=True, exist_ok=True)
Claus Michele's avatar
Claus Michele committed
        kerchunk_files_list = []
        # Read the list of netCDFs
        for same_time_netcdfs in self.data:
            t_labels = []
            for var in same_time_netcdfs:
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                source_nc = var
Claus Michele's avatar
Claus Michele committed
                source_path = os.path.dirname(var)
                local_conn = LocalConnection(source_path)
                tmp = local_conn.load_collection(source_nc).execute()
                t_labels.append(tuple(tmp[tmp.openeo.temporal_dims[0]].values))
            t_steps = [len(x) for x in t_labels]
            if len(set(t_steps)) != 1:
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                raise ValueError(
                    f"The provided netCDFs contain a different number of dates! {same_time_netcdfs}"
                )
Claus Michele's avatar
Claus Michele committed
            if len(set(t_labels)) != 1:
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                raise ValueError(
                    "The provided netCDFs contain a different set of dates!"
Claus Michele's avatar
Claus Michele committed

Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            so = dict(mode="rb", anon=True, default_fill_cache=False)
            same_time_kerchunks = dask.compute(
                *[
                    dask.delayed(gen_json)(var, so, kerchunk_folder)
                    for var in same_time_netcdfs
                ]
            )
Claus Michele's avatar
Claus Michele committed
            kerchunk_files_list.append(same_time_kerchunks)
Claus Michele's avatar
Claus Michele committed
        # List of List json Kerchunk.
        # First list: each element (list) different year/time
        # Second list: each element different variables
        datasets_list = []
        for same_time_data in kerchunk_files_list:
            for d in same_time_data:
                if d.endswith(".json"):
                    self.data = xr.open_dataset(
                        "reference://",
                        engine="zarr",
                        decode_coords="all",
                        backend_kwargs={
                            "storage_options": {
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                                "fo": d,
Claus Michele's avatar
Claus Michele committed
                            },
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                            "consolidated": False,
                        },
                        chunks={},
Claus Michele's avatar
Claus Michele committed
                    ).to_dataarray(dim="bands")
                    # IS_KERCHUNK = True # UNUSED VARIABLE IS_KERCHUNK
Claus Michele's avatar
Claus Michele committed
                    datasets_list.append(self.data)
                    # Need to create one Item per time/netCDF
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
        self.data = xr.combine_by_coords(datasets_list, combine_attrs="drop_conflicts")
        # raise ValueError("'data' paramter must be either xr.DataArray, a str (path to a netCDF) or a list of lists with paths to JSON Kerchunk files.")
Claus Michele's avatar
Claus Michele committed

        self.X_DIM = self.data.openeo.x_dim
        self.Y_DIM = self.data.openeo.y_dim
        self.T_DIM = self.data.openeo.temporal_dims[0]
        self.B_DIM = self.data.openeo.band_dims[0]
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
        _log.debug(
            f"Extracted label dimensions from input are:\nx dimension:{self.X_DIM}\ny dimension:{self.Y_DIM}\nbands dimension:{self.B_DIM}\ntemporal dimension:{self.T_DIM}"
        )

Claus Michele's avatar
Claus Michele committed
        self.output_format = "KERCHUNK"
        self.media_type = pystac.MediaType.JSON
Claus Michele's avatar
Claus Michele committed
        spatial_extents = []
        temporal_extents = []

        # Get the time dimension values
        # time_values = self.data[self.T_DIM].values # unused variable
Claus Michele's avatar
Claus Michele committed

        eo_info = {}
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed

        # resetting CSV file
        open(f"{Path(self.output_folder)}/inline_items.csv", "w+")

Claus Michele's avatar
Claus Michele committed
        _log.debug("Cycling all timestamps")

        # Loop over the kerchunk files
        for same_time_data in kerchunk_files_list:
            _log.debug(f"\nts: {same_time_data}")
Claus Michele's avatar
Claus Michele committed
            time_ranges = []
            bands_data = {}
            for d in same_time_data:
                if d.endswith(".json"):
                    band_data = xr.open_dataset(
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                        "reference://",
                        engine="zarr",
                        decode_coords="all",
                        backend_kwargs={
Claus Michele's avatar
Claus Michele committed
                            "storage_options": {
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                                "fo": d,
Claus Michele's avatar
Claus Michele committed
                            },
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                            "consolidated": False,
                        },
                        chunks={},
Claus Michele's avatar
Claus Michele committed
                    ).to_dataarray(dim="bands")
                    bands_data[d] = band_data
                    time_ranges.append(band_data[self.T_DIM].values)
            # for i,t_r in enumerate(time_ranges):
            #     if i==0:
            #         first_range = t_r
            #         _are_all_time_steps_equal = True
            #     else:
            #         _are_all_time_steps_equal = np.array_equal(first_range,t_r) and _are_all_time_steps_equal
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed

            #             _log.debug(f"Are the time steps provided in the kerchunks aligned {_are_all_time_steps_equal}")

            #             if not _are_all_time_steps_equal:
            #                 raise Exception(f"The time steps provided in the kerchunk files {same_time_data} are not the same, can't continue.")

Claus Michele's avatar
Claus Michele committed
            # Now we can create one STAC Item for this time range, with one asset each band/variable

            start_datetime = np.min(time_ranges[0])
            end_datetime = np.max(time_ranges[0])
Claus Michele's avatar
Claus Michele committed
            # Convert the time value to a datetime object
            # Format the timestamp as a string to use in the file name
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            start_datetime_str = pd.Timestamp(start_datetime).strftime("%Y%m%d%H%M%S")
            end_datetime_str = pd.Timestamp(end_datetime).strftime("%Y%m%d%H%M%S")

            _log.debug(
                f"Extracted temporal extrema for this time range: {start_datetime_str} {end_datetime_str}"
            )
Claus Michele's avatar
Claus Michele committed

            item_id = f"{f'{self.item_prefix}_' if self.item_prefix != '' else ''}{start_datetime_str}_{end_datetime_str}"

            # Create a unique directory for each time slice
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            time_slice_dir = os.path.join(
                self.output_folder, f"{start_datetime_str}_{end_datetime_str}"
            )
Claus Michele's avatar
Claus Michele committed

            Path(time_slice_dir).mkdir(parents=True, exist_ok=True)

            # Get the band name (you may need to adjust this part based on your data)
            # bands = self.data[self.B_DIM].values
Claus Michele's avatar
Claus Michele committed
            pystac_assets = []

            # Cycling all bands/variables
            _log.debug("Cycling all bands")

            eo_bands_list = []
            for b_d in bands_data:
                band = bands_data[b_d][self.B_DIM].values[0]
                kerchunk_file = b_d
                _log.debug(f"b: {band}")

Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                link_path = kerchunk_file
Claus Michele's avatar
Claus Michele committed

Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                #                 if self.s3_upload:
                #                     #Uploading file to s3
                #                     _log.debug(f"Uploading {path} to {self.fix_path_slash(self.bucket_file_prefix)}{os.path.basename(path)}")
                #                     self.upload_s3(path)

                #                     link_path = f"https://{self.bucket_name}.{self.aws_region}.amazonaws.com/{self.fix_path_slash(self.bucket_file_prefix)}{curr_file_name}"
Claus Michele's avatar
Claus Michele committed

                bboxes = []

                # Create an asset dictionary for this time slice
                # Get BBOX and Footprint
                _log.debug(bands_data[b_d].rio.crs)
                _log.debug(bands_data[b_d].rio.bounds())

Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                dataset_geom = rioxarray_get_dataset_geom(
                    bands_data[b_d], densify_pts=0, precision=-1
                )
Claus Michele's avatar
Claus Michele committed
                bboxes.append(dataset_geom["bbox"])
Claus Michele's avatar
Claus Michele committed
                proj_info = {
                    f"proj:{name}": value
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                    for name, value in rioxarray_get_projection_info(
                        bands_data[b_d]
                    ).items()
Claus Michele's avatar
Claus Michele committed
                }

                raster_info = {
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                    "raster:bands": rioxarray_get_raster_info(
                        bands_data[b_d], max_size=1024
                    )
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                band_dict = {"name": band}
Claus Michele's avatar
Claus Michele committed

                eo_bands_list.append(band_dict)

                eo_info["eo:bands"] = [band_dict]

                pystac_assets.append(
                    (
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                        band,
Claus Michele's avatar
Claus Michele committed
                        pystac.Asset(
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                            href=link_path,
Claus Michele's avatar
Claus Michele committed
                            media_type=self.media_type,
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                            extra_fields={**proj_info, **raster_info, **eo_info},
                            roles=["data", "index"],
Claus Michele's avatar
Claus Michele committed
            eo_info["eo:bands"] = eo_bands_list

            minx, miny, maxx, maxy = zip(*bboxes)
            bbox = [min(minx), min(miny), max(maxx), max(maxy)]

            # item
            item = pystac.Item(
                id=item_id,
                geometry=bbox_to_geom(bbox),
                bbox=bbox,
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                collection=None,  # self.collection_id, #FIXME: da errore se lo si decommenta
Claus Michele's avatar
Claus Michele committed
                stac_extensions=self.extensions,
                datetime=None,
                start_datetime=pd.Timestamp(start_datetime),
                end_datetime=pd.Timestamp(end_datetime),
                properties=self.properties,
            )

            # Calculate the item's spatial extent and add it to the list
            spatial_extents.append(item.bbox)

            # Calculate the item's temporal extent and add it to the list
            # item_datetime = item.start_datetime
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            temporal_extents.append(
                [pd.Timestamp(start_datetime), pd.Timestamp(end_datetime)]
            )
Claus Michele's avatar
Claus Michele committed

            for key, asset in pystac_assets:
                item.add_asset(key=key, asset=asset)
Claus Michele's avatar
Claus Michele committed
            item.validate()
Claus Michele's avatar
Claus Michele committed
            item.add_link(
                pystac.Link(
                    pystac.RelType.COLLECTION,
                    f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
                    media_type=pystac.MediaType.JSON,
                )
            )
Claus Michele's avatar
Claus Michele committed
            item.add_link(
                pystac.Link(
                    pystac.RelType.PARENT,
                    f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
                    media_type=pystac.MediaType.JSON,
                )
            )
Claus Michele's avatar
Claus Michele committed
            item.add_link(
                pystac.Link(
                    pystac.RelType.SELF,
                    f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/{item_id}",
                    media_type=pystac.MediaType.JSON,
                )
            )

            item_dict = item.to_dict()

Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            # FIXME: persistent pystac bug or logical error (urllib error when adding root link to current item)
Claus Michele's avatar
Claus Michele committed
            # now this link is added manually by editing the dict
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            item_dict["links"].append(
                {
                    "rel": "root",
                    "href": self.get_root_url(
                        f"{self.fix_path_slash(self.collection_url)}{self.collection_id}"
                    ),
                    "type": "application/json",
                }
            )
Claus Michele's avatar
Claus Michele committed

            # self.stac_collection.add_item(item)
            # Append the item to the list instead of adding it to the collection
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            # item_dict = item.to_dict()
Claus Michele's avatar
Claus Michele committed
            item_dict["collection"] = self.collection_id

Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            item_oneline = json.dumps(
                item_dict, separators=(",", ":"), ensure_ascii=False
            )
Claus Michele's avatar
Claus Michele committed

            output_path = Path(self.output_folder)
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            with open(f"{output_path}/inline_items.csv", "a+") as out_csv:
Claus Michele's avatar
Claus Michele committed
                out_csv.write(f"{item_oneline}\n")

            jsons_path = f"{output_path}/items/"
            Path(jsons_path).mkdir(parents=True, exist_ok=True)

Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            with open(
                f"{self.fix_path_slash(jsons_path)}{item_id}.json", "w+"
            ) as out_json:
                out_json.write(json.dumps(item_dict, indent=4))

Claus Michele's avatar
Claus Michele committed
        # Calculate overall spatial extent
        minx, miny, maxx, maxy = zip(*spatial_extents)
        overall_bbox = [min(minx), min(miny), max(maxx), max(maxy)]

        # Calculate overall temporal extent
        min_datetime = min(temporal_extents, key=lambda x: x[0])[0]
        max_datetime = max(temporal_extents, key=lambda x: x[1])[1]
        overall_temporal_extent = [min_datetime, max_datetime]

        s_ext = pystac.SpatialExtent([overall_bbox])
        t_ext = pystac.TemporalExtent([overall_temporal_extent])

        extra_fields = {}

        extra_fields["stac_version"] = self.stac_version

        if self.keywords is not None:
            extra_fields["keywords"] = self.keywords

        if self.providers is not None:
            extra_fields["providers"] = self.providers

        if self.version is not None:
            extra_fields["version"] = self.version

        if self.title is not None:
            extra_fields["title"] = self.title

        if self.sci_citation is not None:
            extra_fields["sci:citation"] = self.sci_citation
Claus Michele's avatar
Claus Michele committed
        if self.sci_doi is not None:
            extra_fields["sci:doi"] = self.sci_doi
Claus Michele's avatar
Claus Michele committed
        if self.sci_citation is not None or self.sci_doi is not None:
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            self.extensions.append(
                "https://stac-extensions.github.io/scientific/v1.0.0/schema.json"
            )
Claus Michele's avatar
Claus Michele committed

        extra_fields["summaries"] = eo_info
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
        self.extensions.append(
            "https://stac-extensions.github.io/datacube/v2.2.0/schema.json"
        )
Claus Michele's avatar
Claus Michele committed
        extra_fields["stac_extensions"] = self.extensions

        cube_dimensons = {
            self.X_DIM: {
                "axis": "x",
                "type": "spatial",
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                "extent": [
                    float(self.data.coords[self.X_DIM].min()),
                    float(self.data.coords[self.X_DIM].max()),
                ],
                "reference_system": int((self.data.rio.crs.to_string()).split(":")[1]),
Claus Michele's avatar
Claus Michele committed
            },
            self.Y_DIM: {
                "axis": "y",
                "type": "spatial",
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                "extent": [
                    float(self.data.coords[self.Y_DIM].min()),
                    float(self.data.coords[self.Y_DIM].max()),
                ],
                "reference_system": int((self.data.rio.crs.to_string()).split(":")[1]),
Claus Michele's avatar
Claus Michele committed
            },
            self.T_DIM: {
                "type": "temporal",
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                "extent": [
                    str(self.data[self.T_DIM].min().values),
                    str(self.data[self.T_DIM].max().values),
                ],
Claus Michele's avatar
Claus Michele committed
            },
            self.B_DIM: {
                "type": "bands",
                "values": list(self.data[self.B_DIM].values),
Claus Michele's avatar
Claus Michele committed
        }

        extra_fields["cube:dimensions"] = cube_dimensons

        self.stac_collection = pystac.collection.Collection(
            id=self.collection_id,
            description=self.description,
            extent=pystac.Extent(spatial=s_ext, temporal=t_ext),
            extra_fields=extra_fields,
        )

        self.stac_collection.add_link(
            pystac.Link(
                pystac.RelType.ITEMS,
                f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items",
                media_type=pystac.MediaType.JSON,
            )
        )
Claus Michele's avatar
Claus Michele committed
        self.stac_collection.add_link(
            pystac.Link(
                pystac.RelType.PARENT,
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                self.get_root_url(
                    f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items"
                ),
Claus Michele's avatar
Claus Michele committed
                media_type=pystac.MediaType.JSON,
            )
        )

        self.stac_collection.add_link(
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            pystac.Link(
Claus Michele's avatar
Claus Michele committed
                pystac.RelType.SELF,
                f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
                media_type=pystac.MediaType.JSON,
            )
        )

Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
        # self.stac_collection.remove_links(rel=pystac.RelType.ROOT)
Claus Michele's avatar
Claus Michele committed

        self.stac_collection.add_link(
            pystac.Link(
                pystac.RelType.ROOT,
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                self.get_root_url(
                    f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items"
                ),
Claus Michele's avatar
Claus Michele committed
                media_type=pystac.MediaType.JSON,
            )
        )

        if self.license is not None:
            self.stac_collection.license = self.license

        # Create a single JSON file with all the items
        stac_collection_dict = self.stac_collection.to_dict()

Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
        # in order to solve the double "root" link bug/issue
Claus Michele's avatar
Claus Michele committed
        links_dict = stac_collection_dict["links"]

        ctr_roots = 0
        self_exists = False
        self_idx = 0

        for idx, link in enumerate(links_dict):
            if link["rel"] == "root":
                ctr_roots = ctr_roots + 1
            if link["rel"] == "self":
                self_exists = True
                self_idx = idx

        if ctr_roots == 2 and self_exists:
            for idx, link in enumerate(links_dict):
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                if (
                    link["rel"] == "root"
                    and link["href"] == links_dict[self_idx]["href"]
                    and link["type"] == links_dict[self_idx]["type"]
                ):
Claus Michele's avatar
Claus Michele committed
                    del links_dict[idx]
                    break
Claus Michele's avatar
Claus Michele committed
        if self.links is not None:
            stac_collection_dict["links"] = stac_collection_dict["links"] + self.links

        json_str = json.dumps(stac_collection_dict, indent=4)

Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
        # printing metadata.json test output file
Claus Michele's avatar
Claus Michele committed
        output_path = Path(self.output_folder) / Path(self.output_file)
        with open(output_path, "w+") as metadata:
            metadata.write(json_str)

        if self.s3_upload:
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            # Uploading metadata JSON file to s3
            _log.debug(
                f'Uploading metatada JSON "{output_path}" to {self.fix_path_slash(self.bucket_file_prefix)}{os.path.basename(output_path)}'
            )
Claus Michele's avatar
Claus Michele committed
            self.upload_s3(output_path)

    def generate_cog_stac(self):
Claus Michele's avatar
Claus Michele committed
        if isinstance(self.data, xr.Dataset):
            # store datasets in  a placeholder
            self.data_ds = self.data.copy(deep=True)
            self.data = self.data.to_dataarray(dim="bands")
        elif isinstance(self.data, xr.DataArray):
            pass
        elif isinstance(self.data, str):
            source_path = os.path.dirname(self.data)
            local_conn = LocalConnection(source_path)
            self.data = local_conn.load_collection(self.data).execute()
        else:
            raise Exception(
                "Please provide a path to a valid file or an xArray DataArray or Dataset object!"
            )
Claus Michele's avatar
Claus Michele committed

        self.output_format = "COG"
        self.media_type = pystac.MediaType.COG
Claus Michele's avatar
Claus Michele committed
        self.X_DIM = self.data.openeo.x_dim
        self.Y_DIM = self.data.openeo.y_dim
        self.T_DIM = self.data.openeo.temporal_dims[0]
        self.B_DIM = self.data.openeo.band_dims[0]
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
        _log.debug(
            f"Extracted label dimensions from input are:\nx dimension:{self.X_DIM}\ny dimension:{self.Y_DIM}\nbands dimension:{self.B_DIM}\ntemporal dimension:{self.T_DIM}"
        )
Claus Michele's avatar
Claus Michele committed

Claus Michele's avatar
Claus Michele committed
        if self.data_ds is None:
            self.data_ds = self.data.to_dataset(dim=self.B_DIM)

        spatial_extents = []
        temporal_extents = []

        collection_assets = {}
        # Get the time dimension values
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
        time_values = self.data[self.T_DIM].values
        eo_info = {}
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed

        # resetting CSV file
        open(f"{Path(self.output_folder)}/inline_items.csv", "w+")

Claus Michele's avatar
Claus Michele committed
        _log.debug("Cycling all timestamps")
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
        # Cycling all timestamps
        for t in time_values:
Claus Michele's avatar
Claus Michele committed
            _log.debug(f"\nts: {t}")
            # Convert the time value to a datetime object
            timestamp = pd.Timestamp(t)

            # Format the timestamp as a string to use in the file name
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            time_str = timestamp.strftime("%Y%m%d%H%M%S")
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            item_id = (
                f"{f'{self.item_prefix}_' if self.item_prefix != '' else ''}{time_str}"
            )
            # Create a unique directory for each time slice
            time_slice_dir = os.path.join(self.output_folder, time_str)

            if not os.path.exists(time_slice_dir):
                os.makedirs(time_slice_dir)

            # Get the band name (you may need to adjust this part based on your data)
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            bands = self.data[self.B_DIM].values
            pystac_assets = []

            # Cycling all bands
Claus Michele's avatar
Claus Michele committed
            _log.debug("Cycling all bands")
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            eo_bands_list = []

            for band in bands:
Claus Michele's avatar
Claus Michele committed
                _log.debug(f"b: {band}")
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                curr_file_name = f"{band}_{time_str}.tif"
                # Define the GeoTIFF file path for this time slice and band
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                path = os.path.join(time_slice_dir, curr_file_name)

                # Write the result to the GeoTIFF file
                if isinstance(self.data, xr.DataArray):
                    self.data.loc[{self.T_DIM: t, self.B_DIM: band}].to_dataset(
                        name=band
                    ).rio.to_raster(raster_path=path, driver="COG")
                else:
                    cog_file = self.data_ds.loc[{self.T_DIM: t}][band]
                    cog_file.attrs["long_name"] = f"{band}"
                    cog_file.to_dataset(name=band).rio.to_raster(
                        raster_path=path, driver="COG"
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                link_path = path
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                if self.s3_upload:
                    # Uploading file to s3
                    _log.debug(
                        f"Uploading {path} to {self.fix_path_slash(self.bucket_file_prefix)}{os.path.basename(path)}"
                    )
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                    self.upload_s3(path)
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                    link_path = f"https://{self.bucket_name}.{self.aws_region}.amazonaws.com/{self.fix_path_slash(self.bucket_file_prefix)}{curr_file_name}"
                bboxes = []

                # Create an asset dictionary for this time slice
                with rasterio.open(path) as src_dst:
                    # Get BBOX and Footprint
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                    dataset_geom = get_dataset_geom(
                        src_dst, densify_pts=0, precision=-1
                    )
                    bboxes.append(dataset_geom["bbox"])

                    proj_info = {
                        f"proj:{name}": value
                        for name, value in get_projection_info(src_dst).items()
                    }

                    raster_info = {
                        "raster:bands": get_raster_info(src_dst, max_size=1024)
                    }

Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                    band_dict = get_eobands_info(src_dst)[0]

                    # if type(band_dict) == dict:
                    if isinstance(band_dict, dict):
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                        del band_dict["name"]
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                        band_dict["name"] = band_dict["description"]
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                        del band_dict["description"]
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
                    else:
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                        pass  # band_dict = {}

                    eo_bands_list.append(
                        band_dict
                    )  # TODO: add to dict, rename description with name and remove name
                    cloudcover = src_dst.get_tag_item("CLOUDCOVER", "IMAGERY")
                    # TODO: try to add this field to the COG. Currently not present in the files we write here.
                    if cloudcover is not None:
                        self.properties.update({"eo:cloud_cover": int(cloudcover)})

                    eo_info["eo:bands"] = [band_dict]
                    asset = pystac.Asset(
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                        href=link_path,
                        media_type=self.media_type,
                        extra_fields={**proj_info, **raster_info, **eo_info},
                        roles=["data"],
                    )
                    pystac_assets.append(
                        (
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
                            band,
                    if self.write_collection_assets:
                        collection_assets[f"{item_id}_{band}"] = asset
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            eo_info["eo:bands"] = eo_bands_list

            minx, miny, maxx, maxy = zip(*bboxes)
            bbox = [min(minx), min(miny), max(maxx), max(maxy)]

            # item
            item = pystac.Item(
                geometry=bbox_to_geom(bbox),
                bbox=bbox,
Claus Michele's avatar
Claus Michele committed
                collection=None,
                stac_extensions=self.extensions,
                datetime=str_to_datetime(str(t)),
                properties=self.properties,
            )
            # Calculate the item's spatial extent and add it to the list
            item_bbox = item.bbox
            spatial_extents.append(item_bbox)

            # Calculate the item's temporal extent and add it to the list
            item_datetime = item.datetime
            temporal_extents.append([item_datetime, item_datetime])

            for key, asset in pystac_assets:
                item.add_asset(key=key, asset=asset)
            item.validate()
            item.add_link(
                pystac.Link(
                    pystac.RelType.COLLECTION,
                    f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
                    media_type=pystac.MediaType.JSON,
            item.add_link(
                pystac.Link(
                    pystac.RelType.PARENT,
                    f"{self.fix_path_slash(self.collection_url)}{self.collection_id}",
                    media_type=pystac.MediaType.JSON,
            item.add_link(
                pystac.Link(
                    pystac.RelType.SELF,
Claus Michele's avatar
Claus Michele committed
                    f"{self.fix_path_slash(self.collection_url)}{self.collection_id}/items/{item_id}",
                    media_type=pystac.MediaType.JSON,
            item_dict = item.to_dict()
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            # FIXME: persistent pystac bug or logical error (urllib error when adding root link to current item)
            # now this link is added manually by editing the dict
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            item_dict["links"].append(
                {
                    "rel": "root",
                    "href": self.get_root_url(
                        f"{self.fix_path_slash(self.collection_url)}{self.collection_id}"
                    ),
                    "type": "application/json",
                }
            )
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
            # self.stac_collection.add_item(item)
            # Append the item to the list instead of adding it to the collection
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            # item_dict = item.to_dict()
            item_dict["collection"] = self.collection_id
Claus Michele's avatar
Claus Michele committed
            # if self.output_format == "json_full":
            # elif self.output_format == "csv":
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            item_oneline = json.dumps(
                item_dict, separators=(",", ":"), ensure_ascii=False
            )
Claus Michele's avatar
Claus Michele committed
            output_path = Path(self.output_folder)
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            with open(f"{output_path}/inline_items.csv", "a+") as out_csv:
Claus Michele's avatar
Claus Michele committed
                out_csv.write(f"{item_oneline}\n")
Claus Michele's avatar
Claus Michele committed
            jsons_path = f"{output_path}/items/"
            Path(jsons_path).mkdir(parents=True, exist_ok=True)

Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            with open(
                f"{self.fix_path_slash(jsons_path)}{item_id}.json", "w+"
            ) as out_json:
Claus Michele's avatar
Claus Michele committed
                out_json.write(json.dumps(item_dict, indent=4))
        # Calculate overall spatial extent
        minx, miny, maxx, maxy = zip(*spatial_extents)
        overall_bbox = [min(minx), min(miny), max(maxx), max(maxy)]

        # Calculate overall temporal extent
        min_datetime = min(temporal_extents, key=lambda x: x[0])[0]
        max_datetime = max(temporal_extents, key=lambda x: x[1])[1]
        overall_temporal_extent = [min_datetime, max_datetime]

        s_ext = pystac.SpatialExtent([overall_bbox])
        t_ext = pystac.TemporalExtent([overall_temporal_extent])

Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
        extra_fields = {}

        extra_fields["stac_version"] = self.stac_version

        if self.keywords is not None:
            extra_fields["keywords"] = self.keywords

        if self.providers is not None:
            extra_fields["providers"] = self.providers

        if self.version is not None:
            extra_fields["version"] = self.version

        if self.title is not None:
            extra_fields["title"] = self.title

        if self.sci_citation is not None:
            extra_fields["sci:citation"] = self.sci_citation
        if self.sci_doi is not None:
            extra_fields["sci:doi"] = self.sci_doi
        if self.sci_citation is not None or self.sci_doi is not None:
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
            self.extensions.append(
                "https://stac-extensions.github.io/scientific/v1.0.0/schema.json"
            )
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed

        extra_fields["summaries"] = eo_info
Rufai Omowunmi Balogun's avatar
Rufai Omowunmi Balogun committed
        self.extensions.append(
            "https://stac-extensions.github.io/datacube/v2.2.0/schema.json"
        )
Lorenzo.Mercurio's avatar
Lorenzo.Mercurio committed
        extra_fields["stac_extensions"] = self.extensions

        cube_dimensons = {
            self.X_DIM: {
                "axis": "x",