Skip to content

API reference

cfa.dataops.catalog.get_data(name, version='latest', type='transformed', output='pandas')

Gets the data from blob storage based on provided parameters

Parameters:

Name Type Description Default
name str

name of dataset

required
version str

version of dataset. Defaults to "latest".

'latest'
type str

type of data, either 'raw' or 'transformed'. Defaults to "transformed".

'transformed'
output str

dataframe output type, either 'pandas' or 'polars'. Defaults to "pandas".

'pandas'

Returns:

Type Description
DataFrame | DataFrame

pd.DataFrame | pl.DataFrame: pandas or polars dataframe

Source code in cfa/dataops/catalog.py
def get_data(
    name: str,
    version="latest",
    type="transformed",
    output="pandas",
) -> pd.DataFrame | pl.DataFrame:
    """
    Gets the data from blob storage based on provided parameters

    Args:
        name (str): name of dataset
        version (str, optional): version of dataset. Defaults to "latest".
        type (str, optional): type of data, either 'raw' or 'transformed'. Defaults to "transformed".
        output (str, optional): dataframe output type, either 'pandas' or 'polars'. Defaults to "pandas".

    Returns:
        pd.DataFrame | pl.DataFrame: pandas or polars dataframe
    """
    # check data exists
    try:
        config = eval(f"datacat.{name}")
    except AttributeError as e:
        raise ValueError(
            f"{name} not in available datasets."
            f" Available datasets: {list_datasets()}"
        ) from e

    # validate type, raise error if not raw or transformed
    if type not in ["raw", "transformed"]:
        raise ValueError(f"Type {type} needs to be 'raw' or 'transformed'.")

    # validate output, raise error if not pandas or polars
    if output not in ["pandas", "polars", "pd", "pl"]:
        raise ValueError(
            f"Output {output} needs to be 'pandas', 'polars', 'pd, or 'pl'."
        )

    # continue workflow based on raw or transformed
    if type == "raw":
        # get the BlobEndpoint for the raw data
        blob_endpoint = config.extract
        # check version exists, raise error if not
        if version != "latest":
            v_list = blob_endpoint.get_versions()
            # check if version is in the list of available versions
            if version not in v_list:
                print(f"Version {version} not in available versions.")
                print("Available versions:", v_list)
                raise ValueError(
                    f"Version {version} not in available versions."
                )
        # get blobs and convert to correct df
        if config.extract.get_file_ext() == "csv":
            blobs = blob_endpoint.read_blobs()
            if output in ["pandas", "pd"]:
                df = pd.concat([pd.read_csv(blob) for blob in blobs])
            else:
                df = pl.concat(
                    [pl.read_csv(blob.content_as_bytes()) for blob in blobs],
                    how="vertical_relaxed",
                )
            return df
        elif config.extract.get_file_ext() == "json":
            blobs = blob_endpoint.read_blobs()
            if output in ["pandas", "pd"]:
                df = pd.concat(
                    [pd.read_json(blob.content_as_bytes()) for blob in blobs]
                )
            else:
                df = pl.concat(
                    [pl.read_json(blob.content_as_bytes()) for blob in blobs],
                )
            return df
    else:
        blob_endpoint = config.load
        # check version exists, raise error if not
        if version != "latest":
            v_list = blob_endpoint.get_versions()
            if version not in v_list:
                print(f"Version {version} not in available versions.")
                print("Available versions:", v_list)
                raise ValueError(
                    f"Version {version} not in available versions."
                )
        # get blobs and convert to correct df
        blobs = blob_endpoint.read_blobs()
        pq_bytes = [blob.content_as_bytes() for blob in blobs]
        pq_files = [BytesIO(pq) for pq in pq_bytes]
        if output in ["pandas", "pd"]:
            df = pd.concat([pd.read_parquet(pq_file) for pq_file in pq_files])
        else:
            df = pl.concat(
                [pl.read_parquet(pq_file) for pq_file in pq_files],
                how="vertical_relaxed",
            )
        return df

cfa.dataops.catalog.list_datasets()

Lists all available datasets in the catalog

Returns:

Type Description
list[str]

list[str]: list of dataset names

Examples:

>>> datasets = list_datasets()
>>> 'scenarios.covid19vax_trends' in datasets
True
Source code in cfa/dataops/catalog.py
def list_datasets() -> list[str]:
    """
    Lists all available datasets in the catalog

    Returns:
        list[str]: list of dataset names

    Examples:
        >>> datasets = list_datasets()
        >>> 'scenarios.covid19vax_trends' in datasets
        True

    """
    return name_paths

cfa.dataops.reporting.catalog.get_report_catalog()

Get the report catalog as a SimpleNamespace object.

Returns:

Name Type Description
SimpleNamespace SimpleNamespace

The report catalog.

example:

>>> reportcat = get_report_catalog()
>>> reportcat.examples.basics_ipynb.print_params()
{'intercept': {'default': '0.5',
               'help': 'y-intercept of the line',
               'inferred_type_name': 'float',
               'name': 'intercept'},
 'slope': {'default': '1.2',
           'help': 'adding help text can be achieved with in-line comments',
           'inferred_type_name': 'float',
           'name': 'slope'},
 'step_size': {'default': '0.5',
               'help': 'step size for generating x values',
               'inferred_type_name': 'float',
               'name': 'step_size'},
 'x_range': {'default': '(-5, 5)',
             'help': 'range of x values to consider',
             'inferred_type_name': 'tuple',
             'name': 'x_range'}}
Source code in cfa/dataops/reporting/catalog.py
def get_report_catalog() -> SimpleNamespace:
    """Get the report catalog as a SimpleNamespace object.

    Returns:
        SimpleNamespace: The report catalog.

    example:

        >>> reportcat = get_report_catalog()
        >>> reportcat.examples.basics_ipynb.print_params()
        {'intercept': {'default': '0.5',
                       'help': 'y-intercept of the line',
                       'inferred_type_name': 'float',
                       'name': 'intercept'},
         'slope': {'default': '1.2',
                   'help': 'adding help text can be achieved with in-line comments',
                   'inferred_type_name': 'float',
                   'name': 'slope'},
         'step_size': {'default': '0.5',
                       'help': 'step size for generating x values',
                       'inferred_type_name': 'float',
                       'name': 'step_size'},
         'x_range': {'default': '(-5, 5)',
                     'help': 'range of x values to consider',
                     'inferred_type_name': 'tuple',
                     'name': 'x_range'}}
    """
    return report_dict_to_sn(report_ns_map)