cfa-cloudops Modules API Reference¶
cfa.cloudops.auth
¶
Helper functions for Azure authentication.
CredentialHandler
dataclass
¶
Data structure for Azure credentials.
Lazy and cached: credentials are retrieved from a keyvault only when needed and are cached thereafter.
Source code in cfa/cloudops/auth.py
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 | |
azure_batch_endpoint
property
¶
Azure batch endpoint URL.
Constructed programmatically from account name, location, and subdomain.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The endpoint URL. |
Example
handler = CredentialHandler() handler.azure_batch_account = "mybatchaccount" handler.azure_batch_location = "eastus" handler.azure_batch_endpoint_subdomain = "batch.azure.com" handler.azure_batch_endpoint 'https://mybatchaccount.eastus.batch.azure.com'
azure_blob_storage_endpoint
property
¶
Azure blob storage endpoint URL.
Constructed programmatically from the account name and endpoint subdomain.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The endpoint URL. |
Example
handler = CredentialHandler() handler.azure_blob_storage_account = "mystorageaccount" handler.azure_blob_storage_endpoint_subdomain = "blob.core.windows.net" handler.azure_blob_storage_endpoint 'https://mystorageaccount.blob.core.windows.net'
azure_container_registry
cached
property
¶
An object pointing to an Azure Container Registry.
Specifically, a ContainerRegistry instance corresponding to the particular Azure Container Registry account specified in the CredentialHandler, if any, with authentication via the compute_node_identity_reference defined by CredentialHandler, if any.
Returns:
| Type | Description |
|---|---|
|
models.ContainerRegistry: A properly instantiated ContainerRegistry object. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the container registry endpoint is invalid. |
Example
handler = CredentialHandler()
Set required attributes...¶
registry = handler.azure_container_registry
azure_container_registry_endpoint
property
¶
Azure container registry endpoint URL.
Constructed programmatically from the account name and registry domain.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The endpoint URL. |
Example
handler = CredentialHandler() handler.azure_container_registry_account = "myregistry" handler.azure_container_registry_domain = "azurecr.io" handler.azure_container_registry_endpoint 'myregistry.azurecr.io'
batch_service_principal_credentials
cached
property
¶
Service Principal credentials for authenticating to Azure Batch.
Returns:
| Name | Type | Description |
|---|---|---|
ServicePrincipalCredentials |
The credentials configured for Azure Batch access. |
client_secret_credential
cached
property
¶
A client secret credential created using the azure_client_secret attribute.
Returns:
| Name | Type | Description |
|---|---|---|
ClientSecretCredential |
The credential configured with client secret details. |
Example
handler = CredentialHandler() handler.azure_tenant_id = "tenant-id" handler.azure_client_id = "client-id" handler.azure_client_secret = "client-secret" #pragma: allowlist secret credential = handler.client_secret_credential
client_secret_sp_credential
cached
property
¶
A client secret credential created using the service principal secret.
Returns:
| Name | Type | Description |
|---|---|---|
ClientSecretCredential |
The credential configured with service principal details. |
compute_node_identity_reference
cached
property
¶
An object defining a compute node identity reference.
Specifically, a ComputeNodeIdentityReference object associated to the CredentialHandler's user-assigned identity.
Returns:
| Type | Description |
|---|---|
|
models.ComputeNodeIdentityReference: The identity reference. |
Example
handler = CredentialHandler() handler.azure_user_assigned_identity = "/subscriptions/.../resourceGroups/..." identity_ref = handler.compute_node_identity_reference
service_principal_secret
cached
property
¶
A service principal secret retrieved from Azure Key Vault.
Returns:
| Name | Type | Description |
|---|---|---|
str |
The secret value. |
Example
handler = CredentialHandler() handler.azure_keyvault_endpoint = "https://myvault.vault.azure.net/" handler.azure_keyvault_sp_secret_id = "my-secret" secret = handler.service_principal_secret
user_credential
cached
property
¶
Azure user credential.
Returns:
| Name | Type | Description |
|---|---|---|
ManagedIdentityCredential |
ManagedIdentityCredential
|
The Azure user credential using ManagedIdentityCredential. |
Example
handler = CredentialHandler() credential = handler.user_credential
Use credential with Azure SDK clients¶
require_attr(attributes, goal=None)
¶
Check that attributes required for a given operation are defined.
Raises an informative error message if the required attribute is not defined.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
attributes
|
str | list[str]
|
String or list of strings naming the required attribute(s). |
required |
goal
|
str
|
String naming the value that the attributes are required for obtaining, to make error messages more informative. If None, use a more generic message. |
None
|
Raises:
| Type | Description |
|---|---|
AttributeError
|
If any required |
Example
handler = CredentialHandler() handler.require_attr(["azure_tenant_id"], "authentication") AttributeError: A non-None value for attribute azure_tenant_id is required...
Source code in cfa/cloudops/auth.py
DefaultCredential
¶
Bases: BasicTokenAuthentication
Source code in cfa/cloudops/auth.py
set_token()
¶
Ask the azure-core BearerTokenCredentialPolicy policy to get a token. Using the policy gives us for free the caching system of azure-core.
Source code in cfa/cloudops/auth.py
EnvCredentialHandler
¶
Bases: CredentialHandler
Azure Credentials populated from available environment variables.
Subclass of CredentialHandler that populates attributes from environment variables at instantiation, with the opportunity to override those values via keyword arguments passed to the constructor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dotenv_path
|
str
|
Path to .env file to load environment variables from. If None, uses default .env file discovery. |
None
|
**kwargs
|
Keyword arguments defining additional attributes or overriding
those set in the environment variables. Passed as the |
{}
|
Example
Load from environment variables¶
handler = EnvCredentialHandler()
Override specific values¶
handler = EnvCredentialHandler(azure_tenant_id="custom-tenant-id")
Load from custom .env file¶
handler = EnvCredentialHandler(dotenv_path="/path/to/.env")
Source code in cfa/cloudops/auth.py
__init__(dotenv_path=None, **kwargs)
¶
Initialize the EnvCredentialHandler.
Loads environment variables from .env file and populates credential attributes from them.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dotenv_path
|
str
|
Path to .env file to load environment variables from. If None, uses default .env file discovery. |
None
|
**kwargs
|
Additional keyword arguments to override specific credential attributes. |
{}
|
Source code in cfa/cloudops/auth.py
SPCredentialHandler
¶
Bases: CredentialHandler
Source code in cfa/cloudops/auth.py
561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 | |
__init__(azure_tenant_id=None, azure_subscription_id=None, azure_client_id=None, azure_client_secret=None, dotenv_path=None, **kwargs)
¶
Initialize a Service Principal Credential Handler.
Creates a credential handler that uses Azure Service Principal authentication for accessing Azure resources. Credentials can be provided directly as parameters or loaded from environment variables. If not provided directly, the handler will attempt to load credentials from environment variables or a .env file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
azure_tenant_id
|
str
|
Azure Active Directory tenant ID. If None, will attempt to load from AZURE_TENANT_ID environment variable. |
None
|
azure_subscription_id
|
str
|
Azure subscription ID. If None, will attempt to load from AZURE_SUBSCRIPTION_ID environment variable. |
None
|
azure_client_id
|
str
|
Azure Service Principal client ID (application ID). If None, will attempt to load from AZURE_CLIENT_ID environment variable. |
None
|
azure_client_secret
|
str
|
Azure Service Principal client secret. If None, will attempt to load from AZURE_CLIENT_SECRET environment variable. |
None
|
dotenv_path
|
str
|
Path to .env file to load environment variables from. If None, uses default .env file discovery. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If AZURE_TENANT_ID is not found in environment variables and not provided as parameter. |
ValueError
|
If AZURE_SUBSCRIPTION_ID is not found in environment variables and not provided as parameter. |
ValueError
|
If AZURE_CLIENT_ID is not found in environment variables and not provided as parameter. |
ValueError
|
If AZURE_CLIENT_SECRET is not found in environment variables and not provided as parameter. |
Example
Using direct parameters¶
handler = SPCredentialHandler( ... azure_tenant_id="12345678-1234-1234-1234-123456789012", ... azure_subscription_id="87654321-4321-4321-4321-210987654321", ... azure_client_id="abcdef12-3456-7890-abcd-ef1234567890", ... azure_client_secret="your-secret-here" #pragma: allowlist secret ... )
Using environment variables¶
handler = SPCredentialHandler() # Loads from env vars
Using custom .env file¶
handler = SPCredentialHandler(dotenv_path="/path/to/.env")
Source code in cfa/cloudops/auth.py
562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 | |
get_client_secret_sp_credential(vault_url, vault_sp_secret_id, tenant_id, application_id, user_credential=None)
¶
Get a ClientSecretCredential for a given Azure service principal.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
vault_url
|
str
|
URL for the Azure keyvault to access. |
required |
vault_sp_secret_id
|
str
|
Service principal secret ID within the keyvault. |
required |
tenant_id
|
str
|
Tenant ID for the service principal credential. |
required |
application_id
|
str
|
Application ID for the service principal credential. |
required |
user_credential
|
User credential for the Azure user, as an azure-identity
credential class instance. Passed to |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
ClientSecretCredential |
ClientSecretCredential
|
A ClientSecretCredential for the given service principal. |
Example
credential = get_client_secret_sp_credential( ... "https://myvault.vault.azure.net/", ... "my-secret-id", ... "tenant-id", ... "application-id" ... )
Source code in cfa/cloudops/auth.py
get_compute_node_identity_reference(credential_handler=None)
¶
Get a valid ComputeNodeIdentityReference using credentials from a CredentialHandler.
Uses credentials obtained via a CredentialHandler: either a user-provided one or a default based on environment variables.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
credential_handler
|
CredentialHandler
|
Credential handler for connecting and authenticating to Azure resources. If None, create a blank EnvCredentialHandler, which attempts to obtain needed credentials using information available in local environment variables (see its documentation for details). |
None
|
Returns:
| Type | Description |
|---|---|
ComputeNodeIdentityReference
|
models.ComputeNodeIdentityReference: A ComputeNodeIdentityReference created according to the specified configuration. |
Example
Using default environment-based handler¶
identity_ref = get_compute_node_identity_reference()
Using custom handler¶
handler = CredentialHandler() identity_ref = get_compute_node_identity_reference(handler)
Source code in cfa/cloudops/auth.py
get_service_principal_credentials(vault_url, vault_sp_secret_id, tenant_id, application_id, resource_url=d.default_azure_batch_resource_url, user_credential=None)
¶
Get a ServicePrincipalCredentials object for a given Azure service principal.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
vault_url
|
str
|
URL for the Azure keyvault to access. |
required |
vault_sp_secret_id
|
str
|
Service principal secret ID within the keyvault. |
required |
tenant_id
|
str
|
Tenant ID for the service principal credential. |
required |
application_id
|
str
|
Application ID for the service principal credential. |
required |
resource_url
|
str
|
URL of the Azure resource. Defaults to the value of
|
default_azure_batch_resource_url
|
user_credential
|
User credential for the Azure user, as an azure-identity
credential class instance. Passed to |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
ServicePrincipalCredentials |
ServicePrincipalCredentials
|
A ServicePrincipalCredentials object for the service principal. |
Example
credentials = get_service_principal_credentials( ... "https://myvault.vault.azure.net/", ... "my-secret-id", ... "tenant-id", ... "application-id" ... )
Source code in cfa/cloudops/auth.py
get_sp_secret(vault_url, vault_sp_secret_id, user_credential=None)
¶
Get a service principal secret from an Azure keyvault.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
vault_url
|
str
|
URL for the Azure keyvault to access. |
required |
vault_sp_secret_id
|
str
|
Service principal secret ID within the keyvault. |
required |
user_credential
|
User credential for the Azure user, as an azure-identity credential class instance. If None, will use a ManagedIdentityCredential instantiated at runtime. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The retrieved value of the service principal secret. |
Example
secret = get_sp_secret( ... "https://myvault.vault.azure.net/", ... "my-secret-id" ... )
Source code in cfa/cloudops/auth.py
load_env_vars(dotenv_path=None)
¶
Load environment variables and Azure subscription information.
Loads variables from a .env file (if specified), retrieves Azure subscription information using ManagedIdentityCredential, and sets default environment variables.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dotenv_path
|
Path to .env file to load. If None, uses default .env file discovery. |
None
|
Example
load_env_vars() # Load from default .env load_env_vars("/path/to/.env") # Load from specific file
Source code in cfa/cloudops/auth.py
cfa.cloudops.automation
¶
run_experiment(exp_config, dotenv_path=None, **kwargs)
¶
Run jobs and tasks automatically based on the provided experiment config.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
exp_config
|
str
|
path to experiment config file (toml) |
required |
Source code in cfa/cloudops/automation.py
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 | |
run_tasks(task_config, dotenv_path=None, **kwargs)
¶
Run jobs and tasks automatically based on the provided task config. Args: task_config (str): path to task config file (toml)
Source code in cfa/cloudops/automation.py
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 | |
cfa.cloudops.autoscale
¶
cfa.cloudops.blob
¶
Functions for interacting with Azure Blob Storage.
async_download_blob_folder(container_name, local_folder, storage_account_url, name_starts_with=None, include_extensions=None, exclude_extensions=None, check_size=True, max_concurrent_downloads=20, credential=None)
¶
Downloads blobs from an Azure container to a local folder asynchronously.
This is the main entry point for downloading blobs. It sets up Azure credentials, creates the necessary clients, and runs the async download process.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
container_name
|
str
|
Name of the Azure Storage container to download from. |
required |
local_folder
|
Path
|
Local directory path where blobs will be downloaded. |
required |
storage_account_url
|
str
|
URL of the Azure Storage account (e.g., "https:// |
required |
name_starts_with
|
str
|
Filter blobs to only those with names starting with this prefix. |
None
|
include_extensions
|
str or list
|
File extensions to include (e.g., ".txt", [".json", ".csv"]). |
None
|
exclude_extensions
|
str or list
|
File extensions to exclude (e.g., ".log", [".tmp", ".bak"]). |
None
|
check_size
|
bool
|
If True, prompts user if total download size exceeds 2 GB. Defaults to True. |
True
|
max_concurrent_downloads
|
int
|
Maximum number of simultaneous downloads allowed. Defaults to 20. |
20
|
credential
|
any
|
Azure credential object. If None, ManagedIdentityCredential is used. |
None
|
Raises:
| Type | Description |
|---|---|
KeyboardInterrupt
|
If the user cancels the download operation. |
Exception
|
For any Azure SDK or network-related errors during download. |
Notes
Uses ManagedIdentityCredential for authentication. Preserves blob folder structure in the local directory. Handles cleanup of Azure credentials automatically.
Source code in cfa/cloudops/blob.py
739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 | |
async_upload_folder(folder, container_name, storage_account_url, include_extensions=None, exclude_extensions=None, location_in_blob='.', max_concurrent_uploads=20, credential=None)
¶
Upload all files from a local folder to an Azure blob container asynchronously.
This is the main entry point for uploading files. It sets up Azure credentials, creates the necessary clients, and runs the async upload process.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
folder
|
str
|
Local directory path whose files will be uploaded. |
required |
container_name
|
str
|
Name of the Azure Storage container to upload to. |
required |
storage_account_url
|
str
|
URL of the Azure Storage account (e.g., "https:// |
required |
include_extensions
|
str or list
|
File extensions to include (e.g., ".txt", [".json", ".csv"]). |
None
|
exclude_extensions
|
str or list
|
File extensions to exclude (e.g., ".log", [".tmp", ".bak"]). |
None
|
location_in_blob
|
str
|
Path within the blob container where files will be uploaded. Defaults to "." (root of the container). |
'.'
|
max_concurrent_uploads
|
int
|
Maximum number of simultaneous uploads allowed. Defaults to 20. |
20
|
credential
|
any
|
Azure credential object. If None, ManagedIdentityCredential is used. |
None
|
Raises:
| Type | Description |
|---|---|
KeyboardInterrupt
|
If the user cancels the upload operation. |
Exception
|
For any Azure SDK or network-related errors during upload. |
Notes
- Uses ManagedIdentityCredential for authentication.
- Preserves folder structure in the blob container.
- Handles cleanup of Azure credentials automatically.
Source code in cfa/cloudops/blob.py
827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 | |
create_storage_container_if_not_exists(blob_storage_container_name, blob_service_client)
¶
Create an Azure blob storage container if it does not already exist.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
blob_storage_container_name
|
str
|
Name of the storage container. |
required |
blob_service_client
|
BlobServiceClient
|
The blob service client to use when looking for and potentially creating the storage container. |
required |
Example
from azure.storage.blob import BlobServiceClient client = BlobServiceClient(account_url="...", credential="...") create_storage_container_if_not_exists("my-container", client) Container [my-container] created.
Source code in cfa/cloudops/blob.py
download_from_storage_container(file_paths, blob_storage_container_name, blob_service_client=None, local_root_dir='.', remote_root_dir='.', **kwargs)
¶
Download a list of files from an Azure blob storage container.
Preserves relative directory structure.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_paths
|
str | list[str]
|
File or list of files to download, as string paths relative to
|
required |
blob_storage_container_name
|
str
|
Name of the blob storage container from which to download the files. Must already exist. |
required |
blob_service_client
|
BlobServiceClient
|
BlobServiceClient to use when downloading.
If None, attempt to create one via |
None
|
local_root_dir
|
str
|
Root directory for the relative file paths in local storage. Defaults to "." (use the local working directory). |
'.'
|
remote_root_dir
|
str
|
Root directory for the relative file paths within the blob storage container. Defaults to "." (start at the blob storage container root). |
'.'
|
**kwargs
|
Keyword arguments passed to |
{}
|
Raises:
| Type | Description |
|---|---|
Exception
|
If the blob storage container does not exist. |
Example
from azure.storage.blob import BlobServiceClient client = BlobServiceClient(account_url="...", credential="...") download_from_storage_container( ... ["file1.txt", "subdir/file2.txt"], ... "my-container", ... client, ... local_root_dir="/local/path", ... remote_root_dir="uploads" ... ) Downloading file 0 of 2 Downloaded 2 files from blob storage container
Source code in cfa/cloudops/blob.py
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 | |
format_extensions(extension)
¶
Formats file extensions to include leading periods.
Ensures that file extensions have the correct format with leading periods. Accepts both single extensions and lists of extensions, with or without leading periods.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
extension
|
str or list
|
File extension(s) to format. Can be a single extension string or a list of extension strings. Leading periods are optional (e.g., "txt" or ".txt" both work). |
required |
Returns:
| Name | Type | Description |
|---|---|---|
list |
List of properly formatted extensions with leading periods. |
Examples:
Format a single extension: formatted = format_extensions("txt") # Returns: [".txt"]
Format multiple extensions: formatted = format_extensions(["py", ".js", "csv"]) # Returns: [".py", ".js", ".csv"]
Handle mixed formats: formatted = format_extensions([".pdf", "docx"]) # Returns: [".pdf", ".docx"]
Source code in cfa/cloudops/blob.py
get_node_mount_config(storage_containers, account_names, identity_references, shared_relative_mount_path='', mount_names=None, blobfuse_options='', cache_blobfuse=False, **kwargs)
¶
Get configuration for mounting Azure Blob Storage containers to Azure Batch nodes via blobfuse.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
storage_containers
|
str | list[str]
|
Name(s) of the Azure Blob storage container(s) to mount. |
required |
account_names
|
str | list[str]
|
Name(s) of the Azure Blob storage account(s) in which to look
for the storage container(s). If a single value, look for all storage
containers within the same storage account. Otherwise, look for each
container within the corresponding account. The function will raise an
error if there is more than one |
required |
identity_references
|
ComputeNodeIdentityReference | list[ComputeNodeIdentityReference]
|
Valid ComputeNodeIdentityReference objects for the node
to use when connecting to the |
required |
shared_relative_mount_path
|
str
|
Path relative to the |
''
|
mount_names
|
list[str]
|
Iterable of names (or paths) for the individual mounted storage
containers relative to the |
None
|
blobfuse_options
|
str | list[str]
|
Additional options passed to blobfuse. Defaults to "". |
''
|
cache_blobfuse
|
bool
|
Whether to cache Blob storage. Defaults to False. |
False
|
**kwargs
|
Additional keyword arguments passed to the
|
{}
|
Returns:
| Type | Description |
|---|---|
list[MountConfiguration]
|
list[models.MountConfiguration]: A list of instantiated MountConfiguration objects describing the desired storage container mounts. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the number of mount_names doesn't match storage_containers, or if the number of account_names or identity_references doesn't match storage_containers and isn't exactly 1. |
Example
from azure.batch import models identity_ref = models.ComputeNodeIdentityReference( ... resource_id="/subscriptions/.../resourceGroups/.../providers/..." ... ) mount_configs = get_node_mount_config( ... storage_containers=["container1", "container2"], ... account_names="mystorageaccount", ... identity_references=identity_ref, ... shared_relative_mount_path="data", ... cache_blobfuse=True ... ) len(mount_configs) 2
Source code in cfa/cloudops/blob.py
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 | |
upload_to_storage_container(file_paths, blob_storage_container_name, blob_service_client, local_root_dir='.', remote_root_dir='.')
¶
Upload a file or list of files to an Azure blob storage container.
This function preserves relative directory structure among the uploaded files within the storage container.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_paths
|
str | list[str]
|
File or list of files to upload, as string paths relative to
|
required |
blob_storage_container_name
|
str
|
Name of the blob storage container to which to upload the files. Must already exist. |
required |
blob_service_client
|
BlobServiceClient
|
BlobServiceClient to use when uploading. |
required |
local_root_dir
|
str
|
Root directory for the relative file paths in local storage. Defaults to "." (use the local working directory). |
'.'
|
remote_root_dir
|
str
|
Root directory for the relative file paths within the blob storage container. Defaults to "." (start at the blob storage container root). |
'.'
|
Raises:
| Type | Description |
|---|---|
Exception
|
If the blob storage container does not exist. |
Example
from azure.storage.blob import BlobServiceClient client = BlobServiceClient(account_url="...", credential="...") upload_to_storage_container( ... ["file1.txt", "subdir/file2.txt"], ... "my-container", ... client, ... local_root_dir="/local/path", ... remote_root_dir="uploads" ... ) Uploading file 0 of 2 Uploaded 2 files to blob storage container
Source code in cfa/cloudops/blob.py
cfa.cloudops.client
¶
Helper functions for setting up valid Azure clients.
get_batch_management_client(credential_handler=None, **kwargs)
¶
Get an Azure Batch management client using credentials from a CredentialHandler.
Uses credentials obtained via a CredentialHandler: either a user-provided one or a default based on environment variables.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
credential_handler
|
CredentialHandler
|
Credential handler for connecting and authenticating to Azure resources. If None, create a blank EnvCredentialHandler, which attempts to obtain needed credentials using information available in local environment variables (see its documentation for details). |
None
|
**kwargs
|
Additional keyword arguments passed to the BatchManagementClient constructor. |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
BatchManagementClient |
BatchManagementClient
|
A client instantiated according to the specified configuration. |
Example
Using default environment-based credentials¶
client = get_batch_management_client()
Using custom credential handler¶
handler = CredentialHandler() client = get_batch_management_client(credential_handler=handler)
Source code in cfa/cloudops/client.py
get_batch_service_client(credential_handler=None, **kwargs)
¶
Get an Azure batch service client using credentials from a CredentialHandler.
Uses credentials obtained via a CredentialHandler: either a user-provided one or a default based on environment variables.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
credential_handler
|
CredentialHandler
|
Credential handler for connecting and authenticating to Azure resources. If None, create a blank EnvCredentialHandler, which attempts to obtain needed credentials using information available in local environment variables (see its documentation for details). |
None
|
**kwargs
|
Additional keyword arguments passed to the BatchServiceClient constructor. |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
BatchServiceClient |
BatchServiceClient
|
A client instantiated according to the specified configuration. |
Example
Using default environment-based credentials¶
client = get_batch_service_client()
Using custom credential handler¶
handler = CredentialHandler() client = get_batch_service_client(credential_handler=handler)
Source code in cfa/cloudops/client.py
get_blob_service_client(credential_handler=None, **kwargs)
¶
Get an Azure blob service client using credentials from a CredentialHandler.
Uses credentials obtained via a CredentialHandler: either a user-provided one or a default based on environment variables.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
credential_handler
|
CredentialHandler
|
Credential handler for connecting and authenticating to Azure resources. If None, create a blank EnvCredentialHandler, which attempts to obtain needed credentials using information available in local environment variables (see its documentation for details). |
None
|
**kwargs
|
Additional keyword arguments passed to the BlobServiceClient constructor. |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
BlobServiceClient |
BlobServiceClient
|
A client instantiated according to the specified configuration. |
Example
Using default environment-based credentials¶
client = get_blob_service_client()
Using custom credential handler¶
handler = CredentialHandler() client = get_blob_service_client(credential_handler=handler)
Source code in cfa/cloudops/client.py
get_compute_management_client(credential_handler=None, **kwargs)
¶
Get an Azure compute management client using credentials from a CredentialHandler.
Uses credentials obtained via a CredentialHandler: either a user-provided one or a default based on environment variables.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
credential_handler
|
CredentialHandler
|
Credential handler for connecting and authenticating to Azure resources. If None, create a blank EnvCredentialHandler, which attempts to obtain needed credentials using information available in local environment variables (see its documentation for details). |
None
|
**kwargs
|
Additional keyword arguments passed to the ComputeManagementClient constructor. |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
ComputeManagementClient |
ComputeManagementClient
|
A client instantiated according to the specified configuration. |
Example
Using default environment-based credentials¶
client = get_compute_management_client()
Using custom credential handler¶
handler = CredentialHandler() client = get_compute_management_client(credential_handler=handler)
Source code in cfa/cloudops/client.py
cfa.cloudops.defaults
¶
Default configurations for Azure resources.
assign_container_config(pool_config, container_config)
¶
Assign a container configuration to a Pool object (in place).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_config
|
Pool
|
Pool configuration object to modify. |
required |
container_config
|
ContainerConfiguration
|
ContainerConfiguration object to add to the Pool configuration object. |
required |
Returns:
| Type | Description |
|---|---|
Pool
|
models.Pool: The modified Pool object. |
Example
from azure.mgmt.batch import models pool = get_default_pool_config("test", "subnet", "identity") container_config = models.ContainerConfiguration(type="dockerCompatible") modified_pool = assign_container_config(pool, container_config)
Pool is modified in place and returned¶
assert modified_pool is pool
Source code in cfa/cloudops/defaults.py
get_default_pool_config(pool_name, subnet_id, user_assigned_identity, **kwargs)
¶
Instantiate a Pool instance with default configuration.
Creates a Pool with the given pool name and subnet id, the default pool identity given by get_default_pool_identity, and other defaults specified in default_pool_config_dict and default_network_config_dict.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_name
|
str
|
Name for the pool. Passed as the |
required |
subnet_id
|
str
|
Subnet id for the pool, as a string. Should typically be obtained from a configuration file or an environment variable, often via a CredentialHandler instance. |
required |
user_assigned_identity
|
str
|
User-assigned identity for the pool, as a string. Passed to get_default_pool_identity. |
required |
**kwargs
|
Additional keyword arguments passed to the Pool constructor, potentially overriding settings from default_pool_config_dict. |
{}
|
Returns:
| Type | Description |
|---|---|
Pool
|
models.Pool: The instantiated Pool object. |
Example
pool = get_default_pool_config( ... pool_name="my-batch-pool", ... subnet_id="/subscriptions/.../subnets/default", ... user_assigned_identity="/subscriptions/.../resourceGroups/..." ... ) print(pool.display_name) 'my-batch-pool' print(pool.vm_size) 'standard_d4s_v3'
Source code in cfa/cloudops/defaults.py
get_default_pool_identity(user_assigned_identity)
¶
Get the default BatchPoolIdentity instance for azuretools.
Associates a blank UserAssignedIdentities instance to the provided user_assigned_identity string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
user_assigned_identity
|
str
|
User-assigned identity, as a string. |
required |
Returns:
| Type | Description |
|---|---|
BatchPoolIdentity
|
models.BatchPoolIdentity: Instantiated BatchPoolIdentity instance using the provided user-assigned identity. |
Example
identity = get_default_pool_identity( ... "/subscriptions/.../resourceGroups/.../providers/..." ... ) print(identity.type)
Source code in cfa/cloudops/defaults.py
remaining_task_autoscale_formula(task_sample_interval_minutes=15, max_number_vms=10)
¶
Get an autoscaling formula that rescales pools based on the remaining task count.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_sample_interval_minutes
|
int
|
Task sampling interval, in minutes, as an integer. Defaults to 15. |
15
|
max_number_vms
|
int
|
Maximum number of virtual machines to spin up, regardless of the number of remaining tasks. Defaults to 10. |
10
|
Returns:
| Name | Type | Description |
|---|---|---|
str |
The autoscale formula, as a string. |
Example
Default settings (15 min interval, max 10 VMs)¶
formula = remaining_task_autoscale_formula() print(type(formula)) #
Custom settings¶
formula = remaining_task_autoscale_formula( ... task_sample_interval_minutes=30, ... max_number_vms=20 ... ) print("cappedPoolSize = 20" in formula) # True
Source code in cfa/cloudops/defaults.py
set_env_vars()
¶
Set default Azure environment variables.
Sets default values for Azure service endpoints and creates new variables as a function of existing environment variables.
Example
import os set_env_vars() print(os.environ["AZURE_BATCH_ENDPOINT_SUBDOMAIN"]) 'batch.azure.com/' print(os.environ["AZURE_CONTAINER_REGISTRY_DOMAIN"]) 'azurecr.io'
Source code in cfa/cloudops/defaults.py
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 | |
cfa.cloudops.endpoints
¶
Helper functions for constructing Azure endpoint URLs.
construct_azure_container_registry_endpoint(azure_container_registry_account, azure_container_registry_domain=d.default_azure_container_registry_domain)
¶
Construct an Azure container registry endpoint URL from the account name and domain.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
azure_container_registry_account
|
str
|
Name of the Azure container registry account. |
required |
azure_container_registry_domain
|
str
|
Domain for the Azure container registry. Typically "azurecr.io", the default. |
default_azure_container_registry_domain
|
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The registry endpoint URL. |
Example
url = construct_azure_container_registry_endpoint("myregistry") print(url) 'https://myregistry.azurecr.io'
url = construct_azure_container_registry_endpoint("myregistry", "custom.domain.io") print(url) 'https://myregistry.custom.domain.io'
Source code in cfa/cloudops/endpoints.py
construct_batch_endpoint(batch_account, batch_location, batch_endpoint_subdomain=d.default_azure_batch_endpoint_subdomain)
¶
Construct an Azure Batch endpoint URL from the account name, location, and subdomain.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_account
|
str
|
Name of the Azure batch account. |
required |
batch_location
|
str
|
Location of the Azure batch servers, e.g. "eastus". |
required |
batch_endpoint_subdomain
|
str
|
Azure batch endpoint subdomains and domains that follow the account and location, e.g. "batch.azure.com/", the default. |
default_azure_batch_endpoint_subdomain
|
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The endpoint URL. |
Example
url = construct_batch_endpoint("mybatch", "eastus") print(url) 'https://mybatch.eastus.batch.azure.com/'
url = construct_batch_endpoint("mybatch", "westus", "custom.domain.com/") print(url) 'https://mybatch.westus.custom.domain.com/'
Source code in cfa/cloudops/endpoints.py
construct_blob_account_endpoint(blob_account, blob_endpoint_subdomain=d.default_azure_blob_storage_endpoint_subdomain)
¶
Construct an Azure blob storage account endpoint URL.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
blob_account
|
str
|
Name of the Azure blob storage account. |
required |
blob_endpoint_subdomain
|
str
|
Azure blob endpoint subdomains and domains that follow the account, e.g. "blob.core.windows.net/", the default. |
default_azure_blob_storage_endpoint_subdomain
|
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The endpoint URL. |
Example
url = construct_blob_account_endpoint("mystorageaccount") print(url) 'https://mystorageaccount.blob.core.windows.net/'
url = construct_blob_account_endpoint("mystorageaccount", "custom.blob.domain/") print(url) 'https://mystorageaccount.custom.blob.domain/'
Source code in cfa/cloudops/endpoints.py
construct_blob_container_endpoint(blob_container, blob_account, blob_endpoint_subdomain=d.default_azure_blob_storage_endpoint_subdomain)
¶
Construct an endpoint URL for a blob storage container.
Constructs the URL from the container name, account name, and endpoint subdomain.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
blob_container
|
str
|
Name of the blob storage container. |
required |
blob_account
|
str
|
Name of the Azure blob storage account. |
required |
blob_endpoint_subdomain
|
str
|
Azure Blob endpoint subdomains and domains that follow the account name, e.g. "blob.core.windows.net/", the default. |
default_azure_blob_storage_endpoint_subdomain
|
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The endpoint URL. |
Example
url = construct_blob_container_endpoint("mycontainer", "mystorageaccount") print(url) 'https://mystorageaccount.blob.core.windows.net/mycontainer'
url = construct_blob_container_endpoint("data", "storage", "custom.blob.domain/") print(url) 'https://storage.custom.blob.domain/data'
Source code in cfa/cloudops/endpoints.py
is_valid_acr_endpoint(endpoint)
¶
Check whether an Azure container registry endpoint is valid given CFA ACR configurations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
endpoint
|
str
|
Azure Container Registry endpoint to validate. |
required |
Returns:
| Type | Description |
|---|---|
tuple[bool, str | None]
|
tuple[bool, str | None]: First entry: True if validation passes, else False. Second entry: None if validation passes, else a string indicating what failed validation. |
Example
valid, error = is_valid_acr_endpoint("https://myregistry.azurecr.io") print(valid) # True print(error) # None
valid, error = is_valid_acr_endpoint("https://myregistry.azurecr.io/") print(valid) # False print("trailing slash" in error) # True
valid, error = is_valid_acr_endpoint("https://azurecr.io") print(valid) # False print("subdomain" in error) # True
Source code in cfa/cloudops/endpoints.py
cfa.cloudops.job
¶
Utilities for working with Azure Batch jobs.
create_job(client, job, verify_pool=True, exist_ok=False, verbose=False, **kwargs)
¶
Create an Azure Batch job if it does not already exist.
Returns True if the job was created successfully. By default, verifies that the Azure Batch pool specified for the job exists, erroring if the pool cannot be found.
If the job itself already exists, errors by default but can also be configured to proceed without modifying or deleting the existing job.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
BatchServiceClient
|
BatchServiceClient to use when creating the job. |
required |
job
|
JobAddParameter
|
JobAddParameter instance defining the job to add. |
required |
verify_pool
|
bool
|
Verify that the specified pool for the job exists before attempting to create the job, and error if it cannot be found. Defaults to True. |
True
|
exist_ok
|
bool
|
Proceed if the job already exists (without attempting to update/modify/overwrite it)? Defaults to False (error if the job already exists). |
False
|
verbose
|
bool
|
Message to stdout on success or failure due to job already existing? Defaults to False. |
False
|
**kwargs
|
Additional keyword arguments passed to
|
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if the job is successfully created. False if the job already
exists and |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the pool for the job cannot be found and |
BatchErrorException
|
If the job exists and |
Example
from azure.batch import BatchServiceClient, models client = BatchServiceClient(credentials=..., batch_url=...) job = models.JobAddParameter( ... id="my-job", ... pool_info=models.PoolInformation(pool_id="my-pool") ... )
Create job with pool verification¶
success = create_job(client, job) print(success) # True if created, False if already exists with exist_ok=True
Create job allowing existing jobs¶
success = create_job(client, job, exist_ok=True, verbose=True) Job my-job exists.
Source code in cfa/cloudops/job.py
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | |
create_job_schedule(client, cloud_job_schedule, verify_pool=True, exist_ok=False, verbose=False, **kwargs)
¶
Create an Azure Batch job schedule if it does not already exist.
Returns True if the job schedule was created successfully. By default, verifies that the Azure Batch pool specified for the job schedule exists, erroring if the pool cannot be found.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
BatchServiceClient
|
BatchServiceClient to use when creating the job. |
required |
cloud_job_schedule
|
JobScheduleAddParameter
|
JobAdJobScheduleAddParameter instance defining the job schedule to add. |
required |
verify_pool
|
bool
|
Verify that the specified pool for the job exists before attempting to create the job schedule, and error if it cannot be found. Defaults to True. |
True
|
exist_ok
|
bool
|
Proceed if the job schedule already exists (without attempting to update/modify/overwrite it)? Defaults to False (error if the job schedule already exists). |
False
|
verbose
|
bool
|
Message to stdout on success or failure due to job already existing? Defaults to False. |
False
|
**kwargs
|
Additional keyword arguments passed to
|
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if the job schedule is successfully created. False if the job schedule already
exists and |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the pool for the job cannot be found and |
BatchErrorException
|
If the job exists and |
Example
from azure.batch import BatchServiceClient, models client = BatchServiceClient(credentials=..., batch_url=...) schedule = models.Schedule( ... recurrence_interval=datetime.timedelta(hours=1), ... do_not_run_until=datetime.datetime.strptime("2025-01-01 08:00:00", "%Y-%m-%d %H:%M:%S") ... do_not_run_after=datetime.datetime.strptime("2025-01-01 17:00:00", "%Y-%m-%d %H:%M:%S") ) job_manager_task = models.JobManagerTask( ... id="my-job-manager-task", ... command_line="/bin/bash -c 'printenv; echo Job manager task starting.'", ... authentication_token_settings=models.AuthenticationTokenSettings( ... access="job" ... ) ... ) job_specification = models.JobSpecification( ... pool_info=models.PoolInformation(pool_id="my-pool"), ... job_manager_task=job_manager_task ) job_schedule_add_param = models.JobScheduleAddParameter( ... id="my-job-schedule", ... display_name="My Job Schedule", ... schedule=schedule, ... job_specification=job_specification, )
Create job with pool verification¶
success = create_job_schedule(client, job_schedule_add_param) print(success) # True if created, False if already exists with exist_ok=True
Create job allowing existing job schedule¶
success = create_job_schedule(client, job_schedule_add_param, exist_ok=True, verbose=True) Job schedule my-job-schedule" exists.
Source code in cfa/cloudops/job.py
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 | |
cfa.cloudops.task
¶
Functions for manipulating tasks within an Azure batch job.
create_bind_mount_string(az_mount_dir, source_path, target_path)
¶
Create a valid OCI bind mount string for an OCI container running in Azure batch.
Creates a bind mount string for mounting things from Azure blob storage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
az_mount_dir
|
str
|
Directory in which to look for directories or volumes to mount. |
required |
source_path
|
str
|
Path relative to |
required |
target_path
|
str
|
Absolute path within the container to bind to the source path. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
A properly formatted OCI --mount type=bind command, as a string. |
Example
mount_str = create_bind_mount_string( ... "/mnt/batch/tasks/fsmounts", ... "data", ... "/app/data" ... ) print(mount_str) '--mount type=bind,source=/mnt/batch/tasks/fsmounts/data,target=/app/data'
Source code in cfa/cloudops/task.py
get_container_settings(container_image_name, az_mount_dir='$AZ_BATCH_NODE_MOUNTS_DIR', working_directory=None, mount_pairs=None, additional_options='', registry=None, **kwargs)
¶
Create a valid set of container settings with bind mounts for an OCI container.
Creates container settings with bind mounts specified in mount_pairs, for an OCI container run in an Azure batch task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
container_image_name
|
str
|
Name of the OCI container image to use. |
required |
az_mount_dir
|
str
|
Directory in which to look for directories or volumes to mount. |
'$AZ_BATCH_NODE_MOUNTS_DIR'
|
working_directory
|
str | ContainerWorkingDirectory
|
Working directory for the task within the container, passed as the working_directory parameter to the TaskContainerSettings constructor. If None (the default), then defer to the Azure batch default (note that this will not typically be the same as the container image's own WORKDIR). Otherwise specify it with a TaskWorkingDirectory instance or use the string "containerImageDefault" to use the container's own WORKDIR. See the documentation for TaskContainerSettings for more details. |
None
|
mount_pairs
|
list[dict]
|
Pairs of 'source' and 'target' directories to mount when the container is run, as a list of dictionaries with 'source' and 'target' keys. |
None
|
additional_options
|
str
|
Additional flags and options to pass to the container run command, as a string. Defaults to "". |
''
|
registry
|
ContainerRegistry
|
ContainerRegistry instance specifying a private container registry from which to fetch task containers. Defaults to None. |
None
|
**kwargs
|
Additional keyword arguments passed to the TaskContainerSettings constructor. |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
TaskContainerSettings |
TaskContainerSettings
|
A TaskContainerSettings object instantiated according to the specified input. |
Example
mount_pairs = [ ... {"source": "data", "target": "/app/data"}, ... {"source": "output", "target": "/app/output"} ... ] settings = get_container_settings( ... "myregistry.azurecr.io/myapp:latest", ... mount_pairs=mount_pairs, ... additional_options="--env MODE=production" ... ) print(settings.image_name) 'myregistry.azurecr.io/myapp:latest'
Source code in cfa/cloudops/task.py
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | |
get_task_config(task_id, base_call, container_settings=None, user_identity=None, log_blob_container=None, log_blob_account=None, log_subdir=None, log_file_pattern='../std*.txt', log_upload_condition='taskCompletion', log_compute_node_identity_reference=None, output_files=None, **kwargs)
¶
Create a batch task with a given base call and set of container settings.
If the user_identity is not set, set it up automatically with sufficient
permissions to read and write from mounted volumes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_id
|
str
|
Alphanumeric identifier for the task. |
required |
base_call
|
str
|
The base command line call for the task, as a string. |
required |
container_settings
|
TaskContainerSettings
|
Container settings for the task. You can use the create_container_settings helper function to create a valid entry. Defaults to None. |
None
|
user_identity
|
UserIdentity
|
User identity under which to run the task. If None, create one automatically with admin privileges, if permitted. Defaults to None. |
None
|
log_blob_container
|
str
|
If provided, save the contents of the stderr and stdout buffers (default) and/or other specified log files from task execution to files named in the specified Azure blob storage container. If None, do not preserve the contents of those buffers. |
None
|
log_blob_account
|
str
|
Azure Blob storage account in which to look for the storage
container specified in |
None
|
log_subdir
|
str
|
Subdirectory of the Blob storage container given in
|
None
|
log_file_pattern
|
str
|
File pattern for logs to persist. Defaults to "../std*.txt",
which matches the |
'../std*.txt'
|
log_upload_condition
|
str
|
Condition under which to upload logs. Options are
"taskCompletion" (always upload, the default), "taskFailure" (upload only
for failed tasks), and "taskSuccess" (upload only for successful tasks).
Passed as the |
'taskCompletion'
|
log_compute_node_identity_reference
|
ComputeNodeIdentityReference
|
ComputeNodeIdentityReference to use when
constructing a OutputFileBlobContainerDestination object for logging.
If None (default), attempt to obtain one via get_compute_node_identity_reference.
Ignored if |
None
|
output_files
|
list[OutputFile] | OutputFile
|
OutputFile object or list of such objects specifying additional
output files for the task beyond those auto-constructed for persisting logs
to |
None
|
**kwargs
|
Additional keyword arguments passed to the TaskAddParameter constructor. |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
TaskAddParameter |
TaskAddParameter
|
The task configuration object. |
Example
from azure.batch.models import TaskContainerSettings
Basic task without container¶
task = get_task_config( ... task_id="my-task-001", ... base_call="python /app/script.py --input data.txt" ... )
Task with container and logging¶
container_settings = TaskContainerSettings( ... image_name="myregistry.azurecr.io/myapp:latest" ... ) task = get_task_config( ... task_id="my-task-002", ... base_call="python /app/script.py", ... container_settings=container_settings, ... log_blob_container="task-logs", ... log_blob_account="mystorageaccount", ... log_subdir="job-123" ... ) print(task.id) 'my-task-002'
Source code in cfa/cloudops/task.py
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 | |
output_task_files_to_blob(file_pattern, blob_container, blob_account, path=None, upload_condition='taskCompletion', blob_endpoint_subdomain=default_azure_blob_storage_endpoint_subdomain, compute_node_identity_reference=None, **kwargs)
¶
Get a properly configured OutputFile object for uploading files from a Batch task to Blob storage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_pattern
|
str
|
File pattern to match when uploading. Passed as the
|
required |
blob_container
|
str
|
Name of the Azure blob storage container to which to upload the files. |
required |
blob_account
|
str
|
Name of the Azure blob storage account in which to look for
the Blob storage container specified in |
required |
path
|
str
|
Path within the Blob storage container to which to upload the file(s).
Passed as the |
None
|
upload_condition
|
str
|
Condition under which to upload the file(s). Options are
"taskCompletion" (always upload, the default), "taskFailure" (upload only
for failed tasks), and "taskSuccess" (upload only for successful tasks).
Passed as the |
'taskCompletion'
|
blob_endpoint_subdomain
|
str
|
Azure Blob endpoint subdomains and domains that follow the account name. If None (default), use this package's default_azure_blob_storage_endpoint_subdomain. |
default_azure_blob_storage_endpoint_subdomain
|
compute_node_identity_reference
|
ComputeNodeIdentityReference
|
ComputeNodeIdentityReference to use when constructing a OutputFileBlobContainerDestination object for logging. If None (default), attempt to obtain one via get_compute_node_identity_reference. |
None
|
**kwargs
|
Additional keyword arguments passed to the OutputFile constructor. |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
OutputFile |
OutputFile
|
An OutputFile object that can be used in constructing a batch task via get_task_config. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
Example
output_file = output_task_files_to_blob( ... file_pattern=".log", ... blob_container="task-outputs", ... blob_account="mystorageaccount", ... path="logs/task-123", ... upload_condition="taskCompletion" ... ) print(output_file.file_pattern) '.log'
Source code in cfa/cloudops/task.py
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 | |
cfa.cloudops.util
¶
Miscellaneous utilities for interacting with Azure.
ensure_listlike(x)
¶
Ensure that an object either behaves like a MutableSequence or return a one-item list.
If the object is not a MutableSequence, return a one-item list containing the object. Useful for handling list-of-strings inputs alongside single strings.
Based on this StackOverflow approach <https://stackoverflow.com/a/66485952>_.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
x
|
any
|
The item to ensure is list-like. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
MutableSequence |
MutableSequence
|
|
Example
Single string becomes a list¶
result = ensure_listlike("hello") print(result) ['hello']
List stays a list¶
result = ensure_listlike(["hello", "world"]) print(result) ['hello', 'world']
Works with other types too¶
result = ensure_listlike(42) print(result) [42]
Source code in cfa/cloudops/util.py
lookup_available_vm_skus_for_batch(client=None, config_dict=None, try_env=True, to_dict=True, **kwargs)
¶
Look up available VM image SKUs for the given batch service.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
BatchManagementClient
|
BatchManagementClient to use when looking up the available images.
If None, use the output of |
None
|
config_dict
|
dict
|
Configuration dictionary. Passed as the |
None
|
try_env
|
bool
|
Whether to look for configuration values in the available environment
variables. Passed as the |
True
|
to_dict
|
bool
|
Apply |
True
|
**kwargs
|
Additional keyword arguments passed to
|
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
list |
Of supported SKUs, either as dictionaries of property names and values
(default) or as raw SupportedSku objects (if |
Example
from azure.mgmt.batch import BatchManagementClient
Get SKUs as dictionaries (default)¶
skus = lookup_available_vm_skus_for_batch() for sku in skus[:3]: # Show first 3 ... print(f"Name: {sku['name']}, vCPUs: {sku.get('vCPUs', 'N/A')}")
Get raw SupportedSku objects¶
raw_skus = lookup_available_vm_skus_for_batch(to_dict=False) print(f"Found {len(raw_skus)} available VM SKUs") print(f"First SKU: {raw_skus[0].name}")
Source code in cfa/cloudops/util.py
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 | |
lookup_service_principal(display_name)
¶
Look up an Azure service principal from its display name.
Requires the Azure CLI.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
display_name
|
str
|
The display name of the service principal to look up. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
list |
list
|
The results, if any, or an empty list if no match was found. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the Azure CLI command fails or is not available. |
Example
Look up a service principal by display name¶
sp_list = lookup_service_principal("my-service-principal") if sp_list: ... print(f"Found {len(sp_list)} service principal(s)") ... print(f"App ID: {sp_list[0]['appId']}") else: ... print("No service principal found")
Source code in cfa/cloudops/util.py
sku_to_dict(sku)
¶
Convert a SupportedSku object to a flat dictionary of property names and values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sku
|
SupportedSku
|
The SupportedSku object to convert. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
dict |
A flat dictionary with keys 'name', 'family_name', 'batch_support_end_of_life', 'additional_properties', as well as keys and values corresponding to any SkuCapability objects associated to the SupportedSku. |
Example
from azure.mgmt.batch.models import SupportedSku
Assuming we have a SupportedSku object¶
sku_dict = sku_to_dict(some_sku) print(sku_dict['name']) # e.g., 'Standard_D2s_v3' print(sku_dict['family_name']) # e.g., 'standardDSv3Family' print(sku_dict.get('vCPUs')) # e.g., '2' (from capabilities)