Skip to content

API

src.cfa_subgroup_imputer.groups

Submodule for broad-sense handling of supergroups and subgroups.

Group

A class to represent a super or subgroup.

Source code in src/cfa_subgroup_imputer/groups.py
class Group:
    """
    A class to represent a super or subgroup.
    """

    def __init__(
        self,
        name: Hashable,
        attributes: Iterable[Attribute] = [],
        filter_on: Iterable[str] | None = None,
    ):
        """
        Group constructor.

        Parameters
        ----------
        name
            Name defining the group.
        attributes
            Attributes currently attached to the group.
        filter_on
            Keys used to identify this group in tabular JSON-like data.
        """
        self.name = name
        self.attributes = tuple(attributes)
        self.filter_on = filter_on
        self._validate()

    def __eq__(self, x: Self):
        if self.name != x.name:
            return False

        my_attr = set(a.name for a in self.attributes)
        their_attr = set(a.name for a in x.attributes)

        if not my_attr == their_attr:
            return False

        return all(
            self.get_attribute(a) == x.get_attribute(a) for a in my_attr
        )

    def __repr__(self):
        return f"Group(name={self.name}, attributes={[a for a in self.attributes]})"

    def _validate(self):
        assert all([isinstance(a, Attribute) for a in self.attributes]), (
            "All attributes must be of class Attribute"
        )
        measurement_names = [a.name for a in self.attributes]
        assert len(set(measurement_names)) == len(measurement_names), (
            f"Found multiple measurements for same attribute when constructing group named {self.name}: {measurement_names}"
        )
        to_impute = set(
            a.name for a in self.attributes if a.impute_action == "impute"
        )
        imputable = set(
            a.name
            for a in self.attributes
            if isinstance(a, ImputableAttribute)
        )
        assert to_impute.issubset(imputable), (
            f"The following attributes are requested to be imputed but are not imputable: {to_impute.difference(imputable)}"
        )

    def add_attribute(self, attribute: Attribute) -> Self:
        """
        Return a new group with one additional attribute.

        Parameters
        ----------
        attribute
            Attribute to append.

        Returns
        -------
        Group
            A new group containing all existing attributes plus `attribute`.
        """
        assert attribute.name not in [a.name for a in self.attributes], (
            f"Cannot add measurement {attribute} to group {self.name} which already has {self.get_attribute(attribute.name)}"
        )
        return type(self)(
            name=self.name, attributes=self.attributes + (attribute,)
        )

    def disaggregate_one_subgroup(
        self,
        subgroup: Self,
        prop: float,
        size_from: Hashable = "size",
        subgroup_size_from: Hashable = "size",
    ) -> Self:
        assert 0.0 <= prop <= 1.0, (
            f"Cannot disaggregate proportion {prop} of {self}."
        )
        disagg_attributes = list(subgroup.attributes)
        for attr in self.rate_to_count(size_from).attributes:
            if attr.impute_action == "copy":
                disagg_attributes.append(attr)
            elif attr.impute_action == "impute":
                assert isinstance(attr, ImputableAttribute)
                disagg_attributes.append(attr * prop)
        return type(self)(subgroup.name, disagg_attributes).restore_rates(
            subgroup_size_from
        )

    def filter(
        self, data: Iterable[dict[str, Any]], assert_unique: bool = True
    ) -> list[dict]:
        assert self.filter_on is not None, f"{self} has nothing to filter on."
        assert all(isinstance(fo, str) for fo in self.filter_on), (
            f"{self} has non-str elements in `filter_on`."
        )

        filtered_data = list(
            filter(
                lambda row: all(
                    row[filter_key]
                    == self.get_attribute(filter_key).json_value
                    for filter_key in self.filter_on  # pyright: ignore[reportOptionalIterable]
                ),
                data,
            )
        )

        if assert_unique:
            assert len(filtered_data) == 1, (
                f"{data} contains multiple rows for {self}"
            )

        return filtered_data

    def _get_attribute(self, name: Hashable) -> Attribute | None:
        """
        Get a named attribute if present.

        Parameters
        ----------
        name
            Attribute name to retrieve.

        Returns
        -------
        Attribute or None
            The matching attribute, or `None` if not found.
        """
        name_matched = [a for a in self.attributes if a.name == name]
        if len(name_matched) == 0:
            return None
        assert len(name_matched) == 1, (
            f"Malformed group {self} has multiple attributes {name}"
        )
        return name_matched[0]

    def get_attribute(self, name: Hashable) -> Attribute:
        """
        Retrieve a named attribute.

        Parameters
        ----------
        name
            Attribute name to retrieve.

        Returns
        -------
        Attribute
            The matching attribute.
        """
        attr = self._get_attribute(name)
        assert attr is not None, f"{self} has no attribute {name}"
        return attr

    def get_attributes(self, names: Iterable[Hashable]) -> Iterable[Attribute]:
        """
        Retrieve multiple named attributes.

        Parameters
        ----------
        names
            Attribute names to retrieve.

        Returns
        -------
        list
            Attributes in the same order as `names`.
        """
        return [self.get_attribute(name) for name in names]

    def rate_to_count(self, size_from: Hashable = "size") -> Self:
        """
        Convert imputable rate-like attributes into count-like attributes.

        Parameters
        ----------
        size_from
            Name of the attribute containing group size.

        Returns
        -------
        Group
            A new group with converted measurement types where applicable.
        """

        size = self.get_attribute(size_from).value
        assert size > 0
        attributes = [
            a.to_count(size)
            if a.impute_action == "impute"
            and isinstance(a, ImputableAttribute)
            and a.measurement_type in get_args(RateMeasurementType)
            else a
            for a in self.attributes
        ]
        return type(self)(name=self.name, attributes=attributes)

    def restore_rates(self, size_from: Hashable = "size") -> Self:
        """
        Convert imputable count-from-rate attributes back to rates.

        Parameters
        ----------
        size_from
            Name of the attribute containing group size.

        Returns
        -------
        Group
            A new group with restored rate-like attributes where applicable.
        """
        size = self.get_attribute(size_from).value
        assert size > 0
        attributes = [
            a.to_rate(size)
            if a.impute_action == "impute"
            and isinstance(a, ImputableAttribute)
            and a.measurement_type == "count_from_rate"
            else a
            for a in self.attributes
        ]
        return type(self)(name=self.name, attributes=attributes)

    def to_dict(self, use_json_values=False) -> dict[Hashable, Any]:
        assert self.attributes, (
            f"Cannot call to_dict() on {self} which has no attributes."
        )
        if use_json_values:
            return {attr.name: attr.json_value for attr in self.attributes}
        else:
            return {attr.name: attr.value for attr in self.attributes}

    def to_json_dict(self) -> dict[str, Any]:
        for attr in self.attributes:
            attr._assert_jsonable()

        as_dict = self.to_dict(use_json_values=True)

        return as_dict  # pyright: ignore[reportReturnType]

__init__(name, attributes=[], filter_on=None)

Group constructor.

Parameters:

  • name (Hashable) –

    Name defining the group.

  • attributes (Iterable[Attribute], default: [] ) –

    Attributes currently attached to the group.

  • filter_on (Iterable[str] | None, default: None ) –

    Keys used to identify this group in tabular JSON-like data.

Source code in src/cfa_subgroup_imputer/groups.py
def __init__(
    self,
    name: Hashable,
    attributes: Iterable[Attribute] = [],
    filter_on: Iterable[str] | None = None,
):
    """
    Group constructor.

    Parameters
    ----------
    name
        Name defining the group.
    attributes
        Attributes currently attached to the group.
    filter_on
        Keys used to identify this group in tabular JSON-like data.
    """
    self.name = name
    self.attributes = tuple(attributes)
    self.filter_on = filter_on
    self._validate()

add_attribute(attribute)

Return a new group with one additional attribute.

Parameters:

  • attribute (Attribute) –

    Attribute to append.

Returns:

  • Group

    A new group containing all existing attributes plus attribute.

Source code in src/cfa_subgroup_imputer/groups.py
def add_attribute(self, attribute: Attribute) -> Self:
    """
    Return a new group with one additional attribute.

    Parameters
    ----------
    attribute
        Attribute to append.

    Returns
    -------
    Group
        A new group containing all existing attributes plus `attribute`.
    """
    assert attribute.name not in [a.name for a in self.attributes], (
        f"Cannot add measurement {attribute} to group {self.name} which already has {self.get_attribute(attribute.name)}"
    )
    return type(self)(
        name=self.name, attributes=self.attributes + (attribute,)
    )

get_attribute(name)

Retrieve a named attribute.

Parameters:

  • name (Hashable) –

    Attribute name to retrieve.

Returns:

  • Attribute

    The matching attribute.

Source code in src/cfa_subgroup_imputer/groups.py
def get_attribute(self, name: Hashable) -> Attribute:
    """
    Retrieve a named attribute.

    Parameters
    ----------
    name
        Attribute name to retrieve.

    Returns
    -------
    Attribute
        The matching attribute.
    """
    attr = self._get_attribute(name)
    assert attr is not None, f"{self} has no attribute {name}"
    return attr

get_attributes(names)

Retrieve multiple named attributes.

Parameters:

  • names (Iterable[Hashable]) –

    Attribute names to retrieve.

Returns:

  • list

    Attributes in the same order as names.

Source code in src/cfa_subgroup_imputer/groups.py
def get_attributes(self, names: Iterable[Hashable]) -> Iterable[Attribute]:
    """
    Retrieve multiple named attributes.

    Parameters
    ----------
    names
        Attribute names to retrieve.

    Returns
    -------
    list
        Attributes in the same order as `names`.
    """
    return [self.get_attribute(name) for name in names]

rate_to_count(size_from='size')

Convert imputable rate-like attributes into count-like attributes.

Parameters:

  • size_from (Hashable, default: 'size' ) –

    Name of the attribute containing group size.

Returns:

  • Group

    A new group with converted measurement types where applicable.

Source code in src/cfa_subgroup_imputer/groups.py
def rate_to_count(self, size_from: Hashable = "size") -> Self:
    """
    Convert imputable rate-like attributes into count-like attributes.

    Parameters
    ----------
    size_from
        Name of the attribute containing group size.

    Returns
    -------
    Group
        A new group with converted measurement types where applicable.
    """

    size = self.get_attribute(size_from).value
    assert size > 0
    attributes = [
        a.to_count(size)
        if a.impute_action == "impute"
        and isinstance(a, ImputableAttribute)
        and a.measurement_type in get_args(RateMeasurementType)
        else a
        for a in self.attributes
    ]
    return type(self)(name=self.name, attributes=attributes)

restore_rates(size_from='size')

Convert imputable count-from-rate attributes back to rates.

Parameters:

  • size_from (Hashable, default: 'size' ) –

    Name of the attribute containing group size.

Returns:

  • Group

    A new group with restored rate-like attributes where applicable.

Source code in src/cfa_subgroup_imputer/groups.py
def restore_rates(self, size_from: Hashable = "size") -> Self:
    """
    Convert imputable count-from-rate attributes back to rates.

    Parameters
    ----------
    size_from
        Name of the attribute containing group size.

    Returns
    -------
    Group
        A new group with restored rate-like attributes where applicable.
    """
    size = self.get_attribute(size_from).value
    assert size > 0
    attributes = [
        a.to_rate(size)
        if a.impute_action == "impute"
        and isinstance(a, ImputableAttribute)
        and a.measurement_type == "count_from_rate"
        else a
        for a in self.attributes
    ]
    return type(self)(name=self.name, attributes=attributes)

GroupMap

A class that binds supergroups and subgroups together.

Source code in src/cfa_subgroup_imputer/groups.py
class GroupMap:
    """
    A class that binds supergroups and subgroups together.
    """

    def __init__(
        self,
        sub_to_super: Mapping[Hashable, Hashable],
        groups: Iterable[Group] | None,
    ):
        """
        Default constructor, takes in a subgroup : supergroup dict, and, optionally, groups.

        If no groups are provided, empty groups are created.
        """
        # Should probably store one dict of group name to Group, then sub<>super dicts as dict[str, str]
        self.sub_to_super = sub_to_super
        self.super_to_sub = GroupMap.make_one_to_many(sub_to_super)
        if groups is None:
            group_names = set(sub_to_super.values()).union(sub_to_super.keys())
            groups = [Group(name) for name in group_names]
        self.groups = {group.name: group for group in groups}
        self._validate()

    @classmethod
    def from_supergroups(
        cls,
        super_to_sub: dict[Hashable, Iterable[Hashable]],
        groups: Iterable[Group] | None,
    ) -> Self:
        """
        Alternative constructor, takes in a supergroup : [subgroups] dict.
        """
        sub_to_super = GroupMap.make_many_to_one(super_to_sub)
        return cls(sub_to_super, groups)

    def _validate(self):
        # Groups in mapping are in self.groups
        for group in self.sub_to_super.keys():
            assert group in self.groups, (
                f"Subgroup {group} is present in self.sub_to_super but not in self.groups"
            )
        for group in set(self.sub_to_super.values()):
            assert group in self.groups, (
                f"Supergroup {group} is present in self.sub_to_super but not in self.groups"
            )
        # Groups in self.groups are in mapping
        for group in self.groups.keys():
            in_sub = group in self.sub_to_super.keys()
            in_super = group in self.sub_to_super.values()
            assert in_sub or in_super, (
                f"Group {group} is present in self.groups but not in self.sub_to_super"
            )
            if in_sub and in_super:
                assert Counter(self.sub_to_super.items())[group] == 1, (
                    "Group is both a supergroup and a subgroup but is not 1:1."
                )

    def add_attribute(
        self,
        group_type: GroupType,
        attribute_name: Hashable,
        attribute_values: dict[Hashable, Any],
        impute_action: ImputeAction,
        attribute_class: type[Attribute] | type[ImputableAttribute],
        measurement_type: MeasurementType | None = None,
        attribute_json_values: dict[Hashable, Any] | None = None,
    ) -> None:
        """
        Bulk addition of attributes to all sub or supergroups.

        Parameters
        ----------
        group_type : GroupType
            Should the attribute be added to supergroups or subgroups?
        attribute_name : Hashable
            The name of the attribute to be added.
        attribute_values : dict[Hashable, object]
            For all groups of the specified type, the values of the attribute to be added.
        impute_action : ImputeAction
            The impute_action for the attribute to be added.
        attribute_class : type[Attribute] | type[ImputableAttribute]
            The class of the attribute to be added.
        measurement_type : MeasurementType | None
            The measurement type of the attribute to be added, if it is an ImputableAttribute.
        attribute_json_values : dict[Hashable, object] | None
            If the `attribute_values` are not something recorded directly in the json,
            this specifies how the values will be compared against json
            values and how they will be exported to json. None means to use the `attribute_values`.
        """
        if group_type == "supergroup":
            group_names = self.supergroup_names
        elif group_type == "subgroup":
            group_names = [
                k for k in self.groups if k not in self.supergroup_names
            ]
        else:
            raise ValueError(f"Unknown group_type: {group_type}")
        assert set(group_names).issubset(attribute_values.keys()), (
            f"Cannot add attribute {attribute_name} to groups {set(group_names).difference(attribute_values.keys())} which are not found in `attr_values`."
        )
        if attribute_json_values is not None:
            assert set(attribute_json_values.keys()).issubset(
                attribute_values.keys()
            ), (
                "If providing distinct filtering values from values, must provide one per group in `attribute_values`."
            )
        kwargs = {"name": attribute_name, "impute_action": impute_action}
        if attribute_class is ImputableAttribute:
            kwargs |= {"measurement_type": measurement_type}
        for group_name in group_names:
            attr = attribute_class(
                **(
                    kwargs
                    | {
                        "value": attribute_values[group_name],
                        "json_value": None
                        if attribute_json_values is None
                        else attribute_json_values[group_name],
                    }
                )
            )  # pyright: ignore[reportCallIssue]
            self.groups[group_name] = self.groups[group_name].add_attribute(
                attr
            )

    def add_filters(self, group_type: GroupType, filters: Iterable[str]):
        if group_type == "subgroup":
            group_names = self.subgroup_names()
        elif group_type == "supergroup":
            group_names = self.supergroup_names
        else:
            raise RuntimeError(f"Unknown group type {group_type}")
        for grp_name in group_names:
            self.group(grp_name).filter_on = filters

    def group(self, name: Hashable) -> Group:
        return self.groups[name]

    def to_dicts(self, group_type: GroupType) -> list[dict]:
        """
        Creates a list of dicts of the measurements in either the supergroups or subgroups.
        """
        if group_type == "subgroup":
            group_names = self.subgroup_names()
        elif group_type == "supergroup":
            group_names = self.supergroup_names
        else:
            raise RuntimeError(f"Unknown group type {group_type}")

        return [
            self.group(grp_name).to_json_dict() for grp_name in group_names
        ]

    def data_from_dicts(
        self,
        data: Iterable[dict],
        group_type: GroupType,
        exclude: Container[str],
        count: Container[str],
        copy: Container[str],
        rate: Container[str],
    ):
        """
        Populates measurements and attributes for groups found in the data.
        """
        if group_type == "subgroup":
            group_names = self.subgroup_names()
        elif group_type == "supergroup":
            group_names = self.supergroup_names
        else:
            raise RuntimeError(f"Unknown group type {group_type}")

        filters = self.get_filters(group_type)
        assert filters is not None

        data_list = list(data)

        keys = [
            key
            for key in get_json_keys(data)
            if ((key not in exclude) and (key not in filters))
        ]

        # We can do better than O(n^2)
        all_grps_all_vals: dict[Hashable, dict[str, Any]] = {
            grp_name: self.group(grp_name).filter(
                data_list, assert_unique=True
            )[0]
            for grp_name in group_names
        }

        for key in keys:
            vals = {
                grp_name: all_grps_all_vals[grp_name][key]
                for grp_name in group_names
            }
            impute_action = "copy" if key in copy else "ignore"
            measurement_type = None
            attribute_class = Attribute
            if key in count or key in rate:
                impute_action = "impute"
                measurement_type = "count" if key in count else "rate"
                attribute_class = ImputableAttribute
            self.add_attribute(
                group_type=group_type,
                attribute_name=key,
                attribute_values=vals,
                impute_action=impute_action,
                attribute_class=attribute_class,
                measurement_type=measurement_type,
            )

    def get_filters(self, group_type: GroupType) -> Iterable[str]:
        if group_type == "subgroup":
            group_names = self.subgroup_names()
        elif group_type == "supergroup":
            group_names = self.supergroup_names
        else:
            raise RuntimeError(f"Unknown group type {group_type}")

        all_filters = []
        for grp_name in group_names:
            grp_filters = self.group(grp_name).filter_on
            assert grp_filters is not None, (
                f"Group named {grp_name} has no filter"
            )
            all_filters.append(tuple(grp_filters))

        assert len(set(all_filters)) == 1, (
            f"Not all {group_type}s have same filters."
        )

        return all_filters.pop()

    @staticmethod
    def make_many_to_one(
        super_to_sub: Mapping[Hashable, Iterable[Hashable]],
    ) -> Mapping[Hashable, Hashable]:
        """
        Inverts a supergroup : [subgroups] one to one dict to a subgroup : supergroup one to many dict
        """
        return {v: k for k, v_list in super_to_sub.items() for v in v_list}

    @staticmethod
    def make_one_to_many(
        sub_to_super: Mapping[Hashable, Hashable],
    ) -> Mapping[Hashable, list[Hashable]]:
        """
        Inverts a subgroup : supergroup one to one dict to a supergroup : [subgroups] one to many dict
        """
        super_to_sub = {}
        for k, v in sub_to_super.items():
            if v in super_to_sub:
                super_to_sub[v].append(k)
            else:
                super_to_sub[v] = [k]
        return super_to_sub

    def subgroup_names(self, name: Hashable | None = None) -> list[Hashable]:
        """
        Get names of subgroups this supergroup contains
        """
        if name is None:
            group_names = []
            for supergrp in self.supergroup_names:
                group_names = group_names + self.subgroup_names(supergrp)

            return group_names

        assert name in self.super_to_sub.keys()
        return self.super_to_sub[name]

    @property
    def supergroup_names(self) -> list[Hashable]:
        """
        Get all supergroup names.
        """
        return list(self.super_to_sub.keys())

supergroup_names property

Get all supergroup names.

__init__(sub_to_super, groups)

Default constructor, takes in a subgroup : supergroup dict, and, optionally, groups.

If no groups are provided, empty groups are created.

Source code in src/cfa_subgroup_imputer/groups.py
def __init__(
    self,
    sub_to_super: Mapping[Hashable, Hashable],
    groups: Iterable[Group] | None,
):
    """
    Default constructor, takes in a subgroup : supergroup dict, and, optionally, groups.

    If no groups are provided, empty groups are created.
    """
    # Should probably store one dict of group name to Group, then sub<>super dicts as dict[str, str]
    self.sub_to_super = sub_to_super
    self.super_to_sub = GroupMap.make_one_to_many(sub_to_super)
    if groups is None:
        group_names = set(sub_to_super.values()).union(sub_to_super.keys())
        groups = [Group(name) for name in group_names]
    self.groups = {group.name: group for group in groups}
    self._validate()

add_attribute(group_type, attribute_name, attribute_values, impute_action, attribute_class, measurement_type=None, attribute_json_values=None)

Bulk addition of attributes to all sub or supergroups.

Parameters:

  • group_type (GroupType) –

    Should the attribute be added to supergroups or subgroups?

  • attribute_name (Hashable) –

    The name of the attribute to be added.

  • attribute_values (dict[Hashable, object]) –

    For all groups of the specified type, the values of the attribute to be added.

  • impute_action (ImputeAction) –

    The impute_action for the attribute to be added.

  • attribute_class (type[Attribute] | type[ImputableAttribute]) –

    The class of the attribute to be added.

  • measurement_type (MeasurementType | None, default: None ) –

    The measurement type of the attribute to be added, if it is an ImputableAttribute.

  • attribute_json_values (dict[Hashable, object] | None, default: None ) –

    If the attribute_values are not something recorded directly in the json, this specifies how the values will be compared against json values and how they will be exported to json. None means to use the attribute_values.

Source code in src/cfa_subgroup_imputer/groups.py
def add_attribute(
    self,
    group_type: GroupType,
    attribute_name: Hashable,
    attribute_values: dict[Hashable, Any],
    impute_action: ImputeAction,
    attribute_class: type[Attribute] | type[ImputableAttribute],
    measurement_type: MeasurementType | None = None,
    attribute_json_values: dict[Hashable, Any] | None = None,
) -> None:
    """
    Bulk addition of attributes to all sub or supergroups.

    Parameters
    ----------
    group_type : GroupType
        Should the attribute be added to supergroups or subgroups?
    attribute_name : Hashable
        The name of the attribute to be added.
    attribute_values : dict[Hashable, object]
        For all groups of the specified type, the values of the attribute to be added.
    impute_action : ImputeAction
        The impute_action for the attribute to be added.
    attribute_class : type[Attribute] | type[ImputableAttribute]
        The class of the attribute to be added.
    measurement_type : MeasurementType | None
        The measurement type of the attribute to be added, if it is an ImputableAttribute.
    attribute_json_values : dict[Hashable, object] | None
        If the `attribute_values` are not something recorded directly in the json,
        this specifies how the values will be compared against json
        values and how they will be exported to json. None means to use the `attribute_values`.
    """
    if group_type == "supergroup":
        group_names = self.supergroup_names
    elif group_type == "subgroup":
        group_names = [
            k for k in self.groups if k not in self.supergroup_names
        ]
    else:
        raise ValueError(f"Unknown group_type: {group_type}")
    assert set(group_names).issubset(attribute_values.keys()), (
        f"Cannot add attribute {attribute_name} to groups {set(group_names).difference(attribute_values.keys())} which are not found in `attr_values`."
    )
    if attribute_json_values is not None:
        assert set(attribute_json_values.keys()).issubset(
            attribute_values.keys()
        ), (
            "If providing distinct filtering values from values, must provide one per group in `attribute_values`."
        )
    kwargs = {"name": attribute_name, "impute_action": impute_action}
    if attribute_class is ImputableAttribute:
        kwargs |= {"measurement_type": measurement_type}
    for group_name in group_names:
        attr = attribute_class(
            **(
                kwargs
                | {
                    "value": attribute_values[group_name],
                    "json_value": None
                    if attribute_json_values is None
                    else attribute_json_values[group_name],
                }
            )
        )  # pyright: ignore[reportCallIssue]
        self.groups[group_name] = self.groups[group_name].add_attribute(
            attr
        )

data_from_dicts(data, group_type, exclude, count, copy, rate)

Populates measurements and attributes for groups found in the data.

Source code in src/cfa_subgroup_imputer/groups.py
def data_from_dicts(
    self,
    data: Iterable[dict],
    group_type: GroupType,
    exclude: Container[str],
    count: Container[str],
    copy: Container[str],
    rate: Container[str],
):
    """
    Populates measurements and attributes for groups found in the data.
    """
    if group_type == "subgroup":
        group_names = self.subgroup_names()
    elif group_type == "supergroup":
        group_names = self.supergroup_names
    else:
        raise RuntimeError(f"Unknown group type {group_type}")

    filters = self.get_filters(group_type)
    assert filters is not None

    data_list = list(data)

    keys = [
        key
        for key in get_json_keys(data)
        if ((key not in exclude) and (key not in filters))
    ]

    # We can do better than O(n^2)
    all_grps_all_vals: dict[Hashable, dict[str, Any]] = {
        grp_name: self.group(grp_name).filter(
            data_list, assert_unique=True
        )[0]
        for grp_name in group_names
    }

    for key in keys:
        vals = {
            grp_name: all_grps_all_vals[grp_name][key]
            for grp_name in group_names
        }
        impute_action = "copy" if key in copy else "ignore"
        measurement_type = None
        attribute_class = Attribute
        if key in count or key in rate:
            impute_action = "impute"
            measurement_type = "count" if key in count else "rate"
            attribute_class = ImputableAttribute
        self.add_attribute(
            group_type=group_type,
            attribute_name=key,
            attribute_values=vals,
            impute_action=impute_action,
            attribute_class=attribute_class,
            measurement_type=measurement_type,
        )

from_supergroups(super_to_sub, groups) classmethod

Alternative constructor, takes in a supergroup : [subgroups] dict.

Source code in src/cfa_subgroup_imputer/groups.py
@classmethod
def from_supergroups(
    cls,
    super_to_sub: dict[Hashable, Iterable[Hashable]],
    groups: Iterable[Group] | None,
) -> Self:
    """
    Alternative constructor, takes in a supergroup : [subgroups] dict.
    """
    sub_to_super = GroupMap.make_many_to_one(super_to_sub)
    return cls(sub_to_super, groups)

make_many_to_one(super_to_sub) staticmethod

Inverts a supergroup : [subgroups] one to one dict to a subgroup : supergroup one to many dict

Source code in src/cfa_subgroup_imputer/groups.py
@staticmethod
def make_many_to_one(
    super_to_sub: Mapping[Hashable, Iterable[Hashable]],
) -> Mapping[Hashable, Hashable]:
    """
    Inverts a supergroup : [subgroups] one to one dict to a subgroup : supergroup one to many dict
    """
    return {v: k for k, v_list in super_to_sub.items() for v in v_list}

make_one_to_many(sub_to_super) staticmethod

Inverts a subgroup : supergroup one to one dict to a supergroup : [subgroups] one to many dict

Source code in src/cfa_subgroup_imputer/groups.py
@staticmethod
def make_one_to_many(
    sub_to_super: Mapping[Hashable, Hashable],
) -> Mapping[Hashable, list[Hashable]]:
    """
    Inverts a subgroup : supergroup one to one dict to a supergroup : [subgroups] one to many dict
    """
    super_to_sub = {}
    for k, v in sub_to_super.items():
        if v in super_to_sub:
            super_to_sub[v].append(k)
        else:
            super_to_sub[v] = [k]
    return super_to_sub

subgroup_names(name=None)

Get names of subgroups this supergroup contains

Source code in src/cfa_subgroup_imputer/groups.py
def subgroup_names(self, name: Hashable | None = None) -> list[Hashable]:
    """
    Get names of subgroups this supergroup contains
    """
    if name is None:
        group_names = []
        for supergrp in self.supergroup_names:
            group_names = group_names + self.subgroup_names(supergrp)

        return group_names

    assert name in self.super_to_sub.keys()
    return self.super_to_sub[name]

to_dicts(group_type)

Creates a list of dicts of the measurements in either the supergroups or subgroups.

Source code in src/cfa_subgroup_imputer/groups.py
def to_dicts(self, group_type: GroupType) -> list[dict]:
    """
    Creates a list of dicts of the measurements in either the supergroups or subgroups.
    """
    if group_type == "subgroup":
        group_names = self.subgroup_names()
    elif group_type == "supergroup":
        group_names = self.supergroup_names
    else:
        raise RuntimeError(f"Unknown group type {group_type}")

    return [
        self.group(grp_name).to_json_dict() for grp_name in group_names
    ]

src.cfa_subgroup_imputer.imputer

Module for imputation machinery.

Aggregator

A class which aggregates subgroups.

Source code in src/cfa_subgroup_imputer/imputer.py
class Aggregator:
    """
    A class which aggregates subgroups.
    """

    def __init__(self, size_from: Hashable):
        self.size_from = size_from

    def __call__(self, map: GroupMap) -> GroupMap:
        """
        Impute and aggregate the given group map.
        """

        sub_to_super = map.sub_to_super
        groups = []

        for supergroup_name in map.supergroup_names:
            supergroup = map.group(supergroup_name)
            subgroups = [
                map.group(nm).rate_to_count()
                for nm in map.subgroup_names(supergroup_name)
            ]
            attribute_names = [a.name for a in subgroups[0].attributes]

            for nm in attribute_names:
                supergroup = self._aggregate_one_attribute(
                    nm, supergroup, subgroups
                )

            groups.append(supergroup.restore_rates(self.size_from))
            for nm in map.subgroup_names(supergroup_name):
                groups.append(map.group(nm))

        return GroupMap(sub_to_super, groups)

    def _aggregate_one_attribute(
        self,
        attribute_name: Hashable,
        supergroup: Group,
        subgroups: Iterable[Group],
    ) -> Group:
        """
        Aggregate a single attribute from subgroups to supergroup.
        """
        subgroups = list(subgroups)
        assert len(subgroups) > 0, "Cannot aggregate non-existent subgroups."
        attr0 = subgroups[0].get_attribute(attribute_name)
        act0 = attr0.impute_action

        if act0 == "copy":
            vals = set(
                [grp.get_attribute(attribute_name) for grp in subgroups]
            )
            assert len(vals) == 1, (
                f"Found multiple incompatible values for attribute named {attribute_name} in subgroups: {vals}"
            )
            return supergroup.add_attribute(attr0)
        elif act0 == "impute":
            cmt = get_args(CountMeasurementType)
            assert isinstance(attr0, ImputableAttribute)
            assert attr0.measurement_type in cmt, (
                "All subgroups must have been pre-processed with `.rate_to_count()`"
            )
            final_type = attr0.measurement_type
            val = attr0.value
            for grp in subgroups[1:]:
                attr = grp.get_attribute(attribute_name)
                assert isinstance(attr, ImputableAttribute)
                assert attr.measurement_type in cmt, (
                    "All subgroups must have been pre-processed with `.rate_to_count()`"
                )
                if attr.measurement_type == "count_from_rate":
                    final_type = "count_from_rate"
                val += attr.value

            return supergroup.add_attribute(
                ImputableAttribute(
                    value=val,
                    name=attribute_name,
                    impute_action="impute",
                    measurement_type=final_type,
                )
            )
        elif act0 == "ignore":
            return supergroup
        else:
            raise RuntimeError(f"{act0} is not a valid ImputeAction.")

__call__(map)

Impute and aggregate the given group map.

Source code in src/cfa_subgroup_imputer/imputer.py
def __call__(self, map: GroupMap) -> GroupMap:
    """
    Impute and aggregate the given group map.
    """

    sub_to_super = map.sub_to_super
    groups = []

    for supergroup_name in map.supergroup_names:
        supergroup = map.group(supergroup_name)
        subgroups = [
            map.group(nm).rate_to_count()
            for nm in map.subgroup_names(supergroup_name)
        ]
        attribute_names = [a.name for a in subgroups[0].attributes]

        for nm in attribute_names:
            supergroup = self._aggregate_one_attribute(
                nm, supergroup, subgroups
            )

        groups.append(supergroup.restore_rates(self.size_from))
        for nm in map.subgroup_names(supergroup_name):
            groups.append(map.group(nm))

    return GroupMap(sub_to_super, groups)

Disaggregator

A class which imputes and disaggregates subgroups.

Source code in src/cfa_subgroup_imputer/imputer.py
class Disaggregator:
    """
    A class which imputes and disaggregates subgroups.
    """

    def __init__(self, proportion_calculator: ProportionCalculator):
        self.proportion_calculator = proportion_calculator

    def __call__(self, map: GroupMap) -> GroupMap:
        """
        Impute and disaggregate the given group map.
        """

        sub_to_super = map.sub_to_super
        groups = []

        for supergroup_name in map.supergroup_names:
            supergroup = map.group(supergroup_name)
            groups.append(supergroup)
            props = self.proportion_calculator.calculate(supergroup_name, map)
            for grp_name in map.subgroup_names(supergroup_name):
                groups.append(
                    supergroup.disaggregate_one_subgroup(
                        map.group(grp_name), props[grp_name]
                    )
                )

        return GroupMap(sub_to_super, groups)

__call__(map)

Impute and disaggregate the given group map.

Source code in src/cfa_subgroup_imputer/imputer.py
def __call__(self, map: GroupMap) -> GroupMap:
    """
    Impute and disaggregate the given group map.
    """

    sub_to_super = map.sub_to_super
    groups = []

    for supergroup_name in map.supergroup_names:
        supergroup = map.group(supergroup_name)
        groups.append(supergroup)
        props = self.proportion_calculator.calculate(supergroup_name, map)
        for grp_name in map.subgroup_names(supergroup_name):
            groups.append(
                supergroup.disaggregate_one_subgroup(
                    map.group(grp_name), props[grp_name]
                )
            )

    return GroupMap(sub_to_super, groups)

src.cfa_subgroup_imputer.json

Module for interfacing with JSON-style inputs.

aggregate(supergroup_data, subgroup_data, subgroup_to_supergroup, supergroups_from, subgroups_from, group_type, loop_over=[], rate=[], count=[], exclude=[], size_from='size', **kwargs)

Wrapper for impute with action="aggregate".

Source code in src/cfa_subgroup_imputer/json.py
def aggregate(
    # TODO: we should perhaps let this be just a list of values for aggregating on age, or some simple categorical cases
    supergroup_data: Iterable[dict[str, Any]],
    subgroup_data: Iterable[dict[str, Any]],
    subgroup_to_supergroup: Iterable[dict[str, Any]] | None,
    supergroups_from: str,
    subgroups_from: str,
    group_type: GroupableTypes | None,
    loop_over: Collection[str] = [],
    rate: Collection[str] = [],
    count: Collection[str] = [],
    exclude: Collection[str] = [],
    size_from: str = "size",
    **kwargs,
) -> list[dict[str, Any]]:
    """
    Wrapper for `impute` with `action="aggregate"`.
    """
    return impute(
        action="aggregate",
        supergroup_data=supergroup_data,
        subgroup_data=subgroup_data,
        subgroup_to_supergroup=subgroup_to_supergroup,
        supergroups_from=supergroups_from,
        subgroups_from=subgroups_from,
        group_type=group_type,
        loop_over=loop_over,
        rate=rate,
        count=count,
        exclude=exclude,
        size_from=size_from,
        **kwargs,
    )

create_group_map(supergroup_data, subgroup_data, subgroup_to_supergroup, supergroups_from, subgroups_from, group_type, **kwargs)

GroupMap construction utility for disaggregate. See there for more details.

Source code in src/cfa_subgroup_imputer/json.py
def create_group_map(
    supergroup_data: Iterable[dict[str, Any]] | None,
    subgroup_data: Iterable[dict[str, Any]] | None,
    subgroup_to_supergroup: Iterable[dict[str, Any]] | None,
    supergroups_from: str,
    subgroups_from: str,
    group_type: GroupableTypes | None,
    **kwargs,
) -> GroupMap:
    """
    GroupMap construction utility for `disaggregate`. See there for more details.
    """
    if subgroup_to_supergroup is not None:
        sub_super_pairs = [
            (row[subgroups_from], row[supergroups_from])
            for row in subgroup_to_supergroup
        ]
        return RaggedOuterProductSubgroupHandler().construct_group_map(
            category_combinations=sub_super_pairs,
            variable_names=[subgroups_from, supergroups_from],
        )

    assert supergroup_data is not None, (
        "If not supplying `subgroup_to_supergroup`, must supply `supergroup_data`."
    )
    assert subgroup_data is not None, (
        "If not supplying `subgroup_to_supergroup`, must supply `subgroup_data`."
    )

    supergroup_cats = sorted(
        list(set(row[supergroups_from] for row in supergroup_data))
    )
    subgroup_cats = sorted(
        list(set(row[subgroups_from] for row in subgroup_data))
    )
    if group_type == "categorical":
        return OuterProductSubgroupHandler().construct_group_map(
            supergroup_categories=supergroup_cats,
            subgroup_categories=[subgroup_cats],
            supergroup_variable_name=supergroups_from,
            subgroup_variable_names=[subgroups_from],
            **kwargs,
        )
    elif group_type == "age":
        # TODO: we could rename this ourselves, instead of erroring out
        #       though then we'd have to tweak the written output at the end too
        assert supergroups_from == subgroups_from, (
            "Age groups must be named identically in super and subgroup data"
        )
        return AgeGroupHandler(
            age_max=kwargs.get("age_max", 100)
        ).construct_group_map(
            supergroups=supergroup_cats,
            subgroups=subgroup_cats,
            continuous_var_name=subgroups_from,
            **kwargs,
        )
    else:
        raise RuntimeError(f"Unknown grouping variable type {group_type}")

disaggregate(supergroup_data, subgroup_data, subgroup_to_supergroup, supergroups_from, subgroups_from, group_type, loop_over=[], rate=[], count=[], exclude=[], size_from='size', **kwargs)

Wrapper for impute with action="disaggregate".

Source code in src/cfa_subgroup_imputer/json.py
def disaggregate(
    supergroup_data: Iterable[dict[str, Any]],
    # TODO: we should perhaps let this be just a list of values for splitting on age
    subgroup_data: Iterable[dict[str, Any]],
    subgroup_to_supergroup: Iterable[dict[str, Any]] | None,
    supergroups_from: str,
    subgroups_from: str,
    group_type: GroupableTypes | None,
    loop_over: Collection[str] = [],
    rate: Collection[str] = [],
    count: Collection[str] = [],
    exclude: Collection[str] = [],
    size_from: str = "size",
    **kwargs,
) -> list[dict[str, Any]]:
    """
    Wrapper for `impute` with `action="disaggregate"`.
    """
    return impute(
        action="disaggregate",
        supergroup_data=supergroup_data,
        subgroup_data=subgroup_data,
        subgroup_to_supergroup=subgroup_to_supergroup,
        supergroups_from=supergroups_from,
        subgroups_from=subgroups_from,
        group_type=group_type,
        loop_over=loop_over,
        rate=rate,
        count=count,
        exclude=exclude,
        size_from=size_from,
        **kwargs,
    )

impute(action, supergroup_data, subgroup_data, subgroup_to_supergroup, supergroups_from, subgroups_from, group_type, loop_over=[], rate=[], count=[], exclude=[], size_from='size', **kwargs)

Takes in data for supergroups/subgroups, imputes and returns values for the subgroups/supergroups.

Parameters:

  • action (Literal['aggregate', 'disaggregate']) –

    Whether to aggregate or disaggregate.

  • supergroup_data (Iterable[dict[str, Any]]) –

    Information defining supergroups, including any data to disaggregate.

  • subgroup_data (Iterable[dict[str, Any]]) –

    Information defining the subgroups, including any data to aggregate.

  • subgroup_to_supergroup (Iterable[dict[str, Any]] | None) –

    Optional mapping defining all subgroup : supergroup.

  • supergroups_from (str) –

    Name of key in supergroup_data defining supergroups.

  • subgroups_from (str) –

    Name of key in subgroup_data defining subgroups.

  • group_type (GroupableTypes | None) –

    What kind of groups are these, categorical or age? Can only be None if providing subgroup_to_supergroup.

  • loop_over (Collection[str], default: [] ) –

    A collection of covariates, within each combination of which we will separately disaggregate. For example, if we wanted to disaggregate age groups separately in every state and county in a dataset, this would be ["state", "county"].

  • rate (Collection[str], default: [] ) –

    A list of the keys in supergroup_data which define rate measurements.

  • count (Collection[str], default: [] ) –

    A list of the keys in supergroup_data which define count measurements.

  • exclude (Collection[str], default: [] ) –

    A list the keys in supergroup_data which define variables which are to be excluded from imputation and which will not be present in the output.

  • **kwargs

    Passed to internals.

Returns:

  • list[dict[str, Any]]

    Data with measurements imputed for the subgroups.

Source code in src/cfa_subgroup_imputer/json.py
def impute(
    action: Literal["aggregate", "disaggregate"],
    supergroup_data: Iterable[dict[str, Any]],
    subgroup_data: Iterable[dict[str, Any]],
    subgroup_to_supergroup: Iterable[dict[str, Any]] | None,
    supergroups_from: str,
    subgroups_from: str,
    group_type: GroupableTypes | None,
    loop_over: Collection[str] = [],
    rate: Collection[str] = [],
    count: Collection[str] = [],
    exclude: Collection[str] = [],
    size_from: str = "size",
    **kwargs,
) -> list[dict[str, Any]]:
    """
    Takes in data for supergroups/subgroups, imputes and returns values for the subgroups/supergroups.

    Parameters
    ----------
    action: Literal["aggregate", "disaggregate"]
        Whether to aggregate or disaggregate.
    supergroup_data: Iterable[dict[str, Any]]
        Information defining supergroups, including any data to disaggregate.
    subgroup_data: Iterable[dict[str, Any]]
        Information defining the subgroups, including any data to aggregate.
    subgroup_to_supergroup: Iterable[dict[str, Any]] | None
        Optional mapping defining all subgroup : supergroup.
    supergroups_from: str
        Name of key in `supergroup_data` defining supergroups.
    subgroups_from: str
        Name of key in `subgroup_data` defining subgroups.
    group_type: GroupableTypes | None
        What kind of groups are these, categorical or age? Can only
        be None if providing `subgroup_to_supergroup`.
    loop_over: Collection[str] = []
        A collection of covariates, within each combination of which
        we will separately disaggregate. For example, if we wanted
        to disaggregate age groups separately in every state and county
        in a dataset, this would be ["state", "county"].
    rate: Collection[str] = []
        A list of the keys in `supergroup_data` which define rate measurements.
    count: Collection[str] = []
        A list of the keys in `supergroup_data` which define count measurements.
    exclude: Collection[str] = []
        A list the keys in `supergroup_data` which define variables
        which are to be excluded from imputation and which will not
        be present in the output.
    **kwargs
        Passed to internals.

    Returns
    -------
    list[dict[str, Any]]
        Data with measurements imputed for the subgroups.
    """

    group_map = create_group_map(
        supergroup_data=supergroup_data,
        subgroup_data=subgroup_data,
        subgroup_to_supergroup=subgroup_to_supergroup,
        supergroups_from=supergroups_from,
        subgroups_from=subgroups_from,
        group_type=group_type,
        **kwargs,
    )

    if action == "aggregate":
        imputer = Aggregator(size_from)
        output_level = "supergroup"
    elif action == "disaggregate":
        if subgroup_to_supergroup is not None or group_type == "categorical":
            prop_calc = ProportionsFromCategories(size_from=size_from)
        elif group_type == "age":
            # TODO: as above we could rename this ourselves
            assert supergroups_from == subgroups_from, (
                "Age groups must be named identically in super and subgroup data"
            )
            prop_calc = ProportionsFromContinuous(
                continuous_var_name=subgroups_from
            )
        else:
            raise RuntimeError(f"Unknown grouping variable type {group_type}")
        imputer = Disaggregator(proportion_calculator=prop_calc)
        output_level = "subgroup"
    else:
        raise ValueError(f"Unknown action {action}")

    # Add a dummy variable to loop over if none are provided
    if not loop_over:
        safe_loop_over = ["dummy"]
        supergroup_data = [d | {"dummy": "dummy"} for d in supergroup_data]
        subgroup_data = [d | {"dummy": "dummy"} for d in subgroup_data]
    else:
        safe_loop_over = list(loop_over)
        supergroup_data = [d for d in supergroup_data]
        subgroup_data = [d for d in subgroup_data]

    for grp_type, grp_info in {
        "supergroup": {
            "data": supergroup_data,
            "groups_from": [supergroups_from],
            "n_groups": len(group_map.supergroup_names),
        },
        "subgroup": {
            "data": subgroup_data,
            "groups_from": [subgroups_from] + [supergroups_from]
            if group_type == "categorical"
            else [subgroups_from],
            "n_groups": len(group_map.subgroup_names()),
        },
    }.items():
        assert (
            missing := set(safe_loop_over).difference(
                get_json_keys(grp_info["data"])
            )
        ) == set(), (
            f"Looping variables are missing from {grp_type} data: {missing}"
        )

        assert len(
            unique(
                select(
                    grp_info["data"], safe_loop_over + grp_info["groups_from"]
                )
            )
        ) == len(grp_info["data"]), (
            f"Provided data has multiple entries for at least one combination of group-defining variables ({grp_info['groups_from']}) and variables to loop over ({loop_over}).\n{grp_info['data']}"
        )

    supergroup_data.sort(key=itemgetter(*safe_loop_over))
    subgroup_data.sort(key=itemgetter(*safe_loop_over))

    # If we're not told what to do with the column, and it's not being used to compute proportions, copy it
    if subgroup_to_supergroup is not None or group_type == "categorical":
        ignore = [size_from]
    elif group_type == "age":
        ignore = [kwargs.get("continuous_var_name", "age")]
    else:
        ignore = []

    if action == "aggregate":
        copy_from = subgroup_data
        groups_from = subgroups_from
    else:
        copy_from = supergroup_data
        groups_from = supergroups_from

    copy = (
        set(get_json_keys(copy_from))
        .difference(safe_loop_over)
        .difference(exclude)
        .difference(rate)
        .difference(count)
        .difference(ignore)
        # TODO: this is somewhat redundant with data_from_json knowing not to copy group-defining variables
        .difference([groups_from])
    )
    imputed_comp = []

    super_grouper = groupby(supergroup_data, key=itemgetter(*safe_loop_over))
    sub_grouper = groupby(subgroup_data, key=itemgetter(*safe_loop_over))

    for (super_key, super_grp), (sub_key, sub_grp) in zip(
        super_grouper, sub_grouper
    ):
        assert super_key == sub_key, (
            "Mismatch in looping variables between supergroup and subgroup data"
        )
        grp_map = deepcopy(group_map)

        grp_map.data_from_dicts(
            list(super_grp),
            "supergroup",
            copy=copy,
            exclude=exclude,
            count=count,
            rate=rate,
        )
        grp_map.data_from_dicts(
            list(sub_grp),
            "subgroup",
            copy=copy,
            exclude=exclude,
            count=count,
            rate=rate,
        )

        imputed_map = imputer(grp_map)
        imputed_comp.extend(imputed_map.to_dicts(output_level))

    # Remove dummy variable if it was added
    if not loop_over:
        for row in imputed_comp:
            del row["dummy"]

    return imputed_comp

src.cfa_subgroup_imputer.mapping

Submodule for enumerating subgroup and supergroup maps.

AgeGroupHandler

A class for working with age groups.

Implements: cfa_subgroup_imputer.enumerator.Mapper

Source code in src/cfa_subgroup_imputer/mapping.py
class AgeGroupHandler:
    """
    A class for working with age groups.

    Implements:
        cfa_subgroup_imputer.enumerator.Mapper
    """

    STR_AGE_RANGE_CONVERTERS = (
        (
            re.compile(r"^(\d+) years*"),
            lambda x: (
                float(x[0]),
                float(x[0]) + 1.0,
            ),
        ),
        (
            re.compile(r"^(\d+)\+ years"),
            lambda x: (
                float(x[0]),
                inf,
            ),
        ),
        (
            re.compile(r"^(\d+)-(\d+) years"),
            lambda x: (
                float(x[0]),
                float(x[1]) + 1.0,
            ),
        ),
        (
            re.compile(r"^(\d+)-<(\d+) years*"),
            lambda x: (
                float(x[0]),
                float(x[1]),
            ),
        ),
        (
            re.compile(r"^(\d+) months*-(\d+) years*"),
            lambda x: (
                float(x[0]) / 12.0,
                float(x[1]) + 1.0,
            ),
        ),
        (
            re.compile(r"^(\d+) months*-<(\d+) years*"),
            lambda x: (
                float(x[0]) / 12.0,
                float(x[1]),
            ),
        ),
        (
            re.compile(r"^(\d+)-(\d+) months*"),
            lambda x: (
                float(x[0]) / 12.0,
                (float(x[1]) + 1.0) / 12.0,
            ),
        ),
        (
            re.compile(r"^(\d+)-<(\d+) months*"),
            lambda x: (
                float(x[0]) / 12.0,
                float(x[1]) / 12.0,
            ),
        ),
    )
    """
    The master list of age ranges we can convert.

    Each element is a tuple of
    - A regex which can extract the single age or the low/high ages and
    - A function that returns a `(low, high)` tuple for ages in years
    """

    def __init__(self, age_max: float | None = None):
        self.age_max = age_max if age_max is not None else inf

    def age_range_from_str(self, x: str) -> Range:
        """
        Parse an age-group string into a lower and upper bound in years.
        """
        for sarc in AgeGroupHandler.STR_AGE_RANGE_CONVERTERS:
            if ages := sarc[0].fullmatch(x):
                low, high = sarc[1](ages.groups())
                if high == inf:
                    high = self.age_max
                return Range(low, high)
        raise RuntimeError(f"Cannot process age range {x}")

    def age_ranges_equivalent(self, x: str, y: str) -> bool:
        """
        True if the age ranges encode the same values, else False.

        E.g., 1-3 years and 1-<4 years imply age group of 1, 2, and 3 year olds.
        """
        return self.age_range_from_str(x) == self.age_range_from_str(y)

    def construct_group_map(
        self,
        supergroups: Iterable[str],
        subgroups: Iterable[str],
        **kwargs,
    ) -> GroupMap:
        """
        Construct a group map by assigning each subgroup to a containing supergroup.

        Parameters
        ----------
        supergroups
            Supergroup labels.
        subgroups
            Subgroup labels.
        **kwargs
            Optional options, including `continuous_var_name` and `missing_option`.

        Returns
        -------
        GroupMap
            Group mapping with age-range attributes and filters populated.
        """
        age_varname = kwargs.get("continuous_var_name", "age")
        missing_option = kwargs.get("missing_option", "error")

        # Brute force attribution
        super_dict = {grp: self.age_range_from_str(grp) for grp in supergroups}
        sub_to_super = {}
        for sub in subgroups:
            sub_range = self.age_range_from_str(sub)
            super = [
                super_name
                for super_name, super_range in super_dict.items()
                if sub_range in super_range
            ]
            if len(super) == 1:
                sub_to_super[sub] = super[0]
            elif len(super) == 0:
                if missing_option == "add_one_to_one":
                    super_dict[sub] = sub_range
                else:
                    raise RuntimeError(
                        f"Subgroup {sub} has no corresponding supergroup in {supergroups}"
                    )
            else:
                raise RuntimeError(
                    f"Subgroup {sub} is contained by multiple supergroups: {super}"
                )

        grp_map = GroupMap(sub_to_super=sub_to_super, groups=None)
        grp_map.add_attribute(
            group_type="subgroup",
            attribute_name=age_varname,
            attribute_values={
                subgrp: self.age_range_from_str(subgrp) for subgrp in subgroups
            },
            attribute_json_values={subgrp: subgrp for subgrp in subgroups},
            impute_action="ignore",
            attribute_class=Attribute,
        )
        grp_map.add_attribute(
            group_type="supergroup",
            attribute_name=age_varname,
            attribute_values={
                supergrp: self.age_range_from_str(supergrp)
                for supergrp in supergroups
            },
            attribute_json_values={
                supergrp: supergrp for supergrp in supergroups
            },
            impute_action="ignore",
            attribute_class=Attribute,
        )
        self.assert_no_missing_subgroups(grp_map, age_varname)
        sorted_super_ranges = sorted(super_dict.values())
        assert_range_spanned_exactly(
            Range(sorted_super_ranges[0].lower, sorted_super_ranges[-1].upper),
            sorted_super_ranges,
        )

        grp_map.add_filters("supergroup", [age_varname])
        grp_map.add_filters("subgroup", [age_varname])

        return grp_map

    def is_valid_age_group(self, x: str) -> bool:
        try:
            _ = self.age_range_from_str(x)
            return True
        except RuntimeError as e:
            if re.fullmatch("Cannot process age range", str(e)):
                return False
            else:
                raise e

    def assert_no_missing_subgroups(self, group_map: GroupMap, age_varname):
        for supergrp_nm in group_map.supergroup_names:
            supergrp = group_map.group(supergrp_nm)
            supergrp_range = supergrp.get_attribute(age_varname).value
            subgrp_ranges = [
                group_map.group(nm).get_attribute(age_varname).value
                for nm in group_map.subgroup_names(supergrp_nm)
            ]
            assert_range_spanned_exactly(supergrp_range, subgrp_ranges)

STR_AGE_RANGE_CONVERTERS = ((re.compile('^(\\d+) years*'), lambda x: (float(x[0]), float(x[0]) + 1.0)), (re.compile('^(\\d+)\\+ years'), lambda x: (float(x[0]), inf)), (re.compile('^(\\d+)-(\\d+) years'), lambda x: (float(x[0]), float(x[1]) + 1.0)), (re.compile('^(\\d+)-<(\\d+) years*'), lambda x: (float(x[0]), float(x[1]))), (re.compile('^(\\d+) months*-(\\d+) years*'), lambda x: (float(x[0]) / 12.0, float(x[1]) + 1.0)), (re.compile('^(\\d+) months*-<(\\d+) years*'), lambda x: (float(x[0]) / 12.0, float(x[1]))), (re.compile('^(\\d+)-(\\d+) months*'), lambda x: (float(x[0]) / 12.0, float(x[1]) + 1.0 / 12.0)), (re.compile('^(\\d+)-<(\\d+) months*'), lambda x: (float(x[0]) / 12.0, float(x[1]) / 12.0))) class-attribute instance-attribute

The master list of age ranges we can convert.

Each element is a tuple of - A regex which can extract the single age or the low/high ages and - A function that returns a (low, high) tuple for ages in years

age_range_from_str(x)

Parse an age-group string into a lower and upper bound in years.

Source code in src/cfa_subgroup_imputer/mapping.py
def age_range_from_str(self, x: str) -> Range:
    """
    Parse an age-group string into a lower and upper bound in years.
    """
    for sarc in AgeGroupHandler.STR_AGE_RANGE_CONVERTERS:
        if ages := sarc[0].fullmatch(x):
            low, high = sarc[1](ages.groups())
            if high == inf:
                high = self.age_max
            return Range(low, high)
    raise RuntimeError(f"Cannot process age range {x}")

age_ranges_equivalent(x, y)

True if the age ranges encode the same values, else False.

E.g., 1-3 years and 1-<4 years imply age group of 1, 2, and 3 year olds.

Source code in src/cfa_subgroup_imputer/mapping.py
def age_ranges_equivalent(self, x: str, y: str) -> bool:
    """
    True if the age ranges encode the same values, else False.

    E.g., 1-3 years and 1-<4 years imply age group of 1, 2, and 3 year olds.
    """
    return self.age_range_from_str(x) == self.age_range_from_str(y)

construct_group_map(supergroups, subgroups, **kwargs)

Construct a group map by assigning each subgroup to a containing supergroup.

Parameters:

  • supergroups (Iterable[str]) –

    Supergroup labels.

  • subgroups (Iterable[str]) –

    Subgroup labels.

  • **kwargs

    Optional options, including continuous_var_name and missing_option.

Returns:

  • GroupMap

    Group mapping with age-range attributes and filters populated.

Source code in src/cfa_subgroup_imputer/mapping.py
def construct_group_map(
    self,
    supergroups: Iterable[str],
    subgroups: Iterable[str],
    **kwargs,
) -> GroupMap:
    """
    Construct a group map by assigning each subgroup to a containing supergroup.

    Parameters
    ----------
    supergroups
        Supergroup labels.
    subgroups
        Subgroup labels.
    **kwargs
        Optional options, including `continuous_var_name` and `missing_option`.

    Returns
    -------
    GroupMap
        Group mapping with age-range attributes and filters populated.
    """
    age_varname = kwargs.get("continuous_var_name", "age")
    missing_option = kwargs.get("missing_option", "error")

    # Brute force attribution
    super_dict = {grp: self.age_range_from_str(grp) for grp in supergroups}
    sub_to_super = {}
    for sub in subgroups:
        sub_range = self.age_range_from_str(sub)
        super = [
            super_name
            for super_name, super_range in super_dict.items()
            if sub_range in super_range
        ]
        if len(super) == 1:
            sub_to_super[sub] = super[0]
        elif len(super) == 0:
            if missing_option == "add_one_to_one":
                super_dict[sub] = sub_range
            else:
                raise RuntimeError(
                    f"Subgroup {sub} has no corresponding supergroup in {supergroups}"
                )
        else:
            raise RuntimeError(
                f"Subgroup {sub} is contained by multiple supergroups: {super}"
            )

    grp_map = GroupMap(sub_to_super=sub_to_super, groups=None)
    grp_map.add_attribute(
        group_type="subgroup",
        attribute_name=age_varname,
        attribute_values={
            subgrp: self.age_range_from_str(subgrp) for subgrp in subgroups
        },
        attribute_json_values={subgrp: subgrp for subgrp in subgroups},
        impute_action="ignore",
        attribute_class=Attribute,
    )
    grp_map.add_attribute(
        group_type="supergroup",
        attribute_name=age_varname,
        attribute_values={
            supergrp: self.age_range_from_str(supergrp)
            for supergrp in supergroups
        },
        attribute_json_values={
            supergrp: supergrp for supergrp in supergroups
        },
        impute_action="ignore",
        attribute_class=Attribute,
    )
    self.assert_no_missing_subgroups(grp_map, age_varname)
    sorted_super_ranges = sorted(super_dict.values())
    assert_range_spanned_exactly(
        Range(sorted_super_ranges[0].lower, sorted_super_ranges[-1].upper),
        sorted_super_ranges,
    )

    grp_map.add_filters("supergroup", [age_varname])
    grp_map.add_filters("subgroup", [age_varname])

    return grp_map

Mapper

Bases: Protocol

A class that assists in making sub to supergroup maps for an underlying axis defined by a continuous variable, such as age.

E.g., something that takes you from "my age subgroups are... and my age supergroups are..." to a sub : super group name/string dict.

Source code in src/cfa_subgroup_imputer/mapping.py
@runtime_checkable
class Mapper(Protocol):
    """
    A class that assists in making sub to supergroup maps for an underlying
    axis defined by a continuous variable, such as age.

    E.g., something that takes you from "my age subgroups are... and my age
    supergroups are..." to a sub : super group name/string dict.
    """

    def construct_group_map(self, **kwargs) -> GroupMap:
        """
        Makes a GroupMap.
        """
        ...

construct_group_map(**kwargs)

Makes a GroupMap.

Source code in src/cfa_subgroup_imputer/mapping.py
def construct_group_map(self, **kwargs) -> GroupMap:
    """
    Makes a GroupMap.
    """
    ...

OuterProductSubgroupHandler

A class for handling subgroups based on a categorical variable, where all categories (levels) of the subgrouping variable are found in all supergroups.

For example, if we have age-based supergroups [0-17 years, 18-64 years, 65+ years] and want [low, moderate, high]-risk subgroups, this class makes and handles creating all "0-17 years, low risk", ..., "65+ years, high risk" subgroups and mapping them to the supergroups.

Source code in src/cfa_subgroup_imputer/mapping.py
class OuterProductSubgroupHandler:
    """
    A class for handling subgroups based on a categorical variable, where all categories
    (levels) of the subgrouping variable are found in all supergroups.

    For example, if we have age-based supergroups [0-17 years, 18-64 years, 65+ years]
    and want [low, moderate, high]-risk subgroups, this class makes and handles
    creating all "0-17 years, low risk", ..., "65+ years, high risk" subgroups
    and mapping them to the supergroups.
    """

    def construct_group_map(
        self,
        supergroup_categories: Sequence[Hashable],
        subgroup_categories: Sequence[Sequence[Hashable]],
        supergroup_variable_name: str,
        subgroup_variable_names: Sequence,
        **kwargs,
    ) -> GroupMap:
        """
        Constructs a GroupMap from all subgroups defined by the categories of subgroup
        and supergroup variables.

        Parameters
        ----------
        supergroup_categories
            The catgegories of the variable defining the supergroups.
        subgroup_categories
            For each variable defining subgroups, the catgegories it can take.
        supergroup_variable_name
            What is the variable that defines the supergroup?
        subgroup_variable_names
            What are the variables that defines the subgroups?
        """
        assert_hashable_sequence(supergroup_categories)
        assert isinstance(subgroup_categories, Sequence)
        for sc in subgroup_categories:
            assert_hashable_sequence(sc)

        sub_super = itertools.product(
            *[*subgroup_categories, list(supergroup_categories)]
        )

        assert isinstance(subgroup_variable_names, Sequence)
        assert len(subgroup_variable_names) == len(subgroup_categories)
        assert all(
            isinstance(var_name, str) for var_name in subgroup_variable_names
        )

        return RaggedOuterProductSubgroupHandler().construct_group_map(
            category_combinations=list(sub_super),
            variable_names=list(subgroup_variable_names)
            + [supergroup_variable_name],
        )

construct_group_map(supergroup_categories, subgroup_categories, supergroup_variable_name, subgroup_variable_names, **kwargs)

Constructs a GroupMap from all subgroups defined by the categories of subgroup and supergroup variables.

Parameters:

  • supergroup_categories (Sequence[Hashable]) –

    The catgegories of the variable defining the supergroups.

  • subgroup_categories (Sequence[Sequence[Hashable]]) –

    For each variable defining subgroups, the catgegories it can take.

  • supergroup_variable_name (str) –

    What is the variable that defines the supergroup?

  • subgroup_variable_names (Sequence) –

    What are the variables that defines the subgroups?

Source code in src/cfa_subgroup_imputer/mapping.py
def construct_group_map(
    self,
    supergroup_categories: Sequence[Hashable],
    subgroup_categories: Sequence[Sequence[Hashable]],
    supergroup_variable_name: str,
    subgroup_variable_names: Sequence,
    **kwargs,
) -> GroupMap:
    """
    Constructs a GroupMap from all subgroups defined by the categories of subgroup
    and supergroup variables.

    Parameters
    ----------
    supergroup_categories
        The catgegories of the variable defining the supergroups.
    subgroup_categories
        For each variable defining subgroups, the catgegories it can take.
    supergroup_variable_name
        What is the variable that defines the supergroup?
    subgroup_variable_names
        What are the variables that defines the subgroups?
    """
    assert_hashable_sequence(supergroup_categories)
    assert isinstance(subgroup_categories, Sequence)
    for sc in subgroup_categories:
        assert_hashable_sequence(sc)

    sub_super = itertools.product(
        *[*subgroup_categories, list(supergroup_categories)]
    )

    assert isinstance(subgroup_variable_names, Sequence)
    assert len(subgroup_variable_names) == len(subgroup_categories)
    assert all(
        isinstance(var_name, str) for var_name in subgroup_variable_names
    )

    return RaggedOuterProductSubgroupHandler().construct_group_map(
        category_combinations=list(sub_super),
        variable_names=list(subgroup_variable_names)
        + [supergroup_variable_name],
    )

RaggedOuterProductSubgroupHandler

Bases: ABC

Source code in src/cfa_subgroup_imputer/mapping.py
class RaggedOuterProductSubgroupHandler(ABC):
    def construct_group_map(self, **kwargs) -> GroupMap:
        """
        Uses category combinations to construct a GroupMap. Each inner combination
        defines, in order, the category in each variable that defines a group. The last variable
        is taken to be the one which defines the supergroup.

        E.g [["low risk", "child"], ["high risk", "child"], ["low risk", "adult"],]
        defines two supergroups, "child" and "adult", and three subgroups,
        "low risk child", "high risk child", and "low risk adult".

        If provided, `variable_names` is used
        when populating the group attributes.
        """
        assert "category_combinations" in kwargs
        cat_combs = kwargs.get("category_combinations")

        assert isinstance(cat_combs, Sequence)
        lens = []
        for cat_comb in cat_combs:
            assert_hashable_sequence(cat_comb)
            lens.append(len(cat_comb))
        nvar = lens[0]
        assert all(len == nvar for len in lens)

        subgroups = [tuple(cat_comb) for cat_comb in cat_combs]
        supergroups = [cat_comb[-1] for cat_comb in cat_combs]

        group_map = GroupMap(
            sub_to_super={
                subgrp: supergrp
                for subgrp, supergrp in zip(subgroups, supergroups)
            },
            groups=None,
        )

        if "variable_names" in kwargs:
            variable_names = kwargs.get("variable_names")
            assert isinstance(variable_names, Sequence)
            assert len(variable_names) == nvar
            assert all(
                isinstance(var_name, str) for var_name in variable_names
            )
        else:
            variable_names = [f"variable_{i}" for i in range(nvar)]

        group_map.add_attribute(
            group_type="supergroup",
            attribute_name=variable_names[-1],
            attribute_values={supergrp: supergrp for supergrp in supergroups},
            impute_action="ignore",
            attribute_class=Attribute,
            measurement_type=None,
        )

        for varname, cats in zip(variable_names, zip(*cat_combs)):
            group_map.add_attribute(
                group_type="subgroup",
                attribute_name=varname,
                attribute_values={
                    subgrp: cat for subgrp, cat in zip(subgroups, cats)
                },
                impute_action="ignore",
                attribute_class=Attribute,
                measurement_type=None,
            )

        group_map.add_filters("supergroup", [variable_names[-1]])
        group_map.add_filters("subgroup", variable_names)

        return group_map

construct_group_map(**kwargs)

Uses category combinations to construct a GroupMap. Each inner combination defines, in order, the category in each variable that defines a group. The last variable is taken to be the one which defines the supergroup.

E.g [["low risk", "child"], ["high risk", "child"], ["low risk", "adult"],] defines two supergroups, "child" and "adult", and three subgroups, "low risk child", "high risk child", and "low risk adult".

If provided, variable_names is used when populating the group attributes.

Source code in src/cfa_subgroup_imputer/mapping.py
def construct_group_map(self, **kwargs) -> GroupMap:
    """
    Uses category combinations to construct a GroupMap. Each inner combination
    defines, in order, the category in each variable that defines a group. The last variable
    is taken to be the one which defines the supergroup.

    E.g [["low risk", "child"], ["high risk", "child"], ["low risk", "adult"],]
    defines two supergroups, "child" and "adult", and three subgroups,
    "low risk child", "high risk child", and "low risk adult".

    If provided, `variable_names` is used
    when populating the group attributes.
    """
    assert "category_combinations" in kwargs
    cat_combs = kwargs.get("category_combinations")

    assert isinstance(cat_combs, Sequence)
    lens = []
    for cat_comb in cat_combs:
        assert_hashable_sequence(cat_comb)
        lens.append(len(cat_comb))
    nvar = lens[0]
    assert all(len == nvar for len in lens)

    subgroups = [tuple(cat_comb) for cat_comb in cat_combs]
    supergroups = [cat_comb[-1] for cat_comb in cat_combs]

    group_map = GroupMap(
        sub_to_super={
            subgrp: supergrp
            for subgrp, supergrp in zip(subgroups, supergroups)
        },
        groups=None,
    )

    if "variable_names" in kwargs:
        variable_names = kwargs.get("variable_names")
        assert isinstance(variable_names, Sequence)
        assert len(variable_names) == nvar
        assert all(
            isinstance(var_name, str) for var_name in variable_names
        )
    else:
        variable_names = [f"variable_{i}" for i in range(nvar)]

    group_map.add_attribute(
        group_type="supergroup",
        attribute_name=variable_names[-1],
        attribute_values={supergrp: supergrp for supergrp in supergroups},
        impute_action="ignore",
        attribute_class=Attribute,
        measurement_type=None,
    )

    for varname, cats in zip(variable_names, zip(*cat_combs)):
        group_map.add_attribute(
            group_type="subgroup",
            attribute_name=varname,
            attribute_values={
                subgrp: cat for subgrp, cat in zip(subgroups, cats)
            },
            impute_action="ignore",
            attribute_class=Attribute,
            measurement_type=None,
        )

    group_map.add_filters("supergroup", [variable_names[-1]])
    group_map.add_filters("subgroup", variable_names)

    return group_map

src.cfa_subgroup_imputer.utils

get_json_keys(x)

Get keys from list of dicts and make sure they are sync'd and all strs.

Source code in src/cfa_subgroup_imputer/utils.py
def get_json_keys(x: Iterable[dict]) -> list[str]:
    """
    Get keys from list of dicts and make sure they are sync'd and all `str`s.
    """
    keys = get_keys(x)
    assert all(isinstance(k, str) for k in keys)
    return keys  # pyright: ignore[reportReturnType]

get_keys(x)

Get keys from list of dicts and make sure they're sync'd.

Source code in src/cfa_subgroup_imputer/utils.py
def get_keys(x: Iterable[dict]) -> list[Hashable]:
    """
    Get keys from list of dicts and make sure they're sync'd.
    """
    xl = list(x)
    all_keys = set(xl[0].keys())
    assert all(set(datum.keys()) == all_keys for datum in x), (
        "Provided data do not all have same keys."
    )
    return [k for k in xl[0].keys()]

select(x, keys)

Get a list of dicts with only the specified keys.

Source code in src/cfa_subgroup_imputer/utils.py
def select(x: Iterable[dict], keys: Iterable[Hashable]) -> list[dict]:
    """
    Get a list of dicts with only the specified keys.
    """
    res = []
    for row in x:
        res.append({k: row[k] for k in keys})
    return res

unique(x)

Get only the rows out of an iterable of dicts that are unique.

Source code in src/cfa_subgroup_imputer/utils.py
def unique(x: Iterable[dict]) -> list[dict]:
    """
    Get only the rows out of an iterable of dicts that are unique.
    """
    keys = get_keys(x)
    tuples = [_dict_to_tuple(row, keys) for row in x]
    unique_tuples = list(set(tuples))
    return [_tuple_to_dict(row, keys) for row in unique_tuples]

src.cfa_subgroup_imputer.variables

Submodule for handling variables, whether measurements or quantities used to define subgroups.

ImputeAction = Literal['impute', 'copy', 'ignore'] module-attribute

What should be done with this value when disaggregating? - "impute" means the value will be imputed (must be ) - "copy" means the value from the supergroup will be copied to all subgroups - "ignore" means this value is not propagated from supergroups to subgroups

MeasurementType = Literal['count', 'rate', 'count_from_rate', 'rate_from_count'] module-attribute

How a measurement behaves for disaggregation.

Mass-like behavior are things like counts, while density-like measurements are things like rates or proportions.

Attribute

A class for data we can associate with a subgroup.

Source code in src/cfa_subgroup_imputer/variables.py
class Attribute:
    """
    A class for data we can associate with a subgroup.
    """

    def __init__(
        self,
        value: Any,
        name: Hashable,
        impute_action: ImputeAction,
        json_value: Any | None = None,
    ):
        """
        Attribute constructor.

        Parameters
        ----------
        value
            The value of the variable.
        name
            What is this variable? E.g., "size" or "vaccination rate"
        impute_action
            What should we do with this measurement when disaggregating?
            Note that just because we can impute it doesn't mean we will.
        json_value
            If the `value` is not something recorded directly in a dataframe,
            this specifies how to compare to values in json and
            how to output this value to a json.  None means to use the value.
        """
        self.value = value
        self.json_value = json_value if json_value is not None else value
        self.name: Hashable = name
        self.impute_action: ImputeAction = impute_action
        self._validate()

    def __eq__(self, x):
        return (
            self.name == x.name
            and self.value == x.value
            and self.json_value == x.json_value
            and self.impute_action == x.impute_action
        )

    def __hash__(self):
        return hash(
            (self.value, self.json_value, self.name, self.impute_action)
        )

    def __repr__(self):
        return f"Attribute(name={self.name}, impute_action={self.impute_action}, value={self.value}, json_value={self.json_value})"

    def _assert_jsonable(self) -> None:
        assert isinstance(self.name, str), f"{self} has non-str name."

        try:
            json.dumps(self.json_value)
        except (TypeError, OverflowError) as e:
            raise TypeError(
                f"{self} has non-JSON serializable value {self.json_value}"
            ) from e

    def _validate(self):
        assert isinstance(self.name, Hashable)
        # Can't impute the base class
        assert self.impute_action in ["copy", "ignore"]

__init__(value, name, impute_action, json_value=None)

Attribute constructor.

Parameters:

  • value (Any) –

    The value of the variable.

  • name (Hashable) –

    What is this variable? E.g., "size" or "vaccination rate"

  • impute_action (ImputeAction) –

    What should we do with this measurement when disaggregating? Note that just because we can impute it doesn't mean we will.

  • json_value (Any | None, default: None ) –

    If the value is not something recorded directly in a dataframe, this specifies how to compare to values in json and how to output this value to a json. None means to use the value.

Source code in src/cfa_subgroup_imputer/variables.py
def __init__(
    self,
    value: Any,
    name: Hashable,
    impute_action: ImputeAction,
    json_value: Any | None = None,
):
    """
    Attribute constructor.

    Parameters
    ----------
    value
        The value of the variable.
    name
        What is this variable? E.g., "size" or "vaccination rate"
    impute_action
        What should we do with this measurement when disaggregating?
        Note that just because we can impute it doesn't mean we will.
    json_value
        If the `value` is not something recorded directly in a dataframe,
        this specifies how to compare to values in json and
        how to output this value to a json.  None means to use the value.
    """
    self.value = value
    self.json_value = json_value if json_value is not None else value
    self.name: Hashable = name
    self.impute_action: ImputeAction = impute_action
    self._validate()

ImputableAttribute

Bases: Attribute

A class for data we can associate with a subgroup and which can be imputed to subgroups.

Source code in src/cfa_subgroup_imputer/variables.py
class ImputableAttribute(Attribute):
    """
    A class for data we can associate with a subgroup and which can be imputed to subgroups.
    """

    def __init__(
        self,
        value: float,
        name: Hashable,
        impute_action: ImputeAction,
        measurement_type: MeasurementType,
        json_value: Any | None = None,
    ):
        """
        ImputableAttribute constructor.

        Parameters
        ----------
        value
            The value, e.g. a number of cases.
        name
            What is this variable? E.g., "size" or "vaccination rate"
        impute_action
            What should we do with this measurement when disaggregating?
            Note that just because we can impute it doesn't mean we will.
        measurement_type
            What kind of imputable attribute is this?
        json_value
            If the `value` is not something recorded directly in a dataframe,
            this specifies how to compare to values in json and
            how to output this value to a json.  None means to use the value.
        """
        assert value >= 0.0
        super().__init__(
            value=value,
            name=name,
            impute_action=impute_action,
            json_value=json_value,
        )
        self.measurement_type: MeasurementType = measurement_type
        assert self.measurement_type in get_args(MeasurementType)

    def _validate(self):
        assert self.impute_action in get_args(ImputeAction)

    def __eq__(self, x):
        # @TODO: should we check strict equality? allow RateType == RateType? make a toggle? add .equivalent()?
        return (
            super().__eq__(x) and self.measurement_type == x.measurement_type
        )

    def __hash__(self):
        return hash(
            (
                self.value,
                self.json_value,
                self.name,
                self.impute_action,
                self.measurement_type,
            )
        )

    def __mul__(self, k: float) -> Self:
        return type(self)(
            value=self.value * k,
            name=self.name,
            impute_action=self.impute_action,
            measurement_type=self.measurement_type,
        )

    def to_count(self, size: float) -> Self:
        assert self.measurement_type in get_args(RateMeasurementType)
        return type(self)(
            value=self.value * size,
            name=self.name,
            impute_action=self.impute_action,
            measurement_type="count_from_rate",
        )

    def to_rate(self, volume: float) -> Self:
        assert self.measurement_type in get_args(CountMeasurementType)
        return type(self)(
            value=self.value / volume,
            name=self.name,
            impute_action=self.impute_action,
            measurement_type="rate_from_count",
        )

__init__(value, name, impute_action, measurement_type, json_value=None)

ImputableAttribute constructor.

Parameters:

  • value (float) –

    The value, e.g. a number of cases.

  • name (Hashable) –

    What is this variable? E.g., "size" or "vaccination rate"

  • impute_action (ImputeAction) –

    What should we do with this measurement when disaggregating? Note that just because we can impute it doesn't mean we will.

  • measurement_type (MeasurementType) –

    What kind of imputable attribute is this?

  • json_value (Any | None, default: None ) –

    If the value is not something recorded directly in a dataframe, this specifies how to compare to values in json and how to output this value to a json. None means to use the value.

Source code in src/cfa_subgroup_imputer/variables.py
def __init__(
    self,
    value: float,
    name: Hashable,
    impute_action: ImputeAction,
    measurement_type: MeasurementType,
    json_value: Any | None = None,
):
    """
    ImputableAttribute constructor.

    Parameters
    ----------
    value
        The value, e.g. a number of cases.
    name
        What is this variable? E.g., "size" or "vaccination rate"
    impute_action
        What should we do with this measurement when disaggregating?
        Note that just because we can impute it doesn't mean we will.
    measurement_type
        What kind of imputable attribute is this?
    json_value
        If the `value` is not something recorded directly in a dataframe,
        this specifies how to compare to values in json and
        how to output this value to a json.  None means to use the value.
    """
    assert value >= 0.0
    super().__init__(
        value=value,
        name=name,
        impute_action=impute_action,
        json_value=json_value,
    )
    self.measurement_type: MeasurementType = measurement_type
    assert self.measurement_type in get_args(MeasurementType)

Range

A slice of a one-dimensional variable. e.g. [0, 3.14159).

Parameters:

  • lower (float) –

    Value at the lower end of the range.

  • upper (float) –

    Value at the upper end of the range.

Source code in src/cfa_subgroup_imputer/variables.py
class Range:
    """
    A slice of a one-dimensional variable. e.g. [0, 3.14159).

    Parameters
    ----------
    lower
        Value at the lower end of the range.
    upper
        Value at the upper end of the range.
    """

    def __init__(
        self,
        lower: float,
        upper: float,
    ):
        self.lower: float = lower
        self.upper: float = upper

    def __add__(self, x: Self) -> Self:
        # @TODO: should this be less exact?
        assert self.upper == x.lower
        return type(self)(lower=self.lower, upper=x.upper)

    def __contains__(self, x: Self):
        return x.lower >= self.lower and x.upper <= self.upper

    def __gt__(self, x: Self):
        return self.lower >= x.upper

    def __hash__(self):
        return self.to_tuple().__hash__()

    def __lt__(self, x: Self):
        return self.upper <= x.lower

    def __eq__(self, x: Self):
        return self.lower == x.lower and self.upper == x.upper

    def __repr__(self):
        return f"Range({self.lower},{self.upper})"

    def duration(self) -> float:
        return self.upper - self.lower

    @classmethod
    def from_tuple(cls, low_high: tuple[float, float]):
        return cls(low_high[0], low_high[1])

    def to_tuple(self) -> tuple[float, float]:
        return (self.lower, self.upper)

assert_range_spanned_exactly(range, ranges)

Checks that the provided ranges, in aggregate, span exactly range.

[Range(0., 1.), Range(1., 10.)] span Range(0., 10.) [Range(0., 1.), Range(1., 10.1)] does not span Range(0., 10.) [Range(0., 1.), Range(2., 10.)] does not span Range(0., 10.)

Source code in src/cfa_subgroup_imputer/variables.py
def assert_range_spanned_exactly(
    range: Range, ranges: Iterable[Range]
) -> None:
    """
    Checks that the provided `ranges`, in aggregate, span exactly `range`.

    [Range(0., 1.), Range(1., 10.)] span Range(0., 10.)
    [Range(0., 1.), Range(1., 10.1)] does not span Range(0., 10.)
    [Range(0., 1.), Range(2., 10.)] does not span Range(0., 10.)
    """
    ranges = sorted(ranges)
    lower = range.lower
    assert ranges[0].lower == lower
    cumulative = ranges[0]
    for r in ranges[1:]:
        cumulative += r
    assert cumulative.upper == range.upper