Skip to content

cfa-cloudops Modules API Reference

cfa.cloudops.auth

Helper functions for Azure authentication.

CredentialHandler dataclass

Data structure for Azure credentials.

Lazy and cached: credentials are retrieved from a keyvault only when needed and are cached thereafter.

Source code in cfa/cloudops/auth.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
@dataclass
class CredentialHandler:
    """Data structure for Azure credentials.

    Lazy and cached: credentials are retrieved from a keyvault only when needed
    and are cached thereafter.
    """

    azure_subscription_id: str = None
    azure_resource_group_name: str = None
    azure_user_assigned_identity: str = None
    azure_subnet_id: str = None

    azure_keyvault_endpoint: str = None
    azure_keyvault_sp_secret_id: str = None
    azure_tenant_id: str = None
    azure_client_id: str = None
    azure_batch_endpoint_subdomain: str = d.default_azure_batch_endpoint_subdomain
    azure_batch_account: str = None
    azure_batch_location: str = d.default_azure_batch_location
    azure_batch_resource_url: str = d.default_azure_batch_resource_url
    azure_blob_storage_endpoint_subdomain: str = (
        d.default_azure_blob_storage_endpoint_subdomain
    )
    azure_blob_storage_account: str = None

    azure_container_registry_account: str = None
    azure_container_registry_domain: str = d.default_azure_container_registry_domain
    method: str = None

    def require_attr(self, attributes: str | list[str], goal: str = None):
        """Check that attributes required for a given operation are defined.

        Raises an informative error message if the required attribute is not defined.

        Args:
            attributes: String or list of strings naming the required attribute(s).
            goal: String naming the value that the attributes are required for obtaining,
                to make error messages more informative. If None, use a more generic message.

        Raises:
            AttributeError: If any required ``attributes`` are None.

        Example:
            >>> handler = CredentialHandler()
            >>> handler.require_attr(["azure_tenant_id"], "authentication")
            AttributeError: A non-None value for attribute azure_tenant_id is required...
        """
        attributes = ensure_listlike(attributes)
        err_msgs = []
        for attr in attributes:
            attr_val = getattr(self, attr)
            if attr_val is None:
                err_msg = (f"A non-None value for attribute {attr} is required ") + (
                    f"to obtain a value for {goal}."
                    if goal is not None
                    else "for this operation."
                )
                err_msgs.append(err_msg)
        if err_msgs:
            raise AttributeError("\n".join(err_msgs))

    @property
    def azure_batch_endpoint(self) -> str:
        """Azure batch endpoint URL.

        Constructed programmatically from account name, location, and subdomain.

        Returns:
            str: The endpoint URL.

        Example:
            >>> handler = CredentialHandler()
            >>> handler.azure_batch_account = "mybatchaccount"
            >>> handler.azure_batch_location = "eastus"
            >>> handler.azure_batch_endpoint_subdomain = "batch.azure.com"
            >>> handler.azure_batch_endpoint
            'https://mybatchaccount.eastus.batch.azure.com'
        """
        logger.debug("Constructing Azure Batch endpoint URL.")
        self.require_attr(
            [
                "azure_batch_account",
                "azure_batch_endpoint_subdomain",
            ],
            goal="Azure batch endpoint URL",
        )
        logger.debug(
            "All required attributes present for Azure Batch endpoint URL. Constructing..."
        )
        endpoint = construct_batch_endpoint(
            self.azure_batch_account,
            self.azure_batch_location,
            self.azure_batch_endpoint_subdomain,
        )
        logger.debug(f"Constructed Azure Batch endpoint URL: {endpoint}")
        return endpoint

    @property
    def azure_blob_storage_endpoint(self) -> str:
        """Azure blob storage endpoint URL.

        Constructed programmatically from the account name and endpoint subdomain.

        Returns:
            str: The endpoint URL.

        Example:
            >>> handler = CredentialHandler()
            >>> handler.azure_blob_storage_account = "mystorageaccount"
            >>> handler.azure_blob_storage_endpoint_subdomain = "blob.core.windows.net"
            >>> handler.azure_blob_storage_endpoint
            'https://mystorageaccount.blob.core.windows.net'
        """
        logger.debug("Constructing Azure Blob account endpoint URL.")
        self.require_attr(
            [
                "azure_blob_storage_account",
                "azure_blob_storage_endpoint_subdomain",
            ],
            goal="Azure blob storage endpoint URL",
        )
        logger.debug(
            "All required attributes present for Azure Blob endpoint URL. Constructing..."
        )
        endpoint = construct_blob_account_endpoint(
            self.azure_blob_storage_account,
            self.azure_blob_storage_endpoint_subdomain,
        )
        logger.debug(f"Constructed Azure Blob endpoint URL: {endpoint}")
        return endpoint

    @property
    def azure_container_registry_endpoint(self) -> str:
        """Azure container registry endpoint URL.

        Constructed programmatically from the account name and registry domain.

        Returns:
            str: The endpoint URL.

        Example:
            >>> handler = CredentialHandler()
            >>> handler.azure_container_registry_account = "myregistry"
            >>> handler.azure_container_registry_domain = "azurecr.io"
            >>> handler.azure_container_registry_endpoint
            'myregistry.azurecr.io'
        """
        logger.debug("Constructing Azure Container Registry endpoint URL.")
        self.require_attr(
            [
                "azure_container_registry_account",
                "azure_container_registry_domain",
            ],
            goal="Azure container registry endpoint URL",
        )
        logger.debug(
            "All required attributes present for Azure Container Registry endpoint URL. Constructing..."
        )
        registry_endpoint = construct_azure_container_registry_endpoint(
            self.azure_container_registry_account,
            self.azure_container_registry_domain,
        )
        logger.debug(
            f"Constructed Azure Container Registry endpoint URL: {registry_endpoint}"
        )
        return registry_endpoint

    @cached_property
    def user_credential(self) -> ManagedIdentityCredential:
        """Azure user credential.

        Returns:
            ManagedIdentityCredential: The Azure user credential using ManagedIdentityCredential.

        Example:
            >>> handler = CredentialHandler()
            >>> credential = handler.user_credential
            >>> # Use credential with Azure SDK clients
        """
        logger.debug("Creating ManagedIdentityCredential for user.")
        return ManagedIdentityCredential()

    @cached_property
    def service_principal_secret(self):
        """A service principal secret retrieved from Azure Key Vault.

        Returns:
            str: The secret value.

        Example:
            >>> handler = CredentialHandler()
            >>> handler.azure_keyvault_endpoint = "https://myvault.vault.azure.net/"
            >>> handler.azure_keyvault_sp_secret_id = "my-secret"
            >>> secret = handler.service_principal_secret
        """
        logger.debug("Retrieving service principal secret from Azure Key Vault.")
        self.require_attr(
            ["azure_keyvault_endpoint", "azure_keyvault_sp_secret_id"],
            goal="service_principal_secret",
        )
        if self.method == "default":
            logger.debug(
                "Using default credential method for service principal secret."
            )
            cred = self.default_credential
        elif self.method == "sp":
            logger.debug(
                "Using service principal credential method for service principal secret."
            )
            return self.azure_client_secret
        else:
            logger.debug("Using user credential method for service principal secret.")
            cred = self.user_credential
        logger.debug(
            "All required attributes present for service principal secret. Retrieving..."
        )
        secret = get_sp_secret(
            self.azure_keyvault_endpoint,
            self.azure_keyvault_sp_secret_id,
            cred,
        )
        logger.debug("Retrieved service principal secret from Azure Key Vault.")
        logger.info(
            f"Retrieved secret '{self.azure_keyvault_sp_secret_id}' from Azure Key Vault."
        )
        return secret

    @cached_property
    def default_credential(self):
        logger.debug("Creating DefaultCredential.")
        return DefaultCredential()

    @cached_property
    def batch_service_principal_credentials(self):
        """Service Principal credentials for authenticating to Azure Batch.

        Returns:
            ServicePrincipalCredentials: The credentials configured for Azure Batch access.

        Example:
            >>> handler = CredentialHandler()
            >>> # Set required attributes...
            >>> credentials = handler.batch_service_principal_credentials
            >>> # Use with Azure Batch client
        """
        logger.debug("Creating ServicePrincipalCredentials for Azure Batch.")
        self.require_attr(
            [
                "azure_tenant_id",
                "azure_client_id",
                "azure_batch_resource_url",
            ],
            goal="batch_service_principal_credentials",
        )
        logger.debug(
            "All required attributes present for Azure Batch Service Principal credentials. Creating..."
        )
        spcred = ServicePrincipalCredentials(
            client_id=self.azure_client_id,
            tenant=self.azure_tenant_id,
            secret=self.service_principal_secret,
            resource=self.azure_batch_resource_url,
        )
        logger.debug("Created ServicePrincipalCredentials for Azure Batch.")
        return spcred

    @cached_property
    def client_secret_sp_credential(self):
        """A client secret credential created using the service principal secret.

        Returns:
            ClientSecretCredential: The credential configured with service principal details.

        Example:
            >>> handler = CredentialHandler()
            >>> # Set required attributes...
            >>> credential = handler.client_secret_sp_credential
            >>> # Use with Azure SDK clients
        """
        logger.debug("Creating ClientSecretCredential using service principal secret.")
        self.require_attr(["azure_tenant_id", "azure_client_id"])
        logger.debug(
            "All required attributes present for ClientSecretCredential. Creating..."
        )
        cscred = ClientSecretCredential(
            tenant_id=self.azure_tenant_id,
            client_secret=self.service_principal_secret,
            client_id=self.azure_client_id,
        )
        logger.debug("Created ClientSecretCredential using service principal secret.")
        return cscred

    @cached_property
    def client_secret_credential(self):
        """A client secret credential created using the azure_client_secret attribute.

        Returns:
            ClientSecretCredential: The credential configured with client secret details.

        Example:
            >>> handler = CredentialHandler()
            >>> handler.azure_tenant_id = "tenant-id"
            >>> handler.azure_client_id = "client-id"
            >>> handler.azure_client_secret = "client-secret" #pragma: allowlist secret
            >>> credential = handler.client_secret_credential
        """
        logger.debug("Creating ClientSecretCredential using azure_client_secret.")
        self.require_attr(
            [
                "azure_tenant_id",
                "azure_client_id",
                "azure_client_secret",
            ]
        )
        logger.debug(
            "All required attributes present for ClientSecretCredential. Creating..."
        )
        client_sec_cred = ClientSecretCredential(
            tenant_id=self.azure_tenant_id,
            client_secret=self.azure_client_secret,
            client_id=self.azure_client_id,
        )
        logger.debug("Created ClientSecretCredential using azure_client_secret.")
        return client_sec_cred

    @cached_property
    def compute_node_identity_reference(self):
        """An object defining a compute node identity reference.

        Specifically, a ComputeNodeIdentityReference object associated to the
        CredentialHandler's user-assigned identity.

        Returns:
            models.ComputeNodeIdentityReference: The identity reference.

        Example:
            >>> handler = CredentialHandler()
            >>> handler.azure_user_assigned_identity = "/subscriptions/.../resourceGroups/..."
            >>> identity_ref = handler.compute_node_identity_reference
        """
        logger.debug("Creating ComputeNodeIdentityReference.")
        self.require_attr(
            ["azure_user_assigned_identity"],
            goal="Compute node identity reference",
        )
        logger.debug(
            "All required attributes present for ComputeNodeIdentityReference. Creating..."
        )
        comp_id_ref = models.ComputeNodeIdentityReference(
            resource_id=self.azure_user_assigned_identity
        )
        logger.debug("Created ComputeNodeIdentityReference.")
        return comp_id_ref

    @cached_property
    def azure_container_registry(self):
        """An object pointing to an Azure Container Registry.

        Specifically, a ContainerRegistry instance corresponding to the particular
        Azure Container Registry account specified in the CredentialHandler, if any,
        with authentication via the compute_node_identity_reference defined by
        CredentialHandler, if any.

        Returns:
            models.ContainerRegistry: A properly instantiated ContainerRegistry object.

        Raises:
            ValueError: If the container registry endpoint is invalid.

        Example:
            >>> handler = CredentialHandler()
            >>> # Set required attributes...
            >>> registry = handler.azure_container_registry
        """
        logger.debug("Creating Azure Container Registry ContainerRegistry instance.")
        self.require_attr(
            [
                "azure_container_registry_account",
                "azure_container_registry_domain",
                "azure_user_assigned_identity",
            ],
            goal=("Azure Container Registry `ContainerRegistry` instance"),
        )
        logger.debug(
            "All required attributes present for Azure Container Registry. Validating endpoint..."
        )
        valid, msg = is_valid_acr_endpoint(self.azure_container_registry_endpoint)
        if not valid:
            logger.error(f"Invalid Azure Container Registry endpoint: {msg}")
            raise ValueError(msg)
        logger.debug(
            "Azure Container Registry endpoint is valid. Creating ContainerRegistry instance..."
        )
        cont_reg = models.ContainerRegistry(
            user_name=self.azure_container_registry_account,
            registry_server=self.azure_container_registry_endpoint,
            identity_reference=self.compute_node_identity_reference,
        )
        logger.debug("Created Azure Container Registry ContainerRegistry instance.")
        return cont_reg

azure_batch_endpoint property

Azure batch endpoint URL.

Constructed programmatically from account name, location, and subdomain.

Returns:

Name Type Description
str str

The endpoint URL.

Example

handler = CredentialHandler() handler.azure_batch_account = "mybatchaccount" handler.azure_batch_location = "eastus" handler.azure_batch_endpoint_subdomain = "batch.azure.com" handler.azure_batch_endpoint 'https://mybatchaccount.eastus.batch.azure.com'

azure_blob_storage_endpoint property

Azure blob storage endpoint URL.

Constructed programmatically from the account name and endpoint subdomain.

Returns:

Name Type Description
str str

The endpoint URL.

Example

handler = CredentialHandler() handler.azure_blob_storage_account = "mystorageaccount" handler.azure_blob_storage_endpoint_subdomain = "blob.core.windows.net" handler.azure_blob_storage_endpoint 'https://mystorageaccount.blob.core.windows.net'

azure_container_registry cached property

An object pointing to an Azure Container Registry.

Specifically, a ContainerRegistry instance corresponding to the particular Azure Container Registry account specified in the CredentialHandler, if any, with authentication via the compute_node_identity_reference defined by CredentialHandler, if any.

Returns:

Type Description

models.ContainerRegistry: A properly instantiated ContainerRegistry object.

Raises:

Type Description
ValueError

If the container registry endpoint is invalid.

Example

handler = CredentialHandler()

Set required attributes...

registry = handler.azure_container_registry

azure_container_registry_endpoint property

Azure container registry endpoint URL.

Constructed programmatically from the account name and registry domain.

Returns:

Name Type Description
str str

The endpoint URL.

Example

handler = CredentialHandler() handler.azure_container_registry_account = "myregistry" handler.azure_container_registry_domain = "azurecr.io" handler.azure_container_registry_endpoint 'myregistry.azurecr.io'

batch_service_principal_credentials cached property

Service Principal credentials for authenticating to Azure Batch.

Returns:

Name Type Description
ServicePrincipalCredentials

The credentials configured for Azure Batch access.

Example

handler = CredentialHandler()

Set required attributes...

credentials = handler.batch_service_principal_credentials

Use with Azure Batch client

client_secret_credential cached property

A client secret credential created using the azure_client_secret attribute.

Returns:

Name Type Description
ClientSecretCredential

The credential configured with client secret details.

Example

handler = CredentialHandler() handler.azure_tenant_id = "tenant-id" handler.azure_client_id = "client-id" handler.azure_client_secret = "client-secret" #pragma: allowlist secret credential = handler.client_secret_credential

client_secret_sp_credential cached property

A client secret credential created using the service principal secret.

Returns:

Name Type Description
ClientSecretCredential

The credential configured with service principal details.

Example

handler = CredentialHandler()

Set required attributes...

credential = handler.client_secret_sp_credential

Use with Azure SDK clients

compute_node_identity_reference cached property

An object defining a compute node identity reference.

Specifically, a ComputeNodeIdentityReference object associated to the CredentialHandler's user-assigned identity.

Returns:

Type Description

models.ComputeNodeIdentityReference: The identity reference.

Example

handler = CredentialHandler() handler.azure_user_assigned_identity = "/subscriptions/.../resourceGroups/..." identity_ref = handler.compute_node_identity_reference

service_principal_secret cached property

A service principal secret retrieved from Azure Key Vault.

Returns:

Name Type Description
str

The secret value.

Example

handler = CredentialHandler() handler.azure_keyvault_endpoint = "https://myvault.vault.azure.net/" handler.azure_keyvault_sp_secret_id = "my-secret" secret = handler.service_principal_secret

user_credential cached property

Azure user credential.

Returns:

Name Type Description
ManagedIdentityCredential ManagedIdentityCredential

The Azure user credential using ManagedIdentityCredential.

Example

handler = CredentialHandler() credential = handler.user_credential

Use credential with Azure SDK clients

require_attr(attributes, goal=None)

Check that attributes required for a given operation are defined.

Raises an informative error message if the required attribute is not defined.

Parameters:

Name Type Description Default
attributes str | list[str]

String or list of strings naming the required attribute(s).

required
goal str

String naming the value that the attributes are required for obtaining, to make error messages more informative. If None, use a more generic message.

None

Raises:

Type Description
AttributeError

If any required attributes are None.

Example

handler = CredentialHandler() handler.require_attr(["azure_tenant_id"], "authentication") AttributeError: A non-None value for attribute azure_tenant_id is required...

Source code in cfa/cloudops/auth.py
def require_attr(self, attributes: str | list[str], goal: str = None):
    """Check that attributes required for a given operation are defined.

    Raises an informative error message if the required attribute is not defined.

    Args:
        attributes: String or list of strings naming the required attribute(s).
        goal: String naming the value that the attributes are required for obtaining,
            to make error messages more informative. If None, use a more generic message.

    Raises:
        AttributeError: If any required ``attributes`` are None.

    Example:
        >>> handler = CredentialHandler()
        >>> handler.require_attr(["azure_tenant_id"], "authentication")
        AttributeError: A non-None value for attribute azure_tenant_id is required...
    """
    attributes = ensure_listlike(attributes)
    err_msgs = []
    for attr in attributes:
        attr_val = getattr(self, attr)
        if attr_val is None:
            err_msg = (f"A non-None value for attribute {attr} is required ") + (
                f"to obtain a value for {goal}."
                if goal is not None
                else "for this operation."
            )
            err_msgs.append(err_msg)
    if err_msgs:
        raise AttributeError("\n".join(err_msgs))

DefaultCredential

Bases: BasicTokenAuthentication

Source code in cfa/cloudops/auth.py
class DefaultCredential(BasicTokenAuthentication):
    def __init__(
        self,
        credential=None,
        resource_id="https://batch.core.windows.net/.default",
        **kwargs,
    ):
        logger.debug("Initializing DefaultCredential.")
        super(DefaultCredential, self).__init__(None)
        if credential is None:
            logger.debug("No credential provided, using DefaultAzureCredential.")
            credential = DefaultAzureCredential()
        self.credential = credential
        self._policy = BearerTokenCredentialPolicy(credential, resource_id, **kwargs)

    def _make_request(self):
        logger.debug("Making fake PipelineRequest to obtain token.")
        return PipelineRequest(
            HttpRequest("CredentialWrapper", "https://batch.core.windows.net"),
            PipelineContext(None),
        )

    def set_token(self):
        """Ask the azure-core BearerTokenCredentialPolicy policy to get a token.
        Using the policy gives us for free the caching system of azure-core.
        """
        logger.debug("Setting token using BearerTokenCredentialPolicy.")
        request = self._make_request()
        self._policy.on_request(request)
        # Read Authorization, and get the second part after Bearer
        token = request.http_request.headers["Authorization"].split(" ", 1)[1]
        self.token = {"access_token": token}
        logger.debug("Set the token.")

    def get_token(self, *scopes, **kwargs):
        # Pass get_token call to credential
        logger.debug("Getting token from underlying credential.")
        return self.credential.get_token(*scopes, **kwargs)

    def signed_session(self, session=None):
        logger.debug("Creating signed session with updated token.")
        self.set_token()
        return super(DefaultCredential, self).signed_session(session)

set_token()

Ask the azure-core BearerTokenCredentialPolicy policy to get a token. Using the policy gives us for free the caching system of azure-core.

Source code in cfa/cloudops/auth.py
def set_token(self):
    """Ask the azure-core BearerTokenCredentialPolicy policy to get a token.
    Using the policy gives us for free the caching system of azure-core.
    """
    logger.debug("Setting token using BearerTokenCredentialPolicy.")
    request = self._make_request()
    self._policy.on_request(request)
    # Read Authorization, and get the second part after Bearer
    token = request.http_request.headers["Authorization"].split(" ", 1)[1]
    self.token = {"access_token": token}
    logger.debug("Set the token.")

EnvCredentialHandler

Bases: CredentialHandler

Azure Credentials populated from available environment variables.

Subclass of CredentialHandler that populates attributes from environment variables at instantiation, with the opportunity to override those values via keyword arguments passed to the constructor.

Parameters:

Name Type Description Default
dotenv_path str

Path to .env file to load environment variables from. If None, uses default .env file discovery.

None
**kwargs

Keyword arguments defining additional attributes or overriding those set in the environment variables. Passed as the config_dict argument to config.get_config_val.

{}
Example

Load from environment variables

handler = EnvCredentialHandler()

Override specific values

handler = EnvCredentialHandler(azure_tenant_id="custom-tenant-id")

Load from custom .env file

handler = EnvCredentialHandler(dotenv_path="/path/to/.env")

Source code in cfa/cloudops/auth.py
class EnvCredentialHandler(CredentialHandler):
    """Azure Credentials populated from available environment variables.

    Subclass of CredentialHandler that populates attributes from environment
    variables at instantiation, with the opportunity to override those values
    via keyword arguments passed to the constructor.

    Args:
        dotenv_path (str, optional): Path to .env file to load environment variables from.
            If None, uses default .env file discovery.
        **kwargs: Keyword arguments defining additional attributes or overriding
            those set in the environment variables. Passed as the ``config_dict``
            argument to ``config.get_config_val``.

    Example:
        >>> # Load from environment variables
        >>> handler = EnvCredentialHandler()

        >>> # Override specific values
        >>> handler = EnvCredentialHandler(azure_tenant_id="custom-tenant-id")

        >>> # Load from custom .env file
        >>> handler = EnvCredentialHandler(dotenv_path="/path/to/.env")
    """

    def __init__(self, dotenv_path: str = None, **kwargs) -> None:
        """Initialize the EnvCredentialHandler.

        Loads environment variables from .env file and populates credential attributes from them.

        Args:
            dotenv_path (str, optional): Path to .env file to load environment variables from.
                If None, uses default .env file discovery.
            **kwargs: Additional keyword arguments to override specific credential attributes.
        """
        logger.debug("Initializing EnvCredentialHandler.")
        load_env_vars(dotenv_path=dotenv_path)
        get_conf = partial(get_config_val, config_dict=kwargs, try_env=True)

        for key in self.__dataclass_fields__.keys():
            self.__setattr__(key, get_conf(key))
        # set method to "env"
        self.__setattr__("method", "env")
        # check for azure batch location
        if self.__getattribute__("azure_batch_location") is None:
            self.__setattr__("azure_batch_location", d.default_azure_batch_location)

__init__(dotenv_path=None, **kwargs)

Initialize the EnvCredentialHandler.

Loads environment variables from .env file and populates credential attributes from them.

Parameters:

Name Type Description Default
dotenv_path str

Path to .env file to load environment variables from. If None, uses default .env file discovery.

None
**kwargs

Additional keyword arguments to override specific credential attributes.

{}
Source code in cfa/cloudops/auth.py
def __init__(self, dotenv_path: str = None, **kwargs) -> None:
    """Initialize the EnvCredentialHandler.

    Loads environment variables from .env file and populates credential attributes from them.

    Args:
        dotenv_path (str, optional): Path to .env file to load environment variables from.
            If None, uses default .env file discovery.
        **kwargs: Additional keyword arguments to override specific credential attributes.
    """
    logger.debug("Initializing EnvCredentialHandler.")
    load_env_vars(dotenv_path=dotenv_path)
    get_conf = partial(get_config_val, config_dict=kwargs, try_env=True)

    for key in self.__dataclass_fields__.keys():
        self.__setattr__(key, get_conf(key))
    # set method to "env"
    self.__setattr__("method", "env")
    # check for azure batch location
    if self.__getattribute__("azure_batch_location") is None:
        self.__setattr__("azure_batch_location", d.default_azure_batch_location)

SPCredentialHandler

Bases: CredentialHandler

Source code in cfa/cloudops/auth.py
class SPCredentialHandler(CredentialHandler):
    def __init__(
        self,
        azure_tenant_id: str = None,
        azure_subscription_id: str = None,
        azure_client_id: str = None,
        azure_client_secret: str = None,
        dotenv_path: str = None,
        **kwargs,
    ):
        """Initialize a Service Principal Credential Handler.

        Creates a credential handler that uses Azure Service Principal authentication
        for accessing Azure resources. Credentials can be provided directly as parameters
        or loaded from environment variables. If not provided directly, the handler will
        attempt to load credentials from environment variables or a .env file.

        Args:
            azure_tenant_id: Azure Active Directory tenant ID. If None, will attempt
                to load from AZURE_TENANT_ID environment variable.
            azure_subscription_id: Azure subscription ID. If None, will attempt
                to load from AZURE_SUBSCRIPTION_ID environment variable.
            azure_client_id: Azure Service Principal client ID (application ID).
                If None, will attempt to load from AZURE_CLIENT_ID environment variable.
            azure_client_secret: Azure Service Principal client secret. If None, will
                attempt to load from AZURE_CLIENT_SECRET environment variable.
            dotenv_path: Path to .env file to load environment variables from.
                If None, uses default .env file discovery.

        Raises:
            ValueError: If AZURE_TENANT_ID is not found in environment variables
                and not provided as parameter.
            ValueError: If AZURE_SUBSCRIPTION_ID is not found in environment variables
                and not provided as parameter.
            ValueError: If AZURE_CLIENT_ID is not found in environment variables
                and not provided as parameter.
            ValueError: If AZURE_CLIENT_SECRET is not found in environment variables
                and not provided as parameter.

        Example:
            >>> # Using direct parameters
            >>> handler = SPCredentialHandler(
            ...     azure_tenant_id="12345678-1234-1234-1234-123456789012",
            ...     azure_subscription_id="87654321-4321-4321-4321-210987654321",
            ...     azure_client_id="abcdef12-3456-7890-abcd-ef1234567890",
            ...     azure_client_secret="your-secret-here" #pragma: allowlist secret
            ... )

            >>> # Using environment variables
            >>> handler = SPCredentialHandler()  # Loads from env vars

            >>> # Using custom .env file
            >>> handler = SPCredentialHandler(dotenv_path="/path/to/.env")
        """
        logger.debug("Initializing SPCredentialHandler.")
        # load env vars, including client secret if available
        load_dotenv(dotenv_path=dotenv_path, override=True)

        mandatory_environment_variables = [
            "AZURE_TENANT_ID",
            "AZURE_SUBSCRIPTION_ID",
            "AZURE_CLIENT_ID",
            "AZURE_CLIENT_SECRET",
        ]
        for mandatory in mandatory_environment_variables:
            if mandatory not in os.environ:
                logger.warning(f"Environment variable {mandatory} was not provided")

        # check if tenant_id, client_id, subscription_id, and client_secret_id exist, else find in os env vars
        logger.debug(
            "Setting azure_tenant_id, azure_subscription_id, azure_client_id, and azure_client_secret."
        )
        self.azure_tenant_id = (
            azure_tenant_id
            if azure_tenant_id is not None
            else os.getenv("AZURE_TENANT_ID", None)
        )
        self.azure_subscription_id = (
            azure_subscription_id
            if azure_subscription_id is not None
            else os.getenv("AZURE_SUBSCRIPTION_ID", None)
        )
        self.azure_client_id = (
            azure_client_id
            if azure_client_id is not None
            else os.getenv("AZURE_CLIENT_ID", None)
        )
        self.azure_client_secret = (
            azure_client_secret
            if azure_client_secret is not None
            else os.getenv("AZURE_CLIENT_SECRET", None)
        )

        self.require_attr(
            [x.lower() for x in mandatory_environment_variables],
            goal="service principal credentials",
        )

        d.set_env_vars()

        get_conf = partial(get_config_val, config_dict=kwargs, try_env=True)

        for key in self.__dataclass_fields__.keys():
            self.__setattr__(key, get_conf(key))
        # set method to "sp"
        self.__setattr__("method", "sp")
        # check for azure batch location
        if self.__getattribute__("azure_batch_location") is None:
            self.__setattr__("azure_batch_location", d.default_azure_batch_location)

__init__(azure_tenant_id=None, azure_subscription_id=None, azure_client_id=None, azure_client_secret=None, dotenv_path=None, **kwargs)

Initialize a Service Principal Credential Handler.

Creates a credential handler that uses Azure Service Principal authentication for accessing Azure resources. Credentials can be provided directly as parameters or loaded from environment variables. If not provided directly, the handler will attempt to load credentials from environment variables or a .env file.

Parameters:

Name Type Description Default
azure_tenant_id str

Azure Active Directory tenant ID. If None, will attempt to load from AZURE_TENANT_ID environment variable.

None
azure_subscription_id str

Azure subscription ID. If None, will attempt to load from AZURE_SUBSCRIPTION_ID environment variable.

None
azure_client_id str

Azure Service Principal client ID (application ID). If None, will attempt to load from AZURE_CLIENT_ID environment variable.

None
azure_client_secret str

Azure Service Principal client secret. If None, will attempt to load from AZURE_CLIENT_SECRET environment variable.

None
dotenv_path str

Path to .env file to load environment variables from. If None, uses default .env file discovery.

None

Raises:

Type Description
ValueError

If AZURE_TENANT_ID is not found in environment variables and not provided as parameter.

ValueError

If AZURE_SUBSCRIPTION_ID is not found in environment variables and not provided as parameter.

ValueError

If AZURE_CLIENT_ID is not found in environment variables and not provided as parameter.

ValueError

If AZURE_CLIENT_SECRET is not found in environment variables and not provided as parameter.

Example
Using direct parameters

handler = SPCredentialHandler( ... azure_tenant_id="12345678-1234-1234-1234-123456789012", ... azure_subscription_id="87654321-4321-4321-4321-210987654321", ... azure_client_id="abcdef12-3456-7890-abcd-ef1234567890", ... azure_client_secret="your-secret-here" #pragma: allowlist secret ... )

Using environment variables

handler = SPCredentialHandler() # Loads from env vars

Using custom .env file

handler = SPCredentialHandler(dotenv_path="/path/to/.env")

Source code in cfa/cloudops/auth.py
def __init__(
    self,
    azure_tenant_id: str = None,
    azure_subscription_id: str = None,
    azure_client_id: str = None,
    azure_client_secret: str = None,
    dotenv_path: str = None,
    **kwargs,
):
    """Initialize a Service Principal Credential Handler.

    Creates a credential handler that uses Azure Service Principal authentication
    for accessing Azure resources. Credentials can be provided directly as parameters
    or loaded from environment variables. If not provided directly, the handler will
    attempt to load credentials from environment variables or a .env file.

    Args:
        azure_tenant_id: Azure Active Directory tenant ID. If None, will attempt
            to load from AZURE_TENANT_ID environment variable.
        azure_subscription_id: Azure subscription ID. If None, will attempt
            to load from AZURE_SUBSCRIPTION_ID environment variable.
        azure_client_id: Azure Service Principal client ID (application ID).
            If None, will attempt to load from AZURE_CLIENT_ID environment variable.
        azure_client_secret: Azure Service Principal client secret. If None, will
            attempt to load from AZURE_CLIENT_SECRET environment variable.
        dotenv_path: Path to .env file to load environment variables from.
            If None, uses default .env file discovery.

    Raises:
        ValueError: If AZURE_TENANT_ID is not found in environment variables
            and not provided as parameter.
        ValueError: If AZURE_SUBSCRIPTION_ID is not found in environment variables
            and not provided as parameter.
        ValueError: If AZURE_CLIENT_ID is not found in environment variables
            and not provided as parameter.
        ValueError: If AZURE_CLIENT_SECRET is not found in environment variables
            and not provided as parameter.

    Example:
        >>> # Using direct parameters
        >>> handler = SPCredentialHandler(
        ...     azure_tenant_id="12345678-1234-1234-1234-123456789012",
        ...     azure_subscription_id="87654321-4321-4321-4321-210987654321",
        ...     azure_client_id="abcdef12-3456-7890-abcd-ef1234567890",
        ...     azure_client_secret="your-secret-here" #pragma: allowlist secret
        ... )

        >>> # Using environment variables
        >>> handler = SPCredentialHandler()  # Loads from env vars

        >>> # Using custom .env file
        >>> handler = SPCredentialHandler(dotenv_path="/path/to/.env")
    """
    logger.debug("Initializing SPCredentialHandler.")
    # load env vars, including client secret if available
    load_dotenv(dotenv_path=dotenv_path, override=True)

    mandatory_environment_variables = [
        "AZURE_TENANT_ID",
        "AZURE_SUBSCRIPTION_ID",
        "AZURE_CLIENT_ID",
        "AZURE_CLIENT_SECRET",
    ]
    for mandatory in mandatory_environment_variables:
        if mandatory not in os.environ:
            logger.warning(f"Environment variable {mandatory} was not provided")

    # check if tenant_id, client_id, subscription_id, and client_secret_id exist, else find in os env vars
    logger.debug(
        "Setting azure_tenant_id, azure_subscription_id, azure_client_id, and azure_client_secret."
    )
    self.azure_tenant_id = (
        azure_tenant_id
        if azure_tenant_id is not None
        else os.getenv("AZURE_TENANT_ID", None)
    )
    self.azure_subscription_id = (
        azure_subscription_id
        if azure_subscription_id is not None
        else os.getenv("AZURE_SUBSCRIPTION_ID", None)
    )
    self.azure_client_id = (
        azure_client_id
        if azure_client_id is not None
        else os.getenv("AZURE_CLIENT_ID", None)
    )
    self.azure_client_secret = (
        azure_client_secret
        if azure_client_secret is not None
        else os.getenv("AZURE_CLIENT_SECRET", None)
    )

    self.require_attr(
        [x.lower() for x in mandatory_environment_variables],
        goal="service principal credentials",
    )

    d.set_env_vars()

    get_conf = partial(get_config_val, config_dict=kwargs, try_env=True)

    for key in self.__dataclass_fields__.keys():
        self.__setattr__(key, get_conf(key))
    # set method to "sp"
    self.__setattr__("method", "sp")
    # check for azure batch location
    if self.__getattribute__("azure_batch_location") is None:
        self.__setattr__("azure_batch_location", d.default_azure_batch_location)

get_client_secret_sp_credential(vault_url, vault_sp_secret_id, tenant_id, application_id, user_credential=None)

Get a ClientSecretCredential for a given Azure service principal.

Parameters:

Name Type Description Default
vault_url str

URL for the Azure keyvault to access.

required
vault_sp_secret_id str

Service principal secret ID within the keyvault.

required
tenant_id str

Tenant ID for the service principal credential.

required
application_id str

Application ID for the service principal credential.

required
user_credential

User credential for the Azure user, as an azure-identity credential class instance. Passed to get_sp_secret. If None, get_sp_secret will use a ManagedIdentityCredential instantiated at runtime. See its documentation for more.

None

Returns:

Name Type Description
ClientSecretCredential ClientSecretCredential

A ClientSecretCredential for the given service principal.

Example

credential = get_client_secret_sp_credential( ... "https://myvault.vault.azure.net/", ... "my-secret-id", ... "tenant-id", ... "application-id" ... )

Source code in cfa/cloudops/auth.py
def get_client_secret_sp_credential(
    vault_url: str,
    vault_sp_secret_id: str,
    tenant_id: str,
    application_id: str,
    user_credential=None,
) -> ClientSecretCredential:
    """Get a ClientSecretCredential for a given Azure service principal.

    Args:
        vault_url: URL for the Azure keyvault to access.
        vault_sp_secret_id: Service principal secret ID within the keyvault.
        tenant_id: Tenant ID for the service principal credential.
        application_id: Application ID for the service principal credential.
        user_credential: User credential for the Azure user, as an azure-identity
            credential class instance. Passed to ``get_sp_secret``. If None,
            ``get_sp_secret`` will use a ManagedIdentityCredential instantiated
            at runtime. See its documentation for more.

    Returns:
        ClientSecretCredential: A ClientSecretCredential for the given service principal.

    Example:
        >>> credential = get_client_secret_sp_credential(
        ...     "https://myvault.vault.azure.net/",
        ...     "my-secret-id",
        ...     "tenant-id",
        ...     "application-id"
        ... )
    """
    logger.debug("Getting SP secret for service principal.")
    sp_secret = get_sp_secret(
        vault_url, vault_sp_secret_id, user_credential=user_credential
    )
    logger.debug("Creating ClientSecretCredential for service principal using secret.")
    sp_credential = ClientSecretCredential(
        tenant_id=tenant_id,
        client_id=application_id,
        client_secret=sp_secret,
    )
    logger.debug("Created ClientSecretCredential for service principal.")
    return sp_credential

get_compute_node_identity_reference(credential_handler=None)

Get a valid ComputeNodeIdentityReference using credentials from a CredentialHandler.

Uses credentials obtained via a CredentialHandler: either a user-provided one or a default based on environment variables.

Parameters:

Name Type Description Default
credential_handler CredentialHandler

Credential handler for connecting and authenticating to Azure resources. If None, create a blank EnvCredentialHandler, which attempts to obtain needed credentials using information available in local environment variables (see its documentation for details).

None

Returns:

Type Description
ComputeNodeIdentityReference

models.ComputeNodeIdentityReference: A ComputeNodeIdentityReference created according to the specified configuration.

Example

Using default environment-based handler

identity_ref = get_compute_node_identity_reference()

Using custom handler

handler = CredentialHandler() identity_ref = get_compute_node_identity_reference(handler)

Source code in cfa/cloudops/auth.py
def get_compute_node_identity_reference(
    credential_handler: CredentialHandler = None,
) -> models.ComputeNodeIdentityReference:
    """Get a valid ComputeNodeIdentityReference using credentials from a CredentialHandler.

    Uses credentials obtained via a CredentialHandler: either a user-provided one
    or a default based on environment variables.

    Args:
        credential_handler: Credential handler for connecting and authenticating to
            Azure resources. If None, create a blank EnvCredentialHandler, which
            attempts to obtain needed credentials using information available in
            local environment variables (see its documentation for details).

    Returns:
        models.ComputeNodeIdentityReference: A ComputeNodeIdentityReference created
            according to the specified configuration.

    Example:
        >>> # Using default environment-based handler
        >>> identity_ref = get_compute_node_identity_reference()

        >>> # Using custom handler
        >>> handler = CredentialHandler()
        >>> identity_ref = get_compute_node_identity_reference(handler)
    """
    logger.debug("Getting ComputeNodeIdentityReference from CredentialHandler.")
    ch = credential_handler
    if ch is None:
        logger.debug("No CredentialHandler provided, using EnvCredentialHandler.")
        ch = EnvCredentialHandler()
    logger.debug("Retrieving compute_node_identity_reference from CredentialHandler.")
    return ch.compute_node_identity_reference

get_service_principal_credentials(vault_url, vault_sp_secret_id, tenant_id, application_id, resource_url=d.default_azure_batch_resource_url, user_credential=None)

Get a ServicePrincipalCredentials object for a given Azure service principal.

Parameters:

Name Type Description Default
vault_url str

URL for the Azure keyvault to access.

required
vault_sp_secret_id str

Service principal secret ID within the keyvault.

required
tenant_id str

Tenant ID for the service principal credential.

required
application_id str

Application ID for the service principal credential.

required
resource_url str

URL of the Azure resource. Defaults to the value of defaults.default_azure_batch_resource_url.

default_azure_batch_resource_url
user_credential

User credential for the Azure user, as an azure-identity credential class instance. Passed to get_sp_secret. If None, get_sp_secret will use a ManagedIdentityCredential instantiated at runtime. See the get_sp_secret documentation for details.

None

Returns:

Name Type Description
ServicePrincipalCredentials ServicePrincipalCredentials

A ServicePrincipalCredentials object for the service principal.

Example

credentials = get_service_principal_credentials( ... "https://myvault.vault.azure.net/", ... "my-secret-id", ... "tenant-id", ... "application-id" ... )

Source code in cfa/cloudops/auth.py
def get_service_principal_credentials(
    vault_url: str,
    vault_sp_secret_id: str,
    tenant_id: str,
    application_id: str,
    resource_url: str = d.default_azure_batch_resource_url,
    user_credential=None,
) -> ServicePrincipalCredentials:
    """Get a ServicePrincipalCredentials object for a given Azure service principal.

    Args:
        vault_url: URL for the Azure keyvault to access.
        vault_sp_secret_id: Service principal secret ID within the keyvault.
        tenant_id: Tenant ID for the service principal credential.
        application_id: Application ID for the service principal credential.
        resource_url: URL of the Azure resource. Defaults to the value of
            ``defaults.default_azure_batch_resource_url``.
        user_credential: User credential for the Azure user, as an azure-identity
            credential class instance. Passed to ``get_sp_secret``. If None,
            ``get_sp_secret`` will use a ManagedIdentityCredential instantiated
            at runtime. See the ``get_sp_secret`` documentation for details.

    Returns:
        ServicePrincipalCredentials: A ServicePrincipalCredentials object for the
            service principal.

    Example:
        >>> credentials = get_service_principal_credentials(
        ...     "https://myvault.vault.azure.net/",
        ...     "my-secret-id",
        ...     "tenant-id",
        ...     "application-id"
        ... )
    """
    logger.debug("Getting SP secret for service principal.")
    sp_secret = get_sp_secret(
        vault_url, vault_sp_secret_id, user_credential=user_credential
    )
    logger.debug(
        "Creating ServicePrincipalCredentials for service principal using secret."
    )
    sp_credential = ServicePrincipalCredentials(
        tenant=tenant_id,
        client_id=application_id,
        secret=sp_secret,
        resource=resource_url,
    )
    logger.debug("Created ServicePrincipalCredentials for service principal.")

    return sp_credential

get_sp_secret(vault_url, vault_sp_secret_id, user_credential=None)

Get a service principal secret from an Azure keyvault.

Parameters:

Name Type Description Default
vault_url str

URL for the Azure keyvault to access.

required
vault_sp_secret_id str

Service principal secret ID within the keyvault.

required
user_credential

User credential for the Azure user, as an azure-identity credential class instance. If None, will use a ManagedIdentityCredential instantiated at runtime.

None

Returns:

Name Type Description
str str

The retrieved value of the service principal secret.

Example

secret = get_sp_secret( ... "https://myvault.vault.azure.net/", ... "my-secret-id" ... )

Source code in cfa/cloudops/auth.py
def get_sp_secret(
    vault_url: str,
    vault_sp_secret_id: str,
    user_credential=None,
) -> str:
    """Get a service principal secret from an Azure keyvault.

    Args:
        vault_url: URL for the Azure keyvault to access.
        vault_sp_secret_id: Service principal secret ID within the keyvault.
        user_credential: User credential for the Azure user, as an azure-identity
            credential class instance. If None, will use a ManagedIdentityCredential
            instantiated at runtime.

    Returns:
        str: The retrieved value of the service principal secret.

    Example:
        >>> secret = get_sp_secret(
        ...     "https://myvault.vault.azure.net/",
        ...     "my-secret-id"
        ... )
    """
    if user_credential is None:
        logger.debug("No user_credential provided, using ManagedIdentityCredential.")
        user_credential = ManagedIdentityCredential()

    secret_client = SecretClient(vault_url=vault_url, credential=user_credential)
    sp_secret = secret_client.get_secret(vault_sp_secret_id).value
    logger.debug("Retrieved service principal secret from Azure Key Vault.")

    return sp_secret

load_env_vars(dotenv_path=None)

Load environment variables and Azure subscription information.

Loads variables from a .env file (if specified), retrieves Azure subscription information using ManagedIdentityCredential, and sets default environment variables.

Parameters:

Name Type Description Default
dotenv_path

Path to .env file to load. If None, uses default .env file discovery.

None
Example

load_env_vars() # Load from default .env load_env_vars("/path/to/.env") # Load from specific file

Source code in cfa/cloudops/auth.py
def load_env_vars(dotenv_path=None):
    """Load environment variables and Azure subscription information.

    Loads variables from a .env file (if specified), retrieves Azure subscription
    information using ManagedIdentityCredential, and sets default environment variables.

    Args:
        dotenv_path: Path to .env file to load. If None, uses default .env file discovery.

    Example:
        >>> load_env_vars()  # Load from default .env
        >>> load_env_vars("/path/to/.env")  # Load from specific file
    """
    logger.debug("Loading environment variables.")
    load_dotenv(dotenv_path=dotenv_path, override=True)
    # get ManagedIdentityCredential to pull SubscriptionClient
    mid_cred = ManagedIdentityCredential()
    sub_c = SubscriptionClient(mid_cred)
    # pull in account info and save to environment vars
    account_info = list(sub_c.subscriptions.list())[0]
    os.environ["AZURE_SUBSCRIPTION_ID"] = account_info.subscription_id
    os.environ["AZURE_TENANT_ID"] = account_info.tenant_id
    os.environ["AZURE_RESOURCE_GROUP_NAME"] = account_info.display_name
    # save default values
    d.set_env_vars()

cfa.cloudops.automation

run_experiment(exp_config, dotenv_path=None, **kwargs)

Run jobs and tasks automatically based on the provided experiment config.

Parameters:

Name Type Description Default
exp_config str

path to experiment config file (toml)

required
Source code in cfa/cloudops/automation.py
def run_experiment(exp_config: str, dotenv_path: str | None = None, **kwargs):
    """Run jobs and tasks automatically based on the provided experiment config.

    Args:
        exp_config (str): path to experiment config file (toml)
    """
    logger.info(f"Starting experiment execution with config: {exp_config}")
    logger.debug(f"Using dotenv_path: {dotenv_path}")

    # read files
    logger.debug(f"Loading experiment configuration from: {exp_config}")
    exp_toml = toml.load(exp_config)
    logger.debug(
        f"Successfully loaded experiment config with sections: {list(exp_toml.keys())}"
    )

    try:
        logger.debug("Creating CloudClient instance")
        client = CloudClient(dotenv_path=dotenv_path)
        logger.debug("CloudClient created successfully")
    except Exception as e:
        logger.error(f"Failed to create CloudClient object: {e}")
        print("could not create CloudClient object.")
        return None

    # check pool included in exp_toml and exists in azure
    logger.debug("Validating pool configuration")
    if "pool_name" in exp_toml["job"].keys():
        pool_name = exp_toml["job"]["pool_name"]
        logger.debug(f"Checking if pool '{pool_name}' exists in Azure")
        if not batch_helpers.check_pool_exists(
            resource_group_name=client.cred.azure_resource_group_name,
            account_name=client.cred.azure_batch_account,
            pool_name=pool_name,
            batch_mgmt_client=client.batch_mgmt_client,
        ):
            logger.error(f"Pool '{pool_name}' does not exist in the Azure environment")
            print(
                f"pool name {exp_toml['job']['pool_name']} does not exist in the Azure environment."
            )
            return None
        logger.debug(f"Pool '{pool_name}' validated successfully")
    else:
        logger.error(
            "Missing required 'pool_name' key in job section of experiment config"
        )
        print("could not find 'pool_name' key in 'setup' section of exp toml.")
        print("please specify a pool name to use.")
        return None

    # upload files if the section exists
    if "upload" in exp_toml.keys():
        logger.debug("Processing upload section from experiment config")
        container_name = exp_toml["upload"]["container_name"]
        logger.debug(f"Target container: {container_name}")
        if "location_in_blob" in exp_toml["upload"].keys():
            location_in_blob = exp_toml["upload"]["location_in_blob"]
        else:
            location_in_blob = ""
        logger.debug(f"Upload location in blob: '{location_in_blob}'")

        if "folders" in exp_toml["upload"].keys():
            folders = exp_toml["upload"]["folders"]
            logger.debug(f"Uploading folders: {folders}")
            client.upload_folders(
                folder_names=folders,
                location_in_blob=location_in_blob,
                container_name=container_name,
            )
            logger.info(f"Uploaded folders: {folders} to container: {container_name}")
            logger.debug("Folder upload completed")
        if "files" in exp_toml["upload"].keys():
            files = exp_toml["upload"]["files"]
            logger.debug(f"Uploading files: {files}")
            client.upload_files(
                files=files,
                location_in_blob=location_in_blob,
                container_name=container_name,
            )
            logger.info(f"Uploaded files: {files} to container: {container_name}")
            logger.debug("File upload completed")
    else:
        logger.debug("No upload section found in experiment config")

    # create the job
    job_name = exp_toml["job"]["job_name"]
    logger.debug(f"Creating job: {job_name}")

    if "save_logs_to_blob" in exp_toml["job"].keys():
        save_logs_to_blob = exp_toml["job"]["save_logs_to_blob"]
    else:
        save_logs_to_blob = None
    if "logs_folder" in exp_toml["job"].keys():
        logs_folder = exp_toml["job"]["logs_folder"]
    else:
        logs_folder = None
    if "task_retries" in exp_toml["job"].keys():
        task_retries = exp_toml["job"]["task_retries"]
    else:
        task_retries = 0

    logger.debug(
        f"Job config - save_logs_to_blob: {save_logs_to_blob}, logs_folder: {logs_folder}, task_retries: {task_retries}"
    )

    client.create_job(
        job_name=job_name,
        pool_name=pool_name,
        save_logs_to_blob=save_logs_to_blob,
        logs_folder=logs_folder,
        task_retries=task_retries,
    )
    logger.info(f"Job '{job_name}' created successfully.")

    # create the tasks for the experiment
    logger.debug("Creating tasks for experiment")
    # get the container to use if necessary
    if "container" in exp_toml["job"].keys():
        container = exp_toml["job"]["container"]
        logger.debug(f"Using container: {container}")
    else:
        container = None
        logger.debug("No container specified for tasks")

    # submit the experiment tasks
    ex = exp_toml["experiment"]
    logger.debug(f"Processing experiment section with keys: {list(ex.keys())}")
    if "exp_yaml" in ex.keys():
        logger.debug(
            f"Adding tasks from YAML file: {ex['exp_yaml']} with base command: {ex['base_cmd']}"
        )
        client.add_tasks_from_yaml(
            job_name=job_name,
            base_cmd=ex["base_cmd"],
            file_path=ex["exp_yaml"],
        )
        logger.info("Tasks added from YAML successfully.")
        logger.debug("Tasks added from YAML successfully")
    else:
        logger.debug("Processing experiment tasks with parameter combinations")
        var_list = [key for key in ex.keys() if key != "base_cmd"]
        logger.debug(f"Variable list for combinations: {var_list}")
        var_values = []
        for var in var_list:
            var_values.append(ex[var])
        logger.debug(f"Variable values: {var_values}")
        v_v = list(itertools.product(*var_values))
        logger.debug(f"Generated {len(v_v)} parameter combinations")

        for i, params in enumerate(v_v):
            j = {}
            for idx, value in enumerate(params):
                j.update({var_list[idx]: value})
            command_line = ex["base_cmd"].format(**j)
            logger.debug(f"Adding task {i + 1}/{len(v_v)} with command: {command_line}")
            client.add_task(
                job_name=job_name,
                command_line=command_line,
                container_image_name=container,
            )
        logger.info(f"Successfully added {len(v_v)} experiment tasks")
        logger.debug(f"Successfully added {len(v_v)} experiment tasks")

    if "monitor_job" in exp_toml["job"].keys():
        if exp_toml["job"]["monitor_job"] is True:
            logger.debug(f"Starting job monitoring for: {job_name}")
            client.monitor_job(job_name)
            logger.debug(f"Completed monitoring job: {job_name}")
        else:
            logger.debug("Job monitoring disabled in configuration")
    else:
        logger.debug("No monitor_job setting found in configuration")

    logger.debug(f"Experiment execution completed for job: {job_name}")

run_tasks(task_config, dotenv_path=None, **kwargs)

Run jobs and tasks automatically based on the provided task config. Args: task_config (str): path to task config file (toml)

Source code in cfa/cloudops/automation.py
def run_tasks(task_config: str, dotenv_path: str | None = None, **kwargs) -> None:
    """Run jobs and tasks automatically based on the provided task config.
    Args:
        task_config (str): path to task config file (toml)
    """
    logger.debug(f"Starting task execution with config: {task_config}")
    logger.debug(f"Using dotenv_path: {dotenv_path}")

    # read files
    logger.debug(f"Loading task configuration from: {task_config}")
    task_toml = toml.load(task_config)
    logger.debug(
        f"Successfully loaded task config with sections: {list(task_toml.keys())}"
    )

    try:
        logger.debug("Creating CloudClient instance")
        client = CloudClient(dotenv_path=dotenv_path)
        logger.debug("CloudClient created successfully")
    except Exception as e:
        logger.error(f"Failed to create CloudClient object: {e}")
        print("could not create AzureClient object.")
        return None

    # check pool included in task_toml and exists in azure
    logger.debug("Validating pool configuration")
    if "pool_name" in task_toml["job"].keys():
        pool_name = task_toml["job"]["pool_name"]
        logger.debug(f"Checking if pool '{pool_name}' exists in Azure")
        if not batch_helpers.check_pool_exists(
            resource_group_name=client.cred.azure_resource_group_name,
            account_name=client.cred.azure_batch_account,
            pool_name=pool_name,
            batch_mgmt_client=client.batch_mgmt_client,
        ):
            logger.error(f"Pool '{pool_name}' does not exist in the Azure environment")
            print(
                f"pool name {task_toml['job']['pool_name']} does not exist in the Azure environment."
            )
            return None
        logger.debug(f"Pool '{pool_name}' validated successfully")
    else:
        logger.error("Missing required 'pool_name' key in job section of task config")
        print("could not find 'pool_name' key in 'setup' section of task config toml.")
        print("please specify a pool name to use.")
        return None

    # upload files if the section exists
    if "upload" in task_toml.keys():
        logger.debug("Processing upload section from task config")
        container_name = task_toml["upload"]["container_name"]
        logger.debug(f"Target container: {container_name}")
        if "location_in_blob" in task_toml["upload"].keys():
            location_in_blob = task_toml["upload"]["location_in_blob"]
        else:
            location_in_blob = ""
        logger.debug(f"Upload location in blob: '{location_in_blob}'")

        if "folders" in task_toml["upload"].keys():
            folders = task_toml["upload"]["folders"]
            logger.debug(f"Uploading folders: {folders}")
            client.upload_folders(
                folder_names=folders,
                location_in_blob=location_in_blob,
                container_name=container_name,
            )
            logger.info(f"Uploaded folders: {folders} to container: {container_name}")
            logger.debug("Folder upload completed")
        if "files" in task_toml["upload"].keys():
            files = task_toml["upload"]["files"]
            logger.debug(f"Uploading files: {files}")
            client.upload_files(
                files=files,
                location_in_blob=location_in_blob,
                container_name=container_name,
            )
            logger.info(f"Uploaded files: {files} to container: {container_name}")
            logger.debug("File upload completed")
    else:
        logger.debug("No upload section found in task config")

    # create the job
    job_name = task_toml["job"]["job_name"]
    logger.debug(f"Creating job: {job_name}")

    if "save_logs_to_blob" in task_toml["job"].keys():
        save_logs_to_blob = task_toml["job"]["save_logs_to_blob"]
    else:
        save_logs_to_blob = None
    if "logs_folder" in task_toml["job"].keys():
        logs_folder = task_toml["job"]["logs_folder"]
    else:
        logs_folder = None
    if "task_retries" in task_toml["job"].keys():
        task_retries = task_toml["job"]["task_retries"]
    else:
        task_retries = 0

    logger.debug(
        f"Job config - save_logs_to_blob: {save_logs_to_blob}, logs_folder: {logs_folder}, task_retries: {task_retries}"
    )

    client.create_job(
        job_name=job_name,
        pool_name=pool_name,
        save_logs_to_blob=save_logs_to_blob,
        logs_folder=logs_folder,
        task_retries=task_retries,
    )
    logger.debug(f"Job '{job_name}' created successfully")

    # create the tasks for the experiment
    logger.debug("Creating tasks for job")
    # get the container to use if necessary
    if "container" in task_toml["job"].keys():
        container = task_toml["job"]["container"]
        logger.debug(f"Using container: {container}")
    else:
        container = None
        logger.debug("No container specified for tasks")

    # submit the tasks
    tt = task_toml["task"]
    logger.debug(f"Processing {len(tt)} tasks from configuration")
    df = pd.json_normalize(tt)
    df.insert(0, "task_id", pd.Series("", index=range(df.shape[0])))
    logger.debug("Created task tracking dataframe")
    # when kicking off a task we save the taskid to the row in df
    for i, item in enumerate(tt):
        task_name = item.get("name", f"task_{i}")
        logger.debug(f"Processing task {i + 1}/{len(tt)}: {task_name}")

        if "depends_on" in item.keys():
            d_list = []
            logger.debug(f"Task has dependencies: {item['depends_on']}")
            for d in item["depends_on"]:
                d_task = df[df["name"] == d]["task_id"].values[0]
                d_list.append(d_task)
            logger.debug(f"Resolved dependency task IDs: {d_list}")
        else:
            d_list = None
            logger.debug("Task has no dependencies")

        # check for other attributes
        if "run_dependent_tasks_on_fail" in item.keys():
            run_dependent_tasks_on_fail = item["run_dependent_tasks_on_fail"]
        else:
            run_dependent_tasks_on_fail = False
        logger.debug(f"run_dependent_tasks_on_fail: {run_dependent_tasks_on_fail}")

        # submit the task
        logger.debug(f"Submitting task with command: {item['cmd']}")
        tid = client.add_task(
            job_name=job_name,
            command_line=item["cmd"],
            depends_on=d_list,
            run_dependent_tasks_on_fail=run_dependent_tasks_on_fail,
            container_image_name=container,
        )
        df.loc[i, "task_id"] = tid
        logger.debug(f"Task submitted successfully with ID: {tid}")

    if "monitor_job" in task_toml["job"].keys():
        if task_toml["job"]["monitor_job"] is True:
            logger.debug(f"Starting job monitoring for: {job_name}")
            client.monitor_job(job_name)
        else:
            logger.debug("Job monitoring disabled in configuration")
    else:
        logger.debug("No monitor_job setting found in configuration")

    logger.debug(f"Task execution completed for job: {job_name}")
    return None

cfa.cloudops.autoscale

cfa.cloudops.blob

Functions for interacting with Azure Blob Storage.

async_download_blob_folder(container_name, local_folder, storage_account_url, name_starts_with=None, include_extensions=None, exclude_extensions=None, check_size=True, max_concurrent_downloads=20, credential=None)

Downloads blobs from an Azure container to a local folder asynchronously.

This is the main entry point for downloading blobs. It sets up Azure credentials, creates the necessary clients, and runs the async download process.

Parameters:

Name Type Description Default
container_name str

Name of the Azure Storage container to download from.

required
local_folder Path

Local directory path where blobs will be downloaded.

required
storage_account_url str

URL of the Azure Storage account (e.g., "https://.blob.core.windows.net").

required
name_starts_with str

Filter blobs to only those with names starting with this prefix.

None
include_extensions str or list

File extensions to include (e.g., ".txt", [".json", ".csv"]).

None
exclude_extensions str or list

File extensions to exclude (e.g., ".log", [".tmp", ".bak"]).

None
check_size bool

If True, prompts user if total download size exceeds 2 GB. Defaults to True.

True
max_concurrent_downloads int

Maximum number of simultaneous downloads allowed. Defaults to 20.

20
credential any

Azure credential object. If None, ManagedIdentityCredential is used.

None

Raises:

Type Description
KeyboardInterrupt

If the user cancels the download operation.

Exception

For any Azure SDK or network-related errors during download.

Notes

Uses ManagedIdentityCredential for authentication. Preserves blob folder structure in the local directory. Handles cleanup of Azure credentials automatically.

Source code in cfa/cloudops/blob.py
def async_download_blob_folder(
    container_name: str,
    local_folder: Path,
    storage_account_url: str,
    name_starts_with: str | None = None,
    include_extensions: str | list | None = None,
    exclude_extensions: str | list | None = None,
    check_size: bool = True,
    max_concurrent_downloads: int = 20,
    credential: any = None,
) -> None:
    """
    Downloads blobs from an Azure container to a local folder asynchronously.

    This is the main entry point for downloading blobs. It sets up Azure credentials, creates the necessary clients, and runs the async download process.

    Args:
        container_name (str): Name of the Azure Storage container to download from.
        local_folder (Path): Local directory path where blobs will be downloaded.
        storage_account_url (str): URL of the Azure Storage account (e.g., "https://<account_name>.blob.core.windows.net").
        name_starts_with (str, optional): Filter blobs to only those with names starting with this prefix.
        include_extensions (str or list, optional): File extensions to include (e.g., ".txt", [".json", ".csv"]).
        exclude_extensions (str or list, optional): File extensions to exclude (e.g., ".log", [".tmp", ".bak"]).
        check_size (bool, optional): If True, prompts user if total download size exceeds 2 GB. Defaults to True.
        max_concurrent_downloads (int, optional): Maximum number of simultaneous downloads allowed. Defaults to 20.
        credential (any, optional): Azure credential object. If None, ManagedIdentityCredential is used.

    Raises:
        KeyboardInterrupt: If the user cancels the download operation.
        Exception: For any Azure SDK or network-related errors during download.

    Notes:
        Uses ManagedIdentityCredential for authentication.
        Preserves blob folder structure in the local directory.
        Handles cleanup of Azure credentials automatically.
    """
    logger.debug(
        f"Starting async blob folder download from container '{container_name}'"
    )
    logger.debug(f"Storage account URL: {storage_account_url}")
    logger.debug(f"Local folder: {local_folder}")
    logger.debug(f"Max concurrent downloads: {max_concurrent_downloads}")

    async def _runner(credential) -> None:
        if credential is None:
            logger.debug("No credential provided, using ManagedIdentityCredential")
            credential = ManagedIdentityCredential()
        else:
            logger.debug("Using provided credential")

        try:
            logger.debug("Creating blob service client")
            with BlobServiceClient(
                account_url=storage_account_url,
                credential=credential,
            ) as blob_service_client:
                logger.debug(f"Creating container client for '{container_name}'")
                container_client = blob_service_client.get_container_client(
                    container_name
                )

                logger.debug("Starting async blob folder download operation")
                await _async_download_blob_folder(
                    container_client=container_client,
                    local_folder=anyio.Path(local_folder),
                    name_starts_with=name_starts_with,
                    include_extensions=include_extensions,
                    exclude_extensions=exclude_extensions,
                    max_concurrent_downloads=max_concurrent_downloads,
                    check_size=check_size,
                )
        except Exception as e:
            logger.error(f"Error during download: {e}")
            raise

    try:
        anyio.run(_runner, credential)

    except KeyboardInterrupt:
        logger.error("Download cancelled by user.")
    except Exception as e:
        logger.error(f"Failed to download blob folder: {e}")
    logger.info(
        f"Completed async download of folder '{local_folder}' from container '{container_name}'."
    )
    return local_folder

async_upload_folder(folder, container_name, storage_account_url, include_extensions=None, exclude_extensions=None, location_in_blob='.', max_concurrent_uploads=20, credential=None)

Upload all files from a local folder to an Azure blob container asynchronously.

This is the main entry point for uploading files. It sets up Azure credentials, creates the necessary clients, and runs the async upload process.

Parameters:

Name Type Description Default
folder str

Local directory path whose files will be uploaded.

required
container_name str

Name of the Azure Storage container to upload to.

required
storage_account_url str

URL of the Azure Storage account (e.g., "https://.blob.core.windows.net").

required
include_extensions str or list

File extensions to include (e.g., ".txt", [".json", ".csv"]).

None
exclude_extensions str or list

File extensions to exclude (e.g., ".log", [".tmp", ".bak"]).

None
location_in_blob str

Path within the blob container where files will be uploaded. Defaults to "." (root of the container).

'.'
max_concurrent_uploads int

Maximum number of simultaneous uploads allowed. Defaults to 20.

20
credential any

Azure credential object. If None, ManagedIdentityCredential is used.

None

Raises:

Type Description
KeyboardInterrupt

If the user cancels the upload operation.

Exception

For any Azure SDK or network-related errors during upload.

Notes
  • Uses ManagedIdentityCredential for authentication.
  • Preserves folder structure in the blob container.
  • Handles cleanup of Azure credentials automatically.
Source code in cfa/cloudops/blob.py
def async_upload_folder(
    folder: str,
    container_name: str,
    storage_account_url: str,
    include_extensions: str | list | None = None,
    exclude_extensions: str | list | None = None,
    location_in_blob: str = ".",
    max_concurrent_uploads: int = 20,
    credential: any = None,
) -> None:
    """
    Upload all files from a local folder to an Azure blob container asynchronously.

    This is the main entry point for uploading files. It sets up Azure credentials,
    creates the necessary clients, and runs the async upload process.

    Args:
        folder (str): Local directory path whose files will be uploaded.
        container_name (str): Name of the Azure Storage container to upload to.
        storage_account_url (str): URL of the Azure Storage account (e.g., "https://<account_name>.blob.core.windows.net").
        include_extensions (str or list, optional): File extensions to include (e.g., ".txt", [".json", ".csv"]).
        exclude_extensions (str or list, optional): File extensions to exclude (e.g., ".log", [".tmp", ".bak"]).
        location_in_blob (str, optional): Path within the blob container where files will be uploaded. Defaults to "." (root of the container).
        max_concurrent_uploads (int, optional): Maximum number of simultaneous uploads allowed. Defaults to 20.
        credential (any, optional): Azure credential object. If None, ManagedIdentityCredential is used.

    Raises:
        KeyboardInterrupt: If the user cancels the upload operation.
        Exception: For any Azure SDK or network-related errors during upload.

    Notes:
        - Uses ManagedIdentityCredential for authentication.
        - Preserves folder structure in the blob container.
        - Handles cleanup of Azure credentials automatically.
    """
    logger.debug(
        f"Starting async folder upload from '{folder}' to container '{container_name}'"
    )
    logger.debug(f"Storage account URL: {storage_account_url}")
    logger.debug(f"Target location in blob: '{location_in_blob}'")
    logger.debug(f"Max concurrent uploads: {max_concurrent_uploads}")
    logger.debug(f"Include extensions: {include_extensions}")
    logger.debug(f"Exclude extensions: {exclude_extensions}")

    async def _runner(credential):
        if credential is None:
            logger.debug("No credential provided, using ManagedIdentityCredential")
            credential = ManagedIdentityCredential()
        else:
            logger.debug("Using provided credential")

        try:
            logger.debug(f"Resolved upload folder path: {folder}")
            logger.debug(f"Target container name: {container_name}")

            logger.debug("Creating blob service client")
            blob_service_client = BlobServiceClient(
                account_url=storage_account_url,
                credential=credential,
            )
            if blob_service_client is None:
                logger.error(
                    "Failed to create BlobServiceClient. Check your storage_account_url and credentials."
                )
                raise RuntimeError("Failed to create BlobServiceClient.")

            logger.debug(f"Creating container client for '{container_name}'")
            container_client = blob_service_client.get_container_client(container_name)
            if container_client is None:
                logger.error(
                    f"Failed to get container client for container: {container_name}"
                )
                raise RuntimeError(
                    f"Failed to get container client for container: {container_name}"
                )
            await _async_upload_blob_folder(
                container_client=container_client,
                folder=anyio.Path(folder),
                location_in_blob=location_in_blob,
                include_extensions=include_extensions,
                exclude_extensions=exclude_extensions,
                max_concurrent_uploads=max_concurrent_uploads,
            )
        except Exception as e:
            logger.error(f"Error during upload: {e}")
            raise

    try:
        anyio.run(_runner, credential)
    except KeyboardInterrupt:
        logger.error("Upload cancelled by user.")
    except Exception as e:
        logger.error(f"Failed to upload blob folder: {e}")
    logger.info(
        f"Completed async upload of folder '{folder}' to container '{container_name}'."
    )
    return folder

create_storage_container_if_not_exists(blob_storage_container_name, blob_service_client)

Create an Azure blob storage container if it does not already exist.

Parameters:

Name Type Description Default
blob_storage_container_name str

Name of the storage container.

required
blob_service_client BlobServiceClient

The blob service client to use when looking for and potentially creating the storage container.

required
Example

from azure.storage.blob import BlobServiceClient client = BlobServiceClient(account_url="...", credential="...") create_storage_container_if_not_exists("my-container", client) Container [my-container] created.

Source code in cfa/cloudops/blob.py
def create_storage_container_if_not_exists(
    blob_storage_container_name: str, blob_service_client: BlobServiceClient
) -> None:
    """Create an Azure blob storage container if it does not already exist.

    Args:
        blob_storage_container_name: Name of the storage container.
        blob_service_client: The blob service client to use when looking
            for and potentially creating the storage container.

    Example:
        >>> from azure.storage.blob import BlobServiceClient
        >>> client = BlobServiceClient(account_url="...", credential="...")
        >>> create_storage_container_if_not_exists("my-container", client)
        Container [my-container] created.
    """
    logger.debug(f"Checking if container '{blob_storage_container_name}' exists")

    container_client = blob_service_client.get_container_client(
        container=blob_storage_container_name
    )

    logger.debug("Container client created, checking container existence")
    if not container_client.exists():
        logger.debug(
            f"Container '{blob_storage_container_name}' does not exist, creating it"
        )
        container_client.create_container()
        logger.info(f"Container '{blob_storage_container_name}' created.")
        print("Container [{}] created.".format(blob_storage_container_name))
    else:
        logger.info(f"Container '{blob_storage_container_name}' already exists.")
        print("Container [{}] already exists.".format(blob_storage_container_name))

download_from_storage_container(file_paths, blob_storage_container_name, blob_service_client=None, local_root_dir='.', remote_root_dir='.', **kwargs)

Download a list of files from an Azure blob storage container.

Preserves relative directory structure.

Parameters:

Name Type Description Default
file_paths str | list[str]

File or list of files to download, as string paths relative to remote_root_dir. A single string will be coerced to a length-one list.

required
blob_storage_container_name str

Name of the blob storage container from which to download the files. Must already exist.

required
blob_service_client BlobServiceClient

BlobServiceClient to use when downloading. If None, attempt to create one via client.get_blob_service_client using provided **kwargs, if any.

None
local_root_dir str

Root directory for the relative file paths in local storage. Defaults to "." (use the local working directory).

'.'
remote_root_dir str

Root directory for the relative file paths within the blob storage container. Defaults to "." (start at the blob storage container root).

'.'
**kwargs

Keyword arguments passed to client.get_blob_service_client.

{}

Raises:

Type Description
Exception

If the blob storage container does not exist.

Example

from azure.storage.blob import BlobServiceClient client = BlobServiceClient(account_url="...", credential="...") download_from_storage_container( ... ["file1.txt", "subdir/file2.txt"], ... "my-container", ... client, ... local_root_dir="/local/path", ... remote_root_dir="uploads" ... ) Downloading file 0 of 2 Downloaded 2 files from blob storage container

Source code in cfa/cloudops/blob.py
def download_from_storage_container(
    file_paths: str | list[str],
    blob_storage_container_name: str,
    blob_service_client: BlobServiceClient = None,
    local_root_dir: str = ".",
    remote_root_dir: str = ".",
    **kwargs,
) -> None:
    """Download a list of files from an Azure blob storage container.

    Preserves relative directory structure.

    Args:
        file_paths: File or list of files to download, as string paths relative to
            ``remote_root_dir``. A single string will be coerced to a length-one list.
        blob_storage_container_name: Name of the blob storage container from which
            to download the files. Must already exist.
        blob_service_client: BlobServiceClient to use when downloading.
            If None, attempt to create one via ``client.get_blob_service_client``
            using provided ``**kwargs``, if any.
        local_root_dir: Root directory for the relative file paths in local storage.
            Defaults to "." (use the local working directory).
        remote_root_dir: Root directory for the relative file paths within the blob
            storage container. Defaults to "." (start at the blob storage container root).
        **kwargs: Keyword arguments passed to ``client.get_blob_service_client``.

    Raises:
        Exception: If the blob storage container does not exist.

    Example:
        >>> from azure.storage.blob import BlobServiceClient
        >>> client = BlobServiceClient(account_url="...", credential="...")
        >>> download_from_storage_container(
        ...     ["file1.txt", "subdir/file2.txt"],
        ...     "my-container",
        ...     client,
        ...     local_root_dir="/local/path",
        ...     remote_root_dir="uploads"
        ... )
        Downloading file 0 of 2
        Downloaded 2 files from blob storage container
    """
    logger.debug(f"Starting download from container '{blob_storage_container_name}'")
    logger.debug(f"Local root: '{local_root_dir}', Remote root: '{remote_root_dir}'")

    file_paths = ensure_listlike(file_paths)
    n_total_files = len(file_paths)

    logger.debug(f"Downloading {n_total_files} files from blob storage")

    if blob_service_client is None:
        logger.debug("No blob service client provided, creating one")
        blob_service_client = get_blob_service_client(**kwargs)
        logger.debug("Blob service client created successfully")

    for i_file, file_path in enumerate(file_paths):
        if i_file % (1 + int(n_total_files / 10)) == 0:
            print(f"Downloading file {i_file} of {n_total_files}")
            logger.debug(f"Download progress: {i_file}/{n_total_files} files completed")

        local_file_path = os.path.join(local_root_dir, file_path)
        remote_file_path = os.path.join(remote_root_dir, file_path)

        logger.debug(f"Downloading '{remote_file_path}' -> '{local_file_path}'")

        blob_client = blob_service_client.get_blob_client(
            container=blob_storage_container_name, blob=remote_file_path
        )

        logger.debug(f"Created blob client for '{remote_file_path}'")

        with open(local_file_path, "wb") as target_file:
            download_stream = blob_client.download_blob()
            target_file.write(download_stream.readall())
            logger.debug(f"Successfully downloaded '{file_path}'")

    logger.info(
        f"Downloaded {n_total_files} file(s) from container '{blob_storage_container_name}'."
    )
    print(f"Downloaded {n_total_files} files from blob storage container")

format_extensions(extension)

Formats file extensions to include leading periods.

Ensures that file extensions have the correct format with leading periods. Accepts both single extensions and lists of extensions, with or without leading periods.

Parameters:

Name Type Description Default
extension str or list

File extension(s) to format. Can be a single extension string or a list of extension strings. Leading periods are optional (e.g., "txt" or ".txt" both work).

required

Returns:

Name Type Description
list

List of properly formatted extensions with leading periods.

Examples:

Format a single extension: formatted = format_extensions("txt") # Returns: [".txt"]

Format multiple extensions: formatted = format_extensions(["py", ".js", "csv"]) # Returns: [".py", ".js", ".csv"]

Handle mixed formats: formatted = format_extensions([".pdf", "docx"]) # Returns: [".pdf", ".docx"]

Source code in cfa/cloudops/blob.py
def format_extensions(extension):
    """
    Formats file extensions to include leading periods.

    Ensures that file extensions have the correct format with leading periods. Accepts both single extensions and lists of extensions, with or without leading periods.

    Args:
        extension (str or list): File extension(s) to format. Can be a single extension string or a list of extension strings. Leading periods are optional (e.g., "txt" or ".txt" both work).

    Returns:
        list: List of properly formatted extensions with leading periods.

    Examples:
        Format a single extension:
            formatted = format_extensions("txt")
            # Returns: [".txt"]

        Format multiple extensions:
            formatted = format_extensions(["py", ".js", "csv"])
            # Returns: [".py", ".js", ".csv"]

        Handle mixed formats:
            formatted = format_extensions([".pdf", "docx"])
            # Returns: [".pdf", ".docx"]
    """
    logger.debug(
        f"Formatting extensions: {extension} (type: {type(extension).__name__})"
    )

    if isinstance(extension, str):
        extension = [extension]
        logger.debug("Converted single extension string to list")

    ext = []
    for _ext in extension:
        if _ext.startswith("."):
            ext.append(_ext)
            logger.debug(f"Extension '{_ext}' already has leading period")
        else:
            formatted_ext = "." + _ext
            ext.append(formatted_ext)
            logger.debug(
                f"Added leading period to extension: '{_ext}' -> '{formatted_ext}'"
            )

    logger.debug(f"Final formatted extensions: {ext}")
    return ext

get_node_mount_config(storage_containers, account_names, identity_references, shared_relative_mount_path='', mount_names=None, blobfuse_options='', cache_blobfuse=False, **kwargs)

Get configuration for mounting Azure Blob Storage containers to Azure Batch nodes via blobfuse.

Parameters:

Name Type Description Default
storage_containers str | list[str]

Name(s) of the Azure Blob storage container(s) to mount.

required
account_names str | list[str]

Name(s) of the Azure Blob storage account(s) in which to look for the storage container(s). If a single value, look for all storage containers within the same storage account. Otherwise, look for each container within the corresponding account. The function will raise an error if there is more than one account_names value but a different number of storage_containers, as then the mapping is ambiguous.

required
identity_references ComputeNodeIdentityReference | list[ComputeNodeIdentityReference]

Valid ComputeNodeIdentityReference objects for the node to use when connecting to the storage_containers, or an iterable of such objects with one object for each of the storage_containers.

required
shared_relative_mount_path str

Path relative to the fsmounts directory within the running node at which to mount the storage containers. Defaults to "" (mount within fsmounts itself).

''
mount_names list[str]

Iterable of names (or paths) for the individual mounted storage containers relative to the shared_relative_mount_path. If None, use the storage container names given in storage_containers as the mount_names.

None
blobfuse_options str | list[str]

Additional options passed to blobfuse. Defaults to "".

''
cache_blobfuse bool

Whether to cache Blob storage. Defaults to False.

False
**kwargs

Additional keyword arguments passed to the models.AzureBlobFileSystemConfiguration constructor.

{}

Returns:

Type Description
list[MountConfiguration]

list[models.MountConfiguration]: A list of instantiated MountConfiguration objects describing the desired storage container mounts.

Raises:

Type Description
ValueError

If the number of mount_names doesn't match storage_containers, or if the number of account_names or identity_references doesn't match storage_containers and isn't exactly 1.

Example

from azure.batch import models identity_ref = models.ComputeNodeIdentityReference( ... resource_id="/subscriptions/.../resourceGroups/.../providers/..." ... ) mount_configs = get_node_mount_config( ... storage_containers=["container1", "container2"], ... account_names="mystorageaccount", ... identity_references=identity_ref, ... shared_relative_mount_path="data", ... cache_blobfuse=True ... ) len(mount_configs) 2

Source code in cfa/cloudops/blob.py
def get_node_mount_config(
    storage_containers: str | list[str],
    account_names: str | list[str],
    identity_references: (
        models.ComputeNodeIdentityReference | list[models.ComputeNodeIdentityReference]
    ),
    shared_relative_mount_path: str = "",
    mount_names: list[str] = None,
    blobfuse_options: str | list[str] = "",
    cache_blobfuse: bool = False,
    **kwargs,
) -> list[models.MountConfiguration]:
    """Get configuration for mounting Azure Blob Storage containers to Azure Batch nodes via blobfuse.

    Args:
        storage_containers: Name(s) of the Azure Blob storage container(s) to mount.
        account_names: Name(s) of the Azure Blob storage account(s) in which to look
            for the storage container(s). If a single value, look for all storage
            containers within the same storage account. Otherwise, look for each
            container within the corresponding account. The function will raise an
            error if there is more than one ``account_names`` value but a different
            number of ``storage_containers``, as then the mapping is ambiguous.
        identity_references: Valid ComputeNodeIdentityReference objects for the node
            to use when connecting to the ``storage_containers``, or an iterable of
            such objects with one object for each of the ``storage_containers``.
        shared_relative_mount_path: Path relative to the ``fsmounts`` directory within
            the running node at which to mount the storage containers. Defaults to ""
            (mount within ``fsmounts`` itself).
        mount_names: Iterable of names (or paths) for the individual mounted storage
            containers relative to the ``shared_relative_mount_path``. If None, use
            the storage container names given in ``storage_containers`` as the
            ``mount_names``.
        blobfuse_options: Additional options passed to blobfuse. Defaults to "".
        cache_blobfuse: Whether to cache Blob storage. Defaults to False.
        **kwargs: Additional keyword arguments passed to the
            ``models.AzureBlobFileSystemConfiguration`` constructor.

    Returns:
        list[models.MountConfiguration]: A list of instantiated MountConfiguration
            objects describing the desired storage container mounts.

    Raises:
        ValueError: If the number of mount_names doesn't match storage_containers,
            or if the number of account_names or identity_references doesn't match
            storage_containers and isn't exactly 1.

    Example:
        >>> from azure.batch import models
        >>> identity_ref = models.ComputeNodeIdentityReference(
        ...     resource_id="/subscriptions/.../resourceGroups/.../providers/..."
        ... )
        >>> mount_configs = get_node_mount_config(
        ...     storage_containers=["container1", "container2"],
        ...     account_names="mystorageaccount",
        ...     identity_references=identity_ref,
        ...     shared_relative_mount_path="data",
        ...     cache_blobfuse=True
        ... )
        >>> len(mount_configs)
        2
    """
    logger.debug(
        f"Creating node mount configuration for containers: {storage_containers}"
    )
    logger.debug(f"Account names: {account_names}")
    logger.debug(f"Shared relative mount path: '{shared_relative_mount_path}'")
    logger.debug(f"Cache blobfuse: {cache_blobfuse}")

    storage_containers = ensure_listlike(storage_containers)
    account_names = ensure_listlike(account_names)
    identity_references = ensure_listlike(identity_references)

    n_containers = len(storage_containers)
    n_accounts = len(account_names)
    n_identity_refs = len(identity_references)

    logger.debug(
        f"Processing {n_containers} containers, {n_accounts} accounts, {n_identity_refs} identity references"
    )

    if mount_names is None:
        mount_names = storage_containers
        logger.debug("Using container names as mount names")
    else:
        mount_names = ensure_listlike(mount_names)
        logger.debug(f"Using custom mount names: {mount_names}")
    n_mount_names = len(mount_names)

    if n_mount_names != n_containers:
        raise ValueError(
            "Must provide exactly as many "
            "`mount_names` as `storage_containers` "
            "to mount, or set `mount_names=None`, "
            "in which case the storage container "
            "names in `storage_containers` will "
            "be used as the names for mounting "
            "the containers. Got "
            f"{n_mount_names} `mount_names` and "
            f"{n_containers} `storage_containers`."
        )

    if n_containers != n_accounts:
        if n_accounts == 1:
            account_names *= n_containers
        else:
            raise ValueError(
                "Must either provide a single `account_names`"
                "value (as a string or a length-1 list) "
                "to cover all `storage_containers` values "
                "or provide one `account_names` value for "
                "each `storage_containers` value. Got "
                f"{n_accounts} `account_names` and "
                f"{n_containers} `storage_containers`."
            )

    if n_containers != n_identity_refs:
        if n_identity_refs == 1:
            identity_references *= n_containers
        else:
            raise ValueError(
                "Must either provide a single `identity_references`"
                "value (as a single ComputeNodeIdentityReference "
                "object or a length-1 list containing a "
                "ComputeNodeIdentityReference object) "
                "to cover all `storage_containers` values "
                "or provide one `identity_references` value for "
                "each `storage_containers` value. Got "
                f"{n_identity_refs} `identity_references` and "
                f"{n_containers} `storage_containers`."
            )

    relative_mount_paths = [
        os.path.join(shared_relative_mount_path, mount_name)
        for mount_name in mount_names
    ]
    logger.debug(f"Generated relative mount paths: {relative_mount_paths}")

    if cache_blobfuse:
        blob_str = ""
        logger.debug("Caching enabled - no direct_io option")
    else:
        blob_str = " -o direct_io"
        logger.debug("Caching disabled - adding direct_io option")

    mount_configs = []
    for account_name, container_name, relative_mount_path, identity_reference in zip(
        account_names, storage_containers, relative_mount_paths, identity_references
    ):
        logger.debug(
            f"Creating mount config: container '{container_name}' from account '{account_name}' -> '{relative_mount_path}'"
        )

        mount_config = models.MountConfiguration(
            azure_blob_file_system_configuration=(
                models.AzureBlobFileSystemConfiguration(
                    account_name=account_name,
                    container_name=container_name,
                    relative_mount_path=relative_mount_path,
                    blobfuse_options=blobfuse_options + blob_str,
                    identity_reference=identity_reference,
                    **kwargs,
                )
            )
        )
        mount_configs.append(mount_config)

    logger.debug(f"Created {len(mount_configs)} mount configurations")
    return mount_configs

upload_to_storage_container(file_paths, blob_storage_container_name, blob_service_client, local_root_dir='.', remote_root_dir='.')

Upload a file or list of files to an Azure blob storage container.

This function preserves relative directory structure among the uploaded files within the storage container.

Parameters:

Name Type Description Default
file_paths str | list[str]

File or list of files to upload, as string paths relative to local_root_dir. A single string will be coerced to a length-one list.

required
blob_storage_container_name str

Name of the blob storage container to which to upload the files. Must already exist.

required
blob_service_client BlobServiceClient

BlobServiceClient to use when uploading.

required
local_root_dir str

Root directory for the relative file paths in local storage. Defaults to "." (use the local working directory).

'.'
remote_root_dir str

Root directory for the relative file paths within the blob storage container. Defaults to "." (start at the blob storage container root).

'.'

Raises:

Type Description
Exception

If the blob storage container does not exist.

Example

from azure.storage.blob import BlobServiceClient client = BlobServiceClient(account_url="...", credential="...") upload_to_storage_container( ... ["file1.txt", "subdir/file2.txt"], ... "my-container", ... client, ... local_root_dir="/local/path", ... remote_root_dir="uploads" ... ) Uploading file 0 of 2 Uploaded 2 files to blob storage container

Source code in cfa/cloudops/blob.py
def upload_to_storage_container(
    file_paths: str | list[str],
    blob_storage_container_name: str,
    blob_service_client: BlobServiceClient,
    local_root_dir: str = ".",
    remote_root_dir: str = ".",
) -> None:
    """Upload a file or list of files to an Azure blob storage container.

    This function preserves relative directory structure among the
    uploaded files within the storage container.

    Args:
        file_paths: File or list of files to upload, as string paths relative to
            ``local_root_dir``. A single string will be coerced to a length-one list.
        blob_storage_container_name: Name of the blob storage container to which
            to upload the files. Must already exist.
        blob_service_client: BlobServiceClient to use when uploading.
        local_root_dir: Root directory for the relative file paths in local storage.
            Defaults to "." (use the local working directory).
        remote_root_dir: Root directory for the relative file paths within the blob
            storage container. Defaults to "." (start at the blob storage container root).

    Raises:
        Exception: If the blob storage container does not exist.

    Example:
        >>> from azure.storage.blob import BlobServiceClient
        >>> client = BlobServiceClient(account_url="...", credential="...")
        >>> upload_to_storage_container(
        ...     ["file1.txt", "subdir/file2.txt"],
        ...     "my-container",
        ...     client,
        ...     local_root_dir="/local/path",
        ...     remote_root_dir="uploads"
        ... )
        Uploading file 0 of 2
        Uploaded 2 files to blob storage container
    """
    logger.debug(f"Starting upload to container '{blob_storage_container_name}'")
    logger.debug(f"Local root: '{local_root_dir}', Remote root: '{remote_root_dir}'")

    file_paths = ensure_listlike(file_paths)
    n_total_files = len(file_paths)

    logger.debug(f"Uploading {n_total_files} files to blob storage")

    for i_file, file_path in enumerate(file_paths):
        if i_file % (1 + int(n_total_files / 10)) == 0:
            print("Uploading file {} of {}".format(i_file, n_total_files))
            logger.debug(f"Upload progress: {i_file}/{n_total_files} files completed")

        local_file_path = os.path.join(local_root_dir, file_path)
        remote_file_path = os.path.join(remote_root_dir, file_path)

        logger.debug(f"Uploading '{local_file_path}' -> '{remote_file_path}'")

        blob_client = blob_service_client.get_blob_client(
            container=blob_storage_container_name, blob=remote_file_path
        )

        logger.debug(f"Created blob client for '{remote_file_path}'")

        with open(local_file_path, "rb") as upload_data:
            blob_client.upload_blob(upload_data, overwrite=True)
            logger.debug(f"Successfully uploaded '{file_path}'")

    logger.info(
        f"Uploaded {n_total_files} file(s) to container '{blob_storage_container_name}'."
    )
    print("Uploaded {} files to blob storage container".format(n_total_files))

cfa.cloudops.client

Helper functions for setting up valid Azure clients.

get_batch_management_client(credential_handler=None, **kwargs)

Get an Azure Batch management client using credentials from a CredentialHandler.

Uses credentials obtained via a CredentialHandler: either a user-provided one or a default based on environment variables.

Parameters:

Name Type Description Default
credential_handler CredentialHandler

Credential handler for connecting and authenticating to Azure resources. If None, create a blank EnvCredentialHandler, which attempts to obtain needed credentials using information available in local environment variables (see its documentation for details).

None
**kwargs

Additional keyword arguments passed to the BatchManagementClient constructor.

{}

Returns:

Name Type Description
BatchManagementClient BatchManagementClient

A client instantiated according to the specified configuration.

Example

Using default environment-based credentials

client = get_batch_management_client()

Using custom credential handler

handler = CredentialHandler() client = get_batch_management_client(credential_handler=handler)

Source code in cfa/cloudops/client.py
def get_batch_management_client(
    credential_handler: CredentialHandler = None, **kwargs
) -> BatchManagementClient:
    """Get an Azure Batch management client using credentials from a CredentialHandler.

    Uses credentials obtained via a CredentialHandler: either a user-provided one
    or a default based on environment variables.

    Args:
        credential_handler: Credential handler for connecting and authenticating to
            Azure resources. If None, create a blank EnvCredentialHandler, which
            attempts to obtain needed credentials using information available in
            local environment variables (see its documentation for details).
        **kwargs: Additional keyword arguments passed to the BatchManagementClient constructor.

    Returns:
        BatchManagementClient: A client instantiated according to the specified configuration.

    Example:
        >>> # Using default environment-based credentials
        >>> client = get_batch_management_client()

        >>> # Using custom credential handler
        >>> handler = CredentialHandler()
        >>> client = get_batch_management_client(credential_handler=handler)
    """
    logger.debug(
        f"Creating BatchManagementClient with credential handler: {type(credential_handler).__name__ if credential_handler else 'None'}"
    )

    ch = credential_handler
    if ch is None:
        logger.debug("No credential handler provided, creating EnvCredentialHandler")
        ch = EnvCredentialHandler()

    logger.debug(f"Selected authentication method: '{ch.method}'")

    if ch.method == "sp":
        logger.debug("Using service principal credentials for BatchManagementClient")
        client = BatchManagementClient(
            credential=ch.client_secret_credential,
            subscription_id=ch.azure_subscription_id,
            **kwargs,
        )
    elif ch.method == "default":
        logger.debug("Using default credentials for BatchManagementClient")
        client = BatchManagementClient(
            credential=ch.client_secret_sp_credential,
            subscription_id=ch.azure_subscription_id,
            **kwargs,
        )
    else:
        logger.debug("Using user credentials for BatchManagementClient")
        client = BatchManagementClient(
            credential=ch.user_credential,
            subscription_id=ch.azure_subscription_id,
            **kwargs,
        )

    logger.info(f"BatchManagementClient created using '{ch.method}' authentication.")
    return client

get_batch_service_client(credential_handler=None, **kwargs)

Get an Azure batch service client using credentials from a CredentialHandler.

Uses credentials obtained via a CredentialHandler: either a user-provided one or a default based on environment variables.

Parameters:

Name Type Description Default
credential_handler CredentialHandler

Credential handler for connecting and authenticating to Azure resources. If None, create a blank EnvCredentialHandler, which attempts to obtain needed credentials using information available in local environment variables (see its documentation for details).

None
**kwargs

Additional keyword arguments passed to the BatchServiceClient constructor.

{}

Returns:

Name Type Description
BatchServiceClient BatchServiceClient

A client instantiated according to the specified configuration.

Example

Using default environment-based credentials

client = get_batch_service_client()

Using custom credential handler

handler = CredentialHandler() client = get_batch_service_client(credential_handler=handler)

Source code in cfa/cloudops/client.py
def get_batch_service_client(
    credential_handler: CredentialHandler = None, **kwargs
) -> BatchServiceClient:
    """Get an Azure batch service client using credentials from a CredentialHandler.

    Uses credentials obtained via a CredentialHandler: either a user-provided one
    or a default based on environment variables.

    Args:
        credential_handler: Credential handler for connecting and authenticating to
            Azure resources. If None, create a blank EnvCredentialHandler, which
            attempts to obtain needed credentials using information available in
            local environment variables (see its documentation for details).
        **kwargs: Additional keyword arguments passed to the BatchServiceClient constructor.

    Returns:
        BatchServiceClient: A client instantiated according to the specified configuration.

    Example:
        >>> # Using default environment-based credentials
        >>> client = get_batch_service_client()

        >>> # Using custom credential handler
        >>> handler = CredentialHandler()
        >>> client = get_batch_service_client(credential_handler=handler)
    """
    logger.debug(
        f"Creating BatchServiceClient with credential handler: {type(credential_handler).__name__ if credential_handler else 'None'}"
    )

    ch = credential_handler
    if ch is None:
        logger.debug("No credential handler provided, creating EnvCredentialHandler")
        ch = EnvCredentialHandler()

    logger.debug(f"Selected authentication method: '{ch.method}'")
    logger.debug(f"Using batch endpoint: {ch.azure_batch_endpoint}")

    if ch.method == "sp":
        logger.info("Using service principal credentials for BatchServiceClient")
        logger.debug("Creating BatchServiceClient with service principal credentials")
        client = BatchServiceClient(
            credentials=ch.batch_service_principal_credentials,
            batch_url=ch.azure_batch_endpoint,
            **kwargs,
        )
    elif ch.method == "default":
        logger.info("Using default credentials for BatchServiceClient")
        logger.debug("Creating BatchServiceClient with default credentials")
        client = BatchServiceClient(
            credentials=ch.batch_service_principal_credentials,
            batch_url=ch.azure_batch_endpoint,
            **kwargs,
        )
    else:
        logger.info("Using user credentials for BatchServiceClient")
        logger.debug("Creating BatchServiceClient with user credentials")
        client = BatchServiceClient(
            credentials=ch.batch_service_principal_credentials,
            batch_url=ch.azure_batch_endpoint,
            **kwargs,
        )

    logger.debug("BatchServiceClient created successfully")
    return client

get_blob_service_client(credential_handler=None, **kwargs)

Get an Azure blob service client using credentials from a CredentialHandler.

Uses credentials obtained via a CredentialHandler: either a user-provided one or a default based on environment variables.

Parameters:

Name Type Description Default
credential_handler CredentialHandler

Credential handler for connecting and authenticating to Azure resources. If None, create a blank EnvCredentialHandler, which attempts to obtain needed credentials using information available in local environment variables (see its documentation for details).

None
**kwargs

Additional keyword arguments passed to the BlobServiceClient constructor.

{}

Returns:

Name Type Description
BlobServiceClient BlobServiceClient

A client instantiated according to the specified configuration.

Example

Using default environment-based credentials

client = get_blob_service_client()

Using custom credential handler

handler = CredentialHandler() client = get_blob_service_client(credential_handler=handler)

Source code in cfa/cloudops/client.py
def get_blob_service_client(
    credential_handler: CredentialHandler = None, **kwargs
) -> BlobServiceClient:
    """Get an Azure blob service client using credentials from a CredentialHandler.

    Uses credentials obtained via a CredentialHandler: either a user-provided one
    or a default based on environment variables.

    Args:
        credential_handler: Credential handler for connecting and authenticating to
            Azure resources. If None, create a blank EnvCredentialHandler, which
            attempts to obtain needed credentials using information available in
            local environment variables (see its documentation for details).
        **kwargs: Additional keyword arguments passed to the BlobServiceClient constructor.

    Returns:
        BlobServiceClient: A client instantiated according to the specified configuration.

    Example:
        >>> # Using default environment-based credentials
        >>> client = get_blob_service_client()

        >>> # Using custom credential handler
        >>> handler = CredentialHandler()
        >>> client = get_blob_service_client(credential_handler=handler)
    """
    logger.debug(
        f"Creating BlobServiceClient with credential handler: {type(credential_handler).__name__ if credential_handler else 'None'}"
    )

    ch = credential_handler
    if ch is None:
        logger.debug("No credential handler provided, creating EnvCredentialHandler")
        ch = EnvCredentialHandler()

    logger.debug(f"Selected authentication method: '{ch.method}'")
    logger.debug(f"Using blob storage endpoint: {ch.azure_blob_storage_endpoint}")

    if ch.method == "sp":
        logger.debug("Using service principal credentials for BlobServiceClient")
        client = BlobServiceClient(
            account_url=ch.azure_blob_storage_endpoint,
            credential=ch.client_secret_credential,
            **kwargs,
        )
    elif ch.method == "default":
        logger.debug("Using default credentials for BlobServiceClient")
        client = BlobServiceClient(
            credential=ch.client_secret_sp_credential,
            account_url=ch.azure_blob_storage_endpoint,
            **kwargs,
        )
    else:
        logger.debug("Using user credentials for BlobServiceClient")
        client = BlobServiceClient(
            account_url=ch.azure_blob_storage_endpoint,
            credential=ch.user_credential,
            **kwargs,
        )

    logger.info(
        f"BlobServiceClient created using '{ch.method}' authentication at endpoint '{ch.azure_blob_storage_endpoint}'."
    )
    return client

get_compute_management_client(credential_handler=None, **kwargs)

Get an Azure compute management client using credentials from a CredentialHandler.

Uses credentials obtained via a CredentialHandler: either a user-provided one or a default based on environment variables.

Parameters:

Name Type Description Default
credential_handler CredentialHandler

Credential handler for connecting and authenticating to Azure resources. If None, create a blank EnvCredentialHandler, which attempts to obtain needed credentials using information available in local environment variables (see its documentation for details).

None
**kwargs

Additional keyword arguments passed to the ComputeManagementClient constructor.

{}

Returns:

Name Type Description
ComputeManagementClient ComputeManagementClient

A client instantiated according to the specified configuration.

Example

Using default environment-based credentials

client = get_compute_management_client()

Using custom credential handler

handler = CredentialHandler() client = get_compute_management_client(credential_handler=handler)

Source code in cfa/cloudops/client.py
def get_compute_management_client(
    credential_handler: CredentialHandler = None, **kwargs
) -> ComputeManagementClient:
    """Get an Azure compute management client using credentials from a CredentialHandler.

    Uses credentials obtained via a CredentialHandler: either a user-provided one
    or a default based on environment variables.

    Args:
        credential_handler: Credential handler for connecting and authenticating to
            Azure resources. If None, create a blank EnvCredentialHandler, which
            attempts to obtain needed credentials using information available in
            local environment variables (see its documentation for details).
        **kwargs: Additional keyword arguments passed to the ComputeManagementClient constructor.

    Returns:
        ComputeManagementClient: A client instantiated according to the specified configuration.

    Example:
        >>> # Using default environment-based credentials
        >>> client = get_compute_management_client()

        >>> # Using custom credential handler
        >>> handler = CredentialHandler()
        >>> client = get_compute_management_client(credential_handler=handler)
    """
    logger.debug(
        f"Creating ComputeManagementClient with credential handler: {type(credential_handler).__name__ if credential_handler else 'None'}"
    )

    ch = credential_handler
    if ch is None:
        logger.debug("No credential handler provided, creating EnvCredentialHandler")
        ch = EnvCredentialHandler()

    logger.debug(f"Selected authentication method: '{ch.method}'")

    if ch.method == "sp":
        logger.debug("Using service principal credentials for ComputeManagementClient")
        client = ComputeManagementClient(
            credential=ch.client_secret_credential,
            subscription_id=ch.azure_subscription_id,
            **kwargs,
        )
    elif ch.method == "default":
        logger.debug("Using default credentials for ComputeManagementClient")
        client = ComputeManagementClient(
            credential=ch.client_secret_sp_credential,
            subscription_id=ch.azure_subscription_id,
            **kwargs,
        )
    else:
        logger.debug("Using user credentials for ComputeManagementClient")
        client = ComputeManagementClient(
            credential=ch.user_credential,
            subscription_id=ch.azure_subscription_id,
            **kwargs,
        )

    logger.info(f"ComputeManagementClient created using '{ch.method}' authentication.")
    return client

cfa.cloudops.defaults

Default configurations for Azure resources.

assign_container_config(pool_config, container_config)

Assign a container configuration to a Pool object (in place).

Parameters:

Name Type Description Default
pool_config Pool

Pool configuration object to modify.

required
container_config ContainerConfiguration

ContainerConfiguration object to add to the Pool configuration object.

required

Returns:

Type Description
Pool

models.Pool: The modified Pool object.

Example

from azure.mgmt.batch import models pool = get_default_pool_config("test", "subnet", "identity") container_config = models.ContainerConfiguration(type="dockerCompatible") modified_pool = assign_container_config(pool, container_config)

Pool is modified in place and returned

assert modified_pool is pool

Source code in cfa/cloudops/defaults.py
def assign_container_config(
    pool_config: models.Pool, container_config: models.ContainerConfiguration
) -> models.Pool:
    """Assign a container configuration to a Pool object (in place).

    Args:
        pool_config: Pool configuration object to modify.
        container_config: ContainerConfiguration object to add to the Pool
            configuration object.

    Returns:
        models.Pool: The modified Pool object.

    Example:
        >>> from azure.mgmt.batch import models
        >>> pool = get_default_pool_config("test", "subnet", "identity")
        >>> container_config = models.ContainerConfiguration(type="dockerCompatible")
        >>> modified_pool = assign_container_config(pool, container_config)
        >>> # Pool is modified in place and returned
        >>> assert modified_pool is pool
    """
    logger.debug(
        f"Assigning container configuration to pool: {getattr(pool_config, 'display_name', 'unknown')}"
    )
    logger.debug(
        f"Container configuration type: {getattr(container_config, 'type', 'unknown')}"
    )

    logger.debug(
        "Accessing pool deployment configuration virtual machine configuration"
    )

    (
        pool_config.deployment_configuration.virtual_machine_configuration.container_configuration
    ) = container_config

    logger.debug(
        "Successfully assigned container configuration to pool (in-place modification)"
    )
    logger.debug("Pool object modified and will be returned")

    return pool_config

get_default_pool_config(pool_name, subnet_id, user_assigned_identity, **kwargs)

Instantiate a Pool instance with default configuration.

Creates a Pool with the given pool name and subnet id, the default pool identity given by get_default_pool_identity, and other defaults specified in default_pool_config_dict and default_network_config_dict.

Parameters:

Name Type Description Default
pool_name str

Name for the pool. Passed as the display_name argument to the Pool constructor.

required
subnet_id str

Subnet id for the pool, as a string. Should typically be obtained from a configuration file or an environment variable, often via a CredentialHandler instance.

required
user_assigned_identity str

User-assigned identity for the pool, as a string. Passed to get_default_pool_identity.

required
**kwargs

Additional keyword arguments passed to the Pool constructor, potentially overriding settings from default_pool_config_dict.

{}

Returns:

Type Description
Pool

models.Pool: The instantiated Pool object.

Example

pool = get_default_pool_config( ... pool_name="my-batch-pool", ... subnet_id="/subscriptions/.../subnets/default", ... user_assigned_identity="/subscriptions/.../resourceGroups/..." ... ) print(pool.display_name) 'my-batch-pool' print(pool.vm_size) 'standard_d4s_v3'

Source code in cfa/cloudops/defaults.py
def get_default_pool_config(
    pool_name: str, subnet_id: str, user_assigned_identity: str, **kwargs
) -> models.Pool:
    """Instantiate a Pool instance with default configuration.

    Creates a Pool with the given pool name and subnet id, the default pool identity
    given by get_default_pool_identity, and other defaults specified in
    default_pool_config_dict and default_network_config_dict.

    Args:
        pool_name: Name for the pool. Passed as the ``display_name`` argument
            to the Pool constructor.
        subnet_id: Subnet id for the pool, as a string. Should typically be obtained
            from a configuration file or an environment variable, often via a
            CredentialHandler instance.
        user_assigned_identity: User-assigned identity for the pool, as a string.
            Passed to get_default_pool_identity.
        **kwargs: Additional keyword arguments passed to the Pool constructor,
            potentially overriding settings from default_pool_config_dict.

    Returns:
        models.Pool: The instantiated Pool object.

    Example:
        >>> pool = get_default_pool_config(
        ...     pool_name="my-batch-pool",
        ...     subnet_id="/subscriptions/.../subnets/default",
        ...     user_assigned_identity="/subscriptions/.../resourceGroups/..."
        ... )
        >>> print(pool.display_name)
        'my-batch-pool'
        >>> print(pool.vm_size)
        'standard_d4s_v3'
    """
    logger.debug(f"Creating default pool configuration with name: '{pool_name}'")
    logger.debug(
        f"Additional kwargs provided: {list(kwargs.keys()) if kwargs else 'None'}"
    )

    logger.debug("Getting default pool identity")
    pool_identity = get_default_pool_identity(user_assigned_identity)

    logger.debug(
        f"Using default pool config with: {list(default_pool_config_dict.keys())}"
    )

    # Merge configurations: defaults first, then kwargs overrides
    merged_config = {**default_pool_config_dict, **kwargs}
    logger.debug(f"Final configuration keys: {list(merged_config.keys())}")

    pool = models.Pool(
        identity=pool_identity,
        display_name=pool_name,
        network_configuration=models.NetworkConfiguration(
            subnet_id=subnet_id, **default_network_config_dict
        ),
        **merged_config,
    )

    logger.debug(
        f"Successfully created Pool with display_name: '{pool.display_name}', vm_size: {getattr(pool, 'vm_size', 'unknown')}"
    )

    return pool

get_default_pool_identity(user_assigned_identity)

Get the default BatchPoolIdentity instance for azuretools.

Associates a blank UserAssignedIdentities instance to the provided user_assigned_identity string.

Parameters:

Name Type Description Default
user_assigned_identity str

User-assigned identity, as a string.

required

Returns:

Type Description
BatchPoolIdentity

models.BatchPoolIdentity: Instantiated BatchPoolIdentity instance using the provided user-assigned identity.

Example

identity = get_default_pool_identity( ... "/subscriptions/.../resourceGroups/.../providers/..." ... ) print(identity.type)

Source code in cfa/cloudops/defaults.py
def get_default_pool_identity(
    user_assigned_identity: str,
) -> models.BatchPoolIdentity:
    """Get the default BatchPoolIdentity instance for azuretools.

    Associates a blank UserAssignedIdentities instance to the provided
    user_assigned_identity string.

    Args:
        user_assigned_identity: User-assigned identity, as a string.

    Returns:
        models.BatchPoolIdentity: Instantiated BatchPoolIdentity instance
            using the provided user-assigned identity.

    Example:
        >>> identity = get_default_pool_identity(
        ...     "/subscriptions/.../resourceGroups/.../providers/..."
        ... )
        >>> print(identity.type)
        <PoolIdentityType.user_assigned: 'UserAssigned'>
    """
    logger.debug(
        f"Creating default pool identity for user-assigned identity: {user_assigned_identity}"
    )

    logger.debug("Setting pool identity type to UserAssigned")
    logger.debug("Creating UserAssignedIdentities object for the provided identity")

    pool_identity = models.BatchPoolIdentity(
        type=models.PoolIdentityType.user_assigned,
        user_assigned_identities={
            user_assigned_identity: models.UserAssignedIdentities()
        },
    )

    logger.debug(
        f"Successfully created BatchPoolIdentity with type: {pool_identity.type}"
    )

    return pool_identity

remaining_task_autoscale_formula(task_sample_interval_minutes=15, max_number_vms=10)

Get an autoscaling formula that rescales pools based on the remaining task count.

Parameters:

Name Type Description Default
task_sample_interval_minutes int

Task sampling interval, in minutes, as an integer. Defaults to 15.

15
max_number_vms int

Maximum number of virtual machines to spin up, regardless of the number of remaining tasks. Defaults to 10.

10

Returns:

Name Type Description
str

The autoscale formula, as a string.

Example

Default settings (15 min interval, max 10 VMs)

formula = remaining_task_autoscale_formula() print(type(formula)) #

Custom settings

formula = remaining_task_autoscale_formula( ... task_sample_interval_minutes=30, ... max_number_vms=20 ... ) print("cappedPoolSize = 20" in formula) # True

Source code in cfa/cloudops/defaults.py
def remaining_task_autoscale_formula(
    task_sample_interval_minutes: int = 15,
    max_number_vms: int = 10,
):
    """Get an autoscaling formula that rescales pools based on the remaining task count.

    Args:
        task_sample_interval_minutes: Task sampling interval, in minutes, as an integer.
            Defaults to 15.
        max_number_vms: Maximum number of virtual machines to spin up, regardless of
            the number of remaining tasks. Defaults to 10.

    Returns:
        str: The autoscale formula, as a string.

    Example:
        >>> # Default settings (15 min interval, max 10 VMs)
        >>> formula = remaining_task_autoscale_formula()
        >>> print(type(formula))  # <class 'str'>

        >>> # Custom settings
        >>> formula = remaining_task_autoscale_formula(
        ...     task_sample_interval_minutes=30,
        ...     max_number_vms=20
        ... )
        >>> print("cappedPoolSize = 20" in formula)  # True
    """
    logger.debug(
        f"Generating autoscale formula with parameters: task_sample_interval_minutes={task_sample_interval_minutes}, max_number_vms={max_number_vms}"
    )

    autoscale_formula_template = """// In this example, the pool size
    // is adjusted based on the number of tasks in the queue.
    // Note that both comments and line breaks are acceptable in formula strings.

    // Get pending tasks for the past 15 minutes.
    $samples = $ActiveTasks.GetSamplePercent(TimeInterval_Minute * {task_sample_interval_minutes});
    // If we have fewer than 70 percent data points, we use the last sample point, otherwise we use the maximum of last sample point and the history average.
    $tasks = $samples < 70 ? max(0, $ActiveTasks.GetSample(1)) :
    max( $ActiveTasks.GetSample(1), avg($ActiveTasks.GetSample(TimeInterval_Minute * {task_sample_interval_minutes})));
    // If number of pending tasks is not 0, set targetVM to pending tasks, otherwise half of current dedicated.
    $targetVMs = $tasks > 0 ? $tasks : max(0, $TargetDedicatedNodes / 2);
    // The pool size is capped at {max_number_vms}, if target VM value is more than that, set it to {max_number_vms}.
    cappedPoolSize = {max_number_vms};
    $TargetDedicatedNodes = max(0, min($targetVMs, cappedPoolSize));
    // Set node deallocation mode - keep nodes active only until tasks finish
    $NodeDeallocationOption = taskcompletion;"""

    logger.debug("Formatting autoscale formula template with provided parameters")
    autoscale_formula = autoscale_formula_template.format(
        task_sample_interval_minutes=task_sample_interval_minutes,
        max_number_vms=max_number_vms,
    )

    logger.debug(
        f"Generated autoscale formula with {len(autoscale_formula)} characters, capped at {max_number_vms} VMs with {task_sample_interval_minutes}min intervals"
    )

    return autoscale_formula

set_env_vars()

Set default Azure environment variables.

Sets default values for Azure service endpoints and creates new variables as a function of existing environment variables.

Example

import os set_env_vars() print(os.environ["AZURE_BATCH_ENDPOINT_SUBDOMAIN"]) 'batch.azure.com/' print(os.environ["AZURE_CONTAINER_REGISTRY_DOMAIN"]) 'azurecr.io'

Source code in cfa/cloudops/defaults.py
def set_env_vars():
    """Set default Azure environment variables.

    Sets default values for Azure service endpoints and creates new variables
    as a function of existing environment variables.

    Example:
        >>> import os
        >>> set_env_vars()
        >>> print(os.environ["AZURE_BATCH_ENDPOINT_SUBDOMAIN"])
        'batch.azure.com/'
        >>> print(os.environ["AZURE_CONTAINER_REGISTRY_DOMAIN"])
        'azurecr.io'
    """
    logger.debug("Setting default Azure environment variables")

    # save default values
    logger.debug("Setting Azure service endpoint subdomains and URLs")
    os.environ["AZURE_BATCH_ENDPOINT_SUBDOMAIN"] = "batch.azure.com/"
    logger.debug(
        f"Set AZURE_BATCH_ENDPOINT_SUBDOMAIN = {os.environ['AZURE_BATCH_ENDPOINT_SUBDOMAIN']}"
    )

    os.environ["AZURE_BATCH_RESOURCE_URL"] = "https://batch.core.windows.net/"
    logger.debug(
        f"Set AZURE_BATCH_RESOURCE_URL = {os.environ['AZURE_BATCH_RESOURCE_URL']}"
    )

    os.environ["AZURE_KEYVAULT_ENDPOINT_SUBDOMAIN"] = "vault.azure.net"
    logger.debug(
        f"Set AZURE_KEYVAULT_ENDPOINT_SUBDOMAIN = {os.environ['AZURE_KEYVAULT_ENDPOINT_SUBDOMAIN']}"
    )

    os.environ["AZURE_BLOB_STORAGE_ENDPOINT_SUBDOMAIN"] = "blob.core.windows.net/"
    logger.debug(
        f"Set AZURE_BLOB_STORAGE_ENDPOINT_SUBDOMAIN = {os.environ['AZURE_BLOB_STORAGE_ENDPOINT_SUBDOMAIN']}"
    )

    os.environ["AZURE_CONTAINER_REGISTRY_DOMAIN"] = "azurecr.io"
    logger.debug(
        f"Set AZURE_CONTAINER_REGISTRY_DOMAIN = {os.environ['AZURE_CONTAINER_REGISTRY_DOMAIN']}"
    )

    # create new variables as a function of env vars
    logger.debug(
        "Creating derived environment variables from existing Azure account settings"
    )

    batch_account = os.getenv("AZURE_BATCH_ACCOUNT")
    batch_location = os.getenv("AZURE_BATCH_LOCATION")
    os.environ["AZURE_BATCH_ENDPOINT"] = (
        f"https://{batch_account}.{batch_location}.{default_azure_batch_endpoint_subdomain}"
    )
    logger.debug(
        f"Set AZURE_BATCH_ENDPOINT = {os.environ['AZURE_BATCH_ENDPOINT']} (from account: {batch_account}, location: {batch_location})"
    )

    keyvault_name = os.getenv("AZURE_KEYVAULT_NAME")
    os.environ["AZURE_KEYVAULT_ENDPOINT"] = (
        f"https://{keyvault_name}.{default_azure_keyvault_endpoint_subdomain}"
    )
    logger.debug(
        f"Set AZURE_KEYVAULT_ENDPOINT = {os.environ['AZURE_KEYVAULT_ENDPOINT']} (from keyvault: {keyvault_name})"
    )

    blob_account = os.getenv("AZURE_BLOB_STORAGE_ACCOUNT")
    os.environ["AZURE_BLOB_STORAGE_ENDPOINT"] = (
        f"https://{blob_account}.{default_azure_blob_storage_endpoint_subdomain}"
    )
    logger.debug(
        f"Set AZURE_BLOB_STORAGE_ENDPOINT = {os.environ['AZURE_BLOB_STORAGE_ENDPOINT']} (from account: {blob_account})"
    )

    registry_account = os.getenv("AZURE_CONTAINER_REGISTRY_ACCOUNT")
    os.environ["ACR_TAG_PREFIX"] = (
        f"{registry_account}.{default_azure_container_registry_domain}/"
    )
    logger.debug(
        f"Set ACR_TAG_PREFIX = {os.environ['ACR_TAG_PREFIX']} (from registry: {registry_account})"
    )

    logger.debug("Completed setting all default Azure environment variables")

cfa.cloudops.endpoints

Helper functions for constructing Azure endpoint URLs.

construct_azure_container_registry_endpoint(azure_container_registry_account, azure_container_registry_domain=d.default_azure_container_registry_domain)

Construct an Azure container registry endpoint URL from the account name and domain.

Parameters:

Name Type Description Default
azure_container_registry_account str

Name of the Azure container registry account.

required
azure_container_registry_domain str

Domain for the Azure container registry. Typically "azurecr.io", the default.

default_azure_container_registry_domain

Returns:

Name Type Description
str str

The registry endpoint URL.

Example

url = construct_azure_container_registry_endpoint("myregistry") print(url) 'https://myregistry.azurecr.io'

url = construct_azure_container_registry_endpoint("myregistry", "custom.domain.io") print(url) 'https://myregistry.custom.domain.io'

Source code in cfa/cloudops/endpoints.py
def construct_azure_container_registry_endpoint(
    azure_container_registry_account: str,
    azure_container_registry_domain: str = d.default_azure_container_registry_domain,
) -> str:
    """Construct an Azure container registry endpoint URL from the account name and domain.

    Args:
        azure_container_registry_account: Name of the Azure container registry account.
        azure_container_registry_domain: Domain for the Azure container registry.
            Typically "azurecr.io", the default.

    Returns:
        str: The registry endpoint URL.

    Example:
        >>> url = construct_azure_container_registry_endpoint("myregistry")
        >>> print(url)
        'https://myregistry.azurecr.io'

        >>> url = construct_azure_container_registry_endpoint("myregistry", "custom.domain.io")
        >>> print(url)
        'https://myregistry.custom.domain.io'
    """
    logger.debug(
        f"Constructing Azure Container Registry endpoint: account='{azure_container_registry_account}', domain='{azure_container_registry_domain}'"
    )

    is_default_domain = (
        azure_container_registry_domain == d.default_azure_container_registry_domain
    )
    logger.debug(
        f"Using {'default' if is_default_domain else 'custom'} container registry domain"
    )

    netloc = f"{azure_container_registry_account}.{azure_container_registry_domain}"
    logger.debug(f"Assembled container registry netloc: '{netloc}'")

    endpoint_url = _construct_https_url(netloc)
    logger.debug(
        f"Successfully constructed Azure Container Registry endpoint: '{endpoint_url}'"
    )

    return endpoint_url

construct_batch_endpoint(batch_account, batch_location, batch_endpoint_subdomain=d.default_azure_batch_endpoint_subdomain)

Construct an Azure Batch endpoint URL from the account name, location, and subdomain.

Parameters:

Name Type Description Default
batch_account str

Name of the Azure batch account.

required
batch_location str

Location of the Azure batch servers, e.g. "eastus".

required
batch_endpoint_subdomain str

Azure batch endpoint subdomains and domains that follow the account and location, e.g. "batch.azure.com/", the default.

default_azure_batch_endpoint_subdomain

Returns:

Name Type Description
str str

The endpoint URL.

Example

url = construct_batch_endpoint("mybatch", "eastus") print(url) 'https://mybatch.eastus.batch.azure.com/'

url = construct_batch_endpoint("mybatch", "westus", "custom.domain.com/") print(url) 'https://mybatch.westus.custom.domain.com/'

Source code in cfa/cloudops/endpoints.py
def construct_batch_endpoint(
    batch_account: str,
    batch_location: str,
    batch_endpoint_subdomain: str = d.default_azure_batch_endpoint_subdomain,
) -> str:
    """Construct an Azure Batch endpoint URL from the account name, location, and subdomain.

    Args:
        batch_account: Name of the Azure batch account.
        batch_location: Location of the Azure batch servers, e.g. "eastus".
        batch_endpoint_subdomain: Azure batch endpoint subdomains and domains
            that follow the account and location, e.g. "batch.azure.com/", the default.

    Returns:
        str: The endpoint URL.

    Example:
        >>> url = construct_batch_endpoint("mybatch", "eastus")
        >>> print(url)
        'https://mybatch.eastus.batch.azure.com/'

        >>> url = construct_batch_endpoint("mybatch", "westus", "custom.domain.com/")
        >>> print(url)
        'https://mybatch.westus.custom.domain.com/'
    """

    is_default_subdomain = (
        batch_endpoint_subdomain == d.default_azure_batch_endpoint_subdomain
    )
    logger.debug(
        f"Using {'default' if is_default_subdomain else 'custom'} batch endpoint subdomain"
    )

    netloc = f"{batch_account}.{batch_location}.{batch_endpoint_subdomain}"
    logger.debug(f"Assembled batch endpoint netloc: '{netloc}'")

    endpoint_url = _construct_https_url(netloc)
    logger.debug(f"Successfully constructed Azure Batch endpoint: '{endpoint_url}'")

    return endpoint_url

construct_blob_account_endpoint(blob_account, blob_endpoint_subdomain=d.default_azure_blob_storage_endpoint_subdomain)

Construct an Azure blob storage account endpoint URL.

Parameters:

Name Type Description Default
blob_account str

Name of the Azure blob storage account.

required
blob_endpoint_subdomain str

Azure blob endpoint subdomains and domains that follow the account, e.g. "blob.core.windows.net/", the default.

default_azure_blob_storage_endpoint_subdomain

Returns:

Name Type Description
str str

The endpoint URL.

Example

url = construct_blob_account_endpoint("mystorageaccount") print(url) 'https://mystorageaccount.blob.core.windows.net/'

url = construct_blob_account_endpoint("mystorageaccount", "custom.blob.domain/") print(url) 'https://mystorageaccount.custom.blob.domain/'

Source code in cfa/cloudops/endpoints.py
def construct_blob_account_endpoint(
    blob_account: str,
    blob_endpoint_subdomain: str = d.default_azure_blob_storage_endpoint_subdomain,
) -> str:
    """Construct an Azure blob storage account endpoint URL.

    Args:
        blob_account: Name of the Azure blob storage account.
        blob_endpoint_subdomain: Azure blob endpoint subdomains and domains
            that follow the account, e.g. "blob.core.windows.net/", the default.

    Returns:
        str: The endpoint URL.

    Example:
        >>> url = construct_blob_account_endpoint("mystorageaccount")
        >>> print(url)
        'https://mystorageaccount.blob.core.windows.net/'

        >>> url = construct_blob_account_endpoint("mystorageaccount", "custom.blob.domain/")
        >>> print(url)
        'https://mystorageaccount.custom.blob.domain/'
    """
    logger.debug(
        f"Constructing Azure Blob account endpoint: account='{blob_account}', subdomain='{blob_endpoint_subdomain}'"
    )

    is_default_subdomain = (
        blob_endpoint_subdomain == d.default_azure_blob_storage_endpoint_subdomain
    )
    logger.debug(
        f"Using {'default' if is_default_subdomain else 'custom'} blob storage subdomain"
    )

    netloc = f"{blob_account}.{blob_endpoint_subdomain}"
    logger.debug(f"Assembled blob account netloc: '{netloc}'")

    endpoint_url = _construct_https_url(netloc)
    logger.debug(
        f"Successfully constructed Azure Blob account endpoint: '{endpoint_url}'"
    )

    return endpoint_url

construct_blob_container_endpoint(blob_container, blob_account, blob_endpoint_subdomain=d.default_azure_blob_storage_endpoint_subdomain)

Construct an endpoint URL for a blob storage container.

Constructs the URL from the container name, account name, and endpoint subdomain.

Parameters:

Name Type Description Default
blob_container str

Name of the blob storage container.

required
blob_account str

Name of the Azure blob storage account.

required
blob_endpoint_subdomain str

Azure Blob endpoint subdomains and domains that follow the account name, e.g. "blob.core.windows.net/", the default.

default_azure_blob_storage_endpoint_subdomain

Returns:

Name Type Description
str str

The endpoint URL.

Example

url = construct_blob_container_endpoint("mycontainer", "mystorageaccount") print(url) 'https://mystorageaccount.blob.core.windows.net/mycontainer'

url = construct_blob_container_endpoint("data", "storage", "custom.blob.domain/") print(url) 'https://storage.custom.blob.domain/data'

Source code in cfa/cloudops/endpoints.py
def construct_blob_container_endpoint(
    blob_container: str,
    blob_account: str,
    blob_endpoint_subdomain: str = d.default_azure_blob_storage_endpoint_subdomain,
) -> str:
    """Construct an endpoint URL for a blob storage container.

    Constructs the URL from the container name, account name, and endpoint subdomain.

    Args:
        blob_container: Name of the blob storage container.
        blob_account: Name of the Azure blob storage account.
        blob_endpoint_subdomain: Azure Blob endpoint subdomains and domains
            that follow the account name, e.g. "blob.core.windows.net/", the default.

    Returns:
        str: The endpoint URL.

    Example:
        >>> url = construct_blob_container_endpoint("mycontainer", "mystorageaccount")
        >>> print(url)
        'https://mystorageaccount.blob.core.windows.net/mycontainer'

        >>> url = construct_blob_container_endpoint("data", "storage", "custom.blob.domain/")
        >>> print(url)
        'https://storage.custom.blob.domain/data'
    """
    logger.debug(
        f"Constructing Azure Blob container endpoint: container='{blob_container}', account='{blob_account}', subdomain='{blob_endpoint_subdomain}'"
    )

    logger.debug("Getting blob account endpoint for container URL construction")
    account_endpoint = construct_blob_account_endpoint(
        blob_account, blob_endpoint_subdomain
    )
    logger.debug(f"Blob account endpoint: '{account_endpoint}'")

    quoted_container = quote(blob_container)
    logger.debug(f"URL-encoded container name: '{quoted_container}'")

    container_endpoint = urljoin(account_endpoint, quoted_container)
    logger.debug(
        f"Successfully constructed Azure Blob container endpoint: '{container_endpoint}'"
    )

    return container_endpoint

is_valid_acr_endpoint(endpoint)

Check whether an Azure container registry endpoint is valid given CFA ACR configurations.

Parameters:

Name Type Description Default
endpoint str

Azure Container Registry endpoint to validate.

required

Returns:

Type Description
tuple[bool, str | None]

tuple[bool, str | None]: First entry: True if validation passes, else False. Second entry: None if validation passes, else a string indicating what failed validation.

Example

valid, error = is_valid_acr_endpoint("https://myregistry.azurecr.io") print(valid) # True print(error) # None

valid, error = is_valid_acr_endpoint("https://myregistry.azurecr.io/") print(valid) # False print("trailing slash" in error) # True

valid, error = is_valid_acr_endpoint("https://azurecr.io") print(valid) # False print("subdomain" in error) # True

Source code in cfa/cloudops/endpoints.py
def is_valid_acr_endpoint(endpoint: str) -> tuple[bool, str | None]:
    """Check whether an Azure container registry endpoint is valid given CFA ACR configurations.

    Args:
        endpoint: Azure Container Registry endpoint to validate.

    Returns:
        tuple[bool, str | None]: First entry: True if validation passes, else False.
            Second entry: None if validation passes, else a string indicating
            what failed validation.

    Example:
        >>> valid, error = is_valid_acr_endpoint("https://myregistry.azurecr.io")
        >>> print(valid)  # True
        >>> print(error)  # None

        >>> valid, error = is_valid_acr_endpoint("https://myregistry.azurecr.io/")
        >>> print(valid)  # False
        >>> print("trailing slash" in error)  # True

        >>> valid, error = is_valid_acr_endpoint("https://azurecr.io")
        >>> print(valid)  # False
        >>> print("subdomain" in error)  # True
    """
    logger.debug(f"Validating Azure Container Registry endpoint: '{endpoint}'")

    logger.debug("Checking for trailing slash in ACR endpoint")
    if endpoint.endswith("/"):
        error_msg = (
            "Azure Container Registry URLs "
            "must not end with a trailing "
            "slash, as this can hamper DNS "
            "lookups of the private registry endpoint. "
            f"Got {endpoint}"
        )
        logger.debug(f"ACR validation failed: trailing slash found - {error_msg}")
        return (False, error_msg)

    logger.debug("Parsing URL to extract domain information")
    domain = urlparse(endpoint).netloc
    logger.debug(f"Extracted domain: '{domain}'")

    logger.debug("Checking if domain ends with 'azurecr.io'")
    if not domain.endswith("azurecr.io"):
        error_msg = (
            "Azure Container Registry URLs "
            "must have the domain "
            f"`azurecr.io`. Got `{domain}`."
        )
        logger.debug(f"ACR validation failed: invalid domain - {error_msg}")
        return (False, error_msg)

    logger.debug("Checking for required subdomain in ACR URL")
    if domain.startswith("azurecr.io"):
        error_msg = (
            "Azure container registry URLs "
            "must have a subdomain, typically "
            "corresponding to the particular "
            "private registry name."
            f"Got {endpoint}"
        )
        logger.debug(f"ACR validation failed: missing subdomain - {error_msg}")
        return (False, error_msg)

    logger.debug(f"ACR endpoint validation passed: '{endpoint}' is valid")
    return (True, None)

cfa.cloudops.job

Utilities for working with Azure Batch jobs.

create_job(client, job, verify_pool=True, exist_ok=False, verbose=False, **kwargs)

Create an Azure Batch job if it does not already exist.

Returns True if the job was created successfully. By default, verifies that the Azure Batch pool specified for the job exists, erroring if the pool cannot be found.

If the job itself already exists, errors by default but can also be configured to proceed without modifying or deleting the existing job.

Parameters:

Name Type Description Default
client BatchServiceClient

BatchServiceClient to use when creating the job.

required
job JobAddParameter

JobAddParameter instance defining the job to add.

required
verify_pool bool

Verify that the specified pool for the job exists before attempting to create the job, and error if it cannot be found. Defaults to True.

True
exist_ok bool

Proceed if the job already exists (without attempting to update/modify/overwrite it)? Defaults to False (error if the job already exists).

False
verbose bool

Message to stdout on success or failure due to job already existing? Defaults to False.

False
**kwargs

Additional keyword arguments passed to azure.batch.BatchServiceClient.job.add.

{}

Returns:

Name Type Description
bool bool

True if the job is successfully created. False if the job already exists and exist_ok is set to True.

Raises:

Type Description
ValueError

If the pool for the job cannot be found and verify_pool is True.

BatchErrorException

If the job exists and exist_ok is not True.

Example

from azure.batch import BatchServiceClient, models client = BatchServiceClient(credentials=..., batch_url=...) job = models.JobAddParameter( ... id="my-job", ... pool_info=models.PoolInformation(pool_id="my-pool") ... )

Create job with pool verification

success = create_job(client, job) print(success) # True if created, False if already exists with exist_ok=True

Create job allowing existing jobs

success = create_job(client, job, exist_ok=True, verbose=True) Job my-job exists.

Source code in cfa/cloudops/job.py
def create_job(
    client: BatchServiceClient,
    job: models.JobAddParameter,
    verify_pool: bool = True,
    exist_ok: bool = False,
    verbose: bool = False,
    **kwargs,
) -> bool:
    """Create an Azure Batch job if it does not already exist.

    Returns True if the job was created successfully. By default, verifies that the
    Azure Batch pool specified for the job exists, erroring if the pool cannot be found.

    If the job itself already exists, errors by default but can also be configured to
    proceed without modifying or deleting the existing job.

    Args:
        client: BatchServiceClient to use when creating the job.
        job: JobAddParameter instance defining the job to add.
        verify_pool: Verify that the specified pool for the job exists before
            attempting to create the job, and error if it cannot be found.
            Defaults to True.
        exist_ok: Proceed if the job already exists (without attempting to
            update/modify/overwrite it)? Defaults to False (error if the job already exists).
        verbose: Message to stdout on success or failure due to job already existing?
            Defaults to False.
        **kwargs: Additional keyword arguments passed to
            ``azure.batch.BatchServiceClient.job.add``.

    Returns:
        bool: True if the job is successfully created. False if the job already
            exists and ``exist_ok`` is set to True.

    Raises:
        ValueError: If the pool for the job cannot be found and ``verify_pool`` is True.
        models.BatchErrorException: If the job exists and ``exist_ok`` is not True.

    Example:
        >>> from azure.batch import BatchServiceClient, models
        >>> client = BatchServiceClient(credentials=..., batch_url=...)
        >>> job = models.JobAddParameter(
        ...     id="my-job",
        ...     pool_info=models.PoolInformation(pool_id="my-pool")
        ... )
        >>>
        >>> # Create job with pool verification
        >>> success = create_job(client, job)
        >>> print(success)  # True if created, False if already exists with exist_ok=True
        >>>
        >>> # Create job allowing existing jobs
        >>> success = create_job(client, job, exist_ok=True, verbose=True)
        Job my-job exists.
    """
    logger.debug(f"Starting create_job for job ID: '{job.id}'")
    logger.debug(
        f"Parameters: verify_pool={verify_pool}, exist_ok={exist_ok}, verbose={verbose}"
    )

    pool_id = job.pool_info.pool_id
    logger.debug(f"Job '{job.id}' configured to use pool: '{pool_id}'")

    if verify_pool:
        logger.debug(f"Pool verification enabled, checking if pool '{pool_id}' exists")
        pool_exists = client.pool.exists(pool_id)
        logger.debug(f"Pool '{pool_id}' exists: {pool_exists}")

        if not pool_exists:
            error_msg = (
                f"Attempt to create job {job.id} on "
                f"pool {pool_id}, but could not find "
                "the requested pool. Check that this "
                "pool id is correct and that a pool "
                "with that id exists"
            )
            logger.debug(f"Pool verification failed: {error_msg}")
            raise ValueError(error_msg)
        else:
            logger.debug(f"Pool verification successful for pool '{pool_id}'")
    else:
        logger.debug("Pool verification disabled, skipping pool existence check")

    logger.debug(f"Attempting to create job '{job.id}' on Azure Batch service")
    if kwargs:
        logger.debug(f"Additional kwargs provided: {list(kwargs.keys())}")
    else:
        logger.debug("No additional kwargs provided")

    try:
        logger.debug(f"Calling client.job.add() for job '{job.id}'")
        client.job.add(job, **kwargs)
        logger.debug(f"Successfully created job '{job.id}' on pool '{pool_id}'")

        if verbose:
            print(f"Created job {job.id}.")

        logger.debug("Job creation completed successfully, returning True")
        return True

    except models.BatchErrorException as e:
        logger.debug(f"BatchErrorException caught: error code = '{e.error.code}'")
        logger.debug(
            f"Job exists check: error code is 'JobExists' = {e.error.code == 'JobExists'}, exist_ok = {exist_ok}"
        )

        if not (e.error.code == "JobExists" and exist_ok):
            logger.debug(
                f"Re-raising BatchErrorException for job '{job.id}': {e.error.code}"
            )
            raise e

        logger.debug(
            f"Job '{job.id}' already exists and exist_ok=True, proceeding without error"
        )
        if verbose:
            print(f"Job {job.id} exists.")

        logger.debug("Job already exists scenario, returning False")
        return False

create_job_schedule(client, cloud_job_schedule, verify_pool=True, exist_ok=False, verbose=False, **kwargs)

Create an Azure Batch job schedule if it does not already exist.

Returns True if the job schedule was created successfully. By default, verifies that the Azure Batch pool specified for the job schedule exists, erroring if the pool cannot be found.

Parameters:

Name Type Description Default
client BatchServiceClient

BatchServiceClient to use when creating the job.

required
cloud_job_schedule JobScheduleAddParameter

JobAdJobScheduleAddParameter instance defining the job schedule to add.

required
verify_pool bool

Verify that the specified pool for the job exists before attempting to create the job schedule, and error if it cannot be found. Defaults to True.

True
exist_ok bool

Proceed if the job schedule already exists (without attempting to update/modify/overwrite it)? Defaults to False (error if the job schedule already exists).

False
verbose bool

Message to stdout on success or failure due to job already existing? Defaults to False.

False
**kwargs

Additional keyword arguments passed to azure.batch.BatchServiceClient.job_schedule.add.

{}

Returns:

Name Type Description
bool bool

True if the job schedule is successfully created. False if the job schedule already exists and exist_ok is set to True.

Raises:

Type Description
ValueError

If the pool for the job cannot be found and verify_pool is True.

BatchErrorException

If the job exists and exist_ok is not True.

Example

from azure.batch import BatchServiceClient, models client = BatchServiceClient(credentials=..., batch_url=...) schedule = models.Schedule( ... recurrence_interval=datetime.timedelta(hours=1), ... do_not_run_until=datetime.datetime.strptime("2025-01-01 08:00:00", "%Y-%m-%d %H:%M:%S") ... do_not_run_after=datetime.datetime.strptime("2025-01-01 17:00:00", "%Y-%m-%d %H:%M:%S") ) job_manager_task = models.JobManagerTask( ... id="my-job-manager-task", ... command_line="/bin/bash -c 'printenv; echo Job manager task starting.'", ... authentication_token_settings=models.AuthenticationTokenSettings( ... access="job" ... ) ... ) job_specification = models.JobSpecification( ... pool_info=models.PoolInformation(pool_id="my-pool"), ... job_manager_task=job_manager_task ) job_schedule_add_param = models.JobScheduleAddParameter( ... id="my-job-schedule", ... display_name="My Job Schedule", ... schedule=schedule, ... job_specification=job_specification, )

Create job with pool verification

success = create_job_schedule(client, job_schedule_add_param) print(success) # True if created, False if already exists with exist_ok=True

Create job allowing existing job schedule

success = create_job_schedule(client, job_schedule_add_param, exist_ok=True, verbose=True) Job schedule my-job-schedule" exists.

Source code in cfa/cloudops/job.py
def create_job_schedule(
    client: BatchServiceClient,
    cloud_job_schedule: models.JobScheduleAddParameter,
    verify_pool: bool = True,
    exist_ok: bool = False,
    verbose: bool = False,
    **kwargs,
) -> bool:
    """Create an Azure Batch job schedule if it does not already exist.

    Returns True if the job schedule was created successfully. By default, verifies that the
    Azure Batch pool specified for the job schedule exists, erroring if the pool cannot be found.

    Args:
        client: BatchServiceClient to use when creating the job.
        cloud_job_schedule: JobAdJobScheduleAddParameter instance defining the job schedule to add.
        verify_pool: Verify that the specified pool for the job exists before
            attempting to create the job schedule, and error if it cannot be found.
            Defaults to True.
        exist_ok: Proceed if the job schedule already exists (without attempting to
            update/modify/overwrite it)? Defaults to False (error if the job schedule already exists).
        verbose: Message to stdout on success or failure due to job already existing?
            Defaults to False.
        **kwargs: Additional keyword arguments passed to
            ``azure.batch.BatchServiceClient.job_schedule.add``.

    Returns:
        bool: True if the job schedule is successfully created. False if the job schedule already
            exists and ``exist_ok`` is set to True.

    Raises:
        ValueError: If the pool for the job cannot be found and ``verify_pool`` is True.
        models.BatchErrorException: If the job exists and ``exist_ok`` is not True.

    Example:
        >>> from azure.batch import BatchServiceClient, models
        >>> client = BatchServiceClient(credentials=..., batch_url=...)
        >>> schedule = models.Schedule(
        ...     recurrence_interval=datetime.timedelta(hours=1),
        ...     do_not_run_until=datetime.datetime.strptime("2025-01-01 08:00:00", "%Y-%m-%d %H:%M:%S")
        ...     do_not_run_after=datetime.datetime.strptime("2025-01-01 17:00:00", "%Y-%m-%d %H:%M:%S")
        >>> )
        >>> job_manager_task = models.JobManagerTask(
        ...     id="my-job-manager-task",
        ...     command_line="/bin/bash -c 'printenv; echo Job manager task starting.'",
        ...     authentication_token_settings=models.AuthenticationTokenSettings(
        ...         access="job"
        ...     )
        ... )
        >>> job_specification = models.JobSpecification(
        ...     pool_info=models.PoolInformation(pool_id="my-pool"),
        ...     job_manager_task=job_manager_task
        >>> )
        >>> job_schedule_add_param = models.JobScheduleAddParameter(
        ...     id="my-job-schedule",
        ...     display_name="My Job Schedule",
        ...     schedule=schedule,
        ...     job_specification=job_specification,
        >>> )
        >>>
        >>> # Create job with pool verification
        >>> success = create_job_schedule(client, job_schedule_add_param)
        >>> print(success)  # True if created, False if already exists with exist_ok=True
        >>>
        >>> # Create job allowing existing job schedule
        >>> success = create_job_schedule(client, job_schedule_add_param, exist_ok=True, verbose=True)
        Job schedule my-job-schedule" exists.
    """
    if verify_pool:
        pool_id = cloud_job_schedule.job_specification.pool_info.pool_id
        if not client.pool.exists(pool_id):
            raise ValueError(
                f"Attempt to create job schedule {cloud_job_schedule.id} on "
                f"pool {pool_id}, but could not find "
                "the requested pool. Check that this "
                "pool id is correct and that a pool "
                "with that id exists"
            )
    try:
        client.job_schedule.add(cloud_job_schedule, **kwargs)
        if verbose:
            print(f"Created job schedule {cloud_job_schedule.id}.")
        return True
    except models.BatchErrorException as e:
        if not exist_ok:
            raise e
        if verbose:
            print(f"Job schedule {cloud_job_schedule.id} exists.")
        return False

cfa.cloudops.task

Functions for manipulating tasks within an Azure batch job.

create_bind_mount_string(az_mount_dir, source_path, target_path)

Create a valid OCI bind mount string for an OCI container running in Azure batch.

Creates a bind mount string for mounting things from Azure blob storage.

Parameters:

Name Type Description Default
az_mount_dir str

Directory in which to look for directories or volumes to mount.

required
source_path str

Path relative to az_mount_dir to use as the source.

required
target_path str

Absolute path within the container to bind to the source path.

required

Returns:

Name Type Description
str str

A properly formatted OCI --mount type=bind command, as a string.

Example

mount_str = create_bind_mount_string( ... "/mnt/batch/tasks/fsmounts", ... "data", ... "/app/data" ... ) print(mount_str) '--mount type=bind,source=/mnt/batch/tasks/fsmounts/data,target=/app/data'

Source code in cfa/cloudops/task.py
def create_bind_mount_string(
    az_mount_dir: str, source_path: str, target_path: str
) -> str:
    """Create a valid OCI bind mount string for an OCI container running in Azure batch.

    Creates a bind mount string for mounting things from Azure blob storage.

    Args:
        az_mount_dir: Directory in which to look for directories or volumes to mount.
        source_path: Path relative to ``az_mount_dir`` to use as the source.
        target_path: Absolute path within the container to bind to the source path.

    Returns:
        str: A properly formatted OCI --mount type=bind command, as a string.

    Example:
        >>> mount_str = create_bind_mount_string(
        ...     "/mnt/batch/tasks/fsmounts",
        ...     "data",
        ...     "/app/data"
        ... )
        >>> print(mount_str)
        '--mount type=bind,source=/mnt/batch/tasks/fsmounts/data,target=/app/data'
    """
    logger.debug(
        f"Creating bind mount string: az_mount_dir='{az_mount_dir}', source_path='{source_path}', target_path='{target_path}'"
    )

    mount_template = "--mount type=bind,source={}/{},target={}"
    logger.debug(f"Using mount template: '{mount_template}'")

    mount_string = mount_template.format(az_mount_dir, source_path, target_path)
    logger.debug(f"Generated bind mount string: '{mount_string}'")

    return mount_string

get_container_settings(container_image_name, az_mount_dir='$AZ_BATCH_NODE_MOUNTS_DIR', working_directory=None, mount_pairs=None, additional_options='', registry=None, **kwargs)

Create a valid set of container settings with bind mounts for an OCI container.

Creates container settings with bind mounts specified in mount_pairs, for an OCI container run in an Azure batch task.

Parameters:

Name Type Description Default
container_image_name str

Name of the OCI container image to use.

required
az_mount_dir str

Directory in which to look for directories or volumes to mount.

'$AZ_BATCH_NODE_MOUNTS_DIR'
working_directory str | ContainerWorkingDirectory

Working directory for the task within the container, passed as the working_directory parameter to the TaskContainerSettings constructor. If None (the default), then defer to the Azure batch default (note that this will not typically be the same as the container image's own WORKDIR). Otherwise specify it with a TaskWorkingDirectory instance or use the string "containerImageDefault" to use the container's own WORKDIR. See the documentation for TaskContainerSettings for more details.

None
mount_pairs list[dict]

Pairs of 'source' and 'target' directories to mount when the container is run, as a list of dictionaries with 'source' and 'target' keys.

None
additional_options str

Additional flags and options to pass to the container run command, as a string. Defaults to "".

''
registry ContainerRegistry

ContainerRegistry instance specifying a private container registry from which to fetch task containers. Defaults to None.

None
**kwargs

Additional keyword arguments passed to the TaskContainerSettings constructor.

{}

Returns:

Name Type Description
TaskContainerSettings TaskContainerSettings

A TaskContainerSettings object instantiated according to the specified input.

Example

mount_pairs = [ ... {"source": "data", "target": "/app/data"}, ... {"source": "output", "target": "/app/output"} ... ] settings = get_container_settings( ... "myregistry.azurecr.io/myapp:latest", ... mount_pairs=mount_pairs, ... additional_options="--env MODE=production" ... ) print(settings.image_name) 'myregistry.azurecr.io/myapp:latest'

Source code in cfa/cloudops/task.py
def get_container_settings(
    container_image_name: str,
    az_mount_dir: str = "$AZ_BATCH_NODE_MOUNTS_DIR",
    working_directory: str | ContainerWorkingDirectory = None,
    mount_pairs: list[dict] = None,
    additional_options: str = "",
    registry: ContainerRegistry = None,
    **kwargs,
) -> TaskContainerSettings:
    """Create a valid set of container settings with bind mounts for an OCI container.

    Creates container settings with bind mounts specified in mount_pairs,
    for an OCI container run in an Azure batch task.

    Args:
        container_image_name: Name of the OCI container image to use.
        az_mount_dir: Directory in which to look for directories or volumes to mount.
        working_directory: Working directory for the task within the container, passed
            as the working_directory parameter to the TaskContainerSettings constructor.
            If None (the default), then defer to the Azure batch default (note that this
            will _not_ typically be the same as the container image's own WORKDIR).
            Otherwise specify it with a TaskWorkingDirectory instance or use the string
            "containerImageDefault" to use the container's own WORKDIR. See the
            documentation for TaskContainerSettings for more details.
        mount_pairs: Pairs of 'source' and 'target' directories to mount when the
            container is run, as a list of dictionaries with 'source' and 'target' keys.
        additional_options: Additional flags and options to pass to the container
            run command, as a string. Defaults to "".
        registry: ContainerRegistry instance specifying a private container registry
            from which to fetch task containers. Defaults to None.
        **kwargs: Additional keyword arguments passed to the TaskContainerSettings constructor.

    Returns:
        TaskContainerSettings: A TaskContainerSettings object instantiated according
            to the specified input.

    Example:
        >>> mount_pairs = [
        ...     {"source": "data", "target": "/app/data"},
        ...     {"source": "output", "target": "/app/output"}
        ... ]
        >>> settings = get_container_settings(
        ...     "myregistry.azurecr.io/myapp:latest",
        ...     mount_pairs=mount_pairs,
        ...     additional_options="--env MODE=production"
        ... )
        >>> print(settings.image_name)
        'myregistry.azurecr.io/myapp:latest'
    """
    logger.debug(f"Creating container settings for image: '{container_image_name}'")
    logger.debug(
        f"Parameters: az_mount_dir='{az_mount_dir}', working_directory={working_directory}"
    )
    logger.debug(
        f"Mount pairs: {mount_pairs}, additional_options='{additional_options}'"
    )

    if registry:
        logger.debug(f"Using private container registry: {registry.registry_server}")
    else:
        logger.debug("No private container registry specified, using default registry")

    ctr_r_opts = additional_options
    logger.debug(f"Starting with base container run options: '{ctr_r_opts}'")

    if mount_pairs:
        logger.debug(f"Processing {len(mount_pairs)} mount pairs")
        for i, pair in enumerate(mount_pairs):
            logger.debug(
                f"Processing mount pair {i + 1}: source='{pair['source']}', target='{pair['target']}'"
            )
            mount_string = create_bind_mount_string(
                az_mount_dir, pair["source"], pair["target"]
            )
            ctr_r_opts += " " + mount_string
            logger.debug(f"Updated container run options: '{ctr_r_opts}'")
    else:
        logger.debug("No mount pairs to process")

    logger.debug(f"Final container run options: '{ctr_r_opts}'")

    container_settings = TaskContainerSettings(
        image_name=container_image_name,
        working_directory=working_directory,
        container_run_options=ctr_r_opts,
        registry=registry,
        **kwargs,
    )

    logger.debug(
        f"Created TaskContainerSettings with image '{container_image_name}' and {len(ctr_r_opts.split()) if ctr_r_opts else 0} run options"
    )

    return container_settings

get_task_config(task_id, base_call, container_settings=None, user_identity=None, log_blob_container=None, log_blob_account=None, log_subdir=None, log_file_pattern='../std*.txt', log_upload_condition='taskCompletion', log_compute_node_identity_reference=None, output_files=None, **kwargs)

Create a batch task with a given base call and set of container settings.

If the user_identity is not set, set it up automatically with sufficient permissions to read and write from mounted volumes.

Parameters:

Name Type Description Default
task_id str

Alphanumeric identifier for the task.

required
base_call str

The base command line call for the task, as a string.

required
container_settings TaskContainerSettings

Container settings for the task. You can use the create_container_settings helper function to create a valid entry. Defaults to None.

None
user_identity UserIdentity

User identity under which to run the task. If None, create one automatically with admin privileges, if permitted. Defaults to None.

None
log_blob_container str

If provided, save the contents of the stderr and stdout buffers (default) and/or other specified log files from task execution to files named in the specified Azure blob storage container. If None, do not preserve the contents of those buffers.

None
log_blob_account str

Azure Blob storage account in which to look for the storage container specified in log_blob_container. Ignored if log_blob_container is None. Defaults to None.

None
log_subdir str

Subdirectory of the Blob storage container given in log_blob_storage_container in which to save the log .txt files. If None, save at the root of the Blob storage container. Ignored if log_blob_container is None.

None
log_file_pattern str

File pattern for logs to persist. Defaults to "../std*.txt", which matches the .txt output files for the stdout and stderr buffers in a standard Azure Batch Linux task, which are stored one directory up from the task working directory. Ignored if log_blob_container is None.

'../std*.txt'
log_upload_condition str

Condition under which to upload logs. Options are "taskCompletion" (always upload, the default), "taskFailure" (upload only for failed tasks), and "taskSuccess" (upload only for successful tasks). Passed as the upload_condition argument to OutputFileUploadOptions.

'taskCompletion'
log_compute_node_identity_reference ComputeNodeIdentityReference

ComputeNodeIdentityReference to use when constructing a OutputFileBlobContainerDestination object for logging. If None (default), attempt to obtain one via get_compute_node_identity_reference. Ignored if log_blob_container is None.

None
output_files list[OutputFile] | OutputFile

OutputFile object or list of such objects specifying additional output files for the task beyond those auto-constructed for persisting logs to log_blob_container. Passed along with those autogenerated OutputFile objects as the output_files parameter to the TaskAddParameter constructor.

None
**kwargs

Additional keyword arguments passed to the TaskAddParameter constructor.

{}

Returns:

Name Type Description
TaskAddParameter TaskAddParameter

The task configuration object.

Example

from azure.batch.models import TaskContainerSettings

Basic task without container

task = get_task_config( ... task_id="my-task-001", ... base_call="python /app/script.py --input data.txt" ... )

Task with container and logging

container_settings = TaskContainerSettings( ... image_name="myregistry.azurecr.io/myapp:latest" ... ) task = get_task_config( ... task_id="my-task-002", ... base_call="python /app/script.py", ... container_settings=container_settings, ... log_blob_container="task-logs", ... log_blob_account="mystorageaccount", ... log_subdir="job-123" ... ) print(task.id) 'my-task-002'

Source code in cfa/cloudops/task.py
def get_task_config(
    task_id: str,
    base_call: str,
    container_settings: TaskContainerSettings = None,
    user_identity: UserIdentity = None,
    log_blob_container: str = None,
    log_blob_account: str = None,
    log_subdir: str = None,
    log_file_pattern: str = "../std*.txt",
    log_upload_condition: str = "taskCompletion",
    log_compute_node_identity_reference: ComputeNodeIdentityReference = None,
    output_files: list[OutputFile] | OutputFile = None,
    **kwargs,
) -> TaskAddParameter:
    """Create a batch task with a given base call and set of container settings.

    If the ``user_identity`` is not set, set it up automatically with sufficient
    permissions to read and write from mounted volumes.

    Args:
        task_id: Alphanumeric identifier for the task.
        base_call: The base command line call for the task, as a string.
        container_settings: Container settings for the task. You can use the
            create_container_settings helper function to create a valid entry.
            Defaults to None.
        user_identity: User identity under which to run the task. If None, create
            one automatically with admin privileges, if permitted. Defaults to None.
        log_blob_container: If provided, save the contents of the stderr and stdout
            buffers (default) and/or other specified log files from task execution
            to files named in the specified Azure blob storage container. If None,
            do not preserve the contents of those buffers.
        log_blob_account: Azure Blob storage account in which to look for the storage
            container specified in ``log_blob_container``. Ignored if ``log_blob_container``
            is None. Defaults to None.
        log_subdir: Subdirectory of the Blob storage container given in
            ``log_blob_storage_container`` in which to save the log ``.txt`` files.
            If None, save at the root of the Blob storage container. Ignored if
            ``log_blob_container`` is None.
        log_file_pattern: File pattern for logs to persist. Defaults to "../std*.txt",
            which matches the ``.txt`` output files for the stdout and stderr buffers
            in a standard Azure Batch Linux task, which are stored one directory up
            from the task working directory. Ignored if ``log_blob_container`` is None.
        log_upload_condition: Condition under which to upload logs. Options are
            "taskCompletion" (always upload, the default), "taskFailure" (upload only
            for failed tasks), and "taskSuccess" (upload only for successful tasks).
            Passed as the ``upload_condition`` argument to OutputFileUploadOptions.
        log_compute_node_identity_reference: ComputeNodeIdentityReference to use when
            constructing a OutputFileBlobContainerDestination object for logging.
            If None (default), attempt to obtain one via get_compute_node_identity_reference.
            Ignored if ``log_blob_container`` is None.
        output_files: OutputFile object or list of such objects specifying additional
            output files for the task beyond those auto-constructed for persisting logs
            to ``log_blob_container``. Passed along with those autogenerated OutputFile
            objects as the ``output_files`` parameter to the TaskAddParameter constructor.
        **kwargs: Additional keyword arguments passed to the TaskAddParameter constructor.

    Returns:
        TaskAddParameter: The task configuration object.

    Example:
        >>> from azure.batch.models import TaskContainerSettings
        >>>
        >>> # Basic task without container
        >>> task = get_task_config(
        ...     task_id="my-task-001",
        ...     base_call="python /app/script.py --input data.txt"
        ... )
        >>>
        >>> # Task with container and logging
        >>> container_settings = TaskContainerSettings(
        ...     image_name="myregistry.azurecr.io/myapp:latest"
        ... )
        >>> task = get_task_config(
        ...     task_id="my-task-002",
        ...     base_call="python /app/script.py",
        ...     container_settings=container_settings,
        ...     log_blob_container="task-logs",
        ...     log_blob_account="mystorageaccount",
        ...     log_subdir="job-123"
        ... )
        >>> print(task.id)
        'my-task-002'
    """
    logger.debug(f"Creating task configuration for task ID: '{task_id}'")
    logger.debug(f"Base command line: '{base_call}'")

    if container_settings:
        logger.debug(
            f"Container settings provided: image='{container_settings.image_name}'"
        )
        if hasattr(container_settings, "registry") and container_settings.registry:
            logger.debug(
                f"Using private registry: {container_settings.registry.registry_server}"
            )
    else:
        logger.debug("No container settings provided, task will run on host")

    if user_identity is None:
        logger.debug(
            "No user identity provided, creating automatic admin user identity"
        )
        user_identity = UserIdentity(
            auto_user=batchmodels.AutoUserSpecification(
                scope=batchmodels.AutoUserScope.pool,
                elevation_level=batchmodels.ElevationLevel.admin,
            )
        )
        logger.debug(
            "Created automatic user identity with pool scope and admin elevation"
        )
    else:
        logger.debug("Using provided user identity")

    if output_files is None:
        output_files = []
        logger.debug("No output files provided, initializing empty list")
    else:
        logger.debug(
            f"Output files provided: {len(ensure_listlike(output_files))} files"
        )

    if log_blob_container is not None:
        logger.debug(
            f"Log blob container specified: '{log_blob_container}' in account '{log_blob_account}'"
        )
        logger.debug(
            f"Log configuration: subdir='{log_subdir}', pattern='{log_file_pattern}', condition='{log_upload_condition}'"
        )

        if log_subdir is None:
            log_subdir = ""
            logger.debug("No log subdirectory specified, using container root")

        log_path = Path(log_subdir, task_id).as_posix()
        logger.debug(f"Log files will be saved to path: '{log_path}'")

        log_output_files = output_task_files_to_blob(
            file_pattern=log_file_pattern,
            blob_container=log_blob_container,
            blob_account=log_blob_account,
            path=log_path,
            upload_condition=log_upload_condition,
            compute_node_identity_reference=log_compute_node_identity_reference,
        )
        logger.debug("Successfully created log output file configuration")
    else:
        log_output_files = []
        logger.debug(
            "No log blob container specified, task logs will not be persisted to blob storage"
        )

    total_output_files = ensure_listlike(output_files) + ensure_listlike(
        log_output_files
    )
    logger.debug(
        f"Total output files configured: {len(total_output_files)} ({len(ensure_listlike(output_files))} custom + {len(ensure_listlike(log_output_files))} log files)"
    )

    if kwargs:
        logger.debug(f"Additional TaskAddParameter kwargs: {list(kwargs.keys())}")

    task_config = TaskAddParameter(
        id=task_id,
        command_line=base_call,
        container_settings=container_settings,
        user_identity=user_identity,
        output_files=total_output_files,
        **kwargs,
    )

    logger.debug(
        f"Successfully created TaskAddParameter for task '{task_id}' with {len(total_output_files)} output files"
    )

    return task_config

output_task_files_to_blob(file_pattern, blob_container, blob_account, path=None, upload_condition='taskCompletion', blob_endpoint_subdomain=default_azure_blob_storage_endpoint_subdomain, compute_node_identity_reference=None, **kwargs)

Get a properly configured OutputFile object for uploading files from a Batch task to Blob storage.

Parameters:

Name Type Description Default
file_pattern str

File pattern to match when uploading. Passed as the file_pattern argument to OutputFile.

required
blob_container str

Name of the Azure blob storage container to which to upload the files.

required
blob_account str

Name of the Azure blob storage account in which to look for the Blob storage container specified in blob_container.

required
path str

Path within the Blob storage container to which to upload the file(s). Passed as the path argument to the OutputFileBlobContainerDestination constructor. If None, upload to the root of the container. If file_pattern contains wildcards, path gives the subdirectory within the container to upload them with their original filenames and extensions. If file_pattern contains no wildcards, path is treated as the full file path including filename and extension (i.e. the file is renamed). See OutputFileBlobContainerDestination for details.

None
upload_condition str

Condition under which to upload the file(s). Options are "taskCompletion" (always upload, the default), "taskFailure" (upload only for failed tasks), and "taskSuccess" (upload only for successful tasks). Passed as the upload_condition argument to OutputFileUploadOptions.

'taskCompletion'
blob_endpoint_subdomain str

Azure Blob endpoint subdomains and domains that follow the account name. If None (default), use this package's default_azure_blob_storage_endpoint_subdomain.

default_azure_blob_storage_endpoint_subdomain
compute_node_identity_reference ComputeNodeIdentityReference

ComputeNodeIdentityReference to use when constructing a OutputFileBlobContainerDestination object for logging. If None (default), attempt to obtain one via get_compute_node_identity_reference.

None
**kwargs

Additional keyword arguments passed to the OutputFile constructor.

{}

Returns:

Name Type Description
OutputFile OutputFile

An OutputFile object that can be used in constructing a batch task via get_task_config.

Raises:

Type Description
TypeError

If compute_node_identity_reference is not of the required type.

Example

output_file = output_task_files_to_blob( ... file_pattern=".log", ... blob_container="task-outputs", ... blob_account="mystorageaccount", ... path="logs/task-123", ... upload_condition="taskCompletion" ... ) print(output_file.file_pattern) '.log'

Source code in cfa/cloudops/task.py
def output_task_files_to_blob(
    file_pattern: str,
    blob_container: str,
    blob_account: str,
    path: str = None,
    upload_condition: str = "taskCompletion",
    blob_endpoint_subdomain: str = default_azure_blob_storage_endpoint_subdomain,
    compute_node_identity_reference: ComputeNodeIdentityReference = None,
    **kwargs,
) -> OutputFile:
    """Get a properly configured OutputFile object for uploading files from a Batch task to Blob storage.

    Args:
        file_pattern: File pattern to match when uploading. Passed as the
            ``file_pattern`` argument to OutputFile.
        blob_container: Name of the Azure blob storage container to which
            to upload the files.
        blob_account: Name of the Azure blob storage account in which to look for
            the Blob storage container specified in ``blob_container``.
        path: Path within the Blob storage container to which to upload the file(s).
            Passed as the ``path`` argument to the OutputFileBlobContainerDestination
            constructor. If None, upload to the root of the container. If ``file_pattern``
            contains wildcards, ``path`` gives the subdirectory within the container to
            upload them with their original filenames and extensions. If ``file_pattern``
            contains no wildcards, ``path`` is treated as the full file path including
            filename and extension (i.e. the file is renamed). See
            OutputFileBlobContainerDestination for details.
        upload_condition: Condition under which to upload the file(s). Options are
            "taskCompletion" (always upload, the default), "taskFailure" (upload only
            for failed tasks), and "taskSuccess" (upload only for successful tasks).
            Passed as the ``upload_condition`` argument to OutputFileUploadOptions.
        blob_endpoint_subdomain: Azure Blob endpoint subdomains and domains that follow
            the account name. If None (default), use this package's
            default_azure_blob_storage_endpoint_subdomain.
        compute_node_identity_reference: ComputeNodeIdentityReference to use when
            constructing a OutputFileBlobContainerDestination object for logging.
            If None (default), attempt to obtain one via get_compute_node_identity_reference.
        **kwargs: Additional keyword arguments passed to the OutputFile constructor.

    Returns:
        OutputFile: An OutputFile object that can be used in constructing a
            batch task via get_task_config.

    Raises:
        TypeError: If ``compute_node_identity_reference`` is not of the required type.

    Example:
        >>> output_file = output_task_files_to_blob(
        ...     file_pattern="*.log",
        ...     blob_container="task-outputs",
        ...     blob_account="mystorageaccount",
        ...     path="logs/task-123",
        ...     upload_condition="taskCompletion"
        ... )
        >>> print(output_file.file_pattern)
        '*.log'
    """
    logger.debug(f"Creating output file configuration for pattern: '{file_pattern}'")
    logger.debug(
        f"Target blob container: '{blob_container}' in account: '{blob_account}'"
    )
    logger.debug(f"Upload path: '{path}', upload condition: '{upload_condition}'")
    logger.debug(f"Blob endpoint subdomain: '{blob_endpoint_subdomain}'")

    if compute_node_identity_reference is None:
        logger.debug("No compute node identity reference provided, obtaining default")
        compute_node_identity_reference = get_compute_node_identity_reference()
        logger.debug("Successfully obtained default compute node identity reference")
    else:
        logger.debug("Using provided compute node identity reference")

    logger.debug(
        f"Validating compute node identity reference type: {type(compute_node_identity_reference)}"
    )
    if not isinstance(compute_node_identity_reference, ComputeNodeIdentityReference):
        error_msg = (
            "compute_node_identity_reference "
            "must be an instance of "
            "ComputeNodeIdentityReference. "
            f"Got {type(compute_node_identity_reference)}."
        )
        logger.debug(f"Type validation failed: {error_msg}")
        raise TypeError(error_msg)

    logger.debug("Compute node identity reference validation successful")

    container_url = construct_blob_container_endpoint(
        blob_container,
        blob_account,
        blob_endpoint_subdomain,
    )
    logger.debug(f"Constructed container URL: '{container_url}'")

    container = OutputFileBlobContainerDestination(
        container_url=container_url,
        path=path,
        identity_reference=compute_node_identity_reference,
    )
    logger.debug(f"Created OutputFileBlobContainerDestination with path: '{path}'")

    destination = OutputFileDestination(container=container)
    logger.debug("Created OutputFileDestination wrapper")

    upload_options = OutputFileUploadOptions(upload_condition=upload_condition)
    logger.debug(f"Created upload options with condition: '{upload_condition}'")

    output_file = OutputFile(
        file_pattern=file_pattern,
        destination=destination,
        upload_options=upload_options,
        **kwargs,
    )

    logger.debug(
        f"Successfully created OutputFile for pattern '{file_pattern}' -> '{blob_container}/{path or ''}'"
    )

    return output_file

cfa.cloudops.util

Miscellaneous utilities for interacting with Azure.

ensure_listlike(x)

Ensure that an object either behaves like a MutableSequence or return a one-item list.

If the object is not a MutableSequence, return a one-item list containing the object. Useful for handling list-of-strings inputs alongside single strings.

Based on this StackOverflow approach <https://stackoverflow.com/a/66485952>_.

Parameters:

Name Type Description Default
x any

The item to ensure is list-like.

required

Returns:

Name Type Description
MutableSequence MutableSequence

x if x is a MutableSequence, otherwise [x] (i.e. a one-item list containing x).

Example

Single string becomes a list

result = ensure_listlike("hello") print(result) ['hello']

List stays a list

result = ensure_listlike(["hello", "world"]) print(result) ['hello', 'world']

Works with other types too

result = ensure_listlike(42) print(result) [42]

Source code in cfa/cloudops/util.py
def ensure_listlike(x: any) -> MutableSequence:
    """Ensure that an object either behaves like a MutableSequence or return a one-item list.

    If the object is not a MutableSequence, return a one-item list containing the object.
    Useful for handling list-of-strings inputs alongside single strings.

    Based on this `StackOverflow approach <https://stackoverflow.com/a/66485952>`_.

    Args:
        x: The item to ensure is list-like.

    Returns:
        MutableSequence: ``x`` if ``x`` is a MutableSequence, otherwise ``[x]``
            (i.e. a one-item list containing ``x``).

    Example:
        >>> # Single string becomes a list
        >>> result = ensure_listlike("hello")
        >>> print(result)
        ['hello']

        >>> # List stays a list
        >>> result = ensure_listlike(["hello", "world"])
        >>> print(result)
        ['hello', 'world']

        >>> # Works with other types too
        >>> result = ensure_listlike(42)
        >>> print(result)
        [42]
    """
    logger.debug(f"Ensuring input is list-like: type={type(x)}, value={repr(x)}")

    is_mutable_sequence = isinstance(x, MutableSequence)
    logger.debug(f"Input is MutableSequence: {is_mutable_sequence}")

    if is_mutable_sequence:
        logger.debug(f"Returning original MutableSequence with {len(x)} items")
        return x
    else:
        logger.debug("Converting single item to list")
        result = [x]
        logger.debug(f"Created list with 1 item: {repr(result)}")
        return result

lookup_available_vm_skus_for_batch(client=None, config_dict=None, try_env=True, to_dict=True, **kwargs)

Look up available VM image SKUs for the given batch service.

Parameters:

Name Type Description Default
client BatchManagementClient

BatchManagementClient to use when looking up the available images. If None, use the output of get_batch_management_client(). Defaults to None.

None
config_dict dict

Configuration dictionary. Passed as the config_dict argument to any internal config.get_config_val calls. See that function's documentation for additional details.

None
try_env bool

Whether to look for configuration values in the available environment variables. Passed as the try_env argument to any internal config.get_config_val calls. See that function's documentation for additional details.

True
to_dict bool

Apply sku_to_dict to the list of results? Defaults to True. If False, the result will be a list of SupportedSku objects.

True
**kwargs

Additional keyword arguments passed to BatchManagementClient.location.list_supported_virtual_machine_skus.

{}

Returns:

Name Type Description
list

Of supported SKUs, either as dictionaries of property names and values (default) or as raw SupportedSku objects (if to_dict is False).

Example

from azure.mgmt.batch import BatchManagementClient

Get SKUs as dictionaries (default)

skus = lookup_available_vm_skus_for_batch() for sku in skus[:3]: # Show first 3 ... print(f"Name: {sku['name']}, vCPUs: {sku.get('vCPUs', 'N/A')}")

Get raw SupportedSku objects

raw_skus = lookup_available_vm_skus_for_batch(to_dict=False) print(f"Found {len(raw_skus)} available VM SKUs") print(f"First SKU: {raw_skus[0].name}")

Source code in cfa/cloudops/util.py
def lookup_available_vm_skus_for_batch(
    client: BatchManagementClient = None,
    config_dict: dict = None,
    try_env: bool = True,
    to_dict: bool = True,
    **kwargs,
):
    """Look up available VM image SKUs for the given batch service.

    Args:
        client: BatchManagementClient to use when looking up the available images.
            If None, use the output of ``get_batch_management_client()``. Defaults to None.
        config_dict: Configuration dictionary. Passed as the ``config_dict`` argument
            to any internal ``config.get_config_val`` calls. See that function's
            documentation for additional details.
        try_env: Whether to look for configuration values in the available environment
            variables. Passed as the ``try_env`` argument to any internal
            ``config.get_config_val`` calls. See that function's documentation for
            additional details.
        to_dict: Apply ``sku_to_dict`` to the list of results? Defaults to True.
            If False, the result will be a list of SupportedSku objects.
        **kwargs: Additional keyword arguments passed to
            ``BatchManagementClient.location.list_supported_virtual_machine_skus``.

    Returns:
        list: Of supported SKUs, either as dictionaries of property names and values
            (default) or as raw SupportedSku objects (if ``to_dict`` is False).

    Example:
        >>> from azure.mgmt.batch import BatchManagementClient
        >>>
        >>> # Get SKUs as dictionaries (default)
        >>> skus = lookup_available_vm_skus_for_batch()
        >>> for sku in skus[:3]:  # Show first 3
        ...     print(f"Name: {sku['name']}, vCPUs: {sku.get('vCPUs', 'N/A')}")

        >>> # Get raw SupportedSku objects
        >>> raw_skus = lookup_available_vm_skus_for_batch(to_dict=False)
        >>> print(f"Found {len(raw_skus)} available VM SKUs")
        >>> print(f"First SKU: {raw_skus[0].name}")
    """
    logger.debug("Looking up available VM SKUs for batch service")
    logger.debug(
        f"Parameters: client={client is not None}, to_dict={to_dict}, try_env={try_env}"
    )

    if kwargs:
        logger.debug(f"Additional kwargs provided: {list(kwargs.keys())}")

    if client is None:
        logger.debug("No client provided, creating BatchManagementClient")
        from .client import get_batch_management_client

        client = get_batch_management_client(config_dict=config_dict, try_env=try_env)
        logger.debug("Successfully created BatchManagementClient")
    else:
        logger.debug("Using provided BatchManagementClient")

    batch_location = get_config_val(
        "azure_batch_location",
        config_dict=config_dict,
        try_env=try_env,
    )
    logger.debug(f"Using Azure Batch location: '{batch_location}'")

    logger.debug("Calling Azure API to list supported virtual machine SKUs")
    try:
        sku_iterator = client.location.list_supported_virtual_machine_skus(
            location_name=batch_location,
            **kwargs,
        )
        result = [item for item in sku_iterator]
        logger.debug(f"Successfully retrieved {len(result)} VM SKUs from Azure API")

    except Exception as e:
        logger.debug(f"Failed to retrieve VM SKUs from Azure API: {str(e)}")
        raise

    if result:
        sample_sku_names = [sku.name for sku in result[:3]]
        logger.debug(f"Sample SKU names: {sample_sku_names}")
    else:
        logger.debug("No VM SKUs returned from Azure API")

    if to_dict:
        logger.debug("Converting SupportedSku objects to dictionaries")
        result = [sku_to_dict(x) for x in result]
        logger.debug(f"Successfully converted {len(result)} SKUs to dictionaries")
    else:
        logger.debug("Returning raw SupportedSku objects")

    logger.debug(
        f"Returning {len(result)} VM SKUs (as {'dictionaries' if to_dict else 'SupportedSku objects'})"
    )

    return result

lookup_service_principal(display_name)

Look up an Azure service principal from its display name.

Requires the Azure CLI.

Parameters:

Name Type Description Default
display_name str

The display name of the service principal to look up.

required

Returns:

Name Type Description
list list

The results, if any, or an empty list if no match was found.

Raises:

Type Description
RuntimeError

If the Azure CLI command fails or is not available.

Example

Look up a service principal by display name

sp_list = lookup_service_principal("my-service-principal") if sp_list: ... print(f"Found {len(sp_list)} service principal(s)") ... print(f"App ID: {sp_list[0]['appId']}") else: ... print("No service principal found")

Source code in cfa/cloudops/util.py
def lookup_service_principal(display_name: str) -> list:
    """Look up an Azure service principal from its display name.

    Requires the Azure CLI.

    Args:
        display_name: The display name of the service principal to look up.

    Returns:
        list: The results, if any, or an empty list if no match was found.

    Raises:
        RuntimeError: If the Azure CLI command fails or is not available.

    Example:
        >>> # Look up a service principal by display name
        >>> sp_list = lookup_service_principal("my-service-principal")
        >>> if sp_list:
        ...     print(f"Found {len(sp_list)} service principal(s)")
        ...     print(f"App ID: {sp_list[0]['appId']}")
        >>> else:
        ...     print("No service principal found")
    """
    logger.debug(
        f"Looking up Azure service principal with display name: '{display_name}'"
    )

    try:
        command = [f"az ad sp list --display-name {display_name}"]
        logger.debug(f"Executing Azure CLI command: {command[0]}")

        result = subprocess.check_output(
            command, shell=True, universal_newlines=True, text=True
        )
        logger.debug(
            f"Azure CLI command executed successfully, result length: {len(result)} characters"
        )

    except Exception as e:
        error_msg = (
            "Attempt to search available Azure "
            "service principals via the "
            "`az ad sp list` command produced an "
            "error. Check that you have the Azure "
            "command line interface (CLI) installed "
            "and in your PATH as `az`, and that you "
            "are authenticated via `az login`"
        )
        logger.debug(f"Azure CLI command failed: {str(e)}")
        raise RuntimeError(error_msg) from e

    logger.debug("Parsing JSON response from Azure CLI")
    parsed = json.loads(result)
    logger.debug(f"Successfully parsed JSON, found {len(parsed)} service principal(s)")

    if parsed:
        logger.debug(
            f"Service principal details: {[sp.get('appId', 'No appId') for sp in parsed]}"
        )
    else:
        logger.debug("No service principals found matching the display name")

    return parsed

sku_to_dict(sku)

Convert a SupportedSku object to a flat dictionary of property names and values.

Parameters:

Name Type Description Default
sku SupportedSku

The SupportedSku object to convert.

required

Returns:

Name Type Description
dict

A flat dictionary with keys 'name', 'family_name', 'batch_support_end_of_life', 'additional_properties', as well as keys and values corresponding to any SkuCapability objects associated to the SupportedSku.

Example

from azure.mgmt.batch.models import SupportedSku

Assuming we have a SupportedSku object

sku_dict = sku_to_dict(some_sku) print(sku_dict['name']) # e.g., 'Standard_D2s_v3' print(sku_dict['family_name']) # e.g., 'standardDSv3Family' print(sku_dict.get('vCPUs')) # e.g., '2' (from capabilities)

Source code in cfa/cloudops/util.py
def sku_to_dict(sku: SupportedSku):
    """Convert a SupportedSku object to a flat dictionary of property names and values.

    Args:
        sku: The SupportedSku object to convert.

    Returns:
        dict: A flat dictionary with keys 'name', 'family_name',
            'batch_support_end_of_life', 'additional_properties', as well as keys
            and values corresponding to any SkuCapability objects associated to
            the SupportedSku.

    Example:
        >>> from azure.mgmt.batch.models import SupportedSku
        >>> # Assuming we have a SupportedSku object
        >>> sku_dict = sku_to_dict(some_sku)
        >>> print(sku_dict['name'])  # e.g., 'Standard_D2s_v3'
        >>> print(sku_dict['family_name'])  # e.g., 'standardDSv3Family'
        >>> print(sku_dict.get('vCPUs'))  # e.g., '2' (from capabilities)
    """
    logger.debug(
        f"Converting SupportedSku to dictionary: name='{sku.name}', family='{sku.family_name}'"
    )
    logger.debug(f"SKU batch support end of life: {sku.batch_support_end_of_life}")

    if hasattr(sku, "capabilities") and sku.capabilities:
        capabilities_count = len(sku.capabilities)
        logger.debug(f"Processing {capabilities_count} SKU capabilities")
        capabilities_dict = {c.name: c.value for c in sku.capabilities}
        logger.debug(f"SKU capabilities: {list(capabilities_dict.keys())}")
    else:
        logger.debug("No SKU capabilities found")
        capabilities_dict = {}

    if hasattr(sku, "additional_properties") and sku.additional_properties:
        logger.debug(
            f"Additional properties present: {list(sku.additional_properties.keys())}"
        )
    else:
        logger.debug("No additional properties found")

    result_dict = dict(
        name=sku.name,
        family_name=sku.family_name,
        batch_support_end_of_life=sku.batch_support_end_of_life,
        additional_properties=sku.additional_properties,
        **capabilities_dict,
    )

    logger.debug(
        f"Successfully converted SKU to dictionary with {len(result_dict)} keys"
    )

    return result_dict