Package max_ard

Tools for working with Maxar ARD

Provides

  • Tools for interacting with the ARD API and files
  • SDK objects
  • CLI tools
Expand source code
"""Tools for working with Maxar ARD

Provides
--------
- Tools for interacting with the ARD API and files 
- SDK objects
- CLI tools

"""

from max_ard.admin import AccountManager
from max_ard.ard_collection import ARDCollection
from max_ard.order import Order
from max_ard.select import Select, SelectResult
from max_ard.monitor import Monitor

__all__ = ["ARDCollection", "Order", "Select", "SelectResult", "AccountManager"]

Sub-modules

max_ard.admin

Perform admin functions …

max_ard.ard_collection

Collection objects representing stored ARD data …

max_ard.base_collections

Base collection types for ARD …

max_ard.commands

Command line tools for ARD

max_ard.dependency_support

Helpers for optional dependencies and platform quirks

max_ard.exceptions

ARD Exceptions

max_ard.io

Export ARD collections to other file formats …

max_ard.metadata

Search ARD tile metadata …

max_ard.monitor

ARD Monitors …

max_ard.order

Place ARD Orders …

max_ard.outputs
max_ard.processing

Tools and utilities for processing ARD images …

max_ard.select

Create ARD Selects and view results …

max_ard.session

Authenticated sessions for communicating with the ARD API endpoints

max_ard.storage

Utilities for storage backends …

Classes

class ARDCollection (path: str, aoi: Optional[Any] = None, acq_id_in: Optional[Iterable[str]] = None, zone_in: Optional[Iterable[int]] = None, earliest_date: Union[str, datetime.datetime, datetime.date, None] = None, latest_date: Union[str, datetime.datetime, datetime.date, None] = None, profile: Optional[str] = None, public: Optional[bool] = False, **kwargs)

ARDCollections represent collections of S3 tiles. Currently the tiles can be stored in S3 or locally.

Parameters

path : str
Path to S3 prefix or STAC collection.
aoi : shapely.geometry or str, optional
Limit to finding tiles that cover this AOI, can be shapely geometry or most textual representations.
acq_id_in : iterable of str, optional
Limit to finding tiles from these acquisitions.
zone_in : iterable of int, optional
Limit to finding tiles in these zones.
earliest_date : str or datetime.date or datetime.datetime, optional
Limit to finding tiles after this date (strings must be YYYY-MM-DD).
latest_date : str or datetime.date or datetime.datetime, optional
Limit or finding tiles before this date.
profile : str, optional
AWS Profile to use when tiles are in S3.
public : bool
Access cloud data without authentication (for public buckets).

The following parameters are also settable attributes and will trigger a rescan

Attributes

path
 
aoi
 
acq_id_in
 
zone_in
 
earliest_date
 
latest_date
 
Expand source code
class ARDCollection(BaseCollection):
    """ARDCollections represent collections of S3 tiles. Currently the tiles
    can be stored in S3 or locally.

    Parameters
    ----------
    path : str
        Path to S3 prefix or STAC collection.
    aoi : shapely.geometry or str, optional
        Limit to finding tiles that cover this AOI, can be shapely geometry or most textual representations.
    acq_id_in : iterable of str, optional
        Limit to finding tiles from these acquisitions.
    zone_in : iterable of int, optional
        Limit to finding tiles in these zones.
    earliest_date : str or datetime.date or datetime.datetime, optional
        Limit to finding tiles after this date (strings must be YYYY-MM-DD).
    latest_date : str or datetime.date or datetime.datetime, optional
        Limit or finding tiles before this date.
    profile : str, optional
        AWS Profile to use when tiles are in S3.
    public : bool
        Access cloud data without authentication (for public buckets).


    The following parameters are also settable attributes and will trigger a rescan

    Attributes
    ----------
    path
    aoi
    acq_id_in
    zone_in
    earliest_date
    latest_date
    """

    def __init__(
        self,
        path: str,
        aoi: Optional[Any] = None,
        acq_id_in: Optional[Iterable[str]] = None,
        zone_in: Optional[Iterable[int]] = None,
        earliest_date: Optional[Union[str, datetime, date]] = None,
        latest_date: Optional[Union[str, datetime, date]] = None,
        profile: Optional[str] = None,
        public: Optional[bool] = False,
        **kwargs,
    ) -> None:

        self._dirty = True
        self._updating = False
        super().__init__()

        # TODO we may want to normalize AOIs to WGS84
        # but right now `covers` is probably capable enough
        self.aoi = aoi

        # disable authentication for public buckets
        # 'anon' is what fsspec calls it but 'public' makes more sense
        # however 'anon' can also have implications for Azure so
        # we should be able to override it just in case

        if "anon" in kwargs:
            anon = kwargs["anon"]
        else:
            # using the recommend Azure connection string access, anon needs to be True
            if path.startswith("az"):
                anon = True
            else:
                anon = public
        # For GDAL S3 locations we can turn off signing so
        # you can have expired credentials
        if anon and path.startswith("s3"):
            os.environ["AWS_NO_SIGN_REQUEST"] = "YES"

        # validate some inputs that have been problematic in the past
        assert zone_in is None or all(type(z) == int for z in zone_in), "Zones must be integers"
        assert zone_in is None or all(z - 1 in range(60) for z in zone_in), "Invalid zone numbers"
        if not zone_in:
            zone_in = None
        self.zone_in = zone_in

        assert acq_id_in is None or all(
            type(c) == str for c in acq_id_in
        ), "Catalog IDs must be strings"
        if not acq_id_in:
            acq_id_in = None
        self.acq_id_in = acq_id_in

        # store dates as strings, reformat if needed
        def format_date(d):
            if d is None:
                return None
            try:
                return d.strftime("%Y-%m-%d")
            except:
                assert re.match(
                    r"\d{4}-\d{2}-\d{2}", d
                ), "Dates must be YYYY-MM-DD strings, or date/datetime objects"
                return d

        self.earliest_date = format_date(earliest_date)
        self.latest_date = format_date(latest_date)

        # Set up the path and initialize the filesystem source
        self.path = path

        # this might need to be smarter (os.path?) for windows slashes
        if self.path[-1] == "/":
            self.path = self.path[:-1]
        if os.path.exists(self.path):
            self.storage_type = "file"
            self.fs_path = os.path.abspath(self.path)
        else:
            parsed = urlparse(self.path)
            if parsed.scheme == "":
                raise ValueError("Local path does not exist")
            if parsed.scheme not in ["s3", "gs", "az"]:
                raise ValueError("Unrecognized protocol (use s3://, gs://, or az://")
            self.storage_type = parsed.scheme
            # might not need this, s3fs doesn't care about paths leading with protocols
            # need to check gdal, etc
            self.fs_path = parsed.netloc + parsed.path

        if self.storage_type == "az":
            # workarounds for to make azure credentials easier to deal with
            from max_ard.dependency_support.azure import sync_envvars

            sync_envvars()

        self.fs = filesystem(self.storage_type, anon=anon, profile=profile)

        try:
            self.fs.ls(self.path)
        except:
            raise ValueError("Access error: check your path for errors and access permissions")

    def __setattr__(self, name, value):
        """Some attributes are read-only properties
        Related setters need to reset the collection state"""

        if name == "acq_ids":
            raise ValueError(".acq_ids is read-only - set .acq_id_in instead")
        if name == "zones":
            raise ValueError(".zones is read-only - set .zone_in instead")
        if name == "start_date":
            raise ValueError(".start_date is read-only - set .earliest_date instead")
        if name == "acq_ids":
            raise ValueError(".end_date is read-only - set .latest_date instead")
        if name in ["acq_id_in", "zone_in", "earliest_date", "latest_date", "aoi"]:
            # resets the bins
            self._dirty = True
        object.__setattr__(self, name, value)

    def __getattribute__(self, name):
        dirty = object.__getattribute__(self, "_dirty")
        updating = object.__getattribute__(self, "_updating")
        if (
            dirty
            and not updating
            and name
            in [
                "tiles",
                "acquisitions",
                "acquisition_ids",
                "stacks",
                "cells",
                "get_stack",
                "get_acquisition",
                "dates",
                "earliest_date",
                "latest_date",
                "zones",
                "read_windows",
                "write_windows",
            ]
        ):
            object.__setattr__(self, "_updating", True)
            self._scan_files()
            object.__setattr__(self, "_dirty", False)
            object.__setattr__(self, "_updating", False)
        return object.__getattribute__(self, name)

    def __repr__(self) -> str:
        return f"<ARDCollection at {self.path}/>"

    def _scan_files(self) -> None:
        self._reset()

        if self.aoi is not None:
            cells = set([f"{c.zone}/{c.quadkey}" for c in covers(self.aoi)])
        else:
            cells = []

        # STAC source
        if self.path.endswith("json"):

            with self.fs.open(self.path) as f:
                doc = json.load(f)

            # STAC Item
            if "type" in doc.keys():
                items = [self.path]

            # STAC Collection
            else:
                root_path = self.fs_path.split("order_collections")[0]
                items = []
                for link in doc["links"]:
                    if link["rel"] != "child":
                        continue
                    path = root_path + link["href"].replace("../", "")
                    with self.fs.open(path) as f:
                        links = json.load(f)["links"]
                        for link in links:
                            if link["rel"] == "item":
                                path = root_path + link["href"].replace("../", "")
                                items.append(path)

        # Filesystem source
        else:
            # build leading glob pattern based on zone & quadkey
            if self.aoi is not None:
                # shard on first 5 digits on quadkey
                # for parallel fetches
                qkbs = set()
                for qk in cells:
                    qkbs.add(qk[:5])
                paths = [f"{k}*/*/*.json" for k in qkbs]
            else:
                if not self.zone_in or len(self.zone_in) == 0:
                    paths = ["*/*/*/*.json"]
                else:
                    paths = [f"{z}/*/*/*.json" for z in self.zone_in]
            items = []
            for path in paths:
                full_path = f"{self.path}/{path}"
                for item in self.fs.glob(full_path):
                    items.append(item)

        # Filter out items
        for item in items:
            tile = ARDTile.from_doc(self.fs, item)
            if self.aoi is not None:
                if f"{tile.zone}/{tile.quadkey}" not in cells:
                    continue
            if self.acq_id_in is not None and tile.acq_id not in self.acq_id_in:
                continue
            if self.earliest_date is not None:
                if tile.date < self.earliest_date:
                    continue
            if self.latest_date is not None:
                if tile.date > self.latest_date:
                    continue
            self.add_tile(tile)

    def clear_filesystem_cache(self):
        """
        Clear the local cache of a remote filesystem

        Remote file systems (S3, Azure, Google Cloud) cache files locally for speed.
        If the remote files have changed while using an ARDCollectino, you can clear
        the cached files so that new files will be loaded.

        Parameters
        ----------

        Returns
        -------
        """
        self.fs.clear_instance_cache()

    @classmethod
    def from_order(cls, order, **kwargs):
        """
        Create an ARDCollection from an order ID.

        Accepts all filter keywords as used by class initialization.

        Parameters
        ----------
        order_id : str or Order
            Order object or Order ID to open.
        **kwargs
            Filter keywords as used by class initialization.

        Returns
        -------
        ARDCollection
        """

        if type(order) == str:
            order = Order.from_id(order)
        if not order.finished:
            raise NotFinished

        output_config = order.response.order.output_config
        if "bucket" in output_config:
            bucket = output_config["bucket"]
            prefix = output_config["prefix"]
            protocol = "s3"
        elif "amazon_s3" in output_config:
            bucket = output_config["amazon_s3"]["bucket"]
            prefix = output_config["amazon_s3"]["prefix"]
            protocol = "s3"
        elif "google_cloud_storage" in order:
            bucket = output_config["google_cloud_storage"]["bucket"]
            prefix = output_config["google_cloud_storage"]["prefix"]
            protocol = "gs"
        elif "azure_blob_storage" in order:
            bucket = output_config["azure_blob_storage"]["container"]
            prefix = output_config["azure_blob_storage"]["prefix"]
            protocol = "az"
        path = f"{protocol}://{bucket}/{prefix}/order_collections/{order.order_id}_root_collection.json"
        self = cls(path, **kwargs)
        return self

    def read_windows(self, *args, **kwargs):
        """See `max_ard.processing.read_windows`"""
        return read_windows(self, *args, **kwargs)

    def write_windows(self, *args, **kwargs):
        """See `max_ard.processing.write_windows`"""
        return write_windows(self, *args, **kwargs)

Ancestors

Static methods

def from_order(order, **kwargs)

Create an ARDCollection from an order ID.

Accepts all filter keywords as used by class initialization.

Parameters

order_id : str or Order
Order object or Order ID to open.
**kwargs
Filter keywords as used by class initialization.

Returns

ARDCollection
 
Expand source code
@classmethod
def from_order(cls, order, **kwargs):
    """
    Create an ARDCollection from an order ID.

    Accepts all filter keywords as used by class initialization.

    Parameters
    ----------
    order_id : str or Order
        Order object or Order ID to open.
    **kwargs
        Filter keywords as used by class initialization.

    Returns
    -------
    ARDCollection
    """

    if type(order) == str:
        order = Order.from_id(order)
    if not order.finished:
        raise NotFinished

    output_config = order.response.order.output_config
    if "bucket" in output_config:
        bucket = output_config["bucket"]
        prefix = output_config["prefix"]
        protocol = "s3"
    elif "amazon_s3" in output_config:
        bucket = output_config["amazon_s3"]["bucket"]
        prefix = output_config["amazon_s3"]["prefix"]
        protocol = "s3"
    elif "google_cloud_storage" in order:
        bucket = output_config["google_cloud_storage"]["bucket"]
        prefix = output_config["google_cloud_storage"]["prefix"]
        protocol = "gs"
    elif "azure_blob_storage" in order:
        bucket = output_config["azure_blob_storage"]["container"]
        prefix = output_config["azure_blob_storage"]["prefix"]
        protocol = "az"
    path = f"{protocol}://{bucket}/{prefix}/order_collections/{order.order_id}_root_collection.json"
    self = cls(path, **kwargs)
    return self

Methods

def clear_filesystem_cache(self)

Clear the local cache of a remote filesystem

Remote file systems (S3, Azure, Google Cloud) cache files locally for speed. If the remote files have changed while using an ARDCollectino, you can clear the cached files so that new files will be loaded.

Parameters

Returns

Expand source code
def clear_filesystem_cache(self):
    """
    Clear the local cache of a remote filesystem

    Remote file systems (S3, Azure, Google Cloud) cache files locally for speed.
    If the remote files have changed while using an ARDCollectino, you can clear
    the cached files so that new files will be loaded.

    Parameters
    ----------

    Returns
    -------
    """
    self.fs.clear_instance_cache()
def read_windows(self, *args, **kwargs)
Expand source code
def read_windows(self, *args, **kwargs):
    """See `max_ard.processing.read_windows`"""
    return read_windows(self, *args, **kwargs)
def write_windows(self, *args, **kwargs)
Expand source code
def write_windows(self, *args, **kwargs):
    """See `max_ard.processing.write_windows`"""
    return write_windows(self, *args, **kwargs)

Inherited members

class AccountManager (account_id=None, session=None)

Manages account-related actions

Arguments

account_id : str (optional)
Account ID to manage. If not provided, will use the current user's account
session : Requests max_ard.session object (optional)
The session to use to communicate with the API. If not provided, uses the current user's authenticated session
Expand source code
class AccountManager:
    """Manages account-related actions

    Arguments
    ---------
    account_id : str (optional)
        Account ID to manage. If not provided, will use the current user's account
    session : Requests session object (optional)
        The session to use to communicate with the API. If not provided, uses the current user's authenticated session
    """

    __all__ = ["get_account_usage", "get_user_usage"]

    def __init__(self, account_id=None, session=None) -> None:
        if not session:
            session = get_user_session()
        self.session = session
        if not account_id:
            account_id = get_self(self.session)["user"]["account_id"]
        self.account_id = account_id

    def admin_url(self, *args):
        return ard_url("admin", "account", self.account_id, *args)

    @property
    def properties(self):
        return self.session.get(self.admin_url()).json()["account"]

    ########
    # Usage
    ########

    def _validate_dates(self, *args):
        """Validate that dates are YYYY-MM-DD

        Arguments
        ---------
        *args : str
            One or more date strings to validate

        Returns
        -------
        None

        Raises
        ------
        ValueError
            If one or more dates was not YYYY-MM-DD"""

        for date in args:
            if date not in [None, ""]:
                try:
                    datetime.strptime(date, "%Y-%m-%d")
                except ValueError:
                    raise ValueError("Dates must be YYYY-MM-DD")

    def get_account_usage(self, start_date=None, end_date=None):
        """Get account-level data usage

        Parameters
        ----------
        start_date : str (optional)
            Start date to calculate usage
        start_date : str (optional)
            End date to calculate usage

        Returns
        -------
        AdminUsage
            Object model of data usage
        """

        url = ard_url("usage", "account", self.account_id)

        self._validate_dates(start_date, end_date)

        params = {"start_date": start_date, "end_date": end_date}

        r = self.session.get(url, params=params)
        return AdminUsage(**r.json()["usage"])

    def get_user_usage(self, username=None, start_date=None, end_date=None):
        """Get user-level data usage

        Parameters
        ----------
        username : str (optional)
            User name to calculate usage for, if not provided the current user is used
        start_date : str (optional)
            Start date to calculate usage
        end_date : str (optional)
            End date to calculate usage

        Returns
        -------
        AdminUsage
            Object model of data usage
        """

        if not username:
            username = get_self(self.session)["user"]["user_id"]

        url = ard_url("usage", "user", username)

        self._validate_dates(start_date, end_date)

        params = {"start_date": start_date, "end_date": end_date}

        r = self.session.get(url, params=params)
        return AdminUsage(**r.json()["usage"])

    #####
    # Credentials Storage
    #####

    def add_credentials(self, name, credentials, description=""):
        """Saves credentials in the Crendentials API

        The stored credentials key (SAS Url for Azure or Base64-encoded Credentials JSON for GCS) is never returned.
        Only the name, description, and other metadata about the credentials object is provided.

        This is only available to Admin users. However all users in the account may use a stored credential name
        to provide write access when ordering data.

        Parameters
        ----------
        name : str
            Name of the stored credentials, should have URL-friendly characters (no spaces)
        credentials : str
            Credentials secret to store (SAS Url for Azure or Base64 Credentials JSON for GCS).
            If this is a path to a GCS JSON file, it will be read and Base64 encoded.
        description : str (optional)
            Description of the credentials

        Returns
        -------
        dict
            API response JSON dictionary
        """

        if urllib.parse.quote(name) != name:
            raise ValueError(
                "Stored credential names should not use characters that require URL-encoding"
            )
        url = self.admin_url("credentials", name)

        try:
            with open(credentials, "rb") as binary_file:
                encoded = base64.b64encode(binary_file.read())
                credentials = encoded.decode("ascii")

        except (FileNotFoundError, OSError):
            pass

        payload = {"credentials": credentials, "description": description}
        response = self.session.put(url, json=payload)

        if response.status_code == 204:
            return None
        else:
            return response.json()

    def get_credentials(self, name, raw=False):
        """Retrieves information about stored credentials.

        The stored credentials key (SAS Url for Azure or Credentials JSON for GCS) is never returned,
        only the name, description, and other metadata about the credentials object.

        This is only available to Admin users. However all users in the account may use a stored credential name
        to provide write access when ordering data.

        Parameters
        ----------
        name : str
            Name of the stored credentials
        raw : bool (optional)
            Return the response JSON

        Returns
        -------
        RegisteredCredentials
            Object model of credentials
        """

        url = self.admin_url("credentials", name)
        response = self.session.get(url)
        if raw:
            return response.json()
        else:
            return RegisteredCredentials(**response.json()["registered_credentials"])

    def list_credentials(self, raw=False):
        """Retrieves all stored credentials in an account.

        The stored credentials key (SAS Url for Azure or Credentials JSON for GCS) is never returned,
        only the name, description, and other metadata about the credentials object.

        This is only available to Admin users. However all users in the account may use a stored credential name
        to provide write access when ordering data.

        Parameters
        ----------
        raw : bool (optional)
            Return the response JSON

        Returns
        -------
        List of RegisteredCredentials
            List of object models of credentials
        """

        # responses can be paginated
        id_key = lambda credential: credential["registered_credentials"]["credentials_id"]
        url = self.admin_url("credentials")

        credentials = paginated_response(self.session, url, id_key)

        if raw:
            return credentials
        else:
            object_key = lambda credential: credential["registered_credentials"]
            return hydrate(RegisteredCredentials, credentials, key=object_key)

    def delete_credentials(self, name):
        """Deletes stored credentials by name.

        This is only available to Admin users. However all users in the account may use a stored credential name
        to provide write access when ordering data.

        Parameters
        ----------
        name : str
            Name of the credential object

        """
        url = self.admin_url("credentials", name)
        return self.session.delete(url)

    #####
    # Monitors
    #####

    def list_monitors(self, raw=False):
        """Retrieves all monitors for an account.

        Parameters
        ----------
        raw : bool (optional)
            Return the response JSON

        Returns
        -------
        List of Monitors
            List of object models of monitors
        """

        id_key = lambda monitor: monitor["monitor_id"]

        url = self.admin_url("monitor")
        monitors = paginated_response(self.session, url, id_key)

        if raw:
            return monitors
        else:
            return [Monitor.from_response(monitor) for monitor in monitors]

Instance variables

var properties
Expand source code
@property
def properties(self):
    return self.session.get(self.admin_url()).json()["account"]

Methods

def add_credentials(self, name, credentials, description='')

Saves credentials in the Crendentials API

The stored credentials key (SAS Url for Azure or Base64-encoded Credentials JSON for GCS) is never returned. Only the name, description, and other metadata about the credentials object is provided.

This is only available to Admin users. However all users in the account may use a stored credential name to provide write access when ordering data.

Parameters

name : str
Name of the stored credentials, should have URL-friendly characters (no spaces)
credentials : str
Credentials secret to store (SAS Url for Azure or Base64 Credentials JSON for GCS). If this is a path to a GCS JSON file, it will be read and Base64 encoded.
description : str (optional)
Description of the credentials

Returns

dict
API response JSON dictionary
Expand source code
def add_credentials(self, name, credentials, description=""):
    """Saves credentials in the Crendentials API

    The stored credentials key (SAS Url for Azure or Base64-encoded Credentials JSON for GCS) is never returned.
    Only the name, description, and other metadata about the credentials object is provided.

    This is only available to Admin users. However all users in the account may use a stored credential name
    to provide write access when ordering data.

    Parameters
    ----------
    name : str
        Name of the stored credentials, should have URL-friendly characters (no spaces)
    credentials : str
        Credentials secret to store (SAS Url for Azure or Base64 Credentials JSON for GCS).
        If this is a path to a GCS JSON file, it will be read and Base64 encoded.
    description : str (optional)
        Description of the credentials

    Returns
    -------
    dict
        API response JSON dictionary
    """

    if urllib.parse.quote(name) != name:
        raise ValueError(
            "Stored credential names should not use characters that require URL-encoding"
        )
    url = self.admin_url("credentials", name)

    try:
        with open(credentials, "rb") as binary_file:
            encoded = base64.b64encode(binary_file.read())
            credentials = encoded.decode("ascii")

    except (FileNotFoundError, OSError):
        pass

    payload = {"credentials": credentials, "description": description}
    response = self.session.put(url, json=payload)

    if response.status_code == 204:
        return None
    else:
        return response.json()
def admin_url(self, *args)
Expand source code
def admin_url(self, *args):
    return ard_url("admin", "account", self.account_id, *args)
def delete_credentials(self, name)

Deletes stored credentials by name.

This is only available to Admin users. However all users in the account may use a stored credential name to provide write access when ordering data.

Parameters

name : str
Name of the credential object
Expand source code
def delete_credentials(self, name):
    """Deletes stored credentials by name.

    This is only available to Admin users. However all users in the account may use a stored credential name
    to provide write access when ordering data.

    Parameters
    ----------
    name : str
        Name of the credential object

    """
    url = self.admin_url("credentials", name)
    return self.session.delete(url)
def get_account_usage(self, start_date=None, end_date=None)

Get account-level data usage

Parameters

start_date : str (optional)
Start date to calculate usage
start_date : str (optional)
End date to calculate usage

Returns

AdminUsage
Object model of data usage
Expand source code
def get_account_usage(self, start_date=None, end_date=None):
    """Get account-level data usage

    Parameters
    ----------
    start_date : str (optional)
        Start date to calculate usage
    start_date : str (optional)
        End date to calculate usage

    Returns
    -------
    AdminUsage
        Object model of data usage
    """

    url = ard_url("usage", "account", self.account_id)

    self._validate_dates(start_date, end_date)

    params = {"start_date": start_date, "end_date": end_date}

    r = self.session.get(url, params=params)
    return AdminUsage(**r.json()["usage"])
def get_credentials(self, name, raw=False)

Retrieves information about stored credentials.

The stored credentials key (SAS Url for Azure or Credentials JSON for GCS) is never returned, only the name, description, and other metadata about the credentials object.

This is only available to Admin users. However all users in the account may use a stored credential name to provide write access when ordering data.

Parameters

name : str
Name of the stored credentials
raw : bool (optional)
Return the response JSON

Returns

RegisteredCredentials
Object model of credentials
Expand source code
def get_credentials(self, name, raw=False):
    """Retrieves information about stored credentials.

    The stored credentials key (SAS Url for Azure or Credentials JSON for GCS) is never returned,
    only the name, description, and other metadata about the credentials object.

    This is only available to Admin users. However all users in the account may use a stored credential name
    to provide write access when ordering data.

    Parameters
    ----------
    name : str
        Name of the stored credentials
    raw : bool (optional)
        Return the response JSON

    Returns
    -------
    RegisteredCredentials
        Object model of credentials
    """

    url = self.admin_url("credentials", name)
    response = self.session.get(url)
    if raw:
        return response.json()
    else:
        return RegisteredCredentials(**response.json()["registered_credentials"])
def get_user_usage(self, username=None, start_date=None, end_date=None)

Get user-level data usage

Parameters

username : str (optional)
User name to calculate usage for, if not provided the current user is used
start_date : str (optional)
Start date to calculate usage
end_date : str (optional)
End date to calculate usage

Returns

AdminUsage
Object model of data usage
Expand source code
def get_user_usage(self, username=None, start_date=None, end_date=None):
    """Get user-level data usage

    Parameters
    ----------
    username : str (optional)
        User name to calculate usage for, if not provided the current user is used
    start_date : str (optional)
        Start date to calculate usage
    end_date : str (optional)
        End date to calculate usage

    Returns
    -------
    AdminUsage
        Object model of data usage
    """

    if not username:
        username = get_self(self.session)["user"]["user_id"]

    url = ard_url("usage", "user", username)

    self._validate_dates(start_date, end_date)

    params = {"start_date": start_date, "end_date": end_date}

    r = self.session.get(url, params=params)
    return AdminUsage(**r.json()["usage"])
def list_credentials(self, raw=False)

Retrieves all stored credentials in an account.

The stored credentials key (SAS Url for Azure or Credentials JSON for GCS) is never returned, only the name, description, and other metadata about the credentials object.

This is only available to Admin users. However all users in the account may use a stored credential name to provide write access when ordering data.

Parameters

raw : bool (optional)
Return the response JSON

Returns

List of RegisteredCredentials
List of object models of credentials
Expand source code
def list_credentials(self, raw=False):
    """Retrieves all stored credentials in an account.

    The stored credentials key (SAS Url for Azure or Credentials JSON for GCS) is never returned,
    only the name, description, and other metadata about the credentials object.

    This is only available to Admin users. However all users in the account may use a stored credential name
    to provide write access when ordering data.

    Parameters
    ----------
    raw : bool (optional)
        Return the response JSON

    Returns
    -------
    List of RegisteredCredentials
        List of object models of credentials
    """

    # responses can be paginated
    id_key = lambda credential: credential["registered_credentials"]["credentials_id"]
    url = self.admin_url("credentials")

    credentials = paginated_response(self.session, url, id_key)

    if raw:
        return credentials
    else:
        object_key = lambda credential: credential["registered_credentials"]
        return hydrate(RegisteredCredentials, credentials, key=object_key)
def list_monitors(self, raw=False)

Retrieves all monitors for an account.

Parameters

raw : bool (optional)
Return the response JSON

Returns

List of Monitors
List of object models of monitors
Expand source code
def list_monitors(self, raw=False):
    """Retrieves all monitors for an account.

    Parameters
    ----------
    raw : bool (optional)
        Return the response JSON

    Returns
    -------
    List of Monitors
        List of object models of monitors
    """

    id_key = lambda monitor: monitor["monitor_id"]

    url = self.admin_url("monitor")
    monitors = paginated_response(self.session, url, id_key)

    if raw:
        return monitors
    else:
        return [Monitor.from_response(monitor) for monitor in monitors]
class Order (acquisitions=None, select_id=None, destination=None, output_config=None, settings={}, intersects=None, bbox=None, role_arn=None, dry_run=False, bba=False, metadata={}, session=None)

An ARD API Order object

Parameters

acquisitions : iterable of str or dict, optional
An iterable of acquisitions to order, see Notes
select_id : str
A Select ID to order, see Notes
destination:
For S3 locations only, a path to storage location such as s3://my-bucket/my-prefix
output_config:
If not using destination, an output configuration dictionary, see API documentation for examples
intersects : Geometry-like objects, str (optional))
Geometry to intersect, can be most geometry or spatial objects, or a path (see Notes)
bbox : interable of numeric, optional
Like intersects, a bounding box in WGS84 coordinates, [XMIN YMIN XMAX YMAX]
role_arn : str, optional
A trusted Role ARN for the writer to assume so it can write tiles. This is not used if the s3 bucket policy allows writing tiles to the bucket.
dry_run : bool, optional
When true, runs pre-order checks to check if order is valid but does not generate imagery
bba : bool, optional
When true, Block Bundle Adjustment will be applied to the order.
metadata : dict, optional
User-supplied metadata
settings : dict, optional
Dictionary of settings to override outputs, see Notes
session : max_ard.session object, optional
A user_session or application_session object

Attributes

session : max_ard.session object or None
A user_session or application_session object
dry_run : bool
as above
submitted : bool
True if Select has been submitted via Order.submit()
request : Pydantic model
Parameters are loaded into a Pydantic model of the HTTP API request
response : Pydantic model
Pydantic model of the server response to API call. Status and submit calls return the same payload.

Notes

An order must specify acquisitions or select_id.

If ordering by select ID, do not include acquisition IDs or an AOI in the order request.

Acquisitions can be a list of acqusitions IDs, or a a list of dictionaries, with the keys id and cells. If no cells are specified, it is assumed all cells are wanted (subject to clipping by an AOI or BBOX):

acquisitions=["103001009E8G3C90"]

or

acquisitions=[ { "id": "103001007B478000", "cells": ["Z17-031313123113", "Z17-031313123112"] }, { "id": "103001009E8C7C00", "cells": ["Z17-031313123113"] }, { "id": "103001009E8G3C90" } ]

Intersects inputs: Geometry objects: Shapely shapes, objects supporting geo_interface, geojson-like dicts, geojson and wkt strings Geometry iterables: iterables of above, Fiona readers File paths: most spatial file formats. WKT and Geojson supported with base install, other formats require Fiona for reading

Settings dictionary defaults: You can override one or more of the following values:

{
    "bundle_adjust": false,
    "cloud_mask": true,
    "data_mask": true,
    "healthy_vegetation_mask": true,
    "ms_analytic": true,
    "ms_saturation_mask": true,
    "pan_analytic": true,
    "pan_flare_mask": true,
    "terrain_shadow_mask": true,
    "visual": true,
    "water_mask": true
}
Expand source code
class Order(Submitted):
    """An ARD API Order object

    Parameters
    ----------
    acquisitions : iterable of str or dict, optional
        An iterable of acquisitions to order, see Notes
    select_id : str
        A Select ID to order, see Notes
    destination:
        For S3 locations only, a path to storage location such as s3://my-bucket/my-prefix
    output_config:
        If not using `destination`, an output configuration dictionary, see API documentation for examples
    intersects : Geometry-like objects, str (optional))
        Geometry to intersect, can be most geometry or spatial objects, or a path (see Notes)
    bbox : interable of numeric, optional
        Like `intersects`, a bounding box in WGS84 coordinates, [XMIN YMIN XMAX YMAX]
    role_arn : str, optional
        A trusted Role ARN for the writer to assume so it can write tiles.
        This is not used if the s3 bucket policy allows writing tiles to the bucket.
    dry_run : bool, optional
        When true, runs pre-order checks to check if order is valid but does not generate imagery
    bba : bool, optional
        When true, Block Bundle Adjustment will be applied to the order.
    metadata : dict, optional
        User-supplied metadata
    settings : dict, optional
        Dictionary of settings to override outputs, see Notes
    session : session object, optional
        A user_session or application_session object

    Attributes
    ----------
    session : session object or None
        A user_session or application_session object
    dry_run : bool
        as above
    submitted : bool
        True if Select has been submitted via `max_ard.order.Order.submit`
    request : Pydantic model
        Parameters are loaded into a Pydantic model of the HTTP API request
    response : Pydantic model
        Pydantic model of the server response to API call. Status and submit calls
        return the same payload.

    Notes
    -----
    An order must specify `acquisitions` or `select_id`.

    If ordering by select ID, do not include acquisition IDs or an AOI in the order request.

    Acquisitions can be a list of acqusitions IDs, or a a list of dictionaries, with the keys
    `id` and `cells`. If no cells are specified, it is assumed all cells are wanted
    (subject to clipping by an AOI or BBOX):


      acquisitions=["103001009E8G3C90"]

      or

      acquisitions=[
        {
            "id": "103001007B478000",
            "cells": ["Z17-031313123113", "Z17-031313123112"]
        },
        {
            "id": "103001009E8C7C00",
            "cells": ["Z17-031313123113"]
        },
        {
            "id": "103001009E8G3C90"
        }
     ]

    Intersects inputs:
        Geometry objects: Shapely shapes, objects supporting __geo_interface__, geojson-like dicts,
            geojson and wkt strings
        Geometry iterables: iterables of above, Fiona readers
        File paths: most spatial file formats. WKT and Geojson supported with base install, other formats
            require Fiona for reading



    Settings dictionary defaults:
        You can override one or more of the following values:

        {
            "bundle_adjust": false,
            "cloud_mask": true,
            "data_mask": true,
            "healthy_vegetation_mask": true,
            "ms_analytic": true,
            "ms_saturation_mask": true,
            "pan_analytic": true,
            "pan_flare_mask": true,
            "terrain_shadow_mask": true,
            "visual": true,
            "water_mask": true
        }"""

    def __init__(
        self,
        acquisitions=None,
        select_id=None,
        destination=None,
        output_config=None,
        settings={},
        intersects=None,
        bbox=None,
        role_arn=None,
        dry_run=False,
        bba=False,
        metadata={},
        session=None,
    ):

        self.session = session or get_user_session()

        # check booleans aren't strings
        if type(dry_run) is not bool:
            raise ValueError(f"Parameter `dry_run` needs to be a boolean, not {type(dry_run)}")
        if type(bba) is not bool:
            raise ValueError(f"Parameter `bba` needs to be a boolean, not {type(bba)}")

        # bba to be moved to `settings` instead of top level param
        if "bundle_adjust" not in settings:
            settings["bundle_adjust"] = bba

        # set up the output config
        if destination is not None:
            destination = destination.replace("s3://", "")
            parts = destination.split("/", 1)
            bucket = parts[0]
            try:
                prefix = parts[1]
            except IndexError:
                prefix = ""
            # old style output config
            destination = {"bucket": bucket, "prefix": prefix, "role_arn": role_arn}
            output_config = self._validate_output({"destination": destination})

        else:
            if output_config is not None:
                output_config = self._validate_output(output_config)

        # set up acquisitions
        if type(acquisitions) == str:
            acquisitions = [Acquisition(id=acquisitions)]
        elif type(acquisitions) == list or type(acquisitions) == tuple:
            if len(acquisitions) > 0:
                if type(acquisitions[0]) == str:
                    acquisitions = [Acquisition(id=v) for v in acquisitions]
        elif type(acquisitions) == dict:
            acquisitions = [Acquisition(id=k, cells=v) for k, v in acquisitions.items()]
        elif isinstance(acquisitions, BaseCollection):
            acquisitions = [Acquisition(**acq) for acq in acquisitions.as_order()]

        # if the intersects is a Shapely geom, convert it to wkt
        if intersects is not None:
            intersects = convert_to_shapely(intersects).wkt

        self.request = OrderRequest(
            acquisitions=acquisitions,
            select_id=select_id,
            intersects=intersects,
            bbox=bbox,
            output_config=output_config,
            settings=settings,
            dry_run=dry_run,
            metadata=metadata,
        )
        self.dry_run = dry_run
        self.response = None

    def _validate_output(self, output_config):

        platforms = {
            "azure_blob_storage": AZ_Config,
            "google_cloud_storage": GCS_Config,
            "amazon_s3": S3_Config,
            "destination": Output_Config,
        }
        platform = list(output_config.keys())[0]
        config_model = platforms.get(platform, None)
        if config_model is None:
            raise ValueError(
                "Output config format not recognized, please see documentation for examples"
            )
        params = output_config[platform].copy()
        # validation on object creation
        config_obj = config_model(**params)

        params = {k: v for k, v in config_obj.dict().items() if v is not None}

        # convert credential files to b64 encoding
        if platform == "google_cloud_storage":
            if "credentials_id" not in params:
                location = params["service_credentials"]

                try:
                    with open(location, "rb") as binary_file:
                        encoded = base64.b64encode(binary_file.read())
                        params["service_credentials"] = encoded.decode("ascii")
                except (FileNotFoundError, OSError):
                    try:
                        creds = base64.b64decode(location)
                        assert "type" in json.loads(creds)
                    except:
                        raise ValueError(
                            "GCS service credentials should be either a Base64-encoded string"
                            + " of the JSON credentials file contents or a path to the JSON credentials file"
                        )
            else:
                if "service_credentials" in params:
                    c_id = params["credentials_id"]
                    warnings.warn(
                        "Both a stored credential ID and service credential argument were provided. "
                        + f'The store credential ID "{c_id}" will be used'
                    )
                    del params["service_credentials"]

        if platform != "destination":
            return {platform: params}
        else:
            return params

    def add_email_notification(self, address):
        """Add an email notification to the order

        Parameters
        ----------
        address : str
            Email address to receive order notifications"""
        self.request.notifications.append(EmailNotification(address=address))

    def add_sns_notification(self, topic_arn):
        """Add an AWS SNS notification topic to receive order notifications

        Parameters
        ----------
        topic_arn : str
            AWS SNS topic ARN to recieve order notifications"""
        self.request.notifications.append(SNSNotification(topic_arn=topic_arn))

    def __repr__(self):
        try:
            return f"<Order {self.order_id} ({self.status})>"
        except NotSubmitted:
            return "<Order (Not submitted)>"

    @property
    def submitted(self):
        if self.response:
            return self.response.id is not None
        else:
            return False

    @property
    @Submitted.required
    def finished(self):
        """The Order finished processing but may have failed"""

        return self.status != "RUNNING"

    @property
    @Submitted.required
    def running(self):
        """The Order is running"""

        return self.status == "RUNNING"

    @property
    @Submitted.required
    def succeeded(self):
        """The Order has finished running and has succeeded"""

        return self.status == "SUCCEEDED"

    @property
    @Submitted.required
    def usage(self):
        if self.response.order is None:
            # just submitted, refresh the order status
            response = self.get_order(self.order_id)
            self.response = OrderResponse(**response)
        return self.response.order.usage

    @property
    @Submitted.required
    def failed(self):
        """The Order has finished running but failed"""

        return self.status in ["FAILED", "ERROR"]

    @property
    @Submitted.required
    def order_id(self):
        """The Order ID"""

        return self.response.id

    @property
    @Submitted.required
    def status(self):
        """State of the order process: 'RUNNING', 'SUCCEEDED, or 'FAILED'"""

        if self.response.status == "RUNNING":
            response = self.get_order(self.order_id)
            self.response = OrderResponse(**response)
        return self.response.status

    @property
    def state(self):
        """Legacy version of `status`, will be deprecated in the future"""
        return self.status

    @classmethod
    def from_id(cls, order_id, session=None):
        """Create a Order object from an ID

        Parameters
        ----------
        order_id: str
            Order ID to hydrate into a Order object
        session : Session object, optional
            Authenticated session, such as from get_client_session()

        Returns
        -------
        Order"""

        response = cls.get_order(order_id, session)
        instance = cls()
        instance.response = OrderResponse(**response)
        return instance

    @classmethod
    def list_orders(
        cls,
        limit=None,
        starting_after="",
        ending_before="",
        start_date="",
        end_date="",
        filter=None,
        session=None,
    ):
        """Fetch user's orders

        Parameters
        ----------
        limit: int or None, default None
            maximum number of orders to fetch, None (default) means unlimited
        starting_after: str
            the order_id after which further responses will be returned, paging forward
        ending_before: str
            the order_id before which further responses will be returned, paging backwards
        start_date: str
            starting date to filter, ISO-8601 YYYY-MM-DD
        start_date: str
            ending date to filter, ISO-8601 YYYY-MM-DD
        filter: str
            filter results that match values contained in the given key separated by a colon.
            Example: 'metadata.downstream_customer_id:abdc-534-b4dc47'
        session: Session object, optional
            Authenticated session, such as from get_client_session()

        Returns
        -------
        list
            Order objects matching parameters"""

        session = session or get_user_session()

        params = {
            "starting_after": starting_after,
            "ending_before": ending_before,
            "start_date": start_date,
            "end_date": end_date,
        }

        if filter is not None:
            params["filter"] = filter

        key = lambda order: order["id"]
        responses = paginated_response(session, ard_url("order"), key, limit, **params)

        return hydrate_with_responses(Order, OrderResponse, responses, session=session)

    @classmethod
    def get_order(cls, order_id, session=None):
        """Fetch raw data about an Order from an ID

        Parameters
        ----------
        order_id : str
            Order ID to fetch metadata for
        session: Session object, optional
            Authenticated session, such as from get_client_session()

        Returns
        -------
        dict
            API data for the given Order"""

        if not session:
            session = get_user_session()

        r = session.get(ard_url("order", "status", order_id))
        return r.json()

    @classmethod
    def send_order(cls, payload, session=None):
        """Send a request to the Order API

        Parameters
        ----------
        payload : dict
            Order API request payload
        session : Session object, optional
            Authenticated session, such as from get_client_session()

        Returns
        -------
        dict
            API response data for the given Order"""

        if not session:
            session = get_user_session()
        r = session.post(ard_url("order"), json=payload)
        return r.json()

    def submit(self):
        """Submit this Order to the API"""

        response = self.send_order(self.request.to_payload(), session=self.session)
        if self.submitted:
            warnings.warn("The Order has already been submitted")
            return
        if self.dry_run:
            self.response = OrderResponse(
                id="dry run",
                status="SUCCEEDED",
                order={"usage": response["usage"]},
                status_message="Order dry-run validation successful.",
            )
        else:
            self.response = OrderResponse(
                id=response["id"], status="RUNNING", status_message="Order submitted and running."
            )

Ancestors

Static methods

def from_id(order_id, session=None)

Create a Order object from an ID

Parameters

order_id : str
Order ID to hydrate into a Order object
session : Session object, optional
Authenticated session, such as from get_client_session()

Returns

Order
 
Expand source code
@classmethod
def from_id(cls, order_id, session=None):
    """Create a Order object from an ID

    Parameters
    ----------
    order_id: str
        Order ID to hydrate into a Order object
    session : Session object, optional
        Authenticated session, such as from get_client_session()

    Returns
    -------
    Order"""

    response = cls.get_order(order_id, session)
    instance = cls()
    instance.response = OrderResponse(**response)
    return instance
def get_order(order_id, session=None)

Fetch raw data about an Order from an ID

Parameters

order_id : str
Order ID to fetch metadata for
session : Session object, optional
Authenticated session, such as from get_client_session()

Returns

dict
API data for the given Order
Expand source code
@classmethod
def get_order(cls, order_id, session=None):
    """Fetch raw data about an Order from an ID

    Parameters
    ----------
    order_id : str
        Order ID to fetch metadata for
    session: Session object, optional
        Authenticated session, such as from get_client_session()

    Returns
    -------
    dict
        API data for the given Order"""

    if not session:
        session = get_user_session()

    r = session.get(ard_url("order", "status", order_id))
    return r.json()
def list_orders(limit=None, starting_after='', ending_before='', start_date='', end_date='', filter=None, session=None)

Fetch user's orders

Parameters

limit : int or None, default None
maximum number of orders to fetch, None (default) means unlimited
starting_after : str
the order_id after which further responses will be returned, paging forward
ending_before : str
the order_id before which further responses will be returned, paging backwards
start_date : str
starting date to filter, ISO-8601 YYYY-MM-DD
start_date : str
ending date to filter, ISO-8601 YYYY-MM-DD
filter : str
filter results that match values contained in the given key separated by a colon. Example: 'metadata.downstream_customer_id:abdc-534-b4dc47'
session : Session object, optional
Authenticated session, such as from get_client_session()

Returns

list
Order objects matching parameters
Expand source code
@classmethod
def list_orders(
    cls,
    limit=None,
    starting_after="",
    ending_before="",
    start_date="",
    end_date="",
    filter=None,
    session=None,
):
    """Fetch user's orders

    Parameters
    ----------
    limit: int or None, default None
        maximum number of orders to fetch, None (default) means unlimited
    starting_after: str
        the order_id after which further responses will be returned, paging forward
    ending_before: str
        the order_id before which further responses will be returned, paging backwards
    start_date: str
        starting date to filter, ISO-8601 YYYY-MM-DD
    start_date: str
        ending date to filter, ISO-8601 YYYY-MM-DD
    filter: str
        filter results that match values contained in the given key separated by a colon.
        Example: 'metadata.downstream_customer_id:abdc-534-b4dc47'
    session: Session object, optional
        Authenticated session, such as from get_client_session()

    Returns
    -------
    list
        Order objects matching parameters"""

    session = session or get_user_session()

    params = {
        "starting_after": starting_after,
        "ending_before": ending_before,
        "start_date": start_date,
        "end_date": end_date,
    }

    if filter is not None:
        params["filter"] = filter

    key = lambda order: order["id"]
    responses = paginated_response(session, ard_url("order"), key, limit, **params)

    return hydrate_with_responses(Order, OrderResponse, responses, session=session)
def send_order(payload, session=None)

Send a request to the Order API

Parameters

payload : dict
Order API request payload
session : Session object, optional
Authenticated session, such as from get_client_session()

Returns

dict
API response data for the given Order
Expand source code
@classmethod
def send_order(cls, payload, session=None):
    """Send a request to the Order API

    Parameters
    ----------
    payload : dict
        Order API request payload
    session : Session object, optional
        Authenticated session, such as from get_client_session()

    Returns
    -------
    dict
        API response data for the given Order"""

    if not session:
        session = get_user_session()
    r = session.post(ard_url("order"), json=payload)
    return r.json()

Instance variables

var failed

The Order has finished running but failed

Expand source code
@property
@Submitted.required
def failed(self):
    """The Order has finished running but failed"""

    return self.status in ["FAILED", "ERROR"]
var finished

The Order finished processing but may have failed

Expand source code
@property
@Submitted.required
def finished(self):
    """The Order finished processing but may have failed"""

    return self.status != "RUNNING"
var order_id

The Order ID

Expand source code
@property
@Submitted.required
def order_id(self):
    """The Order ID"""

    return self.response.id
var running

The Order is running

Expand source code
@property
@Submitted.required
def running(self):
    """The Order is running"""

    return self.status == "RUNNING"
var state

Legacy version of status, will be deprecated in the future

Expand source code
@property
def state(self):
    """Legacy version of `status`, will be deprecated in the future"""
    return self.status
var status

State of the order process: 'RUNNING', 'SUCCEEDED, or 'FAILED'

Expand source code
@property
@Submitted.required
def status(self):
    """State of the order process: 'RUNNING', 'SUCCEEDED, or 'FAILED'"""

    if self.response.status == "RUNNING":
        response = self.get_order(self.order_id)
        self.response = OrderResponse(**response)
    return self.response.status
var submitted
Expand source code
@property
def submitted(self):
    if self.response:
        return self.response.id is not None
    else:
        return False
var succeeded

The Order has finished running and has succeeded

Expand source code
@property
@Submitted.required
def succeeded(self):
    """The Order has finished running and has succeeded"""

    return self.status == "SUCCEEDED"
var usage
Expand source code
@property
@Submitted.required
def usage(self):
    if self.response.order is None:
        # just submitted, refresh the order status
        response = self.get_order(self.order_id)
        self.response = OrderResponse(**response)
    return self.response.order.usage

Methods

def add_email_notification(self, address)

Add an email notification to the order

Parameters

address : str
Email address to receive order notifications
Expand source code
def add_email_notification(self, address):
    """Add an email notification to the order

    Parameters
    ----------
    address : str
        Email address to receive order notifications"""
    self.request.notifications.append(EmailNotification(address=address))
def add_sns_notification(self, topic_arn)

Add an AWS SNS notification topic to receive order notifications

Parameters

topic_arn : str
AWS SNS topic ARN to recieve order notifications
Expand source code
def add_sns_notification(self, topic_arn):
    """Add an AWS SNS notification topic to receive order notifications

    Parameters
    ----------
    topic_arn : str
        AWS SNS topic ARN to recieve order notifications"""
    self.request.notifications.append(SNSNotification(topic_arn=topic_arn))
def submit(self)

Submit this Order to the API

Expand source code
def submit(self):
    """Submit this Order to the API"""

    response = self.send_order(self.request.to_payload(), session=self.session)
    if self.submitted:
        warnings.warn("The Order has already been submitted")
        return
    if self.dry_run:
        self.response = OrderResponse(
            id="dry run",
            status="SUCCEEDED",
            order={"usage": response["usage"]},
            status_message="Order dry-run validation successful.",
        )
    else:
        self.response = OrderResponse(
            id=response["id"], status="RUNNING", status_message="Order submitted and running."
        )
class Select (acq_ids=None, datetime=None, intersects=None, bbox=None, query={}, stack_depth=None, image_age_category=None, session=None)

An ARD API Select object

Parameters

acq_ids : iterable of str
An iterable of acquisition IDs to search for
datetime : str
Date or date range string
intersects : Geometry-like objects, str (optional))
Geometry to intersect, can be most geometry or spatial objects, or a path (see Notes)
bbox : interable of numeric
Bounding box in WGS84 coordinates, [west, south, east, north]
query : dict
Query dictionary
stack_depth : int or None, optional
Maximum number of tiles to return
image_age_category : iterable of str, optional
One or
session : max_ard.session object, optional
A user_session or application_session object

Attributes

session : max_ard.session object or None
A user_session or application_session object
submitted : bool
True if Select has been submitted via Select.submit()
request : Pydantic model
Parameters are loaded into a Pydantic model of the HTTP API request
response : Pydantic model
Pydantic model of the server response to API call. Status and submit calls return the same payload

Notes

Intersects inputs: Geometry objects: Shapely shapes, objects supporting geo_interface, geojson-like dicts, geojson and wkt strings Geometry iterables: iterables of above, Fiona readers File paths: most spatial file formats. WKT and Geojson supported with base install, other formats require Fiona for reading

Expand source code
class Select(Succeeded, Submitted):
    """An ARD API Select object

    Parameters
    ----------
    acq_ids : iterable of str
        An iterable of acquisition IDs to search for
    datetime : str
        Date or date range string
    intersects : Geometry-like objects, str (optional))
        Geometry to intersect, can be most geometry or spatial objects, or a path (see Notes)
    bbox : interable of numeric
        Bounding box in WGS84 coordinates, [west, south, east, north]
    query : dict
        Query dictionary
    stack_depth: int or None, optional
        Maximum number of tiles to return
    image_age_category : iterable of str, optional
        One or
    session : session object, optional
        A user_session or application_session object

    Attributes
    ----------
    session : session object or None
        A user_session or application_session object
    submitted : bool
        True if Select has been submitted via `max_ard.select.Select.submit`
    request : Pydantic model
        Parameters are loaded into a Pydantic model of the HTTP API request
    response : Pydantic model
        Pydantic model of the server response to API call. Status and submit calls
        return the same payload


    Notes
    -----

    Intersects inputs:
        Geometry objects: Shapely shapes, objects supporting __geo_interface__, geojson-like dicts,
            geojson and wkt strings
        Geometry iterables: iterables of above, Fiona readers
        File paths: most spatial file formats. WKT and Geojson supported with base install, other formats
            require Fiona for reading"""

    # store an authenticated Requests session in the class

    def __init__(
        self,
        acq_ids=None,
        datetime=None,
        intersects=None,
        bbox=None,
        query={},
        stack_depth=None,
        image_age_category=None,
        session=None,
    ):

        self.session = session or get_user_session()

        if intersects is not None:
            intersects = convert_to_shapely(intersects).wkt

        self.request = SelectRequest(
            ids=acq_ids,
            datetime=datetime,
            intersects=intersects,
            bbox=bbox,
            stack_depth=stack_depth,
            query=query,
            image_age_category=image_age_category,
        )
        self.response = None

    @property
    def submitted(self):
        if self.response:
            return self.response.id is not None
        else:
            return False

    @property
    @Submitted.required
    def running(self):
        """The Select is currently running"""

        return self.status == "RUNNING"

    @property
    @Submitted.required
    def finished(self):
        """The Select has finished running but may have failed"""

        return self.status != "RUNNING"

    @property
    @Submitted.required
    def succeeded(self):
        """The Select has finished running and has succeeded"""

        return self.status == "SUCCEEDED"

    @property
    @Submitted.required
    def failed(self):
        """The Select has finished running but has failed"""

        return self.status in ["FAILED", "ERROR"]

    @Submitted.required
    def wait_for_success(self, interval: int = 5) -> None:
        """Wait for the Select to succeed

        Parameters
        ----------
        interval: numeric, optional
            polling interval for success, default is 5 secs

        Raises
        ------
        SelectError
            An error in the selection caused it to fail"""

        while self.status == "RUNNING":
            sleep(interval)
        if self.status == "FAILED":
            error = self.response.error_message
            msg = f'{error["Error"]}: {error["Cause"]}'
            raise SelectError(msg)

    @property
    @Submitted.required
    def status(self):
        """Status of the select process: 'RUNNING', 'FINISHED', or 'FAILED'"""
        if self.response.status == "RUNNING":
            response = self.get_select(self.select_id, self.session)
            self.response = SelectResponse(**response)
        return self.response.status

    @property
    def state(self):
        """Alternate name for `max_ard.Select.status`, will be deprecated"""
        return self.status

    @property
    @Submitted.required
    def select_id(self):
        """ID of the Select"""

        return self.response.id

    @property
    @Succeeded.required
    def usage(self):
        """Dictionary of data usage metrics"""

        return self.response.usage

    @classmethod
    def from_id(cls, select_id: str, session=None):
        """Create a Select object from an ID

        Parameters
        ----------
        select_id: str
            Select ID to hydrate into a Select object
        session : Session object, optional
            Authenticated session, such as from get_client_session()

        Returns
        -------
        Select"""

        if not session:
            session = get_user_session()
        instance = cls()
        instance.session = session
        response = cls.get_select(select_id, session)
        instance.response = SelectResponse(**response)
        if "request_details" in response:
            instance.request = SelectRequest(**response["request_details"])
        return instance

    @classmethod
    def get_select(cls, select_id, session=None):
        """Fetch raw data about a Select from an ID

        Parameters
        ----------
        select_id : str
            Select ID to fetch metadata for
        session: Session object, optional
            Authenticated session, such as from get_client_session()

        Returns
        -------
        dict
            API data for the given Select"""

        if not session:
            session = get_user_session()

        r = session.get(ard_url("select", "request", select_id))
        return r.json()

    @classmethod
    def send_select(cls, payload, session=None):
        """Send a request to the Select API

        Parameters
        ----------
        payload : dict
            Select API request payload
        session : Session object, optional
            Authenticated session, such as from get_client_session()

        Returns
        -------
        dict
            API response data for the given Select"""

        if not session:
            session = get_user_session()
        r = session.post(ard_url("select"), json=payload)
        return r.json()

    def submit(self):
        """Submit this Select to the API"""

        response = self.send_select(self.request.to_payload(), self.session)
        if self.submitted:
            warnings.warn("The Select has already been submitted")
            return
        self.response = SelectResponse(**response)
        # update request in case defaults were applied
        self.request = SelectRequest(**self.response.request_details)

    @lru_cache()
    @Succeeded.required
    def get_link_contents(self, name):
        """Get the contents of an Select result file via its signed link

        Parameters
        ----------
        name : str
            The Select result file name

        Returns
        -------
        str
            Link contents"""

        temp_url = self.get_signed_link(name)
        # unauthenticated, don't send token
        r = requests.get(temp_url)
        r.raise_for_status()
        return r.text

    @Succeeded.required
    def get_signed_link(self, name):
        """Get the signed link for a Select result file

        Parameters
        ----------
        name : str
            The Select result file name

        Returns
        -------
        str
            signed URL"""

        url = self.response.links[name]
        r = self.session.get(url)
        r.raise_for_status()
        return r.json()["download_link"]

    @Succeeded.required
    def copy_file(self, name, dir="."):
        """Copy a Select result file to a local location

        Parameters
        ----------
        name : str
            The Select result file name
        dir : str, optional
            Local directory location to copy to, file will retain its name"""

        # TODO get the output filename
        path = Path(dir, f"{self.select_id}.{name}")

        with open(path, "w") as out:
            file = self.get_link_contents(name)
            out.write(file)

    @property
    @lru_cache()
    @Succeeded.required
    def results(self):
        """The results of a select converted to Python objects"""

        return SelectResult.from_geojson(json.loads(self.get_link_contents("geojson")))

    def __repr__(self) -> str:
        if not self.submitted:
            return f"<ARD Select (unsubmitted)>"
        elif self.succeeded:
            return f"<ARD Select {self.select_id}>"
        else:
            return f"<ARD Select ({self.status})>"

Ancestors

Static methods

def from_id(select_id: str, session=None)

Create a Select object from an ID

Parameters

select_id : str
Select ID to hydrate into a Select object
session : Session object, optional
Authenticated session, such as from get_client_session()

Returns

Select
 
Expand source code
@classmethod
def from_id(cls, select_id: str, session=None):
    """Create a Select object from an ID

    Parameters
    ----------
    select_id: str
        Select ID to hydrate into a Select object
    session : Session object, optional
        Authenticated session, such as from get_client_session()

    Returns
    -------
    Select"""

    if not session:
        session = get_user_session()
    instance = cls()
    instance.session = session
    response = cls.get_select(select_id, session)
    instance.response = SelectResponse(**response)
    if "request_details" in response:
        instance.request = SelectRequest(**response["request_details"])
    return instance
def get_select(select_id, session=None)

Fetch raw data about a Select from an ID

Parameters

select_id : str
Select ID to fetch metadata for
session : Session object, optional
Authenticated session, such as from get_client_session()

Returns

dict
API data for the given Select
Expand source code
@classmethod
def get_select(cls, select_id, session=None):
    """Fetch raw data about a Select from an ID

    Parameters
    ----------
    select_id : str
        Select ID to fetch metadata for
    session: Session object, optional
        Authenticated session, such as from get_client_session()

    Returns
    -------
    dict
        API data for the given Select"""

    if not session:
        session = get_user_session()

    r = session.get(ard_url("select", "request", select_id))
    return r.json()
def send_select(payload, session=None)

Send a request to the Select API

Parameters

payload : dict
Select API request payload
session : Session object, optional
Authenticated session, such as from get_client_session()

Returns

dict
API response data for the given Select
Expand source code
@classmethod
def send_select(cls, payload, session=None):
    """Send a request to the Select API

    Parameters
    ----------
    payload : dict
        Select API request payload
    session : Session object, optional
        Authenticated session, such as from get_client_session()

    Returns
    -------
    dict
        API response data for the given Select"""

    if not session:
        session = get_user_session()
    r = session.post(ard_url("select"), json=payload)
    return r.json()

Instance variables

var failed

The Select has finished running but has failed

Expand source code
@property
@Submitted.required
def failed(self):
    """The Select has finished running but has failed"""

    return self.status in ["FAILED", "ERROR"]
var finished

The Select has finished running but may have failed

Expand source code
@property
@Submitted.required
def finished(self):
    """The Select has finished running but may have failed"""

    return self.status != "RUNNING"
var results

The results of a select converted to Python objects

Expand source code
@property
@lru_cache()
@Succeeded.required
def results(self):
    """The results of a select converted to Python objects"""

    return SelectResult.from_geojson(json.loads(self.get_link_contents("geojson")))
var running

The Select is currently running

Expand source code
@property
@Submitted.required
def running(self):
    """The Select is currently running"""

    return self.status == "RUNNING"
var select_id

ID of the Select

Expand source code
@property
@Submitted.required
def select_id(self):
    """ID of the Select"""

    return self.response.id
var state

Alternate name for Select.status, will be deprecated

Expand source code
@property
def state(self):
    """Alternate name for `max_ard.Select.status`, will be deprecated"""
    return self.status
var status

Status of the select process: 'RUNNING', 'FINISHED', or 'FAILED'

Expand source code
@property
@Submitted.required
def status(self):
    """Status of the select process: 'RUNNING', 'FINISHED', or 'FAILED'"""
    if self.response.status == "RUNNING":
        response = self.get_select(self.select_id, self.session)
        self.response = SelectResponse(**response)
    return self.response.status
var submitted
Expand source code
@property
def submitted(self):
    if self.response:
        return self.response.id is not None
    else:
        return False
var succeeded

The Select has finished running and has succeeded

Expand source code
@property
@Submitted.required
def succeeded(self):
    """The Select has finished running and has succeeded"""

    return self.status == "SUCCEEDED"
var usage

Dictionary of data usage metrics

Expand source code
@property
@Succeeded.required
def usage(self):
    """Dictionary of data usage metrics"""

    return self.response.usage

Methods

def copy_file(self, name, dir='.')

Copy a Select result file to a local location

Parameters

name : str
The Select result file name
dir : str, optional
Local directory location to copy to, file will retain its name
Expand source code
@Succeeded.required
def copy_file(self, name, dir="."):
    """Copy a Select result file to a local location

    Parameters
    ----------
    name : str
        The Select result file name
    dir : str, optional
        Local directory location to copy to, file will retain its name"""

    # TODO get the output filename
    path = Path(dir, f"{self.select_id}.{name}")

    with open(path, "w") as out:
        file = self.get_link_contents(name)
        out.write(file)

Get the contents of an Select result file via its signed link

Parameters

name : str
The Select result file name

Returns

str
Link contents
Expand source code
@lru_cache()
@Succeeded.required
def get_link_contents(self, name):
    """Get the contents of an Select result file via its signed link

    Parameters
    ----------
    name : str
        The Select result file name

    Returns
    -------
    str
        Link contents"""

    temp_url = self.get_signed_link(name)
    # unauthenticated, don't send token
    r = requests.get(temp_url)
    r.raise_for_status()
    return r.text

Get the signed link for a Select result file

Parameters

name : str
The Select result file name

Returns

str
signed URL
Expand source code
@Succeeded.required
def get_signed_link(self, name):
    """Get the signed link for a Select result file

    Parameters
    ----------
    name : str
        The Select result file name

    Returns
    -------
    str
        signed URL"""

    url = self.response.links[name]
    r = self.session.get(url)
    r.raise_for_status()
    return r.json()["download_link"]
def submit(self)

Submit this Select to the API

Expand source code
def submit(self):
    """Submit this Select to the API"""

    response = self.send_select(self.request.to_payload(), self.session)
    if self.submitted:
        warnings.warn("The Select has already been submitted")
        return
    self.response = SelectResponse(**response)
    # update request in case defaults were applied
    self.request = SelectRequest(**self.response.request_details)
def wait_for_success(self, interval: int = 5) ‑> None

Wait for the Select to succeed

Parameters

interval : numeric, optional
polling interval for success, default is 5 secs

Raises

SelectError
An error in the selection caused it to fail
Expand source code
@Submitted.required
def wait_for_success(self, interval: int = 5) -> None:
    """Wait for the Select to succeed

    Parameters
    ----------
    interval: numeric, optional
        polling interval for success, default is 5 secs

    Raises
    ------
    SelectError
        An error in the selection caused it to fail"""

    while self.status == "RUNNING":
        sleep(interval)
    if self.status == "FAILED":
        error = self.response.error_message
        msg = f'{error["Error"]}: {error["Cause"]}'
        raise SelectError(msg)
class SelectResult

The results of a Select or MetaSelect operation.

This object is converted from the GeoJSON FeatureCollections returned by Selects and MetaSelects into Python objects

Expand source code
class SelectResult(BaseCollection):
    """The results of a Select or MetaSelect operation.

    This object is converted from the GeoJSON FeatureCollections
    returned by Selects and MetaSelects into Python objects"""

    def __init__(self):
        super().__init__()

    @classmethod
    def from_geojson(cls, geojson):
        self = cls()
        for feature in geojson["features"]:
            if "best_matches" in feature["properties"]:
                for match in feature["properties"]["best_matches"]:
                    self.add_tile(SelectTile(match))
            else:
                try:
                    self.add_tile(SelectTile(feature["properties"]))
                except (KeyError, TypeError):
                    pass
        return self

    def __repr__(self) -> str:
        return (
            f"<SelectResult ({len(self.tiles)} tiles in {len(self.acquisitions)} acquisitions) >"
        )

Ancestors

Static methods

def from_geojson(geojson)
Expand source code
@classmethod
def from_geojson(cls, geojson):
    self = cls()
    for feature in geojson["features"]:
        if "best_matches" in feature["properties"]:
            for match in feature["properties"]["best_matches"]:
                self.add_tile(SelectTile(match))
        else:
            try:
                self.add_tile(SelectTile(feature["properties"]))
            except (KeyError, TypeError):
                pass
    return self

Inherited members