Skip to content

cfa-cloudops Modules API Reference

cfa.cloudops.auth

Helper functions for Azure authentication.

CredentialHandler dataclass

Data structure for Azure credentials.

Lazy and cached: credentials are retrieved from a keyvault only when needed and are cached thereafter.

Source code in cfa/cloudops/auth.py
@dataclass
class CredentialHandler:
    """Data structure for Azure credentials.

    Lazy and cached: credentials are retrieved from a keyvault only when needed
    and are cached thereafter.
    """

    azure_subscription_id: str = None
    azure_resource_group_name: str = None
    azure_user_assigned_identity: str = None
    azure_subnet_id: str = None

    azure_keyvault_endpoint: str = None
    azure_keyvault_sp_secret_id: str = None
    azure_tenant_id: str = None
    azure_sp_client_id: str = None
    azure_batch_endpoint_subdomain: str = (
        d.default_azure_batch_endpoint_subdomain
    )
    azure_batch_account: str = None
    azure_batch_location: str = "eastus"
    azure_batch_resource_url: str = d.default_azure_batch_resource_url
    azure_blob_storage_endpoint_subdomain: str = (
        d.default_azure_blob_storage_endpoint_subdomain
    )
    azure_blob_storage_account: str = None

    azure_container_registry_account: str = None
    azure_container_registry_domain: str = (
        d.default_azure_container_registry_domain
    )
    azure_federated_token_file: str = None
    method: str = None

    def require_attr(self, attributes: str | list[str], goal: str = None):
        """Check that attributes required for a given operation are defined.

        Raises an informative error message if the required attribute is not defined.

        Args:
            attributes: String or list of strings naming the required attribute(s).
            goal: String naming the value that the attributes are required for obtaining,
                to make error messages more informative. If None, use a more generic message.

        Raises:
            AttributeError: If any required ``attributes`` are None.

        Example:
            >>> handler = CredentialHandler()
            >>> handler.require_attr(["azure_tenant_id"], "authentication")
            AttributeError: A non-None value for attribute azure_tenant_id is required...
        """
        attributes = ensure_listlike(attributes)
        for attr in attributes:
            attr_val = getattr(self, attr)
            if attr_val is None:
                err_msg = (
                    f"A non-None value for attribute {attr} is required "
                ) + (
                    f"to obtain a value for {goal}."
                    if goal is not None
                    else "for this operation."
                )
                raise AttributeError(err_msg)

    @property
    def azure_batch_endpoint(self) -> str:
        """Azure batch endpoint URL.

        Constructed programmatically from account name, location, and subdomain.

        Returns:
            str: The endpoint URL.

        Example:
            >>> handler = CredentialHandler()
            >>> handler.azure_batch_account = "mybatchaccount"
            >>> handler.azure_batch_location = "eastus"
            >>> handler.azure_batch_endpoint_subdomain = "batch.azure.com"
            >>> handler.azure_batch_endpoint
            'https://mybatchaccount.eastus.batch.azure.com'
        """
        self.require_attr(
            [
                "azure_batch_account",
                "azure_batch_endpoint_subdomain",
            ],
            goal="Azure batch endpoint URL",
        )
        return construct_batch_endpoint(
            self.azure_batch_account,
            self.azure_batch_location,
            self.azure_batch_endpoint_subdomain,
        )

    @property
    def azure_blob_storage_endpoint(self) -> str:
        """Azure blob storage endpoint URL.

        Constructed programmatically from the account name and endpoint subdomain.

        Returns:
            str: The endpoint URL.

        Example:
            >>> handler = CredentialHandler()
            >>> handler.azure_blob_storage_account = "mystorageaccount"
            >>> handler.azure_blob_storage_endpoint_subdomain = "blob.core.windows.net"
            >>> handler.azure_blob_storage_endpoint
            'https://mystorageaccount.blob.core.windows.net'
        """
        self.require_attr(
            [
                "azure_blob_storage_account",
                "azure_blob_storage_endpoint_subdomain",
            ],
            goal="Azure blob storage endpoint URL",
        )
        return construct_blob_account_endpoint(
            self.azure_blob_storage_account,
            self.azure_blob_storage_endpoint_subdomain,
        )

    @property
    def azure_container_registry_endpoint(self) -> str:
        """Azure container registry endpoint URL.

        Constructed programmatically from the account name and registry domain.

        Returns:
            str: The endpoint URL.

        Example:
            >>> handler = CredentialHandler()
            >>> handler.azure_container_registry_account = "myregistry"
            >>> handler.azure_container_registry_domain = "azurecr.io"
            >>> handler.azure_container_registry_endpoint
            'myregistry.azurecr.io'
        """
        self.require_attr(
            [
                "azure_container_registry_account",
                "azure_container_registry_domain",
            ],
            goal="Azure container registry endpoint URL",
        )
        return construct_azure_container_registry_endpoint(
            self.azure_container_registry_account,
            self.azure_container_registry_domain,
        )

    @cached_property
    def user_credential(self) -> ManagedIdentityCredential:
        """Azure user credential.

        Returns:
            ManagedIdentityCredential: The Azure user credential using ManagedIdentityCredential.

        Example:
            >>> handler = CredentialHandler()
            >>> credential = handler.user_credential
            >>> # Use credential with Azure SDK clients
        """
        return ManagedIdentityCredential()

    @cached_property
    def service_principal_secret(self):
        """A service principal secret retrieved from Azure Key Vault.

        Returns:
            str: The secret value.

        Example:
            >>> handler = CredentialHandler()
            >>> handler.azure_keyvault_endpoint = "https://myvault.vault.azure.net/"
            >>> handler.azure_keyvault_sp_secret_id = "my-secret"
            >>> secret = handler.service_principal_secret
        """
        self.require_attr(
            ["azure_keyvault_endpoint", "azure_keyvault_sp_secret_id"],
            goal="service_principal_secret",
        )

        return get_sp_secret(
            self.azure_keyvault_endpoint,
            self.azure_keyvault_sp_secret_id,
            self.user_credential,
        )

    @cached_property
    def federated_credential(self) -> WorkloadIdentityCredential:
        self.require_attr(
            [
                "azure_tenant_id",
                "azure_sp_client_id",
                "azure_federated_token_file",
            ]
        )
        return WorkloadIdentityCredential(
            tenant_id=self.azure_tenant_id,
            client_id=self.azure_sp_client_id,
            token_file_path=self.azure_federated_token_file,
        )

    @cached_property
    def batch_service_principal_credentials(self):
        """Service Principal credentials for authenticating to Azure Batch.

        Returns:
            ServicePrincipalCredentials: The credentials configured for Azure Batch access.

        Example:
            >>> handler = CredentialHandler()
            >>> # Set required attributes...
            >>> credentials = handler.batch_service_principal_credentials
            >>> # Use with Azure Batch client
        """
        self.require_attr(
            [
                "azure_tenant_id",
                "azure_sp_client_id",
                "azure_batch_resource_url",
            ],
            goal="batch_service_principal_credentials",
        )
        return ServicePrincipalCredentials(
            client_id=self.azure_sp_client_id,
            tenant=self.azure_tenant_id,
            secret=self.service_principal_secret,
            resource=self.azure_batch_resource_url,
        )

    @cached_property
    def client_secret_sp_credential(self):
        """A client secret credential created using the service principal secret.

        Returns:
            ClientSecretCredential: The credential configured with service principal details.

        Example:
            >>> handler = CredentialHandler()
            >>> # Set required attributes...
            >>> credential = handler.client_secret_sp_credential
            >>> # Use with Azure SDK clients
        """
        self.require_attr(["azure_tenant_id", "azure_sp_client_id"])
        return ClientSecretCredential(
            tenant_id=self.azure_tenant_id,
            client_secret=self.service_principal_secret,
            client_id=self.azure_sp_client_id,
        )

    @cached_property
    def client_secret_credential(self):
        """A client secret credential created using the azure_client_secret attribute.

        Returns:
            ClientSecretCredential: The credential configured with client secret details.

        Example:
            >>> handler = CredentialHandler()
            >>> handler.azure_tenant_id = "tenant-id"
            >>> handler.azure_sp_client_id = "client-id"
            >>> handler.azure_client_secret = "client-secret" #pragma: allowlist secret
            >>> credential = handler.client_secret_credential
        """
        self.require_attr(
            [
                "azure_tenant_id",
                "azure_sp_client_id",
                "azure_client_secret",
            ]
        )
        return ClientSecretCredential(
            tenant_id=self.azure_tenant_id,
            client_secret=self.azure_client_secret,
            client_id=self.azure_sp_client_id,
        )

    @cached_property
    def compute_node_identity_reference(self):
        """An object defining a compute node identity reference.

        Specifically, a ComputeNodeIdentityReference object associated to the
        CredentialHandler's user-assigned identity.

        Returns:
            models.ComputeNodeIdentityReference: The identity reference.

        Example:
            >>> handler = CredentialHandler()
            >>> handler.azure_user_assigned_identity = "/subscriptions/.../resourceGroups/..."
            >>> identity_ref = handler.compute_node_identity_reference
        """
        self.require_attr(
            ["azure_user_assigned_identity"],
            goal="Compute node identity reference",
        )
        return models.ComputeNodeIdentityReference(
            resource_id=self.azure_user_assigned_identity
        )

    @cached_property
    def azure_container_registry(self):
        """An object pointing to an Azure Container Registry.

        Specifically, a ContainerRegistry instance corresponding to the particular
        Azure Container Registry account specified in the CredentialHandler, if any,
        with authentication via the compute_node_identity_reference defined by
        CredentialHandler, if any.

        Returns:
            models.ContainerRegistry: A properly instantiated ContainerRegistry object.

        Raises:
            ValueError: If the container registry endpoint is invalid.

        Example:
            >>> handler = CredentialHandler()
            >>> # Set required attributes...
            >>> registry = handler.azure_container_registry
        """
        self.require_attr(
            [
                "azure_container_registry_account",
                "azure_container_registry_domain",
                "azure_user_assigned_identity",
            ],
            goal=("Azure Container Registry `ContainerRegistry` instance"),
        )

        valid, msg = is_valid_acr_endpoint(
            self.azure_container_registry_endpoint
        )
        if not valid:
            raise ValueError(msg)

        return models.ContainerRegistry(
            user_name=self.azure_container_registry_account,
            registry_server=self.azure_container_registry_endpoint,
            identity_reference=self.compute_node_identity_reference,
        )

azure_batch_endpoint property

Azure batch endpoint URL.

Constructed programmatically from account name, location, and subdomain.

Returns:

Name Type Description
str str

The endpoint URL.

Example

handler = CredentialHandler() handler.azure_batch_account = "mybatchaccount" handler.azure_batch_location = "eastus" handler.azure_batch_endpoint_subdomain = "batch.azure.com" handler.azure_batch_endpoint 'https://mybatchaccount.eastus.batch.azure.com'

azure_blob_storage_endpoint property

Azure blob storage endpoint URL.

Constructed programmatically from the account name and endpoint subdomain.

Returns:

Name Type Description
str str

The endpoint URL.

Example

handler = CredentialHandler() handler.azure_blob_storage_account = "mystorageaccount" handler.azure_blob_storage_endpoint_subdomain = "blob.core.windows.net" handler.azure_blob_storage_endpoint 'https://mystorageaccount.blob.core.windows.net'

azure_container_registry cached property

An object pointing to an Azure Container Registry.

Specifically, a ContainerRegistry instance corresponding to the particular Azure Container Registry account specified in the CredentialHandler, if any, with authentication via the compute_node_identity_reference defined by CredentialHandler, if any.

Returns:

Type Description

models.ContainerRegistry: A properly instantiated ContainerRegistry object.

Raises:

Type Description
ValueError

If the container registry endpoint is invalid.

Example

handler = CredentialHandler()

Set required attributes...

registry = handler.azure_container_registry

azure_container_registry_endpoint property

Azure container registry endpoint URL.

Constructed programmatically from the account name and registry domain.

Returns:

Name Type Description
str str

The endpoint URL.

Example

handler = CredentialHandler() handler.azure_container_registry_account = "myregistry" handler.azure_container_registry_domain = "azurecr.io" handler.azure_container_registry_endpoint 'myregistry.azurecr.io'

batch_service_principal_credentials cached property

Service Principal credentials for authenticating to Azure Batch.

Returns:

Name Type Description
ServicePrincipalCredentials

The credentials configured for Azure Batch access.

Example

handler = CredentialHandler()

Set required attributes...

credentials = handler.batch_service_principal_credentials

Use with Azure Batch client

client_secret_credential cached property

A client secret credential created using the azure_client_secret attribute.

Returns:

Name Type Description
ClientSecretCredential

The credential configured with client secret details.

Example

handler = CredentialHandler() handler.azure_tenant_id = "tenant-id" handler.azure_sp_client_id = "client-id" handler.azure_client_secret = "client-secret" #pragma: allowlist secret credential = handler.client_secret_credential

client_secret_sp_credential cached property

A client secret credential created using the service principal secret.

Returns:

Name Type Description
ClientSecretCredential

The credential configured with service principal details.

Example

handler = CredentialHandler()

Set required attributes...

credential = handler.client_secret_sp_credential

Use with Azure SDK clients

compute_node_identity_reference cached property

An object defining a compute node identity reference.

Specifically, a ComputeNodeIdentityReference object associated to the CredentialHandler's user-assigned identity.

Returns:

Type Description

models.ComputeNodeIdentityReference: The identity reference.

Example

handler = CredentialHandler() handler.azure_user_assigned_identity = "/subscriptions/.../resourceGroups/..." identity_ref = handler.compute_node_identity_reference

service_principal_secret cached property

A service principal secret retrieved from Azure Key Vault.

Returns:

Name Type Description
str

The secret value.

Example

handler = CredentialHandler() handler.azure_keyvault_endpoint = "https://myvault.vault.azure.net/" handler.azure_keyvault_sp_secret_id = "my-secret" secret = handler.service_principal_secret

user_credential cached property

Azure user credential.

Returns:

Name Type Description
ManagedIdentityCredential ManagedIdentityCredential

The Azure user credential using ManagedIdentityCredential.

Example

handler = CredentialHandler() credential = handler.user_credential

Use credential with Azure SDK clients

require_attr(attributes, goal=None)

Check that attributes required for a given operation are defined.

Raises an informative error message if the required attribute is not defined.

Parameters:

Name Type Description Default
attributes str | list[str]

String or list of strings naming the required attribute(s).

required
goal str

String naming the value that the attributes are required for obtaining, to make error messages more informative. If None, use a more generic message.

None

Raises:

Type Description
AttributeError

If any required attributes are None.

Example

handler = CredentialHandler() handler.require_attr(["azure_tenant_id"], "authentication") AttributeError: A non-None value for attribute azure_tenant_id is required...

Source code in cfa/cloudops/auth.py
def require_attr(self, attributes: str | list[str], goal: str = None):
    """Check that attributes required for a given operation are defined.

    Raises an informative error message if the required attribute is not defined.

    Args:
        attributes: String or list of strings naming the required attribute(s).
        goal: String naming the value that the attributes are required for obtaining,
            to make error messages more informative. If None, use a more generic message.

    Raises:
        AttributeError: If any required ``attributes`` are None.

    Example:
        >>> handler = CredentialHandler()
        >>> handler.require_attr(["azure_tenant_id"], "authentication")
        AttributeError: A non-None value for attribute azure_tenant_id is required...
    """
    attributes = ensure_listlike(attributes)
    for attr in attributes:
        attr_val = getattr(self, attr)
        if attr_val is None:
            err_msg = (
                f"A non-None value for attribute {attr} is required "
            ) + (
                f"to obtain a value for {goal}."
                if goal is not None
                else "for this operation."
            )
            raise AttributeError(err_msg)

EnvCredentialHandler

Bases: CredentialHandler

Azure Credentials populated from available environment variables.

Subclass of CredentialHandler that populates attributes from environment variables at instantiation, with the opportunity to override those values via keyword arguments passed to the constructor.

Parameters:

Name Type Description Default
dotenv_path str

Path to .env file to load environment variables from. If None, uses default .env file discovery.

None
**kwargs

Keyword arguments defining additional attributes or overriding those set in the environment variables. Passed as the config_dict argument to config.get_config_val.

{}
Example

Load from environment variables

handler = EnvCredentialHandler()

Override specific values

handler = EnvCredentialHandler(azure_tenant_id="custom-tenant-id")

Load from custom .env file

handler = EnvCredentialHandler(dotenv_path="/path/to/.env")

Source code in cfa/cloudops/auth.py
class EnvCredentialHandler(CredentialHandler):
    """Azure Credentials populated from available environment variables.

    Subclass of CredentialHandler that populates attributes from environment
    variables at instantiation, with the opportunity to override those values
    via keyword arguments passed to the constructor.

    Args:
        dotenv_path (str, optional): Path to .env file to load environment variables from.
            If None, uses default .env file discovery.
        **kwargs: Keyword arguments defining additional attributes or overriding
            those set in the environment variables. Passed as the ``config_dict``
            argument to ``config.get_config_val``.

    Example:
        >>> # Load from environment variables
        >>> handler = EnvCredentialHandler()

        >>> # Override specific values
        >>> handler = EnvCredentialHandler(azure_tenant_id="custom-tenant-id")

        >>> # Load from custom .env file
        >>> handler = EnvCredentialHandler(dotenv_path="/path/to/.env")
    """

    def __init__(self, dotenv_path: str = None, **kwargs) -> None:
        """Initialize the EnvCredentialHandler.

        Loads environment variables from .env file and populates credential attributes from them.

        Args:
            dotenv_path (str, optional): Path to .env file to load environment variables from.
                If None, uses default .env file discovery.
            **kwargs: Additional keyword arguments to override specific credential attributes.
        """
        load_env_vars(dotenv_path=dotenv_path)
        get_conf = partial(get_config_val, config_dict=kwargs, try_env=True)

        for key in self.__dataclass_fields__.keys():
            self.__setattr__(key, get_conf(key))

__init__(dotenv_path=None, **kwargs)

Initialize the EnvCredentialHandler.

Loads environment variables from .env file and populates credential attributes from them.

Parameters:

Name Type Description Default
dotenv_path str

Path to .env file to load environment variables from. If None, uses default .env file discovery.

None
**kwargs

Additional keyword arguments to override specific credential attributes.

{}
Source code in cfa/cloudops/auth.py
def __init__(self, dotenv_path: str = None, **kwargs) -> None:
    """Initialize the EnvCredentialHandler.

    Loads environment variables from .env file and populates credential attributes from them.

    Args:
        dotenv_path (str, optional): Path to .env file to load environment variables from.
            If None, uses default .env file discovery.
        **kwargs: Additional keyword arguments to override specific credential attributes.
    """
    load_env_vars(dotenv_path=dotenv_path)
    get_conf = partial(get_config_val, config_dict=kwargs, try_env=True)

    for key in self.__dataclass_fields__.keys():
        self.__setattr__(key, get_conf(key))

SPCredentialHandler

Bases: CredentialHandler

Source code in cfa/cloudops/auth.py
class SPCredentialHandler(CredentialHandler):
    def __init__(
        self,
        azure_tenant_id: str = None,
        azure_subscription_id: str = None,
        azure_sp_client_id: str = None,
        azure_client_secret: str = None,
        dotenv_path: str = None,
        **kwargs,
    ):
        """Initialize a Service Principal Credential Handler.

        Creates a credential handler that uses Azure Service Principal authentication
        for accessing Azure resources. Credentials can be provided directly as parameters
        or loaded from environment variables. If not provided directly, the handler will
        attempt to load credentials from environment variables or a .env file.

        Args:
            azure_tenant_id: Azure Active Directory tenant ID. If None, will attempt
                to load from AZURE_TENANT_ID environment variable.
            azure_subscription_id: Azure subscription ID. If None, will attempt
                to load from AZURE_SUBSCRIPTION_ID environment variable.
            azure_sp_client_id: Azure Service Principal client ID (application ID).
                If None, will attempt to load from AZURE_SP_CLIENT_ID environment variable.
            azure_client_secret: Azure Service Principal client secret. If None, will
                attempt to load from AZURE_CLIENT_SECRET environment variable.
            dotenv_path: Path to .env file to load environment variables from.
                If None, uses default .env file discovery.

        Raises:
            ValueError: If AZURE_TENANT_ID is not found in environment variables
                and not provided as parameter.
            ValueError: If AZURE_SUBSCRIPTION_ID is not found in environment variables
                and not provided as parameter.
            ValueError: If AZURE_SP_CLIENT_ID is not found in environment variables
                and not provided as parameter.
            ValueError: If AZURE_CLIENT_SECRET is not found in environment variables
                and not provided as parameter.

        Example:
            >>> # Using direct parameters
            >>> handler = SPCredentialHandler(
            ...     azure_tenant_id="12345678-1234-1234-1234-123456789012",
            ...     azure_subscription_id="87654321-4321-4321-4321-210987654321",
            ...     azure_sp_client_id="abcdef12-3456-7890-abcd-ef1234567890",
            ...     azure_client_secret="your-secret-here" #pragma: allowlist secret
            ... )

            >>> # Using environment variables
            >>> handler = SPCredentialHandler()  # Loads from env vars

            >>> # Using custom .env file
            >>> handler = SPCredentialHandler(dotenv_path="/path/to/.env")
        """
        self.method = "sp"
        # load env vars, including client secret if available
        load_dotenv(dotenv_path=dotenv_path, override=True)

        self.azure_tenant_id = (
            azure_tenant_id
            if azure_tenant_id is not None
            else os.environ["AZURE_TENANT_ID"]
        )
        self.azure_subscription_id = (
            azure_subscription_id
            if azure_subscription_id is not None
            else os.environ["AZURE_SUBSCRIPTION_ID"]
        )
        self.azure_sp_client_id = (
            azure_sp_client_id
            if azure_sp_client_id is not None
            else os.environ["AZURE_SP_CLIENT_ID"]
        )
        self.azure_client_secret = (
            azure_client_secret
            if azure_client_secret is not None
            else os.environ["AZURE_CLIENT_SECRET"]
        )

        # check if tenant_id, client_id, subscription_id, and client_secret_id exist, else find in os env vars
        if "AZURE_TENANT_ID" not in os.environ and not azure_tenant_id:
            raise ValueError(
                "AZURE_TENANT_ID not found in env variables and not provided."
            )
        if (
            "AZURE_SUBSCRIPTION_ID" not in os.environ
            and not azure_subscription_id
        ):
            raise ValueError(
                "AZURE_SUBSCRIPTION_ID not found in env variables and not provided."
            )
        if "AZURE_SP_CLIENT_ID" not in os.environ and not azure_sp_client_id:
            raise ValueError(
                "AZURE_SP_CLIENT_ID not found in env variables and not provided."
            )
        if "AZURE_CLIENT_SECRET" not in os.environ and not azure_client_secret:
            raise ValueError(
                "AZURE_CLIENT_SECRET not found in env variables and not provided."
            )
        d.set_env_vars()

        get_conf = partial(get_config_val, config_dict=kwargs, try_env=True)

        for key in self.__dataclass_fields__.keys():
            self.__setattr__(key, get_conf(key))

__init__(azure_tenant_id=None, azure_subscription_id=None, azure_sp_client_id=None, azure_client_secret=None, dotenv_path=None, **kwargs)

Initialize a Service Principal Credential Handler.

Creates a credential handler that uses Azure Service Principal authentication for accessing Azure resources. Credentials can be provided directly as parameters or loaded from environment variables. If not provided directly, the handler will attempt to load credentials from environment variables or a .env file.

Parameters:

Name Type Description Default
azure_tenant_id str

Azure Active Directory tenant ID. If None, will attempt to load from AZURE_TENANT_ID environment variable.

None
azure_subscription_id str

Azure subscription ID. If None, will attempt to load from AZURE_SUBSCRIPTION_ID environment variable.

None
azure_sp_client_id str

Azure Service Principal client ID (application ID). If None, will attempt to load from AZURE_SP_CLIENT_ID environment variable.

None
azure_client_secret str

Azure Service Principal client secret. If None, will attempt to load from AZURE_CLIENT_SECRET environment variable.

None
dotenv_path str

Path to .env file to load environment variables from. If None, uses default .env file discovery.

None

Raises:

Type Description
ValueError

If AZURE_TENANT_ID is not found in environment variables and not provided as parameter.

ValueError

If AZURE_SUBSCRIPTION_ID is not found in environment variables and not provided as parameter.

ValueError

If AZURE_SP_CLIENT_ID is not found in environment variables and not provided as parameter.

ValueError

If AZURE_CLIENT_SECRET is not found in environment variables and not provided as parameter.

Example
Using direct parameters

handler = SPCredentialHandler( ... azure_tenant_id="12345678-1234-1234-1234-123456789012", ... azure_subscription_id="87654321-4321-4321-4321-210987654321", ... azure_sp_client_id="abcdef12-3456-7890-abcd-ef1234567890", ... azure_client_secret="your-secret-here" #pragma: allowlist secret ... )

Using environment variables

handler = SPCredentialHandler() # Loads from env vars

Using custom .env file

handler = SPCredentialHandler(dotenv_path="/path/to/.env")

Source code in cfa/cloudops/auth.py
def __init__(
    self,
    azure_tenant_id: str = None,
    azure_subscription_id: str = None,
    azure_sp_client_id: str = None,
    azure_client_secret: str = None,
    dotenv_path: str = None,
    **kwargs,
):
    """Initialize a Service Principal Credential Handler.

    Creates a credential handler that uses Azure Service Principal authentication
    for accessing Azure resources. Credentials can be provided directly as parameters
    or loaded from environment variables. If not provided directly, the handler will
    attempt to load credentials from environment variables or a .env file.

    Args:
        azure_tenant_id: Azure Active Directory tenant ID. If None, will attempt
            to load from AZURE_TENANT_ID environment variable.
        azure_subscription_id: Azure subscription ID. If None, will attempt
            to load from AZURE_SUBSCRIPTION_ID environment variable.
        azure_sp_client_id: Azure Service Principal client ID (application ID).
            If None, will attempt to load from AZURE_SP_CLIENT_ID environment variable.
        azure_client_secret: Azure Service Principal client secret. If None, will
            attempt to load from AZURE_CLIENT_SECRET environment variable.
        dotenv_path: Path to .env file to load environment variables from.
            If None, uses default .env file discovery.

    Raises:
        ValueError: If AZURE_TENANT_ID is not found in environment variables
            and not provided as parameter.
        ValueError: If AZURE_SUBSCRIPTION_ID is not found in environment variables
            and not provided as parameter.
        ValueError: If AZURE_SP_CLIENT_ID is not found in environment variables
            and not provided as parameter.
        ValueError: If AZURE_CLIENT_SECRET is not found in environment variables
            and not provided as parameter.

    Example:
        >>> # Using direct parameters
        >>> handler = SPCredentialHandler(
        ...     azure_tenant_id="12345678-1234-1234-1234-123456789012",
        ...     azure_subscription_id="87654321-4321-4321-4321-210987654321",
        ...     azure_sp_client_id="abcdef12-3456-7890-abcd-ef1234567890",
        ...     azure_client_secret="your-secret-here" #pragma: allowlist secret
        ... )

        >>> # Using environment variables
        >>> handler = SPCredentialHandler()  # Loads from env vars

        >>> # Using custom .env file
        >>> handler = SPCredentialHandler(dotenv_path="/path/to/.env")
    """
    self.method = "sp"
    # load env vars, including client secret if available
    load_dotenv(dotenv_path=dotenv_path, override=True)

    self.azure_tenant_id = (
        azure_tenant_id
        if azure_tenant_id is not None
        else os.environ["AZURE_TENANT_ID"]
    )
    self.azure_subscription_id = (
        azure_subscription_id
        if azure_subscription_id is not None
        else os.environ["AZURE_SUBSCRIPTION_ID"]
    )
    self.azure_sp_client_id = (
        azure_sp_client_id
        if azure_sp_client_id is not None
        else os.environ["AZURE_SP_CLIENT_ID"]
    )
    self.azure_client_secret = (
        azure_client_secret
        if azure_client_secret is not None
        else os.environ["AZURE_CLIENT_SECRET"]
    )

    # check if tenant_id, client_id, subscription_id, and client_secret_id exist, else find in os env vars
    if "AZURE_TENANT_ID" not in os.environ and not azure_tenant_id:
        raise ValueError(
            "AZURE_TENANT_ID not found in env variables and not provided."
        )
    if (
        "AZURE_SUBSCRIPTION_ID" not in os.environ
        and not azure_subscription_id
    ):
        raise ValueError(
            "AZURE_SUBSCRIPTION_ID not found in env variables and not provided."
        )
    if "AZURE_SP_CLIENT_ID" not in os.environ and not azure_sp_client_id:
        raise ValueError(
            "AZURE_SP_CLIENT_ID not found in env variables and not provided."
        )
    if "AZURE_CLIENT_SECRET" not in os.environ and not azure_client_secret:
        raise ValueError(
            "AZURE_CLIENT_SECRET not found in env variables and not provided."
        )
    d.set_env_vars()

    get_conf = partial(get_config_val, config_dict=kwargs, try_env=True)

    for key in self.__dataclass_fields__.keys():
        self.__setattr__(key, get_conf(key))

get_client_secret_sp_credential(vault_url, vault_sp_secret_id, tenant_id, application_id, user_credential=None)

Get a ClientSecretCredential for a given Azure service principal.

Parameters:

Name Type Description Default
vault_url str

URL for the Azure keyvault to access.

required
vault_sp_secret_id str

Service principal secret ID within the keyvault.

required
tenant_id str

Tenant ID for the service principal credential.

required
application_id str

Application ID for the service principal credential.

required
user_credential

User credential for the Azure user, as an azure-identity credential class instance. Passed to get_sp_secret. If None, get_sp_secret will use a ManagedIdentityCredential instantiated at runtime. See its documentation for more.

None

Returns:

Name Type Description
ClientSecretCredential ClientSecretCredential

A ClientSecretCredential for the given service principal.

Example

credential = get_client_secret_sp_credential( ... "https://myvault.vault.azure.net/", ... "my-secret-id", ... "tenant-id", ... "application-id" ... )

Source code in cfa/cloudops/auth.py
def get_client_secret_sp_credential(
    vault_url: str,
    vault_sp_secret_id: str,
    tenant_id: str,
    application_id: str,
    user_credential=None,
) -> ClientSecretCredential:
    """Get a ClientSecretCredential for a given Azure service principal.

    Args:
        vault_url: URL for the Azure keyvault to access.
        vault_sp_secret_id: Service principal secret ID within the keyvault.
        tenant_id: Tenant ID for the service principal credential.
        application_id: Application ID for the service principal credential.
        user_credential: User credential for the Azure user, as an azure-identity
            credential class instance. Passed to ``get_sp_secret``. If None,
            ``get_sp_secret`` will use a ManagedIdentityCredential instantiated
            at runtime. See its documentation for more.

    Returns:
        ClientSecretCredential: A ClientSecretCredential for the given service principal.

    Example:
        >>> credential = get_client_secret_sp_credential(
        ...     "https://myvault.vault.azure.net/",
        ...     "my-secret-id",
        ...     "tenant-id",
        ...     "application-id"
        ... )
    """
    sp_secret = get_sp_secret(
        vault_url, vault_sp_secret_id, user_credential=user_credential
    )
    sp_credential = ClientSecretCredential(
        tenant_id=tenant_id,
        client_id=application_id,
        client_secret=sp_secret,
    )

    return sp_credential

get_compute_node_identity_reference(credential_handler=None)

Get a valid ComputeNodeIdentityReference using credentials from a CredentialHandler.

Uses credentials obtained via a CredentialHandler: either a user-provided one or a default based on environment variables.

Parameters:

Name Type Description Default
credential_handler CredentialHandler

Credential handler for connecting and authenticating to Azure resources. If None, create a blank EnvCredentialHandler, which attempts to obtain needed credentials using information available in local environment variables (see its documentation for details).

None

Returns:

Type Description
ComputeNodeIdentityReference

models.ComputeNodeIdentityReference: A ComputeNodeIdentityReference created according to the specified configuration.

Example

Using default environment-based handler

identity_ref = get_compute_node_identity_reference()

Using custom handler

handler = CredentialHandler() identity_ref = get_compute_node_identity_reference(handler)

Source code in cfa/cloudops/auth.py
def get_compute_node_identity_reference(
    credential_handler: CredentialHandler = None,
) -> models.ComputeNodeIdentityReference:
    """Get a valid ComputeNodeIdentityReference using credentials from a CredentialHandler.

    Uses credentials obtained via a CredentialHandler: either a user-provided one
    or a default based on environment variables.

    Args:
        credential_handler: Credential handler for connecting and authenticating to
            Azure resources. If None, create a blank EnvCredentialHandler, which
            attempts to obtain needed credentials using information available in
            local environment variables (see its documentation for details).

    Returns:
        models.ComputeNodeIdentityReference: A ComputeNodeIdentityReference created
            according to the specified configuration.

    Example:
        >>> # Using default environment-based handler
        >>> identity_ref = get_compute_node_identity_reference()

        >>> # Using custom handler
        >>> handler = CredentialHandler()
        >>> identity_ref = get_compute_node_identity_reference(handler)
    """
    ch = credential_handler
    if ch is None:
        ch = EnvCredentialHandler()

    return ch.compute_node_identity_reference

get_service_principal_credentials(vault_url, vault_sp_secret_id, tenant_id, application_id, resource_url=d.default_azure_batch_resource_url, user_credential=None)

Get a ServicePrincipalCredentials object for a given Azure service principal.

Parameters:

Name Type Description Default
vault_url str

URL for the Azure keyvault to access.

required
vault_sp_secret_id str

Service principal secret ID within the keyvault.

required
tenant_id str

Tenant ID for the service principal credential.

required
application_id str

Application ID for the service principal credential.

required
resource_url str

URL of the Azure resource. Defaults to the value of defaults.default_azure_batch_resource_url.

default_azure_batch_resource_url
user_credential

User credential for the Azure user, as an azure-identity credential class instance. Passed to get_sp_secret. If None, get_sp_secret will use a ManagedIdentityCredential instantiated at runtime. See the get_sp_secret documentation for details.

None

Returns:

Name Type Description
ServicePrincipalCredentials ServicePrincipalCredentials

A ServicePrincipalCredentials object for the service principal.

Example

credentials = get_service_principal_credentials( ... "https://myvault.vault.azure.net/", ... "my-secret-id", ... "tenant-id", ... "application-id" ... )

Source code in cfa/cloudops/auth.py
def get_service_principal_credentials(
    vault_url: str,
    vault_sp_secret_id: str,
    tenant_id: str,
    application_id: str,
    resource_url: str = d.default_azure_batch_resource_url,
    user_credential=None,
) -> ServicePrincipalCredentials:
    """Get a ServicePrincipalCredentials object for a given Azure service principal.

    Args:
        vault_url: URL for the Azure keyvault to access.
        vault_sp_secret_id: Service principal secret ID within the keyvault.
        tenant_id: Tenant ID for the service principal credential.
        application_id: Application ID for the service principal credential.
        resource_url: URL of the Azure resource. Defaults to the value of
            ``defaults.default_azure_batch_resource_url``.
        user_credential: User credential for the Azure user, as an azure-identity
            credential class instance. Passed to ``get_sp_secret``. If None,
            ``get_sp_secret`` will use a ManagedIdentityCredential instantiated
            at runtime. See the ``get_sp_secret`` documentation for details.

    Returns:
        ServicePrincipalCredentials: A ServicePrincipalCredentials object for the
            service principal.

    Example:
        >>> credentials = get_service_principal_credentials(
        ...     "https://myvault.vault.azure.net/",
        ...     "my-secret-id",
        ...     "tenant-id",
        ...     "application-id"
        ... )
    """
    sp_secret = get_sp_secret(
        vault_url, vault_sp_secret_id, user_credential=user_credential
    )
    sp_credential = ServicePrincipalCredentials(
        tenant=tenant_id,
        client_id=application_id,
        secret=sp_secret,
        resource=resource_url,
    )

    return sp_credential

get_sp_secret(vault_url, vault_sp_secret_id, user_credential=None)

Get a service principal secret from an Azure keyvault.

Parameters:

Name Type Description Default
vault_url str

URL for the Azure keyvault to access.

required
vault_sp_secret_id str

Service principal secret ID within the keyvault.

required
user_credential

User credential for the Azure user, as an azure-identity credential class instance. If None, will use a ManagedIdentityCredential instantiated at runtime.

None

Returns:

Name Type Description
str str

The retrieved value of the service principal secret.

Example

secret = get_sp_secret( ... "https://myvault.vault.azure.net/", ... "my-secret-id" ... )

Source code in cfa/cloudops/auth.py
def get_sp_secret(
    vault_url: str,
    vault_sp_secret_id: str,
    user_credential=None,
) -> str:
    """Get a service principal secret from an Azure keyvault.

    Args:
        vault_url: URL for the Azure keyvault to access.
        vault_sp_secret_id: Service principal secret ID within the keyvault.
        user_credential: User credential for the Azure user, as an azure-identity
            credential class instance. If None, will use a ManagedIdentityCredential
            instantiated at runtime.

    Returns:
        str: The retrieved value of the service principal secret.

    Example:
        >>> secret = get_sp_secret(
        ...     "https://myvault.vault.azure.net/",
        ...     "my-secret-id"
        ... )
    """
    if user_credential is None:
        user_credential = ManagedIdentityCredential()

    secret_client = SecretClient(
        vault_url=vault_url, credential=user_credential
    )
    sp_secret = secret_client.get_secret(vault_sp_secret_id).value

    return sp_secret

load_env_vars(dotenv_path=None)

Load environment variables and Azure subscription information.

Loads variables from a .env file (if specified), retrieves Azure subscription information using ManagedIdentityCredential, and sets default environment variables.

Parameters:

Name Type Description Default
dotenv_path

Path to .env file to load. If None, uses default .env file discovery.

None
Example

load_env_vars() # Load from default .env load_env_vars("/path/to/.env") # Load from specific file

Source code in cfa/cloudops/auth.py
def load_env_vars(dotenv_path=None):
    """Load environment variables and Azure subscription information.

    Loads variables from a .env file (if specified), retrieves Azure subscription
    information using ManagedIdentityCredential, and sets default environment variables.

    Args:
        dotenv_path: Path to .env file to load. If None, uses default .env file discovery.

    Example:
        >>> load_env_vars()  # Load from default .env
        >>> load_env_vars("/path/to/.env")  # Load from specific file
    """
    load_dotenv(dotenv_path=dotenv_path, override=True)
    # get ManagedIdentityCredential to pull SubscriptionClient
    mid_cred = ManagedIdentityCredential()
    sub_c = SubscriptionClient(mid_cred)
    # pull in account info and save to environment vars
    account_info = list(sub_c.subscriptions.list())[0]
    os.environ["AZURE_SUBSCRIPTION_ID"] = account_info.subscription_id
    os.environ["AZURE_TENANT_ID"] = account_info.tenant_id
    os.environ["AZURE_RESOURCE_GROUP_NAME"] = account_info.display_name
    # save default values
    d.set_env_vars()

cfa.cloudops.automation

run_experiment(exp_config, dotenv_path=None, **kwargs)

Run jobs and tasks automatically based on the provided experiment config.

Parameters:

Name Type Description Default
exp_config str

path to experiment config file (toml)

required
Source code in cfa/cloudops/automation.py
def run_experiment(exp_config: str, dotenv_path: str | None = None, **kwargs):
    """Run jobs and tasks automatically based on the provided experiment config.

    Args:
        exp_config (str): path to experiment config file (toml)
    """

    # read files
    exp_toml = toml.load(exp_config)

    try:
        client = CloudClient(dotenv_path=dotenv_path)
    except Exception:
        print("could not create CloudClient object.")
        return None

    # check pool included in exp_toml and exists in azure
    if "pool_name" in exp_toml["job"].keys():
        if not batch_helpers.check_pool_exists(
            resource_group_name=client.cred.azure_resource_group_name,
            account_name=client.cred.azure_batch_account,
            pool_name=exp_toml["job"]["pool_name"],
            batch_mgmt_client=client.batch_mgmt_client,
        ):
            print(
                f"pool name {exp_toml['job']['pool_name']} does not exist in the Azure environment."
            )
            return None
        pool_name = exp_toml["job"]["pool_name"]
    else:
        print("could not find 'pool_name' key in 'setup' section of exp toml.")
        print("please specify a pool name to use.")
        return None

    # upload files if the section exists
    if "upload" in exp_toml.keys():
        container_name = exp_toml["upload"]["container_name"]
        if "location_in_blob" in exp_toml["upload"].keys():
            location_in_blob = exp_toml["upload"]["location_in_blob"]
        else:
            location_in_blob = ""
        if "folders" in exp_toml["upload"].keys():
            client.upload_folders(
                folder_names=exp_toml["upload"]["folders"],
                location_in_blob=location_in_blob,
                container_name=container_name,
            )
        if "files" in exp_toml["upload"].keys():
            client.upload_files(
                files=exp_toml["upload"]["files"],
                location_in_blob=location_in_blob,
                container_name=container_name,
            )

    # create the job
    job_name = exp_toml["job"]["job_name"]
    if "save_logs_to_blob" in exp_toml["job"].keys():
        save_logs_to_blob = exp_toml["job"]["save_logs_to_blob"]
    else:
        save_logs_to_blob = None
    if "logs_folder" in exp_toml["job"].keys():
        logs_folder = exp_toml["job"]["logs_folder"]
    else:
        logs_folder = None
    if "task_retries" in exp_toml["job"].keys():
        task_retries = exp_toml["job"]["task_retries"]
    else:
        task_retries = 0

    client.create_job(
        job_name=job_name,
        pool_name=pool_name,
        save_logs_to_blob=save_logs_to_blob,
        logs_folder=logs_folder,
        task_retries=task_retries,
    )

    # create the tasks for the experiment
    # get the container to use if necessary
    if "container" in exp_toml["job"].keys():
        container = exp_toml["job"]["container"]
    else:
        container = None

    # submit the experiment tasks
    ex = exp_toml["experiment"]
    if "exp_yaml" in ex.keys():
        client.add_tasks_from_yaml(
            job_name=job_name,
            base_cmd=ex["base_cmd"],
            file_path=ex["exp_yaml"],
        )
    else:
        var_list = [key for key in ex.keys() if key != "base_cmd"]
        var_values = []
        for var in var_list:
            var_values.append(ex[var])
        v_v = list(itertools.product(*var_values))
        for params in v_v:
            j = {}
            for i, value in enumerate(params):
                j.update({var_list[i]: value})
            client.add_task(
                job_name=job_name,
                command_line=ex["base_cmd"].format(**j),
                container_image_name=container,
            )

    if "monitor_job" in exp_toml["job"].keys():
        if exp_toml["job"]["monitor_job"] is True:
            client.monitor_job(job_name)

run_tasks(task_config, dotenv_path=None, **kwargs)

Run jobs and tasks automatically based on the provided task config. Args: task_config (str): path to task config file (toml)

Source code in cfa/cloudops/automation.py
def run_tasks(
    task_config: str, dotenv_path: str | None = None, **kwargs
) -> None:
    """Run jobs and tasks automatically based on the provided task config.
    Args:
        task_config (str): path to task config file (toml)
    """

    # read files
    task_toml = toml.load(task_config)

    try:
        client = CloudClient(dotenv_path=dotenv_path)
    except Exception:
        print("could not create AzureClient object.")
        return None

    # check pool included in task_toml and exists in azure
    if "pool_name" in task_toml["job"].keys():
        if not batch_helpers.check_pool_exists(
            resource_group_name=client.cred.azure_resource_group_name,
            account_name=client.cred.azure_batch_account,
            pool_name=task_toml["job"]["pool_name"],
            batch_mgmt_client=client.batch_mgmt_client,
        ):
            print(
                f"pool name {task_toml['job']['pool_name']} does not exist in the Azure environment."
            )
            return None
        pool_name = task_toml["job"]["pool_name"]
    else:
        print(
            "could not find 'pool_name' key in 'setup' section of task config toml."
        )
        print("please specify a pool name to use.")
        return None

    # upload files if the section exists
    if "upload" in task_toml.keys():
        container_name = task_toml["upload"]["container_name"]
        if "location_in_blob" in task_toml["upload"].keys():
            location_in_blob = task_toml["upload"]["location_in_blob"]
        else:
            location_in_blob = ""
        if "folders" in task_toml["upload"].keys():
            client.upload_folders(
                folder_names=task_toml["upload"]["folders"],
                location_in_blob=location_in_blob,
                container_name=container_name,
            )
        if "files" in task_toml["upload"].keys():
            client.upload_files(
                files=task_toml["upload"]["files"],
                location_in_blob=location_in_blob,
                container_name=container_name,
            )

    # create the job
    job_name = task_toml["job"]["job_name"]
    if "save_logs_to_blob" in task_toml["job"].keys():
        save_logs_to_blob = task_toml["job"]["save_logs_to_blob"]
    else:
        save_logs_to_blob = None
    if "logs_folder" in task_toml["job"].keys():
        logs_folder = task_toml["job"]["logs_folder"]
    else:
        logs_folder = None
    if "task_retries" in task_toml["job"].keys():
        task_retries = task_toml["job"]["task_retries"]
    else:
        task_retries = 0

    client.create_job(
        job_name=job_name,
        pool_name=pool_name,
        save_logs_to_blob=save_logs_to_blob,
        logs_folder=logs_folder,
        task_retries=task_retries,
    )

    # create the tasks for the experiment
    # get the container to use if necessary
    if "container" in task_toml["job"].keys():
        container = task_toml["job"]["container"]
    else:
        container = None

    # submit the tasks
    tt = task_toml["task"]
    df = pd.json_normalize(tt)
    df.insert(0, "task_id", pd.Series("", index=range(df.shape[0])))
    # when kicking off a task we save the taskid to the row in df
    for i, item in enumerate(tt):
        if "depends_on" in item.keys():
            d_list = []
            for d in item["depends_on"]:
                d_task = df[df["name"] == d]["task_id"].values[0]
                d_list.append(d_task)
        else:
            d_list = None
        # check for other attributes
        if "run_dependent_tasks_on_fail" in item.keys():
            run_dependent_tasks_on_fail = item["run_dependent_tasks_on_fail"]
        else:
            run_dependent_tasks_on_fail = False
        # submit the task
        tid = client.add_task(
            job_name=job_name,
            command_line=item["cmd"],
            depends_on=d_list,
            run_dependent_tasks_on_fail=run_dependent_tasks_on_fail,
            container_image_name=container,
        )
        df.loc[i, "task_id"] = tid

    if "monitor_job" in task_toml["job"].keys():
        if task_toml["job"]["monitor_job"] is True:
            client.monitor_job(job_name)
    return None

cfa.cloudops.autoscale

cfa.cloudops.blob

Functions for interacting with Azure Blob Storage.

create_storage_container_if_not_exists(blob_storage_container_name, blob_service_client)

Create an Azure blob storage container if it does not already exist.

Parameters:

Name Type Description Default
blob_storage_container_name str

Name of the storage container.

required
blob_service_client BlobServiceClient

The blob service client to use when looking for and potentially creating the storage container.

required
Example

from azure.storage.blob import BlobServiceClient client = BlobServiceClient(account_url="...", credential="...") create_storage_container_if_not_exists("my-container", client) Container [my-container] created.

Source code in cfa/cloudops/blob.py
def create_storage_container_if_not_exists(
    blob_storage_container_name: str, blob_service_client: BlobServiceClient
) -> None:
    """Create an Azure blob storage container if it does not already exist.

    Args:
        blob_storage_container_name: Name of the storage container.
        blob_service_client: The blob service client to use when looking
            for and potentially creating the storage container.

    Example:
        >>> from azure.storage.blob import BlobServiceClient
        >>> client = BlobServiceClient(account_url="...", credential="...")
        >>> create_storage_container_if_not_exists("my-container", client)
        Container [my-container] created.
    """
    container_client = blob_service_client.get_container_client(
        container=blob_storage_container_name
    )
    if not container_client.exists():
        container_client.create_container()
        print("Container [{}] created.".format(blob_storage_container_name))
    else:
        print(
            "Container [{}] already exists.".format(
                blob_storage_container_name
            )
        )

download_from_storage_container(file_paths, blob_storage_container_name, blob_service_client=None, local_root_dir='.', remote_root_dir='.', **kwargs)

Download a list of files from an Azure blob storage container.

Preserves relative directory structure.

Parameters:

Name Type Description Default
file_paths str | list[str]

File or list of files to download, as string paths relative to remote_root_dir. A single string will be coerced to a length-one list.

required
blob_storage_container_name str

Name of the blob storage container from which to download the files. Must already exist.

required
blob_service_client BlobServiceClient

BlobServiceClient to use when downloading. If None, attempt to create one via client.get_blob_service_client using provided **kwargs, if any.

None
local_root_dir str

Root directory for the relative file paths in local storage. Defaults to "." (use the local working directory).

'.'
remote_root_dir str

Root directory for the relative file paths within the blob storage container. Defaults to "." (start at the blob storage container root).

'.'
**kwargs

Keyword arguments passed to client.get_blob_service_client.

{}

Raises:

Type Description
Exception

If the blob storage container does not exist.

Example

from azure.storage.blob import BlobServiceClient client = BlobServiceClient(account_url="...", credential="...") download_from_storage_container( ... ["file1.txt", "subdir/file2.txt"], ... "my-container", ... client, ... local_root_dir="/local/path", ... remote_root_dir="uploads" ... ) Downloading file 0 of 2 Downloaded 2 files from blob storage container

Source code in cfa/cloudops/blob.py
def download_from_storage_container(
    file_paths: str | list[str],
    blob_storage_container_name: str,
    blob_service_client: BlobServiceClient = None,
    local_root_dir: str = ".",
    remote_root_dir: str = ".",
    **kwargs,
) -> None:
    """Download a list of files from an Azure blob storage container.

    Preserves relative directory structure.

    Args:
        file_paths: File or list of files to download, as string paths relative to
            ``remote_root_dir``. A single string will be coerced to a length-one list.
        blob_storage_container_name: Name of the blob storage container from which
            to download the files. Must already exist.
        blob_service_client: BlobServiceClient to use when downloading.
            If None, attempt to create one via ``client.get_blob_service_client``
            using provided ``**kwargs``, if any.
        local_root_dir: Root directory for the relative file paths in local storage.
            Defaults to "." (use the local working directory).
        remote_root_dir: Root directory for the relative file paths within the blob
            storage container. Defaults to "." (start at the blob storage container root).
        **kwargs: Keyword arguments passed to ``client.get_blob_service_client``.

    Raises:
        Exception: If the blob storage container does not exist.

    Example:
        >>> from azure.storage.blob import BlobServiceClient
        >>> client = BlobServiceClient(account_url="...", credential="...")
        >>> download_from_storage_container(
        ...     ["file1.txt", "subdir/file2.txt"],
        ...     "my-container",
        ...     client,
        ...     local_root_dir="/local/path",
        ...     remote_root_dir="uploads"
        ... )
        Downloading file 0 of 2
        Downloaded 2 files from blob storage container
    """

    file_paths = ensure_listlike(file_paths)
    n_total_files = len(file_paths)

    if blob_service_client is None:
        blob_service_client = get_blob_service_client(**kwargs)

    for i_file, file_path in enumerate(file_paths):
        if i_file % (1 + int(n_total_files / 10)) == 0:
            print(f"Downloading file {i_file} of {n_total_files}")

        local_file_path = os.path.join(local_root_dir, file_path)
        remote_file_path = os.path.join(remote_root_dir, file_path)

        blob_client = blob_service_client.get_blob_client(
            container=blob_storage_container_name, blob=remote_file_path
        )
        with open(local_file_path, "wb") as target_file:
            download_stream = blob_client.download_blob()
            target_file.write(download_stream.readall())

    print(f"Downloaded {n_total_files} files from blob storage container")

get_node_mount_config(storage_containers, account_names, identity_references, shared_relative_mount_path='', mount_names=None, blobfuse_options='', cache_blobfuse=False, **kwargs)

Get configuration for mounting Azure Blob Storage containers to Azure Batch nodes via blobfuse.

Parameters:

Name Type Description Default
storage_containers str | list[str]

Name(s) of the Azure Blob storage container(s) to mount.

required
account_names str | list[str]

Name(s) of the Azure Blob storage account(s) in which to look for the storage container(s). If a single value, look for all storage containers within the same storage account. Otherwise, look for each container within the corresponding account. The function will raise an error if there is more than one account_names value but a different number of storage_containers, as then the mapping is ambiguous.

required
identity_references ComputeNodeIdentityReference | list[ComputeNodeIdentityReference]

Valid ComputeNodeIdentityReference objects for the node to use when connecting to the storage_containers, or an iterable of such objects with one object for each of the storage_containers.

required
shared_relative_mount_path str

Path relative to the fsmounts directory within the running node at which to mount the storage containers. Defaults to "" (mount within fsmounts itself).

''
mount_names list[str]

Iterable of names (or paths) for the individual mounted storage containers relative to the shared_relative_mount_path. If None, use the storage container names given in storage_containers as the mount_names.

None
blobfuse_options str | list[str]

Additional options passed to blobfuse. Defaults to "".

''
cache_blobfuse bool

Whether to cache Blob storage. Defaults to False.

False
**kwargs

Additional keyword arguments passed to the models.AzureBlobFileSystemConfiguration constructor.

{}

Returns:

Type Description
list[MountConfiguration]

list[models.MountConfiguration]: A list of instantiated MountConfiguration objects describing the desired storage container mounts.

Raises:

Type Description
ValueError

If the number of mount_names doesn't match storage_containers, or if the number of account_names or identity_references doesn't match storage_containers and isn't exactly 1.

Example

from azure.batch import models identity_ref = models.ComputeNodeIdentityReference( ... resource_id="/subscriptions/.../resourceGroups/.../providers/..." ... ) mount_configs = get_node_mount_config( ... storage_containers=["container1", "container2"], ... account_names="mystorageaccount", ... identity_references=identity_ref, ... shared_relative_mount_path="data", ... cache_blobfuse=True ... ) len(mount_configs) 2

Source code in cfa/cloudops/blob.py
def get_node_mount_config(
    storage_containers: str | list[str],
    account_names: str | list[str],
    identity_references: (
        models.ComputeNodeIdentityReference
        | list[models.ComputeNodeIdentityReference]
    ),
    shared_relative_mount_path: str = "",
    mount_names: list[str] = None,
    blobfuse_options: str | list[str] = "",
    cache_blobfuse: bool = False,
    **kwargs,
) -> list[models.MountConfiguration]:
    """Get configuration for mounting Azure Blob Storage containers to Azure Batch nodes via blobfuse.

    Args:
        storage_containers: Name(s) of the Azure Blob storage container(s) to mount.
        account_names: Name(s) of the Azure Blob storage account(s) in which to look
            for the storage container(s). If a single value, look for all storage
            containers within the same storage account. Otherwise, look for each
            container within the corresponding account. The function will raise an
            error if there is more than one ``account_names`` value but a different
            number of ``storage_containers``, as then the mapping is ambiguous.
        identity_references: Valid ComputeNodeIdentityReference objects for the node
            to use when connecting to the ``storage_containers``, or an iterable of
            such objects with one object for each of the ``storage_containers``.
        shared_relative_mount_path: Path relative to the ``fsmounts`` directory within
            the running node at which to mount the storage containers. Defaults to ""
            (mount within ``fsmounts`` itself).
        mount_names: Iterable of names (or paths) for the individual mounted storage
            containers relative to the ``shared_relative_mount_path``. If None, use
            the storage container names given in ``storage_containers`` as the
            ``mount_names``.
        blobfuse_options: Additional options passed to blobfuse. Defaults to "".
        cache_blobfuse: Whether to cache Blob storage. Defaults to False.
        **kwargs: Additional keyword arguments passed to the
            ``models.AzureBlobFileSystemConfiguration`` constructor.

    Returns:
        list[models.MountConfiguration]: A list of instantiated MountConfiguration
            objects describing the desired storage container mounts.

    Raises:
        ValueError: If the number of mount_names doesn't match storage_containers,
            or if the number of account_names or identity_references doesn't match
            storage_containers and isn't exactly 1.

    Example:
        >>> from azure.batch import models
        >>> identity_ref = models.ComputeNodeIdentityReference(
        ...     resource_id="/subscriptions/.../resourceGroups/.../providers/..."
        ... )
        >>> mount_configs = get_node_mount_config(
        ...     storage_containers=["container1", "container2"],
        ...     account_names="mystorageaccount",
        ...     identity_references=identity_ref,
        ...     shared_relative_mount_path="data",
        ...     cache_blobfuse=True
        ... )
        >>> len(mount_configs)
        2
    """
    storage_containers = ensure_listlike(storage_containers)
    account_names = ensure_listlike(account_names)
    identity_references = ensure_listlike(identity_references)

    n_containers = len(storage_containers)
    n_accounts = len(account_names)
    n_identity_refs = len(identity_references)

    if mount_names is None:
        mount_names = storage_containers
    else:
        mount_names = ensure_listlike(mount_names)
    n_mount_names = len(mount_names)

    if n_mount_names != n_containers:
        raise ValueError(
            "Must provide exactly as many "
            "`mount_names` as `storage_containers` "
            "to mount, or set `mount_names=None`, "
            "in which case the storage container "
            "names in `storage_containers` will "
            "be used as the names for mounting "
            "the containers. Got "
            f"{n_mount_names} `mount_names` and "
            f"{n_containers} `storage_containers`."
        )

    if n_containers != n_accounts:
        if n_accounts == 1:
            account_names *= n_containers
        else:
            raise ValueError(
                "Must either provide a single `account_names`"
                "value (as a string or a length-1 list) "
                "to cover all `storage_containers` values "
                "or provide one `account_names` value for "
                "each `storage_containers` value. Got "
                f"{n_accounts} `account_names` and "
                f"{n_containers} `storage_containers`."
            )

    if n_containers != n_identity_refs:
        if n_identity_refs == 1:
            identity_references *= n_containers
        else:
            raise ValueError(
                "Must either provide a single `identity_references`"
                "value (as a single ComputeNodeIdentityReference "
                "object or a length-1 list containing a "
                "ComputeNodeIdentityReference object) "
                "to cover all `storage_containers` values "
                "or provide one `identity_references` value for "
                "each `storage_containers` value. Got "
                f"{n_identity_refs} `identity_references` and "
                f"{n_containers} `storage_containers`."
            )

    relative_mount_paths = [
        os.path.join(shared_relative_mount_path, mount_name)
        for mount_name in mount_names
    ]
    if cache_blobfuse:
        blob_str = ""
    else:
        blob_str = " -o direct_io"

    return [
        models.MountConfiguration(
            azure_blob_file_system_configuration=(
                models.AzureBlobFileSystemConfiguration(
                    account_name=account_name,
                    container_name=container_name,
                    relative_mount_path=relative_mount_path,
                    blobfuse_options=blobfuse_options + blob_str,
                    identity_reference=identity_reference,
                    **kwargs,
                )
            )
        )
        for (
            account_name,
            container_name,
            relative_mount_path,
            identity_reference,
        ) in zip(
            account_names,
            storage_containers,
            relative_mount_paths,
            identity_references,
        )
    ]

upload_to_storage_container(file_paths, blob_storage_container_name, blob_service_client, local_root_dir='.', remote_root_dir='.')

Upload a file or list of files to an Azure blob storage container.

This function preserves relative directory structure among the uploaded files within the storage container.

Parameters:

Name Type Description Default
file_paths str | list[str]

File or list of files to upload, as string paths relative to local_root_dir. A single string will be coerced to a length-one list.

required
blob_storage_container_name str

Name of the blob storage container to which to upload the files. Must already exist.

required
blob_service_client BlobServiceClient

BlobServiceClient to use when uploading.

required
local_root_dir str

Root directory for the relative file paths in local storage. Defaults to "." (use the local working directory).

'.'
remote_root_dir str

Root directory for the relative file paths within the blob storage container. Defaults to "." (start at the blob storage container root).

'.'

Raises:

Type Description
Exception

If the blob storage container does not exist.

Example

from azure.storage.blob import BlobServiceClient client = BlobServiceClient(account_url="...", credential="...") upload_to_storage_container( ... ["file1.txt", "subdir/file2.txt"], ... "my-container", ... client, ... local_root_dir="/local/path", ... remote_root_dir="uploads" ... ) Uploading file 0 of 2 Uploaded 2 files to blob storage container

Source code in cfa/cloudops/blob.py
def upload_to_storage_container(
    file_paths: str | list[str],
    blob_storage_container_name: str,
    blob_service_client: BlobServiceClient,
    local_root_dir: str = ".",
    remote_root_dir: str = ".",
) -> None:
    """Upload a file or list of files to an Azure blob storage container.

    This function preserves relative directory structure among the
    uploaded files within the storage container.

    Args:
        file_paths: File or list of files to upload, as string paths relative to
            ``local_root_dir``. A single string will be coerced to a length-one list.
        blob_storage_container_name: Name of the blob storage container to which
            to upload the files. Must already exist.
        blob_service_client: BlobServiceClient to use when uploading.
        local_root_dir: Root directory for the relative file paths in local storage.
            Defaults to "." (use the local working directory).
        remote_root_dir: Root directory for the relative file paths within the blob
            storage container. Defaults to "." (start at the blob storage container root).

    Raises:
        Exception: If the blob storage container does not exist.

    Example:
        >>> from azure.storage.blob import BlobServiceClient
        >>> client = BlobServiceClient(account_url="...", credential="...")
        >>> upload_to_storage_container(
        ...     ["file1.txt", "subdir/file2.txt"],
        ...     "my-container",
        ...     client,
        ...     local_root_dir="/local/path",
        ...     remote_root_dir="uploads"
        ... )
        Uploading file 0 of 2
        Uploaded 2 files to blob storage container
    """

    file_paths = ensure_listlike(file_paths)

    n_total_files = len(file_paths)

    for i_file, file_path in enumerate(file_paths):
        if i_file % (1 + int(n_total_files / 10)) == 0:
            print("Uploading file {} of {}".format(i_file, n_total_files))

        local_file_path = os.path.join(local_root_dir, file_path)
        remote_file_path = os.path.join(remote_root_dir, file_path)

        blob_client = blob_service_client.get_blob_client(
            container=blob_storage_container_name, blob=remote_file_path
        )
        with open(local_file_path, "rb") as upload_data:
            blob_client.upload_blob(upload_data, overwrite=True)

    print("Uploaded {} files to blob storage container".format(n_total_files))

cfa.cloudops.client

Helper functions for setting up valid Azure clients.

get_batch_management_client(credential_handler=None, **kwargs)

Get an Azure Batch management client using credentials from a CredentialHandler.

Uses credentials obtained via a CredentialHandler: either a user-provided one or a default based on environment variables.

Parameters:

Name Type Description Default
credential_handler CredentialHandler

Credential handler for connecting and authenticating to Azure resources. If None, create a blank EnvCredentialHandler, which attempts to obtain needed credentials using information available in local environment variables (see its documentation for details).

None
**kwargs

Additional keyword arguments passed to the BatchManagementClient constructor.

{}

Returns:

Name Type Description
BatchManagementClient BatchManagementClient

A client instantiated according to the specified configuration.

Example

Using default environment-based credentials

client = get_batch_management_client()

Using custom credential handler

handler = CredentialHandler() client = get_batch_management_client(credential_handler=handler)

Source code in cfa/cloudops/client.py
def get_batch_management_client(
    credential_handler: CredentialHandler = None, **kwargs
) -> BatchManagementClient:
    """Get an Azure Batch management client using credentials from a CredentialHandler.

    Uses credentials obtained via a CredentialHandler: either a user-provided one
    or a default based on environment variables.

    Args:
        credential_handler: Credential handler for connecting and authenticating to
            Azure resources. If None, create a blank EnvCredentialHandler, which
            attempts to obtain needed credentials using information available in
            local environment variables (see its documentation for details).
        **kwargs: Additional keyword arguments passed to the BatchManagementClient constructor.

    Returns:
        BatchManagementClient: A client instantiated according to the specified configuration.

    Example:
        >>> # Using default environment-based credentials
        >>> client = get_batch_management_client()

        >>> # Using custom credential handler
        >>> handler = CredentialHandler()
        >>> client = get_batch_management_client(credential_handler=handler)
    """

    ch = credential_handler
    if ch is None:
        ch = EnvCredentialHandler()
    if ch.method == "sp":
        return BatchManagementClient(
            credential=ch.client_secret_credential,
            subscription_id=ch.azure_subscription_id,
            **kwargs,
        )
    elif ch.method == "fc":
        return BatchManagementClient(
            credential=ch.federated_credential,
            subscription_id=ch.azure_subscription_id,
            **kwargs,
        )
    else:
        return BatchManagementClient(
            credential=ch.user_credential,
            subscription_id=ch.azure_subscription_id,
            **kwargs,
        )

get_batch_service_client(credential_handler=None, **kwargs)

Get an Azure batch service client using credentials from a CredentialHandler.

Uses credentials obtained via a CredentialHandler: either a user-provided one or a default based on environment variables.

Parameters:

Name Type Description Default
credential_handler CredentialHandler

Credential handler for connecting and authenticating to Azure resources. If None, create a blank EnvCredentialHandler, which attempts to obtain needed credentials using information available in local environment variables (see its documentation for details).

None
**kwargs

Additional keyword arguments passed to the BatchServiceClient constructor.

{}

Returns:

Name Type Description
BatchServiceClient BatchServiceClient

A client instantiated according to the specified configuration.

Example

Using default environment-based credentials

client = get_batch_service_client()

Using custom credential handler

handler = CredentialHandler() client = get_batch_service_client(credential_handler=handler)

Source code in cfa/cloudops/client.py
def get_batch_service_client(
    credential_handler: CredentialHandler = None, **kwargs
) -> BatchServiceClient:
    """Get an Azure batch service client using credentials from a CredentialHandler.

    Uses credentials obtained via a CredentialHandler: either a user-provided one
    or a default based on environment variables.

    Args:
        credential_handler: Credential handler for connecting and authenticating to
            Azure resources. If None, create a blank EnvCredentialHandler, which
            attempts to obtain needed credentials using information available in
            local environment variables (see its documentation for details).
        **kwargs: Additional keyword arguments passed to the BatchServiceClient constructor.

    Returns:
        BatchServiceClient: A client instantiated according to the specified configuration.

    Example:
        >>> # Using default environment-based credentials
        >>> client = get_batch_service_client()

        >>> # Using custom credential handler
        >>> handler = CredentialHandler()
        >>> client = get_batch_service_client(credential_handler=handler)
    """
    ch = credential_handler
    if ch is None:
        ch = EnvCredentialHandler()
    if ch.method == "sp":
        return BatchServiceClient(
            credentials=ch.client_secret_credential,
            batch_url=ch.azure_batch_endpoint,
            **kwargs,
        )
    elif ch.method == "fc":
        return BatchServiceClient(
            credential=ch.federated_credential,
            subscription_id=ch.azure_batch_endpoint,
            **kwargs,
        )
    else:
        return BatchServiceClient(
            credentials=ch.batch_service_principal_credentials,
            batch_url=ch.azure_batch_endpoint,
            **kwargs,
        )

get_blob_service_client(credential_handler=None, **kwargs)

Get an Azure blob service client using credentials from a CredentialHandler.

Uses credentials obtained via a CredentialHandler: either a user-provided one or a default based on environment variables.

Parameters:

Name Type Description Default
credential_handler CredentialHandler

Credential handler for connecting and authenticating to Azure resources. If None, create a blank EnvCredentialHandler, which attempts to obtain needed credentials using information available in local environment variables (see its documentation for details).

None
**kwargs

Additional keyword arguments passed to the BlobServiceClient constructor.

{}

Returns:

Name Type Description
BlobServiceClient BlobServiceClient

A client instantiated according to the specified configuration.

Example

Using default environment-based credentials

client = get_blob_service_client()

Using custom credential handler

handler = CredentialHandler() client = get_blob_service_client(credential_handler=handler)

Source code in cfa/cloudops/client.py
def get_blob_service_client(
    credential_handler: CredentialHandler = None, **kwargs
) -> BlobServiceClient:
    """Get an Azure blob service client using credentials from a CredentialHandler.

    Uses credentials obtained via a CredentialHandler: either a user-provided one
    or a default based on environment variables.

    Args:
        credential_handler: Credential handler for connecting and authenticating to
            Azure resources. If None, create a blank EnvCredentialHandler, which
            attempts to obtain needed credentials using information available in
            local environment variables (see its documentation for details).
        **kwargs: Additional keyword arguments passed to the BlobServiceClient constructor.

    Returns:
        BlobServiceClient: A client instantiated according to the specified configuration.

    Example:
        >>> # Using default environment-based credentials
        >>> client = get_blob_service_client()

        >>> # Using custom credential handler
        >>> handler = CredentialHandler()
        >>> client = get_blob_service_client(credential_handler=handler)
    """
    ch = credential_handler
    if ch is None:
        ch = EnvCredentialHandler()
    if ch.method == "sp":
        return BlobServiceClient(
            account_url=ch.azure_blob_storage_endpoint,
            credential=ch.client_secret_credential,
            **kwargs,
        )
    elif ch.method == "fc":
        return BlobServiceClient(
            credential=ch.federated_credential,
            account_url=ch.azure_blob_storage_endpoint,
            **kwargs,
        )
    else:
        return BlobServiceClient(
            account_url=ch.azure_blob_storage_endpoint,
            credential=ch.user_credential,
            **kwargs,
        )

get_compute_management_client(credential_handler=None, **kwargs)

Get an Azure compute management client using credentials from a CredentialHandler.

Uses credentials obtained via a CredentialHandler: either a user-provided one or a default based on environment variables.

Parameters:

Name Type Description Default
credential_handler CredentialHandler

Credential handler for connecting and authenticating to Azure resources. If None, create a blank EnvCredentialHandler, which attempts to obtain needed credentials using information available in local environment variables (see its documentation for details).

None
**kwargs

Additional keyword arguments passed to the ComputeManagementClient constructor.

{}

Returns:

Name Type Description
ComputeManagementClient ComputeManagementClient

A client instantiated according to the specified configuration.

Example

Using default environment-based credentials

client = get_compute_management_client()

Using custom credential handler

handler = CredentialHandler() client = get_compute_management_client(credential_handler=handler)

Source code in cfa/cloudops/client.py
def get_compute_management_client(
    credential_handler: CredentialHandler = None, **kwargs
) -> ComputeManagementClient:
    """Get an Azure compute management client using credentials from a CredentialHandler.

    Uses credentials obtained via a CredentialHandler: either a user-provided one
    or a default based on environment variables.

    Args:
        credential_handler: Credential handler for connecting and authenticating to
            Azure resources. If None, create a blank EnvCredentialHandler, which
            attempts to obtain needed credentials using information available in
            local environment variables (see its documentation for details).
        **kwargs: Additional keyword arguments passed to the ComputeManagementClient constructor.

    Returns:
        ComputeManagementClient: A client instantiated according to the specified configuration.

    Example:
        >>> # Using default environment-based credentials
        >>> client = get_compute_management_client()

        >>> # Using custom credential handler
        >>> handler = CredentialHandler()
        >>> client = get_compute_management_client(credential_handler=handler)
    """
    ch = credential_handler
    if ch is None:
        ch = EnvCredentialHandler()
    if ch.method == "sp":
        return ComputeManagementClient(
            credential=ch.client_secret_credential,
            subscription_id=ch.azure_subscription_id,
            **kwargs,
        )
    elif ch.method == "fc":
        return ComputeManagementClient(
            credential=ch.federated_credential,
            subscription_id=ch.azure_subscription_id,
            **kwargs,
        )
    else:
        return ComputeManagementClient(
            credential=ch.user_credential,
            subscription_id=ch.azure_subscription_id,
            **kwargs,
        )

cfa.cloudops.defaults

Default configurations for Azure resources.

assign_container_config(pool_config, container_config)

Assign a container configuration to a Pool object (in place).

Parameters:

Name Type Description Default
pool_config Pool

Pool configuration object to modify.

required
container_config ContainerConfiguration

ContainerConfiguration object to add to the Pool configuration object.

required

Returns:

Type Description
Pool

models.Pool: The modified Pool object.

Example

from azure.mgmt.batch import models pool = get_default_pool_config("test", "subnet", "identity") container_config = models.ContainerConfiguration(type="dockerCompatible") modified_pool = assign_container_config(pool, container_config)

Pool is modified in place and returned

assert modified_pool is pool

Source code in cfa/cloudops/defaults.py
def assign_container_config(
    pool_config: models.Pool, container_config: models.ContainerConfiguration
) -> models.Pool:
    """Assign a container configuration to a Pool object (in place).

    Args:
        pool_config: Pool configuration object to modify.
        container_config: ContainerConfiguration object to add to the Pool
            configuration object.

    Returns:
        models.Pool: The modified Pool object.

    Example:
        >>> from azure.mgmt.batch import models
        >>> pool = get_default_pool_config("test", "subnet", "identity")
        >>> container_config = models.ContainerConfiguration(type="dockerCompatible")
        >>> modified_pool = assign_container_config(pool, container_config)
        >>> # Pool is modified in place and returned
        >>> assert modified_pool is pool
    """
    (
        pool_config.deployment_configuration.virtual_machine_configuration.container_configuration
    ) = container_config
    return pool_config

get_default_pool_config(pool_name, subnet_id, user_assigned_identity, **kwargs)

Instantiate a Pool instance with default configuration.

Creates a Pool with the given pool name and subnet id, the default pool identity given by get_default_pool_identity, and other defaults specified in default_pool_config_dict and default_network_config_dict.

Parameters:

Name Type Description Default
pool_name str

Name for the pool. Passed as the display_name argument to the Pool constructor.

required
subnet_id str

Subnet id for the pool, as a string. Should typically be obtained from a configuration file or an environment variable, often via a CredentialHandler instance.

required
user_assigned_identity str

User-assigned identity for the pool, as a string. Passed to get_default_pool_identity.

required
**kwargs

Additional keyword arguments passed to the Pool constructor, potentially overriding settings from default_pool_config_dict.

{}

Returns:

Type Description
Pool

models.Pool: The instantiated Pool object.

Example

pool = get_default_pool_config( ... pool_name="my-batch-pool", ... subnet_id="/subscriptions/.../subnets/default", ... user_assigned_identity="/subscriptions/.../resourceGroups/..." ... ) print(pool.display_name) 'my-batch-pool' print(pool.vm_size) 'standard_d4s_v3'

Source code in cfa/cloudops/defaults.py
def get_default_pool_config(
    pool_name: str, subnet_id: str, user_assigned_identity: str, **kwargs
) -> models.Pool:
    """Instantiate a Pool instance with default configuration.

    Creates a Pool with the given pool name and subnet id, the default pool identity
    given by get_default_pool_identity, and other defaults specified in
    default_pool_config_dict and default_network_config_dict.

    Args:
        pool_name: Name for the pool. Passed as the ``display_name`` argument
            to the Pool constructor.
        subnet_id: Subnet id for the pool, as a string. Should typically be obtained
            from a configuration file or an environment variable, often via a
            CredentialHandler instance.
        user_assigned_identity: User-assigned identity for the pool, as a string.
            Passed to get_default_pool_identity.
        **kwargs: Additional keyword arguments passed to the Pool constructor,
            potentially overriding settings from default_pool_config_dict.

    Returns:
        models.Pool: The instantiated Pool object.

    Example:
        >>> pool = get_default_pool_config(
        ...     pool_name="my-batch-pool",
        ...     subnet_id="/subscriptions/.../subnets/default",
        ...     user_assigned_identity="/subscriptions/.../resourceGroups/..."
        ... )
        >>> print(pool.display_name)
        'my-batch-pool'
        >>> print(pool.vm_size)
        'standard_d4s_v3'
    """
    return models.Pool(
        identity=get_default_pool_identity(user_assigned_identity),
        display_name=pool_name,
        network_configuration=models.NetworkConfiguration(
            subnet_id=subnet_id, **default_network_config_dict
        ),
        **{**default_pool_config_dict, **kwargs},
    )

get_default_pool_identity(user_assigned_identity)

Get the default BatchPoolIdentity instance for azuretools.

Associates a blank UserAssignedIdentities instance to the provided user_assigned_identity string.

Parameters:

Name Type Description Default
user_assigned_identity str

User-assigned identity, as a string.

required

Returns:

Type Description
BatchPoolIdentity

models.BatchPoolIdentity: Instantiated BatchPoolIdentity instance using the provided user-assigned identity.

Example

identity = get_default_pool_identity( ... "/subscriptions/.../resourceGroups/.../providers/..." ... ) print(identity.type)

Source code in cfa/cloudops/defaults.py
def get_default_pool_identity(
    user_assigned_identity: str,
) -> models.BatchPoolIdentity:
    """Get the default BatchPoolIdentity instance for azuretools.

    Associates a blank UserAssignedIdentities instance to the provided
    user_assigned_identity string.

    Args:
        user_assigned_identity: User-assigned identity, as a string.

    Returns:
        models.BatchPoolIdentity: Instantiated BatchPoolIdentity instance
            using the provided user-assigned identity.

    Example:
        >>> identity = get_default_pool_identity(
        ...     "/subscriptions/.../resourceGroups/.../providers/..."
        ... )
        >>> print(identity.type)
        <PoolIdentityType.user_assigned: 'UserAssigned'>
    """
    return models.BatchPoolIdentity(
        type=models.PoolIdentityType.user_assigned,
        user_assigned_identities={
            user_assigned_identity: models.UserAssignedIdentities()
        },
    )

remaining_task_autoscale_formula(task_sample_interval_minutes=15, max_number_vms=10)

Get an autoscaling formula that rescales pools based on the remaining task count.

Parameters:

Name Type Description Default
task_sample_interval_minutes int

Task sampling interval, in minutes, as an integer. Defaults to 15.

15
max_number_vms int

Maximum number of virtual machines to spin up, regardless of the number of remaining tasks. Defaults to 10.

10

Returns:

Name Type Description
str

The autoscale formula, as a string.

Example

Default settings (15 min interval, max 10 VMs)

formula = remaining_task_autoscale_formula() print(type(formula)) #

Custom settings

formula = remaining_task_autoscale_formula( ... task_sample_interval_minutes=30, ... max_number_vms=20 ... ) print("cappedPoolSize = 20" in formula) # True

Source code in cfa/cloudops/defaults.py
def remaining_task_autoscale_formula(
    task_sample_interval_minutes: int = 15,
    max_number_vms: int = 10,
):
    """Get an autoscaling formula that rescales pools based on the remaining task count.

    Args:
        task_sample_interval_minutes: Task sampling interval, in minutes, as an integer.
            Defaults to 15.
        max_number_vms: Maximum number of virtual machines to spin up, regardless of
            the number of remaining tasks. Defaults to 10.

    Returns:
        str: The autoscale formula, as a string.

    Example:
        >>> # Default settings (15 min interval, max 10 VMs)
        >>> formula = remaining_task_autoscale_formula()
        >>> print(type(formula))  # <class 'str'>

        >>> # Custom settings
        >>> formula = remaining_task_autoscale_formula(
        ...     task_sample_interval_minutes=30,
        ...     max_number_vms=20
        ... )
        >>> print("cappedPoolSize = 20" in formula)  # True
    """
    autoscale_formula_template = """// In this example, the pool size
    // is adjusted based on the number of tasks in the queue.
    // Note that both comments and line breaks are acceptable in formula strings.

    // Get pending tasks for the past 15 minutes.
    $samples = $ActiveTasks.GetSamplePercent(TimeInterval_Minute * {task_sample_interval_minutes});
    // If we have fewer than 70 percent data points, we use the last sample point, otherwise we use the maximum of last sample point and the history average.
    $tasks = $samples < 70 ? max(0, $ActiveTasks.GetSample(1)) :
    max( $ActiveTasks.GetSample(1), avg($ActiveTasks.GetSample(TimeInterval_Minute * {task_sample_interval_minutes})));
    // If number of pending tasks is not 0, set targetVM to pending tasks, otherwise half of current dedicated.
    $targetVMs = $tasks > 0 ? $tasks : max(0, $TargetDedicatedNodes / 2);
    // The pool size is capped at {max_number_vms}, if target VM value is more than that, set it to {max_number_vms}.
    cappedPoolSize = {max_number_vms};
    $TargetDedicatedNodes = max(0, min($targetVMs, cappedPoolSize));
    // Set node deallocation mode - keep nodes active only until tasks finish
    $NodeDeallocationOption = taskcompletion;"""

    autoscale_formula = autoscale_formula_template.format(
        task_sample_interval_minutes=task_sample_interval_minutes,
        max_number_vms=max_number_vms,
    )

    return autoscale_formula

set_env_vars()

Set default Azure environment variables.

Sets default values for Azure service endpoints and creates new variables as a function of existing environment variables.

Example

import os set_env_vars() print(os.environ["AZURE_BATCH_ENDPOINT_SUBDOMAIN"]) 'batch.azure.com/' print(os.environ["AZURE_CONTAINER_REGISTRY_DOMAIN"]) 'azurecr.io'

Source code in cfa/cloudops/defaults.py
def set_env_vars():
    """Set default Azure environment variables.

    Sets default values for Azure service endpoints and creates new variables
    as a function of existing environment variables.

    Example:
        >>> import os
        >>> set_env_vars()
        >>> print(os.environ["AZURE_BATCH_ENDPOINT_SUBDOMAIN"])
        'batch.azure.com/'
        >>> print(os.environ["AZURE_CONTAINER_REGISTRY_DOMAIN"])
        'azurecr.io'
    """
    # save default values
    os.environ["AZURE_BATCH_ENDPOINT_SUBDOMAIN"] = "batch.azure.com/"
    os.environ["AZURE_BATCH_RESOURCE_URL"] = "https://batch.core.windows.net/"
    os.environ["AZURE_KEYVAULT_ENDPOINT_SUBDOMAIN"] = "vault.azure.net"
    os.environ["AZURE_BLOB_STORAGE_ENDPOINT_SUBDOMAIN"] = (
        "blob.core.windows.net/"
    )
    os.environ["AZURE_CONTAINER_REGISTRY_DOMAIN"] = "azurecr.io"
    # create new variables as a function of env vars
    os.environ["AZURE_BATCH_ENDPOINT"] = (
        f"https://{os.getenv('AZURE_BATCH_ACCOUNT')}.{os.getenv('AZURE_BATCH_LOCATION')}.{default_azure_batch_endpoint_subdomain}"
    )
    os.environ["AZURE_KEYVAULT_ENDPOINT"] = (
        f"https://{os.getenv('AZURE_KEYVAULT_NAME')}.{default_azure_keyvault_endpoint_subdomain}"
    )
    os.environ["AZURE_BLOB_STORAGE_ENDPOINT"] = (
        f"https://{os.getenv('AZURE_BLOB_STORAGE_ACCOUNT')}.{default_azure_blob_storage_endpoint_subdomain}"
    )
    os.environ["ACR_TAG_PREFIX"] = (
        f"{os.getenv('AZURE_CONTAINER_REGISTRY_ACCOUNT')}.{default_azure_container_registry_domain}/"
    )

cfa.cloudops.endpoints

Helper functions for constructing Azure endpoint URLs.

construct_azure_container_registry_endpoint(azure_container_registry_account, azure_container_registry_domain=d.default_azure_container_registry_domain)

Construct an Azure container registry endpoint URL from the account name and domain.

Parameters:

Name Type Description Default
azure_container_registry_account str

Name of the Azure container registry account.

required
azure_container_registry_domain str

Domain for the Azure container registry. Typically "azurecr.io", the default.

default_azure_container_registry_domain

Returns:

Name Type Description
str str

The registry endpoint URL.

Example

url = construct_azure_container_registry_endpoint("myregistry") print(url) 'https://myregistry.azurecr.io'

url = construct_azure_container_registry_endpoint("myregistry", "custom.domain.io") print(url) 'https://myregistry.custom.domain.io'

Source code in cfa/cloudops/endpoints.py
def construct_azure_container_registry_endpoint(
    azure_container_registry_account: str,
    azure_container_registry_domain: str = d.default_azure_container_registry_domain,
) -> str:
    """Construct an Azure container registry endpoint URL from the account name and domain.

    Args:
        azure_container_registry_account: Name of the Azure container registry account.
        azure_container_registry_domain: Domain for the Azure container registry.
            Typically "azurecr.io", the default.

    Returns:
        str: The registry endpoint URL.

    Example:
        >>> url = construct_azure_container_registry_endpoint("myregistry")
        >>> print(url)
        'https://myregistry.azurecr.io'

        >>> url = construct_azure_container_registry_endpoint("myregistry", "custom.domain.io")
        >>> print(url)
        'https://myregistry.custom.domain.io'
    """
    return _construct_https_url(
        f"{azure_container_registry_account}.{azure_container_registry_domain}"
    )

construct_batch_endpoint(batch_account, batch_location, batch_endpoint_subdomain=d.default_azure_batch_endpoint_subdomain)

Construct an Azure Batch endpoint URL from the account name, location, and subdomain.

Parameters:

Name Type Description Default
batch_account str

Name of the Azure batch account.

required
batch_location str

Location of the Azure batch servers, e.g. "eastus".

required
batch_endpoint_subdomain str

Azure batch endpoint subdomains and domains that follow the account and location, e.g. "batch.azure.com/", the default.

default_azure_batch_endpoint_subdomain

Returns:

Name Type Description
str str

The endpoint URL.

Example

url = construct_batch_endpoint("mybatch", "eastus") print(url) 'https://mybatch.eastus.batch.azure.com/'

url = construct_batch_endpoint("mybatch", "westus", "custom.domain.com/") print(url) 'https://mybatch.westus.custom.domain.com/'

Source code in cfa/cloudops/endpoints.py
def construct_batch_endpoint(
    batch_account: str,
    batch_location: str,
    batch_endpoint_subdomain: str = d.default_azure_batch_endpoint_subdomain,
) -> str:
    """Construct an Azure Batch endpoint URL from the account name, location, and subdomain.

    Args:
        batch_account: Name of the Azure batch account.
        batch_location: Location of the Azure batch servers, e.g. "eastus".
        batch_endpoint_subdomain: Azure batch endpoint subdomains and domains
            that follow the account and location, e.g. "batch.azure.com/", the default.

    Returns:
        str: The endpoint URL.

    Example:
        >>> url = construct_batch_endpoint("mybatch", "eastus")
        >>> print(url)
        'https://mybatch.eastus.batch.azure.com/'

        >>> url = construct_batch_endpoint("mybatch", "westus", "custom.domain.com/")
        >>> print(url)
        'https://mybatch.westus.custom.domain.com/'
    """
    return _construct_https_url(
        f"{batch_account}.{batch_location}.{batch_endpoint_subdomain}"
    )

construct_blob_account_endpoint(blob_account, blob_endpoint_subdomain=d.default_azure_blob_storage_endpoint_subdomain)

Construct an Azure blob storage account endpoint URL.

Parameters:

Name Type Description Default
blob_account str

Name of the Azure blob storage account.

required
blob_endpoint_subdomain str

Azure blob endpoint subdomains and domains that follow the account, e.g. "blob.core.windows.net/", the default.

default_azure_blob_storage_endpoint_subdomain

Returns:

Name Type Description
str str

The endpoint URL.

Example

url = construct_blob_account_endpoint("mystorageaccount") print(url) 'https://mystorageaccount.blob.core.windows.net/'

url = construct_blob_account_endpoint("mystorageaccount", "custom.blob.domain/") print(url) 'https://mystorageaccount.custom.blob.domain/'

Source code in cfa/cloudops/endpoints.py
def construct_blob_account_endpoint(
    blob_account: str,
    blob_endpoint_subdomain: str = d.default_azure_blob_storage_endpoint_subdomain,
) -> str:
    """Construct an Azure blob storage account endpoint URL.

    Args:
        blob_account: Name of the Azure blob storage account.
        blob_endpoint_subdomain: Azure blob endpoint subdomains and domains
            that follow the account, e.g. "blob.core.windows.net/", the default.

    Returns:
        str: The endpoint URL.

    Example:
        >>> url = construct_blob_account_endpoint("mystorageaccount")
        >>> print(url)
        'https://mystorageaccount.blob.core.windows.net/'

        >>> url = construct_blob_account_endpoint("mystorageaccount", "custom.blob.domain/")
        >>> print(url)
        'https://mystorageaccount.custom.blob.domain/'
    """
    return _construct_https_url(f"{blob_account}.{blob_endpoint_subdomain}")

construct_blob_container_endpoint(blob_container, blob_account, blob_endpoint_subdomain=d.default_azure_blob_storage_endpoint_subdomain)

Construct an endpoint URL for a blob storage container.

Constructs the URL from the container name, account name, and endpoint subdomain.

Parameters:

Name Type Description Default
blob_container str

Name of the blob storage container.

required
blob_account str

Name of the Azure blob storage account.

required
blob_endpoint_subdomain str

Azure Blob endpoint subdomains and domains that follow the account name, e.g. "blob.core.windows.net/", the default.

default_azure_blob_storage_endpoint_subdomain

Returns:

Name Type Description
str str

The endpoint URL.

Example

url = construct_blob_container_endpoint("mycontainer", "mystorageaccount") print(url) 'https://mystorageaccount.blob.core.windows.net/mycontainer'

url = construct_blob_container_endpoint("data", "storage", "custom.blob.domain/") print(url) 'https://storage.custom.blob.domain/data'

Source code in cfa/cloudops/endpoints.py
def construct_blob_container_endpoint(
    blob_container: str,
    blob_account: str,
    blob_endpoint_subdomain: str = d.default_azure_blob_storage_endpoint_subdomain,
) -> str:
    """Construct an endpoint URL for a blob storage container.

    Constructs the URL from the container name, account name, and endpoint subdomain.

    Args:
        blob_container: Name of the blob storage container.
        blob_account: Name of the Azure blob storage account.
        blob_endpoint_subdomain: Azure Blob endpoint subdomains and domains
            that follow the account name, e.g. "blob.core.windows.net/", the default.

    Returns:
        str: The endpoint URL.

    Example:
        >>> url = construct_blob_container_endpoint("mycontainer", "mystorageaccount")
        >>> print(url)
        'https://mystorageaccount.blob.core.windows.net/mycontainer'

        >>> url = construct_blob_container_endpoint("data", "storage", "custom.blob.domain/")
        >>> print(url)
        'https://storage.custom.blob.domain/data'
    """
    return urljoin(
        construct_blob_account_endpoint(blob_account, blob_endpoint_subdomain),
        quote(blob_container),
    )

is_valid_acr_endpoint(endpoint)

Check whether an Azure container registry endpoint is valid given CFA ACR configurations.

Parameters:

Name Type Description Default
endpoint str

Azure Container Registry endpoint to validate.

required

Returns:

Type Description
tuple[bool, str | None]

tuple[bool, str | None]: First entry: True if validation passes, else False. Second entry: None if validation passes, else a string indicating what failed validation.

Example

valid, error = is_valid_acr_endpoint("https://myregistry.azurecr.io") print(valid) # True print(error) # None

valid, error = is_valid_acr_endpoint("https://myregistry.azurecr.io/") print(valid) # False print("trailing slash" in error) # True

valid, error = is_valid_acr_endpoint("https://azurecr.io") print(valid) # False print("subdomain" in error) # True

Source code in cfa/cloudops/endpoints.py
def is_valid_acr_endpoint(endpoint: str) -> tuple[bool, str | None]:
    """Check whether an Azure container registry endpoint is valid given CFA ACR configurations.

    Args:
        endpoint: Azure Container Registry endpoint to validate.

    Returns:
        tuple[bool, str | None]: First entry: True if validation passes, else False.
            Second entry: None if validation passes, else a string indicating
            what failed validation.

    Example:
        >>> valid, error = is_valid_acr_endpoint("https://myregistry.azurecr.io")
        >>> print(valid)  # True
        >>> print(error)  # None

        >>> valid, error = is_valid_acr_endpoint("https://myregistry.azurecr.io/")
        >>> print(valid)  # False
        >>> print("trailing slash" in error)  # True

        >>> valid, error = is_valid_acr_endpoint("https://azurecr.io")
        >>> print(valid)  # False
        >>> print("subdomain" in error)  # True
    """
    if endpoint.endswith("/"):
        return (
            False,
            (
                "Azure Container Registry URLs "
                "must not end with a trailing "
                "slash, as this can hamper DNS "
                "lookups of the private registry endpoint. "
                f"Got {endpoint}"
            ),
        )

    domain = urlparse(endpoint).netloc

    if not domain.endswith("azurecr.io"):
        return (
            False,
            (
                "Azure Container Registry URLs "
                "must have the domain "
                f"`azurecr.io`. Got `{domain}`."
            ),
        )

    if domain.startswith("azurecr.io"):
        return (
            False,
            (
                "Azure container registry URLs "
                "must have a subdomain, typically "
                "corresponding to the particular "
                "private registry name."
                f"Got {endpoint}"
            ),
        )

    return (True, None)

cfa.cloudops.job

Utilities for working with Azure Batch jobs.

create_job(client, job, verify_pool=True, exist_ok=False, verbose=False, **kwargs)

Create an Azure Batch job if it does not already exist.

Returns True if the job was created successfully. By default, verifies that the Azure Batch pool specified for the job exists, erroring if the pool cannot be found.

If the job itself already exists, errors by default but can also be configured to proceed without modifying or deleting the existing job.

Parameters:

Name Type Description Default
client BatchServiceClient

BatchServiceClient to use when creating the job.

required
job JobAddParameter

JobAddParameter instance defining the job to add.

required
verify_pool bool

Verify that the specified pool for the job exists before attempting to create the job, and error if it cannot be found. Defaults to True.

True
exist_ok bool

Proceed if the job already exists (without attempting to update/modify/overwrite it)? Defaults to False (error if the job already exists).

False
verbose bool

Message to stdout on success or failure due to job already existing? Defaults to False.

False
**kwargs

Additional keyword arguments passed to azure.batch.BatchServiceClient.job.add.

{}

Returns:

Name Type Description
bool bool

True if the job is successfully created. False if the job already exists and exist_ok is set to True.

Raises:

Type Description
ValueError

If the pool for the job cannot be found and verify_pool is True.

BatchErrorException

If the job exists and exist_ok is not True.

Example

from azure.batch import BatchServiceClient, models client = BatchServiceClient(credentials=..., batch_url=...) job = models.JobAddParameter( ... id="my-job", ... pool_info=models.PoolInformation(pool_id="my-pool") ... )

Create job with pool verification

success = create_job(client, job) print(success) # True if created, False if already exists with exist_ok=True

Create job allowing existing jobs

success = create_job(client, job, exist_ok=True, verbose=True) Job my-job exists.

Source code in cfa/cloudops/job.py
def create_job(
    client: BatchServiceClient,
    job: models.JobAddParameter,
    verify_pool: bool = True,
    exist_ok: bool = False,
    verbose: bool = False,
    **kwargs,
) -> bool:
    """Create an Azure Batch job if it does not already exist.

    Returns True if the job was created successfully. By default, verifies that the
    Azure Batch pool specified for the job exists, erroring if the pool cannot be found.

    If the job itself already exists, errors by default but can also be configured to
    proceed without modifying or deleting the existing job.

    Args:
        client: BatchServiceClient to use when creating the job.
        job: JobAddParameter instance defining the job to add.
        verify_pool: Verify that the specified pool for the job exists before
            attempting to create the job, and error if it cannot be found.
            Defaults to True.
        exist_ok: Proceed if the job already exists (without attempting to
            update/modify/overwrite it)? Defaults to False (error if the job already exists).
        verbose: Message to stdout on success or failure due to job already existing?
            Defaults to False.
        **kwargs: Additional keyword arguments passed to
            ``azure.batch.BatchServiceClient.job.add``.

    Returns:
        bool: True if the job is successfully created. False if the job already
            exists and ``exist_ok`` is set to True.

    Raises:
        ValueError: If the pool for the job cannot be found and ``verify_pool`` is True.
        models.BatchErrorException: If the job exists and ``exist_ok`` is not True.

    Example:
        >>> from azure.batch import BatchServiceClient, models
        >>> client = BatchServiceClient(credentials=..., batch_url=...)
        >>> job = models.JobAddParameter(
        ...     id="my-job",
        ...     pool_info=models.PoolInformation(pool_id="my-pool")
        ... )
        >>>
        >>> # Create job with pool verification
        >>> success = create_job(client, job)
        >>> print(success)  # True if created, False if already exists with exist_ok=True
        >>>
        >>> # Create job allowing existing jobs
        >>> success = create_job(client, job, exist_ok=True, verbose=True)
        Job my-job exists.
    """
    if verify_pool:
        pool_id = job.pool_info.pool_id
        if not client.pool.exists(pool_id):
            raise ValueError(
                f"Attempt to create job {job.id} on "
                f"pool {pool_id}, but could not find "
                "the requested pool. Check that this "
                "pool id is correct and that a pool "
                "with that id exists"
            )
    try:
        client.job.add(job, **kwargs)
        if verbose:
            print(f"Created job {job.id}.")
        return True
    except models.BatchErrorException as e:
        if not (e.error.code == "JobExists" and exist_ok):
            raise e
        if verbose:
            print(f"Job {job.id} exists.")
        return False

cfa.cloudops.task

Functions for manipulating tasks within an Azure batch job.

create_bind_mount_string(az_mount_dir, source_path, target_path)

Create a valid OCI bind mount string for an OCI container running in Azure batch.

Creates a bind mount string for mounting things from Azure blob storage.

Parameters:

Name Type Description Default
az_mount_dir str

Directory in which to look for directories or volumes to mount.

required
source_path str

Path relative to az_mount_dir to use as the source.

required
target_path str

Absolute path within the container to bind to the source path.

required

Returns:

Name Type Description
str str

A properly formatted OCI --mount type=bind command, as a string.

Example

mount_str = create_bind_mount_string( ... "/mnt/batch/tasks/fsmounts", ... "data", ... "/app/data" ... ) print(mount_str) '--mount type=bind,source=/mnt/batch/tasks/fsmounts/data,target=/app/data'

Source code in cfa/cloudops/task.py
def create_bind_mount_string(
    az_mount_dir: str, source_path: str, target_path: str
) -> str:
    """Create a valid OCI bind mount string for an OCI container running in Azure batch.

    Creates a bind mount string for mounting things from Azure blob storage.

    Args:
        az_mount_dir: Directory in which to look for directories or volumes to mount.
        source_path: Path relative to ``az_mount_dir`` to use as the source.
        target_path: Absolute path within the container to bind to the source path.

    Returns:
        str: A properly formatted OCI --mount type=bind command, as a string.

    Example:
        >>> mount_str = create_bind_mount_string(
        ...     "/mnt/batch/tasks/fsmounts",
        ...     "data",
        ...     "/app/data"
        ... )
        >>> print(mount_str)
        '--mount type=bind,source=/mnt/batch/tasks/fsmounts/data,target=/app/data'
    """
    mount_template = "--mount type=bind,source={}/{},target={}"
    return mount_template.format(az_mount_dir, source_path, target_path)

get_container_settings(container_image_name, az_mount_dir='$AZ_BATCH_NODE_MOUNTS_DIR', working_directory=None, mount_pairs=None, additional_options='', registry=None, **kwargs)

Create a valid set of container settings with bind mounts for an OCI container.

Creates container settings with bind mounts specified in mount_pairs, for an OCI container run in an Azure batch task.

Parameters:

Name Type Description Default
container_image_name str

Name of the OCI container image to use.

required
az_mount_dir str

Directory in which to look for directories or volumes to mount.

'$AZ_BATCH_NODE_MOUNTS_DIR'
working_directory str | ContainerWorkingDirectory

Working directory for the task within the container, passed as the working_directory parameter to the TaskContainerSettings constructor. If None (the default), then defer to the Azure batch default (note that this will not typically be the same as the container image's own WORKDIR). Otherwise specify it with a TaskWorkingDirectory instance or use the string "containerImageDefault" to use the container's own WORKDIR. See the documentation for TaskContainerSettings for more details.

None
mount_pairs list[dict]

Pairs of 'source' and 'target' directories to mount when the container is run, as a list of dictionaries with 'source' and 'target' keys.

None
additional_options str

Additional flags and options to pass to the container run command, as a string. Defaults to "".

''
registry ContainerRegistry

ContainerRegistry instance specifying a private container registry from which to fetch task containers. Defaults to None.

None
**kwargs

Additional keyword arguments passed to the TaskContainerSettings constructor.

{}

Returns:

Name Type Description
TaskContainerSettings TaskContainerSettings

A TaskContainerSettings object instantiated according to the specified input.

Example

mount_pairs = [ ... {"source": "data", "target": "/app/data"}, ... {"source": "output", "target": "/app/output"} ... ] settings = get_container_settings( ... "myregistry.azurecr.io/myapp:latest", ... mount_pairs=mount_pairs, ... additional_options="--env MODE=production" ... ) print(settings.image_name) 'myregistry.azurecr.io/myapp:latest'

Source code in cfa/cloudops/task.py
def get_container_settings(
    container_image_name: str,
    az_mount_dir: str = "$AZ_BATCH_NODE_MOUNTS_DIR",
    working_directory: str | ContainerWorkingDirectory = None,
    mount_pairs: list[dict] = None,
    additional_options: str = "",
    registry: ContainerRegistry = None,
    **kwargs,
) -> TaskContainerSettings:
    """Create a valid set of container settings with bind mounts for an OCI container.

    Creates container settings with bind mounts specified in mount_pairs,
    for an OCI container run in an Azure batch task.

    Args:
        container_image_name: Name of the OCI container image to use.
        az_mount_dir: Directory in which to look for directories or volumes to mount.
        working_directory: Working directory for the task within the container, passed
            as the working_directory parameter to the TaskContainerSettings constructor.
            If None (the default), then defer to the Azure batch default (note that this
            will _not_ typically be the same as the container image's own WORKDIR).
            Otherwise specify it with a TaskWorkingDirectory instance or use the string
            "containerImageDefault" to use the container's own WORKDIR. See the
            documentation for TaskContainerSettings for more details.
        mount_pairs: Pairs of 'source' and 'target' directories to mount when the
            container is run, as a list of dictionaries with 'source' and 'target' keys.
        additional_options: Additional flags and options to pass to the container
            run command, as a string. Defaults to "".
        registry: ContainerRegistry instance specifying a private container registry
            from which to fetch task containers. Defaults to None.
        **kwargs: Additional keyword arguments passed to the TaskContainerSettings constructor.

    Returns:
        TaskContainerSettings: A TaskContainerSettings object instantiated according
            to the specified input.

    Example:
        >>> mount_pairs = [
        ...     {"source": "data", "target": "/app/data"},
        ...     {"source": "output", "target": "/app/output"}
        ... ]
        >>> settings = get_container_settings(
        ...     "myregistry.azurecr.io/myapp:latest",
        ...     mount_pairs=mount_pairs,
        ...     additional_options="--env MODE=production"
        ... )
        >>> print(settings.image_name)
        'myregistry.azurecr.io/myapp:latest'
    """

    ctr_r_opts = additional_options

    for pair in mount_pairs:
        ctr_r_opts += " " + create_bind_mount_string(
            az_mount_dir, pair["source"], pair["target"]
        )

    return TaskContainerSettings(
        image_name=container_image_name,
        working_directory=working_directory,
        container_run_options=ctr_r_opts,
        registry=registry,
        **kwargs,
    )

get_task_config(task_id, base_call, container_settings=None, user_identity=None, log_blob_container=None, log_blob_account=None, log_subdir=None, log_file_pattern='../std*.txt', log_upload_condition='taskCompletion', log_compute_node_identity_reference=None, output_files=None, **kwargs)

Create a batch task with a given base call and set of container settings.

If the user_identity is not set, set it up automatically with sufficient permissions to read and write from mounted volumes.

Parameters:

Name Type Description Default
task_id str

Alphanumeric identifier for the task.

required
base_call str

The base command line call for the task, as a string.

required
container_settings TaskContainerSettings

Container settings for the task. You can use the create_container_settings helper function to create a valid entry. Defaults to None.

None
user_identity UserIdentity

User identity under which to run the task. If None, create one automatically with admin privileges, if permitted. Defaults to None.

None
log_blob_container str

If provided, save the contents of the stderr and stdout buffers (default) and/or other specified log files from task execution to files named in the specified Azure blob storage container. If None, do not preserve the contents of those buffers.

None
log_blob_account str

Azure Blob storage account in which to look for the storage container specified in log_blob_container. Ignored if log_blob_container is None. Defaults to None.

None
log_subdir str

Subdirectory of the Blob storage container given in log_blob_storage_container in which to save the log .txt files. If None, save at the root of the Blob storage container. Ignored if log_blob_container is None.

None
log_file_pattern str

File pattern for logs to persist. Defaults to "../std*.txt", which matches the .txt output files for the stdout and stderr buffers in a standard Azure Batch Linux task, which are stored one directory up from the task working directory. Ignored if log_blob_container is None.

'../std*.txt'
log_upload_condition str

Condition under which to upload logs. Options are "taskCompletion" (always upload, the default), "taskFailure" (upload only for failed tasks), and "taskSuccess" (upload only for successful tasks). Passed as the upload_condition argument to OutputFileUploadOptions.

'taskCompletion'
log_compute_node_identity_reference ComputeNodeIdentityReference

ComputeNodeIdentityReference to use when constructing a OutputFileBlobContainerDestination object for logging. If None (default), attempt to obtain one via get_compute_node_identity_reference. Ignored if log_blob_container is None.

None
output_files list[OutputFile] | OutputFile

OutputFile object or list of such objects specifying additional output files for the task beyond those auto-constructed for persisting logs to log_blob_container. Passed along with those autogenerated OutputFile objects as the output_files parameter to the TaskAddParameter constructor.

None
**kwargs

Additional keyword arguments passed to the TaskAddParameter constructor.

{}

Returns:

Name Type Description
TaskAddParameter TaskAddParameter

The task configuration object.

Example

from azure.batch.models import TaskContainerSettings

Basic task without container

task = get_task_config( ... task_id="my-task-001", ... base_call="python /app/script.py --input data.txt" ... )

Task with container and logging

container_settings = TaskContainerSettings( ... image_name="myregistry.azurecr.io/myapp:latest" ... ) task = get_task_config( ... task_id="my-task-002", ... base_call="python /app/script.py", ... container_settings=container_settings, ... log_blob_container="task-logs", ... log_blob_account="mystorageaccount", ... log_subdir="job-123" ... ) print(task.id) 'my-task-002'

Source code in cfa/cloudops/task.py
def get_task_config(
    task_id: str,
    base_call: str,
    container_settings: TaskContainerSettings = None,
    user_identity: UserIdentity = None,
    log_blob_container: str = None,
    log_blob_account: str = None,
    log_subdir: str = None,
    log_file_pattern: str = "../std*.txt",
    log_upload_condition: str = "taskCompletion",
    log_compute_node_identity_reference: ComputeNodeIdentityReference = None,
    output_files: list[OutputFile] | OutputFile = None,
    **kwargs,
) -> TaskAddParameter:
    """Create a batch task with a given base call and set of container settings.

    If the ``user_identity`` is not set, set it up automatically with sufficient
    permissions to read and write from mounted volumes.

    Args:
        task_id: Alphanumeric identifier for the task.
        base_call: The base command line call for the task, as a string.
        container_settings: Container settings for the task. You can use the
            create_container_settings helper function to create a valid entry.
            Defaults to None.
        user_identity: User identity under which to run the task. If None, create
            one automatically with admin privileges, if permitted. Defaults to None.
        log_blob_container: If provided, save the contents of the stderr and stdout
            buffers (default) and/or other specified log files from task execution
            to files named in the specified Azure blob storage container. If None,
            do not preserve the contents of those buffers.
        log_blob_account: Azure Blob storage account in which to look for the storage
            container specified in ``log_blob_container``. Ignored if ``log_blob_container``
            is None. Defaults to None.
        log_subdir: Subdirectory of the Blob storage container given in
            ``log_blob_storage_container`` in which to save the log ``.txt`` files.
            If None, save at the root of the Blob storage container. Ignored if
            ``log_blob_container`` is None.
        log_file_pattern: File pattern for logs to persist. Defaults to "../std*.txt",
            which matches the ``.txt`` output files for the stdout and stderr buffers
            in a standard Azure Batch Linux task, which are stored one directory up
            from the task working directory. Ignored if ``log_blob_container`` is None.
        log_upload_condition: Condition under which to upload logs. Options are
            "taskCompletion" (always upload, the default), "taskFailure" (upload only
            for failed tasks), and "taskSuccess" (upload only for successful tasks).
            Passed as the ``upload_condition`` argument to OutputFileUploadOptions.
        log_compute_node_identity_reference: ComputeNodeIdentityReference to use when
            constructing a OutputFileBlobContainerDestination object for logging.
            If None (default), attempt to obtain one via get_compute_node_identity_reference.
            Ignored if ``log_blob_container`` is None.
        output_files: OutputFile object or list of such objects specifying additional
            output files for the task beyond those auto-constructed for persisting logs
            to ``log_blob_container``. Passed along with those autogenerated OutputFile
            objects as the ``output_files`` parameter to the TaskAddParameter constructor.
        **kwargs: Additional keyword arguments passed to the TaskAddParameter constructor.

    Returns:
        TaskAddParameter: The task configuration object.

    Example:
        >>> from azure.batch.models import TaskContainerSettings
        >>>
        >>> # Basic task without container
        >>> task = get_task_config(
        ...     task_id="my-task-001",
        ...     base_call="python /app/script.py --input data.txt"
        ... )
        >>>
        >>> # Task with container and logging
        >>> container_settings = TaskContainerSettings(
        ...     image_name="myregistry.azurecr.io/myapp:latest"
        ... )
        >>> task = get_task_config(
        ...     task_id="my-task-002",
        ...     base_call="python /app/script.py",
        ...     container_settings=container_settings,
        ...     log_blob_container="task-logs",
        ...     log_blob_account="mystorageaccount",
        ...     log_subdir="job-123"
        ... )
        >>> print(task.id)
        'my-task-002'
    """
    if user_identity is None:
        user_identity = UserIdentity(
            auto_user=batchmodels.AutoUserSpecification(
                scope=batchmodels.AutoUserScope.pool,
                elevation_level=batchmodels.ElevationLevel.admin,
            )
        )
    if output_files is None:
        output_files = []

    if log_blob_container is not None:
        if log_subdir is None:
            log_subdir = ""
        log_output_files = output_task_files_to_blob(
            file_pattern=log_file_pattern,
            blob_container=log_blob_container,
            blob_account=log_blob_account,
            path=Path(log_subdir, task_id).as_posix(),
            upload_condition=log_upload_condition,
            compute_node_identity_reference=log_compute_node_identity_reference,
        )
    else:
        log_output_files = []

    task_config = TaskAddParameter(
        id=task_id,
        command_line=base_call,
        container_settings=container_settings,
        user_identity=user_identity,
        output_files=ensure_listlike(output_files)
        + ensure_listlike(log_output_files),
        **kwargs,
    )

    return task_config

output_task_files_to_blob(file_pattern, blob_container, blob_account, path=None, upload_condition='taskCompletion', blob_endpoint_subdomain=default_azure_blob_storage_endpoint_subdomain, compute_node_identity_reference=None, **kwargs)

Get a properly configured OutputFile object for uploading files from a Batch task to Blob storage.

Parameters:

Name Type Description Default
file_pattern str

File pattern to match when uploading. Passed as the file_pattern argument to OutputFile.

required
blob_container str

Name of the Azure blob storage container to which to upload the files.

required
blob_account str

Name of the Azure blob storage account in which to look for the Blob storage container specified in blob_container.

required
path str

Path within the Blob storage container to which to upload the file(s). Passed as the path argument to the OutputFileBlobContainerDestination constructor. If None, upload to the root of the container. If file_pattern contains wildcards, path gives the subdirectory within the container to upload them with their original filenames and extensions. If file_pattern contains no wildcards, path is treated as the full file path including filename and extension (i.e. the file is renamed). See OutputFileBlobContainerDestination for details.

None
upload_condition str

Condition under which to upload the file(s). Options are "taskCompletion" (always upload, the default), "taskFailure" (upload only for failed tasks), and "taskSuccess" (upload only for successful tasks). Passed as the upload_condition argument to OutputFileUploadOptions.

'taskCompletion'
blob_endpoint_subdomain str

Azure Blob endpoint subdomains and domains that follow the account name. If None (default), use this package's default_azure_blob_storage_endpoint_subdomain.

default_azure_blob_storage_endpoint_subdomain
compute_node_identity_reference ComputeNodeIdentityReference

ComputeNodeIdentityReference to use when constructing a OutputFileBlobContainerDestination object for logging. If None (default), attempt to obtain one via get_compute_node_identity_reference.

None
**kwargs

Additional keyword arguments passed to the OutputFile constructor.

{}

Returns:

Name Type Description
OutputFile OutputFile

An OutputFile object that can be used in constructing a batch task via get_task_config.

Raises:

Type Description
TypeError

If compute_node_identity_reference is not of the required type.

Example

output_file = output_task_files_to_blob( ... file_pattern=".log", ... blob_container="task-outputs", ... blob_account="mystorageaccount", ... path="logs/task-123", ... upload_condition="taskCompletion" ... ) print(output_file.file_pattern) '.log'

Source code in cfa/cloudops/task.py
def output_task_files_to_blob(
    file_pattern: str,
    blob_container: str,
    blob_account: str,
    path: str = None,
    upload_condition: str = "taskCompletion",
    blob_endpoint_subdomain: str = default_azure_blob_storage_endpoint_subdomain,
    compute_node_identity_reference: ComputeNodeIdentityReference = None,
    **kwargs,
) -> OutputFile:
    """Get a properly configured OutputFile object for uploading files from a Batch task to Blob storage.

    Args:
        file_pattern: File pattern to match when uploading. Passed as the
            ``file_pattern`` argument to OutputFile.
        blob_container: Name of the Azure blob storage container to which
            to upload the files.
        blob_account: Name of the Azure blob storage account in which to look for
            the Blob storage container specified in ``blob_container``.
        path: Path within the Blob storage container to which to upload the file(s).
            Passed as the ``path`` argument to the OutputFileBlobContainerDestination
            constructor. If None, upload to the root of the container. If ``file_pattern``
            contains wildcards, ``path`` gives the subdirectory within the container to
            upload them with their original filenames and extensions. If ``file_pattern``
            contains no wildcards, ``path`` is treated as the full file path including
            filename and extension (i.e. the file is renamed). See
            OutputFileBlobContainerDestination for details.
        upload_condition: Condition under which to upload the file(s). Options are
            "taskCompletion" (always upload, the default), "taskFailure" (upload only
            for failed tasks), and "taskSuccess" (upload only for successful tasks).
            Passed as the ``upload_condition`` argument to OutputFileUploadOptions.
        blob_endpoint_subdomain: Azure Blob endpoint subdomains and domains that follow
            the account name. If None (default), use this package's
            default_azure_blob_storage_endpoint_subdomain.
        compute_node_identity_reference: ComputeNodeIdentityReference to use when
            constructing a OutputFileBlobContainerDestination object for logging.
            If None (default), attempt to obtain one via get_compute_node_identity_reference.
        **kwargs: Additional keyword arguments passed to the OutputFile constructor.

    Returns:
        OutputFile: An OutputFile object that can be used in constructing a
            batch task via get_task_config.

    Raises:
        TypeError: If ``compute_node_identity_reference`` is not of the required type.

    Example:
        >>> output_file = output_task_files_to_blob(
        ...     file_pattern="*.log",
        ...     blob_container="task-outputs",
        ...     blob_account="mystorageaccount",
        ...     path="logs/task-123",
        ...     upload_condition="taskCompletion"
        ... )
        >>> print(output_file.file_pattern)
        '*.log'
    """
    if compute_node_identity_reference is None:
        compute_node_identity_reference = get_compute_node_identity_reference()
    if not isinstance(
        compute_node_identity_reference, ComputeNodeIdentityReference
    ):
        raise TypeError(
            "compute_node_identity_reference "
            "must be an instance of "
            "ComputeNodeIdentityReference. "
            f"Got {type(compute_node_identity_reference)}."
        )
    container = OutputFileBlobContainerDestination(
        container_url=construct_blob_container_endpoint(
            blob_container,
            blob_account,
            blob_endpoint_subdomain,
        ),
        path=path,
        identity_reference=compute_node_identity_reference,
    )
    destination = OutputFileDestination(container=container)
    upload_options = OutputFileUploadOptions(upload_condition=upload_condition)

    return OutputFile(
        file_pattern=file_pattern,
        destination=destination,
        upload_options=upload_options,
        **kwargs,
    )

cfa.cloudops.util

Miscellaneous utilities for interacting with Azure.

ensure_listlike(x)

Ensure that an object either behaves like a MutableSequence or return a one-item list.

If the object is not a MutableSequence, return a one-item list containing the object. Useful for handling list-of-strings inputs alongside single strings.

Based on this StackOverflow approach <https://stackoverflow.com/a/66485952>_.

Parameters:

Name Type Description Default
x any

The item to ensure is list-like.

required

Returns:

Name Type Description
MutableSequence MutableSequence

x if x is a MutableSequence, otherwise [x] (i.e. a one-item list containing x).

Example

Single string becomes a list

result = ensure_listlike("hello") print(result) ['hello']

List stays a list

result = ensure_listlike(["hello", "world"]) print(result) ['hello', 'world']

Works with other types too

result = ensure_listlike(42) print(result) [42]

Source code in cfa/cloudops/util.py
def ensure_listlike(x: any) -> MutableSequence:
    """Ensure that an object either behaves like a MutableSequence or return a one-item list.

    If the object is not a MutableSequence, return a one-item list containing the object.
    Useful for handling list-of-strings inputs alongside single strings.

    Based on this `StackOverflow approach <https://stackoverflow.com/a/66485952>`_.

    Args:
        x: The item to ensure is list-like.

    Returns:
        MutableSequence: ``x`` if ``x`` is a MutableSequence, otherwise ``[x]``
            (i.e. a one-item list containing ``x``).

    Example:
        >>> # Single string becomes a list
        >>> result = ensure_listlike("hello")
        >>> print(result)
        ['hello']

        >>> # List stays a list
        >>> result = ensure_listlike(["hello", "world"])
        >>> print(result)
        ['hello', 'world']

        >>> # Works with other types too
        >>> result = ensure_listlike(42)
        >>> print(result)
        [42]
    """
    return x if isinstance(x, MutableSequence) else [x]

lookup_available_vm_skus_for_batch(client=None, config_dict=None, try_env=True, to_dict=True, **kwargs)

Look up available VM image SKUs for the given batch service.

Parameters:

Name Type Description Default
client BatchManagementClient

BatchManagementClient to use when looking up the available images. If None, use the output of get_batch_management_client(). Defaults to None.

None
config_dict dict

Configuration dictionary. Passed as the config_dict argument to any internal config.get_config_val calls. See that function's documentation for additional details.

None
try_env bool

Whether to look for configuration values in the available environment variables. Passed as the try_env argument to any internal config.get_config_val calls. See that function's documentation for additional details.

True
to_dict bool

Apply sku_to_dict to the list of results? Defaults to True. If False, the result will be a list of SupportedSku objects.

True
**kwargs

Additional keyword arguments passed to BatchManagementClient.location.list_supported_virtual_machine_skus.

{}

Returns:

Name Type Description
list

Of supported SKUs, either as dictionaries of property names and values (default) or as raw SupportedSku objects (if to_dict is False).

Example

from azure.mgmt.batch import BatchManagementClient

Get SKUs as dictionaries (default)

skus = lookup_available_vm_skus_for_batch() for sku in skus[:3]: # Show first 3 ... print(f"Name: {sku['name']}, vCPUs: {sku.get('vCPUs', 'N/A')}")

Get raw SupportedSku objects

raw_skus = lookup_available_vm_skus_for_batch(to_dict=False) print(f"Found {len(raw_skus)} available VM SKUs") print(f"First SKU: {raw_skus[0].name}")

Source code in cfa/cloudops/util.py
def lookup_available_vm_skus_for_batch(
    client: BatchManagementClient = None,
    config_dict: dict = None,
    try_env: bool = True,
    to_dict: bool = True,
    **kwargs,
):
    """Look up available VM image SKUs for the given batch service.

    Args:
        client: BatchManagementClient to use when looking up the available images.
            If None, use the output of ``get_batch_management_client()``. Defaults to None.
        config_dict: Configuration dictionary. Passed as the ``config_dict`` argument
            to any internal ``config.get_config_val`` calls. See that function's
            documentation for additional details.
        try_env: Whether to look for configuration values in the available environment
            variables. Passed as the ``try_env`` argument to any internal
            ``config.get_config_val`` calls. See that function's documentation for
            additional details.
        to_dict: Apply ``sku_to_dict`` to the list of results? Defaults to True.
            If False, the result will be a list of SupportedSku objects.
        **kwargs: Additional keyword arguments passed to
            ``BatchManagementClient.location.list_supported_virtual_machine_skus``.

    Returns:
        list: Of supported SKUs, either as dictionaries of property names and values
            (default) or as raw SupportedSku objects (if ``to_dict`` is False).

    Example:
        >>> from azure.mgmt.batch import BatchManagementClient
        >>>
        >>> # Get SKUs as dictionaries (default)
        >>> skus = lookup_available_vm_skus_for_batch()
        >>> for sku in skus[:3]:  # Show first 3
        ...     print(f"Name: {sku['name']}, vCPUs: {sku.get('vCPUs', 'N/A')}")

        >>> # Get raw SupportedSku objects
        >>> raw_skus = lookup_available_vm_skus_for_batch(to_dict=False)
        >>> print(f"Found {len(raw_skus)} available VM SKUs")
        >>> print(f"First SKU: {raw_skus[0].name}")
    """
    if client is None:
        from .client import get_batch_management_client

        client = get_batch_management_client(
            config_dict=config_dict, try_env=try_env
        )
    result = [
        item
        for item in client.location.list_supported_virtual_machine_skus(
            location_name=get_config_val(
                "azure_batch_location",
                config_dict=config_dict,
                try_env=try_env,
            ),
            **kwargs,
        )
    ]

    if to_dict:
        result = [sku_to_dict(x) for x in result]

    return result

lookup_service_principal(display_name)

Look up an Azure service principal from its display name.

Requires the Azure CLI.

Parameters:

Name Type Description Default
display_name str

The display name of the service principal to look up.

required

Returns:

Name Type Description
list list

The results, if any, or an empty list if no match was found.

Raises:

Type Description
RuntimeError

If the Azure CLI command fails or is not available.

Example

Look up a service principal by display name

sp_list = lookup_service_principal("my-service-principal") if sp_list: ... print(f"Found {len(sp_list)} service principal(s)") ... print(f"App ID: {sp_list[0]['appId']}") ... else: ... print("No service principal found")

Source code in cfa/cloudops/util.py
def lookup_service_principal(display_name: str) -> list:
    """Look up an Azure service principal from its display name.

    Requires the Azure CLI.

    Args:
        display_name: The display name of the service principal to look up.

    Returns:
        list: The results, if any, or an empty list if no match was found.

    Raises:
        RuntimeError: If the Azure CLI command fails or is not available.

    Example:
        >>> # Look up a service principal by display name
        >>> sp_list = lookup_service_principal("my-service-principal")
        >>> if sp_list:
        ...     print(f"Found {len(sp_list)} service principal(s)")
        ...     print(f"App ID: {sp_list[0]['appId']}")
        ... else:
        ...     print("No service principal found")
    """
    try:
        command = [f"az ad sp list --display-name {display_name}"]
        result = subprocess.check_output(
            command, shell=True, universal_newlines=True, text=True
        )
    except Exception as e:
        raise RuntimeError(
            "Attempt to search available Azure "
            "service principals via the "
            "`az ad sp list` command produced an "
            "error. Check that you have the Azure "
            "command line interface (CLI) installed "
            "and in your PATH as `az`, and that you "
            "are authenticated via `az login`"
        ) from e
    parsed = json.loads(result)
    return parsed

sku_to_dict(sku)

Convert a SupportedSku object to a flat dictionary of property names and values.

Parameters:

Name Type Description Default
sku SupportedSku

The SupportedSku object to convert.

required

Returns:

Name Type Description
dict

A flat dictionary with keys 'name', 'family_name', 'batch_support_end_of_life', 'additional_properties', as well as keys and values corresponding to any SkuCapability objects associated to the SupportedSku.

Example

from azure.mgmt.batch.models import SupportedSku

Assuming we have a SupportedSku object

sku_dict = sku_to_dict(some_sku) print(sku_dict['name']) # e.g., 'Standard_D2s_v3' print(sku_dict['family_name']) # e.g., 'standardDSv3Family' print(sku_dict.get('vCPUs')) # e.g., '2' (from capabilities)

Source code in cfa/cloudops/util.py
def sku_to_dict(sku: SupportedSku):
    """Convert a SupportedSku object to a flat dictionary of property names and values.

    Args:
        sku: The SupportedSku object to convert.

    Returns:
        dict: A flat dictionary with keys 'name', 'family_name',
            'batch_support_end_of_life', 'additional_properties', as well as keys
            and values corresponding to any SkuCapability objects associated to
            the SupportedSku.

    Example:
        >>> from azure.mgmt.batch.models import SupportedSku
        >>> # Assuming we have a SupportedSku object
        >>> sku_dict = sku_to_dict(some_sku)
        >>> print(sku_dict['name'])  # e.g., 'Standard_D2s_v3'
        >>> print(sku_dict['family_name'])  # e.g., 'standardDSv3Family'
        >>> print(sku_dict.get('vCPUs'))  # e.g., '2' (from capabilities)
    """
    return dict(
        name=sku.name,
        family_name=sku.family_name,
        batch_support_end_of_life=sku.batch_support_end_of_life,
        additional_properties=sku.additional_properties,
        **{c.name: c.value for c in sku.capabilities},
    )