Skip to content

API reference

cladecombiner.taxon

Taxon

Representation of taxonomic units.

Source code in cladecombiner/taxon.py
class Taxon:
    """
    Representation of taxonomic units.
    """

    def __init__(self, name: str, is_tip: bool, data: Any = None):
        if not isinstance(is_tip, bool):
            raise TypeError()
        self.name: str = name
        self.tip: bool = is_tip
        self.data: Any = data

    def __eq__(self, other) -> bool:
        return self.name == other.name and self.tip == other.tip

    def __hash__(self) -> int:
        return hash(str(self.name) + str(self.tip))

    def __repr__(self) -> str:
        return f"Taxon({self.name}, tip={str(self.tip)})"

cladecombiner.taxon_utils

printable_taxon_list(taxa, sep='\n')

Prettier printing of lists of taxa.

Parameters:

  • taxa (sequence[Taxon]) –

    The Taxon objects to be printed.

  • sep (str, default: '\n' ) –

    The separator for printing the list

Returns:

  • str

    A string which may be fed to print().

Source code in cladecombiner/taxon_utils.py
def printable_taxon_list(taxa: Sequence[Taxon], sep: str = "\n") -> str:
    """
    Prettier printing of lists of taxa.

    Parameters
    ---------
    taxa : sequence[Taxon]
        The Taxon objects to be printed.
    sep : str
        The separator for printing the list

    Returns
    -------
    str
        A string which may be fed to print().
    """
    print_str = ""
    for taxon in taxa:
        print_str += str(taxon) + sep
    return print_str

read_taxa(fp, is_tip=True, nomenclature=None, taxonomy_scheme=None)

Reads in taxa as a list of Taxon objects.

Parameters:

  • fp (str) –

    The file path to be read from

  • is_tip (bool | Sequence[bool], default: True ) –

    Either one bool specifying whether all these are tip taxa or not, or one bool per taxon in the file specifying for each.

  • nomenclature (Optional[Nomenclature], default: None ) –

    If specified, taxon names are checked for validity according to this nomenclature scheme, and an error is raised if an invalid taxon is found.

  • taxonomy_scheme (Optional[TaxonomyScheme], default: None ) –

    If specified, taxon names are checked for validity according to this taxonomy scheme, and an error is raised if an invalid taxon is found.

Returns:

  • Sequence[Taxon]

    Container of the taxa as Taxon objects.

Source code in cladecombiner/taxon_utils.py
def read_taxa(
    fp: str,
    is_tip: bool | Sequence[bool] = True,
    nomenclature: Optional[Nomenclature] = None,
    taxonomy_scheme: Optional[TaxonomyScheme] = None,
) -> Sequence[Taxon]:
    """
    Reads in taxa as a list of Taxon objects.

    Parameters
    ---------
    fp : str
        The file path to be read from
    is_tip : bool | Sequence[bool]
        Either one bool specifying whether all these are tip taxa or not,
        or one bool per taxon in the file specifying for each.
    nomenclature : Optional[Nomenclature]
        If specified, taxon names are checked for validity according to this
        nomenclature scheme, and an error is raised if an invalid taxon is
        found.
    taxonomy_scheme : Optional[TaxonomyScheme]
        If specified, taxon names are checked for validity according to this
        taxonomy scheme, and an error is raised if an invalid taxon is found.

    Returns
    -------
    Sequence[Taxon]
        Container of the taxa as Taxon objects.
    """
    assert nomenclature is None or isinstance(nomenclature, Nomenclature)

    assert taxonomy_scheme is None or isinstance(
        taxonomy_scheme, TaxonomyScheme
    )

    ext = path.splitext(fp)[1]
    taxa = []
    if ext == ".txt":
        f = open(fp)
        lines = f.readlines()
        f.close()
        taxa = []
        if not isinstance(is_tip, Sequence):
            is_tip = [is_tip for _ in range(len(lines))]

        for i in range(len(lines)):
            taxon = Taxon(lines[i][:-1], is_tip[i])
            taxa.append(taxon)

    if nomenclature:
        nomenclature.validate([taxon.name for taxon in taxa])

    if taxonomy_scheme:
        taxonomy_scheme.validate([taxon for taxon in taxa])
    return taxa

sort_taxa(taxa, taxonomy_scheme)

Sorts taxa into a phylogenetic preorder according to a taxonomy scheme, such that if taxon X contains Y, Y comes before X.

For example, the Pango lineages [KP.1, JN.1, BA.2, BA.3] will be sorted such that (1) KP.1 comes before both JN.1 and BA.2 and (2) JN.1 appears before BA.2. The ordering of these with respect to BA.3 is arbitrary, as BA.3 is sister to BA.2 and its descendants.

Parameters:

  • taxa (Iterable[Taxon]) –

    The Taxon objects to be sorted.

  • taxonomy_scheme (TreelikeTaxonomyScheme) –

    The taxonomy scheme by which to sort the taxa.

Returns:

  • list[Taxon]

    The sorted taxa.

Source code in cladecombiner/taxon_utils.py
def sort_taxa(
    taxa: Iterable[Taxon], taxonomy_scheme: TreelikeTaxonomyScheme
) -> list[Taxon]:
    """
    Sorts taxa into a phylogenetic preorder according to a taxonomy scheme, such that if
    taxon X contains Y, Y comes before X.

    For example, the Pango lineages [KP.1, JN.1, BA.2, BA.3] will be sorted such that (1) KP.1
    comes before both JN.1 and BA.2 and (2) JN.1 appears before BA.2. The ordering of these
    with respect to BA.3 is arbitrary, as BA.3 is sister to BA.2 and its descendants.

    Parameters
    ---------
    taxa : Iterable[Taxon]
        The Taxon objects to be sorted.
    taxonomy_scheme : TreelikeTaxonomyScheme
        The taxonomy scheme by which to sort the taxa.

    Returns
    -------
    list[Taxon]
        The sorted taxa.
    """
    taxonomy_scheme.validate(taxa)
    return sorted(
        taxa,
        key=cmp_to_key(
            lambda x, y: 1 if taxonomy_scheme.contains(x, y) else -1
        ),
    )

cladecombiner.aggregator

Aggregation

Bases: dict[Taxon, Taxon]

An object for aggregations, basically just a dictionary.

Source code in cladecombiner/aggregator.py
class Aggregation(dict[Taxon, Taxon]):
    """
    An object for aggregations, basically just a dictionary.
    """

    def _validate(
        self, input_taxa: Iterable[Taxon], taxon_map: dict[Taxon, Taxon]
    ):
        """
        Checks that all input taxa are in the mapping.
        """
        if set(taxon_map.keys()) != set(input_taxa):
            raise RuntimeError(
                "Mismatch between aggregated taxa and input taxa. Input taxa are: "
                + str(input_taxa)
                + " but aggregated taxa are "
                + str(taxon_map.keys())
            )

    def __init__(
        self, input_taxa: Iterable[Taxon], taxon_map: dict[Taxon, Taxon]
    ):
        self._validate(input_taxa, taxon_map)
        super().__init__(taxon_map)

    def to_str(self):
        """
        Get str : str map of taxa names
        """
        return {k.name: v.name for k, v in self.items()}

to_str()

Get str : str map of taxa names

Source code in cladecombiner/aggregator.py
def to_str(self):
    """
    Get str : str map of taxa names
    """
    return {k.name: v.name for k, v in self.items()}

Aggregator

Bases: ABC

Aggregators return Aggregations, maps of input_taxon : aggregated_taxon

Source code in cladecombiner/aggregator.py
class Aggregator(ABC):
    """
    Aggregators return Aggregations, maps of input_taxon : aggregated_taxon
    """

    @abstractmethod
    def aggregate(self, input_taxa: Iterable[Taxon]) -> Aggregation:
        raise NotImplementedError()

ArbitraryAggregator

Bases: Aggregator

Aggregation via a user-provided dictionary.

Source code in cladecombiner/aggregator.py
class ArbitraryAggregator(Aggregator):
    """
    Aggregation via a user-provided dictionary.
    """

    def __init__(
        self,
        map: dict[Taxon, Taxon],
    ):
        """
        FixedAggregator constructor.

        Parameters
        ----------
        map : dict[Taxon, Taxon]
            Dictionary mapping the input taxa to their aggregated taxa.
        """
        self.map = map

    def aggregate(self, input_taxa: Iterable[Taxon]) -> Aggregation:
        return Aggregation(
            input_taxa, {taxon: self.map[taxon] for taxon in input_taxa}
        )

__init__(map)

FixedAggregator constructor.

Parameters:

  • map (dict[Taxon, Taxon]) –

    Dictionary mapping the input taxa to their aggregated taxa.

Source code in cladecombiner/aggregator.py
def __init__(
    self,
    map: dict[Taxon, Taxon],
):
    """
    FixedAggregator constructor.

    Parameters
    ----------
    map : dict[Taxon, Taxon]
        Dictionary mapping the input taxa to their aggregated taxa.
    """
    self.map = map

BasicPhylogeneticAggregator

Bases: Aggregator

An aggregator which maps a set of input taxa to a fixed set of aggregation targets using a tree.

Source code in cladecombiner/aggregator.py
class BasicPhylogeneticAggregator(Aggregator):
    """
    An aggregator which maps a set of input taxa to a fixed set of aggregation targets using a tree.
    """

    def __init__(
        self,
        targets: Iterable[Taxon],
        taxonomy_scheme: PhylogeneticTaxonomyScheme,
        sort_clades: bool = True,
        off_target: str = "other",
        warn: bool = True,
    ):
        """
        BasicPhylogeneticAggregator constructor.

        Parameters
        ----------
        targets : Iterable[Taxon]
            The taxa into which we wish to aggregate the input taxa.

        taxonomy_scheme : PhylogeneticTaxonomyScheme
            The tree which we use to do the mapping.

        sort_clades : bool
            If False, mapping is done using the taxa as ordered in `targets`.
            If True, `targets` are taxonomically sorted so that so that larger
            `targets` do not override smaller ones. For example, if BA.2 and
            BA.2.86 are both aggregation targets, sort_clades = True would handle
            BA.2.86 first, such that JN.1 would map to BA.2.86, while BG.1 would
            map to BA.2. If BA.2 is processed first, both will map to it.

        off_target : str
            Specifies what to do with taxa which do not belong to any target.
            Options are "other" for aggregating all such taxa into Taxon("other"),
            and "self" for aggregating all such taxa into themselves.
        """
        self.taxonomy_scheme = taxonomy_scheme
        self.taxonomy_scheme.validate(targets)
        self.targets = [taxon for taxon in targets]
        off_target_options = ["self", "other"]
        if off_target not in off_target_options:
            raise RuntimeError(
                f"Unrecognized value for `off_target`, options are:{off_target}"
            )
        self.off_target = off_target
        self.warn = warn
        if sort_clades:
            self.targets = sort_taxa(self.targets, self.taxonomy_scheme)

    def _check_missing(self, agg_map: dict[Taxon, Taxon]):
        if self.warn:
            used_targets = set(agg_map.values())
            unused_targets = [
                target for target in self.targets if target not in used_targets
            ]
            if len(unused_targets) > 0:
                warn(
                    f"The aggregation does not make use of the following input targets: {unused_targets}."
                )

    def aggregate(self, input_taxa: Iterable[Taxon]) -> Aggregation:
        self.taxonomy_scheme.validate(input_taxa)
        agg_map: dict[Taxon, Taxon] = {}
        stack = set(input_taxa)
        for target in self.targets:
            children = self.taxonomy_scheme.descendants(target, True)
            sub_map = {taxon: target for taxon in stack if taxon in children}
            agg_map = agg_map | sub_map
            stack.difference_update(set(agg_map.keys()))

        if len(stack) > 0:
            if self.off_target == "other":
                cleanup = HomogenousAggregator(
                    Taxon("other", False)
                ).aggregate(stack)
            else:
                cleanup = SelfAggregator().aggregate(stack)
            agg_map = agg_map | cleanup

        self._check_missing(agg_map)

        return Aggregation(input_taxa, agg_map)

__init__(targets, taxonomy_scheme, sort_clades=True, off_target='other', warn=True)

BasicPhylogeneticAggregator constructor.

Parameters:

  • targets (Iterable[Taxon]) –

    The taxa into which we wish to aggregate the input taxa.

  • taxonomy_scheme (PhylogeneticTaxonomyScheme) –

    The tree which we use to do the mapping.

  • sort_clades (bool, default: True ) –

    If False, mapping is done using the taxa as ordered in targets. If True, targets are taxonomically sorted so that so that larger targets do not override smaller ones. For example, if BA.2 and BA.2.86 are both aggregation targets, sort_clades = True would handle BA.2.86 first, such that JN.1 would map to BA.2.86, while BG.1 would map to BA.2. If BA.2 is processed first, both will map to it.

  • off_target (str, default: 'other' ) –

    Specifies what to do with taxa which do not belong to any target. Options are "other" for aggregating all such taxa into Taxon("other"), and "self" for aggregating all such taxa into themselves.

Source code in cladecombiner/aggregator.py
def __init__(
    self,
    targets: Iterable[Taxon],
    taxonomy_scheme: PhylogeneticTaxonomyScheme,
    sort_clades: bool = True,
    off_target: str = "other",
    warn: bool = True,
):
    """
    BasicPhylogeneticAggregator constructor.

    Parameters
    ----------
    targets : Iterable[Taxon]
        The taxa into which we wish to aggregate the input taxa.

    taxonomy_scheme : PhylogeneticTaxonomyScheme
        The tree which we use to do the mapping.

    sort_clades : bool
        If False, mapping is done using the taxa as ordered in `targets`.
        If True, `targets` are taxonomically sorted so that so that larger
        `targets` do not override smaller ones. For example, if BA.2 and
        BA.2.86 are both aggregation targets, sort_clades = True would handle
        BA.2.86 first, such that JN.1 would map to BA.2.86, while BG.1 would
        map to BA.2. If BA.2 is processed first, both will map to it.

    off_target : str
        Specifies what to do with taxa which do not belong to any target.
        Options are "other" for aggregating all such taxa into Taxon("other"),
        and "self" for aggregating all such taxa into themselves.
    """
    self.taxonomy_scheme = taxonomy_scheme
    self.taxonomy_scheme.validate(targets)
    self.targets = [taxon for taxon in targets]
    off_target_options = ["self", "other"]
    if off_target not in off_target_options:
        raise RuntimeError(
            f"Unrecognized value for `off_target`, options are:{off_target}"
        )
    self.off_target = off_target
    self.warn = warn
    if sort_clades:
        self.targets = sort_taxa(self.targets, self.taxonomy_scheme)

HomogenousAggregator

Bases: Aggregator

Aggregation of every taxon to some catch-all taxon.

Source code in cladecombiner/aggregator.py
class HomogenousAggregator(Aggregator):
    """
    Aggregation of every taxon to some catch-all taxon.
    """

    def __init__(self, taxon: Taxon):
        self.agg_taxon = taxon

    def aggregate(self, input_taxa: Iterable[Taxon]) -> Aggregation:
        return Aggregation(
            input_taxa, {taxon: self.agg_taxon for taxon in input_taxa}
        )

SelfAggregator

Bases: Aggregator

Aggregation of every taxon to itself

Source code in cladecombiner/aggregator.py
class SelfAggregator(Aggregator):
    """
    Aggregation of every taxon to itself
    """

    def __init__(self):
        pass

    def aggregate(self, input_taxa: Iterable[Taxon]) -> Aggregation:
        return Aggregation(input_taxa, {taxon: taxon for taxon in input_taxa})

SerialAggregator

Bases: Aggregator

A number of aggregators chained in serial.

Source code in cladecombiner/aggregator.py
class SerialAggregator(Aggregator):
    """
    A number of aggregators chained in serial.
    """

    def __init__(self, aggregators: Iterable[Aggregator]):
        self.aggregators = aggregators

    def aggregate(self, input_taxa: Iterable[Taxon]) -> Aggregation:
        taxa = list(input_taxa)
        comp_agg = SelfAggregator().aggregate(input_taxa)

        for aggregator in self.aggregators:
            agg = aggregator.aggregate(taxa)
            taxa = set(agg.values())
            comp_agg = {taxon: agg[comp_agg[taxon]] for taxon in input_taxa}

        return Aggregation(input_taxa, comp_agg)

cladecombiner.nomenclature

pango_sc2_nomenclature = PangoNomenclature(alias_map_hybrid=[list], max_sublevels=3, special=['A', 'B'], system='SARS-CoV-2', url_alias_json='https://raw.githubusercontent.com/cov-lineages/pango-designation/master/pango_designation/alias_key.json') module-attribute

Pango nomenclature for SARS-CoV-2.

A PangoNomenclature with a specific .name() method, a known url for the alias map, maximally 3 sublevels, and the special root descendants A and B.

See: https://doi.org/10.1038/s41564-020-0770-5

AlgorithmicNomenclature

Bases: Nomenclature

Abstract class Nomenclature schemes which encode a taxon's history in its name in some form.

The primary exemplar is the Pango nomenclature, which descends from this class via the more-general PangoLikeNomenclature.

This class assumes that the history of a set of taxa can be decoded (in some way), the result being for each taxon a Sequence of taxa linking the root to it. A method is provided for constructing from these histories a tree suitable for use in PhylogeneticTaxonomyScheme.

Source code in cladecombiner/nomenclature.py
class AlgorithmicNomenclature(Nomenclature):
    """
    Abstract class Nomenclature schemes which encode a taxon's history in its
    name in some form.

    The primary exemplar is the Pango nomenclature, which descends from this
    class via the more-general PangoLikeNomenclature.

    This class assumes that the history of a set of taxa can be decoded (in
    some way), the result being for each taxon a Sequence of taxa linking
    the root to it. A method is provided for constructing from these histories
    a tree suitable for use in PhylogeneticTaxonomyScheme.
    """

    @abstractmethod
    def full_histories(
        self, taxa: Sequence[str], stop_at_hybrid: bool = False
    ) -> Sequence[Sequence[str]]:
        """
        For each taxon, get the sequence of names of ancestors from the root
        to it.

        Parameters
        ----------
        taxa : Sequence[str]
            Each string is the name of one taxon for which we want the full
            history.
        stop_at_hybrid : boolean
            If True, the history for a taxon starts at the most recent
            hybridization event in its ancestry. If False, we extract a linear
            history by taking the ancestry through the first indicated parent
            every time.

        Returns
        -------
        Sequence[Sequence[str]]
            For each input taxon, the history from the root to the taxon as a
            Sequence of names of taxa.
        """
        raise NotImplementedError()

    def subtree_from_histories(
        self, node: dendropy.Node, lvl: int, histories: Sequence[Sequence[str]]
    ) -> None:
        """
        Recursive building of taxonomic tree from taxon-specific histories.

        Parameters
        ----------
        node : dendropy.Node
            Node defining the subtree to operate on.
        lvl : int
            How many levels deep from the root are we?
        histories: Sequence[Sequence[str]]
            The histories of all taxa in this subtree for which we are
            attempting to construct the subtree.

        Returns
        -------
        None
            Modifies tree in-place recursively.
        """
        next_step = set([history[lvl] for history in histories])
        if len(next_step) == 1:
            child = dendropy.Node(label=next_step.pop())
            node.add_child(child)
            next_histories = [
                history for history in histories if len(history) > lvl + 1
            ]
            if next_histories:
                self.subtree_from_histories(child, lvl + 1, next_histories)
        elif len(next_step) > 1:
            for step in next_step:
                child = dendropy.Node(label=step)
                node.add_child(child)
                next_histories = [
                    history
                    for history in histories
                    if len(history) > lvl + 1 and history[lvl] == step
                ]
                if next_histories:
                    self.subtree_from_histories(child, lvl + 1, next_histories)

    def taxonomy_tree(
        self,
        taxa: Sequence[Taxon],
        insert_tips: bool,
        name_cleanup_fun: Optional[Callable[[str], str]] = None,
        warn: bool = True,
    ) -> dendropy.Tree:
        """
        Makes a taxonomy tree for a set of taxa.

        A taxonomy tree is the core object of a PhylogeneticTaxonomyScheme,
        being a phylogenetic representation of the relationships between
        all taxa. It takes the form of a dendropy.Tree object where every
        node has a label.

        Parameters
        ----------
        taxa : Sequence[Taxon]
            We will build the tree of these taxa.
        insert_tips : boolean
            If True, where a Taxon in the provided taxa is an internal node,
            a tip is added to represent any paraphyletic observations of this
            taxon using add_paraphyletic_tips().
        name_cleanup_fun : Optional[Callable]
            A function applied to all node labels after the tree is
            constructed, to ensure validity of all names.
        warn : bool
            Should we warn the user if any taxa are dropped in the process
            of making the tree?

        Returns
        -------
        dendropy.Tree object with all nodes labeled
            The taxonomy tree is given by the phylogeny and all nodes are
            labeled with the taxon they represent. This tree may have nodes
            with only one descendant.
        """
        unique_names = list(set([taxon.name for taxon in taxa if taxon.tip]))
        if warn and (len(unique_names) < len(taxa)):
            warnings.warn(
                "Removed non-unique and/or non-tip taxa to build tree."
            )

        histories = self.full_histories(unique_names)

        all_names: set[str] = set()
        for history in histories:
            for taxon in history:
                all_names.add(taxon)

        namespace = dendropy.TaxonNamespace(list(all_names))
        phy = dendropy.Tree(taxon_namespace=namespace)
        node = phy.seed_node
        if not isinstance(node, dendropy.Node):
            # Should never hit, required for type checking
            raise RuntimeError(
                "Cannot start tree because seed_node is not a dendropy.Node"
            )

        # Support for forests, where we break trees at recombination, could be added
        first_step = set([history[0] for history in histories])
        if len(first_step) != 1:
            raise RuntimeError(
                "Cannot start tree, not all histories have same root"
            )
        node.label = first_step.pop()

        self.subtree_from_histories(node, 1, histories)

        if name_cleanup_fun is not None:
            for node in phy.preorder_node_iter():
                node.label = name_cleanup_fun(node.label)

        if insert_tips:
            phy = add_paraphyletic_tips(phy, unique_names)

        tip_names = [node.label for node in phy.leaf_node_iter()]
        int_names = [node.label for node in phy.preorder_internal_node_iter()]

        if len(set(tip_names)) != len(tip_names):
            tab = Counter(tip_names)
            mults = ", ".join(
                [
                    str(k) + " (x" + str(v) + ")"
                    for k, v in tab.items()
                    if v > 1
                ]
            )
            raise RuntimeError(
                "Malformed tree has multiples of tip taxa: " + mults
            )

        if len(set(int_names)) != len(int_names):
            tab = Counter(int_names)
            mults = ", ".join(
                [
                    str(k) + " (x" + str(v) + ")"
                    for k, v in tab.items()
                    if v > 1
                ]
            )
            raise RuntimeError(
                "Malformed tree has multiples of internal taxa: " + mults
            )

        return phy

full_histories(taxa, stop_at_hybrid=False) abstractmethod

For each taxon, get the sequence of names of ancestors from the root to it.

Parameters:

  • taxa (Sequence[str]) –

    Each string is the name of one taxon for which we want the full history.

  • stop_at_hybrid (boolean, default: False ) –

    If True, the history for a taxon starts at the most recent hybridization event in its ancestry. If False, we extract a linear history by taking the ancestry through the first indicated parent every time.

Returns:

  • Sequence[Sequence[str]]

    For each input taxon, the history from the root to the taxon as a Sequence of names of taxa.

Source code in cladecombiner/nomenclature.py
@abstractmethod
def full_histories(
    self, taxa: Sequence[str], stop_at_hybrid: bool = False
) -> Sequence[Sequence[str]]:
    """
    For each taxon, get the sequence of names of ancestors from the root
    to it.

    Parameters
    ----------
    taxa : Sequence[str]
        Each string is the name of one taxon for which we want the full
        history.
    stop_at_hybrid : boolean
        If True, the history for a taxon starts at the most recent
        hybridization event in its ancestry. If False, we extract a linear
        history by taking the ancestry through the first indicated parent
        every time.

    Returns
    -------
    Sequence[Sequence[str]]
        For each input taxon, the history from the root to the taxon as a
        Sequence of names of taxa.
    """
    raise NotImplementedError()

subtree_from_histories(node, lvl, histories)

Recursive building of taxonomic tree from taxon-specific histories.

Parameters:

  • node (Node) –

    Node defining the subtree to operate on.

  • lvl (int) –

    How many levels deep from the root are we?

  • histories (Sequence[Sequence[str]]) –

    The histories of all taxa in this subtree for which we are attempting to construct the subtree.

Returns:

  • None

    Modifies tree in-place recursively.

Source code in cladecombiner/nomenclature.py
def subtree_from_histories(
    self, node: dendropy.Node, lvl: int, histories: Sequence[Sequence[str]]
) -> None:
    """
    Recursive building of taxonomic tree from taxon-specific histories.

    Parameters
    ----------
    node : dendropy.Node
        Node defining the subtree to operate on.
    lvl : int
        How many levels deep from the root are we?
    histories: Sequence[Sequence[str]]
        The histories of all taxa in this subtree for which we are
        attempting to construct the subtree.

    Returns
    -------
    None
        Modifies tree in-place recursively.
    """
    next_step = set([history[lvl] for history in histories])
    if len(next_step) == 1:
        child = dendropy.Node(label=next_step.pop())
        node.add_child(child)
        next_histories = [
            history for history in histories if len(history) > lvl + 1
        ]
        if next_histories:
            self.subtree_from_histories(child, lvl + 1, next_histories)
    elif len(next_step) > 1:
        for step in next_step:
            child = dendropy.Node(label=step)
            node.add_child(child)
            next_histories = [
                history
                for history in histories
                if len(history) > lvl + 1 and history[lvl] == step
            ]
            if next_histories:
                self.subtree_from_histories(child, lvl + 1, next_histories)

taxonomy_tree(taxa, insert_tips, name_cleanup_fun=None, warn=True)

Makes a taxonomy tree for a set of taxa.

A taxonomy tree is the core object of a PhylogeneticTaxonomyScheme, being a phylogenetic representation of the relationships between all taxa. It takes the form of a dendropy.Tree object where every node has a label.

Parameters:

  • taxa (Sequence[Taxon]) –

    We will build the tree of these taxa.

  • insert_tips (boolean) –

    If True, where a Taxon in the provided taxa is an internal node, a tip is added to represent any paraphyletic observations of this taxon using add_paraphyletic_tips().

  • name_cleanup_fun (Optional[Callable], default: None ) –

    A function applied to all node labels after the tree is constructed, to ensure validity of all names.

  • warn (bool, default: True ) –

    Should we warn the user if any taxa are dropped in the process of making the tree?

Returns:

  • dendropy.Tree object with all nodes labeled

    The taxonomy tree is given by the phylogeny and all nodes are labeled with the taxon they represent. This tree may have nodes with only one descendant.

Source code in cladecombiner/nomenclature.py
def taxonomy_tree(
    self,
    taxa: Sequence[Taxon],
    insert_tips: bool,
    name_cleanup_fun: Optional[Callable[[str], str]] = None,
    warn: bool = True,
) -> dendropy.Tree:
    """
    Makes a taxonomy tree for a set of taxa.

    A taxonomy tree is the core object of a PhylogeneticTaxonomyScheme,
    being a phylogenetic representation of the relationships between
    all taxa. It takes the form of a dendropy.Tree object where every
    node has a label.

    Parameters
    ----------
    taxa : Sequence[Taxon]
        We will build the tree of these taxa.
    insert_tips : boolean
        If True, where a Taxon in the provided taxa is an internal node,
        a tip is added to represent any paraphyletic observations of this
        taxon using add_paraphyletic_tips().
    name_cleanup_fun : Optional[Callable]
        A function applied to all node labels after the tree is
        constructed, to ensure validity of all names.
    warn : bool
        Should we warn the user if any taxa are dropped in the process
        of making the tree?

    Returns
    -------
    dendropy.Tree object with all nodes labeled
        The taxonomy tree is given by the phylogeny and all nodes are
        labeled with the taxon they represent. This tree may have nodes
        with only one descendant.
    """
    unique_names = list(set([taxon.name for taxon in taxa if taxon.tip]))
    if warn and (len(unique_names) < len(taxa)):
        warnings.warn(
            "Removed non-unique and/or non-tip taxa to build tree."
        )

    histories = self.full_histories(unique_names)

    all_names: set[str] = set()
    for history in histories:
        for taxon in history:
            all_names.add(taxon)

    namespace = dendropy.TaxonNamespace(list(all_names))
    phy = dendropy.Tree(taxon_namespace=namespace)
    node = phy.seed_node
    if not isinstance(node, dendropy.Node):
        # Should never hit, required for type checking
        raise RuntimeError(
            "Cannot start tree because seed_node is not a dendropy.Node"
        )

    # Support for forests, where we break trees at recombination, could be added
    first_step = set([history[0] for history in histories])
    if len(first_step) != 1:
        raise RuntimeError(
            "Cannot start tree, not all histories have same root"
        )
    node.label = first_step.pop()

    self.subtree_from_histories(node, 1, histories)

    if name_cleanup_fun is not None:
        for node in phy.preorder_node_iter():
            node.label = name_cleanup_fun(node.label)

    if insert_tips:
        phy = add_paraphyletic_tips(phy, unique_names)

    tip_names = [node.label for node in phy.leaf_node_iter()]
    int_names = [node.label for node in phy.preorder_internal_node_iter()]

    if len(set(tip_names)) != len(tip_names):
        tab = Counter(tip_names)
        mults = ", ".join(
            [
                str(k) + " (x" + str(v) + ")"
                for k, v in tab.items()
                if v > 1
            ]
        )
        raise RuntimeError(
            "Malformed tree has multiples of tip taxa: " + mults
        )

    if len(set(int_names)) != len(int_names):
        tab = Counter(int_names)
        mults = ", ".join(
            [
                str(k) + " (x" + str(v) + ")"
                for k, v in tab.items()
                if v > 1
            ]
        )
        raise RuntimeError(
            "Malformed tree has multiples of internal taxa: " + mults
        )

    return phy

Nomenclature

Bases: ABC

Abstract class for most general casting of Nomenclature

Nomenclature concerns rules for naming taxa, and what names may imply about those taxa.

Source code in cladecombiner/nomenclature.py
class Nomenclature(ABC):
    """
    Abstract class for most general casting of Nomenclature

    Nomenclature concerns rules for naming taxa, and what names may imply about
    those taxa.
    """

    @abstractmethod
    def is_ambiguous(self, name: str) -> bool:
        """
        Does this name indicate an ambiguous taxon?

        Ambiguity means a taxon specified only to a higher level than to which
        resolution is possible.

        Returns
        -------
        bool
            True if this name indicates an ambiguous taxon.
        """
        raise NotImplementedError()

    @abstractmethod
    def is_hybrid(self, name: str) -> bool:
        """
        Does this name indicate a hybrid?

        Hybrid taxa have more than one parent taxon.

        Parameters
        ----------
        name : string specifying name of the taxon

        Returns
        -------
        bool
            True if this name indicates a hybrid taxon.
        """
        raise NotImplementedError()

    @abstractmethod
    def is_root(self, name: str) -> bool:
        """
        Does this string specify the root taxon?

        The root taxon includes all taxa in the nomenclature scheme.

        Parameters
        ----------
        name : string specifying name of the taxon

        Returns
        -------
        bool
            True if this name indicates the root taxon.
        """
        raise NotImplementedError()

    @abstractmethod
    def is_valid_name(self, name: str) -> bool:
        """
        Is this name valid in the nomenclature scheme?

        Parameters
        ----------
        name : string specifying name of the taxon

        Returns
        -------
        bool
            True if this is a valid name under the nomenclature.
        """
        raise NotImplementedError()

    @abstractmethod
    def name(self) -> str:
        """
        Name of this nomenclature scheme.

        Returns
        -------
        string
            The name of this taxonomy scheme.
        """
        raise NotImplementedError()

    def validate(self, names: Iterable[str]) -> None:
        nonstr = [name for name in names if not isinstance(name, str)]
        if len(nonstr) > 0:
            raise TypeError(f"Found non-string names: {nonstr}")
        invalid = [name for name in names if not self.is_valid_name(name)]
        if len(invalid) > 0:
            raise ValueError(
                f"The following names are invalid under the provided Nomenclature ({self.name()}): {invalid}"
            )

    def __str__(self):
        return self.name()

is_ambiguous(name) abstractmethod

Does this name indicate an ambiguous taxon?

Ambiguity means a taxon specified only to a higher level than to which resolution is possible.

Returns:

  • bool

    True if this name indicates an ambiguous taxon.

Source code in cladecombiner/nomenclature.py
@abstractmethod
def is_ambiguous(self, name: str) -> bool:
    """
    Does this name indicate an ambiguous taxon?

    Ambiguity means a taxon specified only to a higher level than to which
    resolution is possible.

    Returns
    -------
    bool
        True if this name indicates an ambiguous taxon.
    """
    raise NotImplementedError()

is_hybrid(name) abstractmethod

Does this name indicate a hybrid?

Hybrid taxa have more than one parent taxon.

Parameters:

  • name (string specifying name of the taxon) –

Returns:

  • bool

    True if this name indicates a hybrid taxon.

Source code in cladecombiner/nomenclature.py
@abstractmethod
def is_hybrid(self, name: str) -> bool:
    """
    Does this name indicate a hybrid?

    Hybrid taxa have more than one parent taxon.

    Parameters
    ----------
    name : string specifying name of the taxon

    Returns
    -------
    bool
        True if this name indicates a hybrid taxon.
    """
    raise NotImplementedError()

is_root(name) abstractmethod

Does this string specify the root taxon?

The root taxon includes all taxa in the nomenclature scheme.

Parameters:

  • name (string specifying name of the taxon) –

Returns:

  • bool

    True if this name indicates the root taxon.

Source code in cladecombiner/nomenclature.py
@abstractmethod
def is_root(self, name: str) -> bool:
    """
    Does this string specify the root taxon?

    The root taxon includes all taxa in the nomenclature scheme.

    Parameters
    ----------
    name : string specifying name of the taxon

    Returns
    -------
    bool
        True if this name indicates the root taxon.
    """
    raise NotImplementedError()

is_valid_name(name) abstractmethod

Is this name valid in the nomenclature scheme?

Parameters:

  • name (string specifying name of the taxon) –

Returns:

  • bool

    True if this is a valid name under the nomenclature.

Source code in cladecombiner/nomenclature.py
@abstractmethod
def is_valid_name(self, name: str) -> bool:
    """
    Is this name valid in the nomenclature scheme?

    Parameters
    ----------
    name : string specifying name of the taxon

    Returns
    -------
    bool
        True if this is a valid name under the nomenclature.
    """
    raise NotImplementedError()

name() abstractmethod

Name of this nomenclature scheme.

Returns:

  • string

    The name of this taxonomy scheme.

Source code in cladecombiner/nomenclature.py
@abstractmethod
def name(self) -> str:
    """
    Name of this nomenclature scheme.

    Returns
    -------
    string
        The name of this taxonomy scheme.
    """
    raise NotImplementedError()

PangoLikeNomenclature

Bases: AlgorithmicNomenclature

A Pango-like nomenclature is an AlgorithmicNomenclature with more specific assumptions about the encoding of history.

Specifically, we assume that the name encodes the history in a string such that the name is a series of (sub)levels denoted by a consistent set of characters (say, digits) separated by a consistent separator (say, r"."). The first portion of the name is assumed to be an alias, which is a set of different characters (say, upper case letters) which serve as shorthand for a longer series of levels. The alias is allowed to be cumulative (such as in RSV nomenclature) or not (such as in Pango nomenclature).

An external file storing the alias shortcuts is required.

This class is partially abstract and should not directly be used to initialize Nomenclature objects.

Source code in cladecombiner/nomenclature.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
class PangoLikeNomenclature(AlgorithmicNomenclature):
    """
    A Pango-like nomenclature is an AlgorithmicNomenclature with more specific
    assumptions about the encoding of history.

    Specifically, we assume that the name encodes the history in a string such
    that the name is a series of (sub)levels denoted by a consistent set of
    characters (say, digits) separated by a consistent separator (say, r".").
    The first portion of the name is assumed to be an alias, which is a set of
    different characters (say, upper case letters) which serve as shorthand
    for a longer series of levels. The alias is allowed to be cumulative (such
    as in RSV nomenclature) or not (such as in Pango nomenclature).

    An external file storing the alias shortcuts is required.

    This class is partially abstract and should not directly be used to initialize
    Nomenclature objects.
    """

    def __init__(
        self,
        alias_map_hybrid: Collection[type],
        charsets: Sequence[set],
        cumulative_alias: bool,
        max_sublevels: int,
        root: str,
        sep: str,
        special: Container,
        name: str,
    ):
        """
        Initialization of PangoLikeNomenclature objects.

        Parameters
        ----------
        alias_map_hybrid : Collection[type]
            Container type(s) used in alias map when hybrid ancestry is indicated.
        charsets : Sequence[set]
            Defines what's allowed in alias names [0] and sublevel names [1]
        cumulative_alias: bool
            Does the alias accumulate (like RSV system) or not (like Pango)
        max_sublevels : int
            Defines maximum number of sublevels before aliasing must be done, 3 for
            Pango SARS-CoV-2.
        root : str
            Name for the root taxon. If not explicitly specified by naming system,
            anything that will not conflict with other taxon names could be used.
        sep : str
            Defines what separates the levels of the name, "." in Pango and RSV.
        special : Container
            Defines what aliases are allowed to appear alone, such as "A" in Pango
            SARS-CoV-2.
        name : str
            The name of this nomenclature system, e.g. PangoNomenclature(SARS-CoV-2).
        """
        self.alias_map_hybrid: Collection[type] = alias_map_hybrid
        self.charsets: Sequence[set] = charsets
        self.cumulative_alias: bool = cumulative_alias
        self.max_sublevels: int = max_sublevels
        self.root: str = root
        self.sep: str = sep
        self.special: Container = special
        self._name: str = name

        self.alias_map: dict = {}
        "Defines mapping to make longer names from shorter ones"
        self.alias_map_inv: dict = {}
        "Defines mapping to make shorter names from longer ones"

    ##############################
    # Superclass implementations #
    ##############################

    def full_histories(
        self, taxa: Sequence[str], stop_at_hybrid: bool = False
    ) -> Sequence[Sequence[str]]:
        if stop_at_hybrid:
            raise NotImplementedError(
                "Forests of histories are not currently implemented or supported."
            )
        return [self.get_history(taxon, stop_at_hybrid) for taxon in taxa]

    def is_root(self, name: str) -> bool:
        return name == self.root

    def is_valid_name(
        self,
        name: str,
        min_sublevels: int = 1,
        max_sublevels: Optional[int] = None,
    ) -> bool:
        parts = self.partition_name(name)
        # Check aliasing portion of name
        n_alias = len(parts[0])
        if n_alias < 1:
            return False
        if n_alias > 1 and not self.cumulative_alias:
            return False
        for a in parts[0]:
            if not set(a) < self.charsets[0]:
                return False
        # Check sublevels
        n_lvl = len(parts[1])
        if n_lvl < min_sublevels:
            return False
        if max_sublevels is None:
            if n_lvl > self.max_sublevels:
                return False
        elif n_lvl > max_sublevels:
            return False
        for lvl in parts[1]:
            if not set(lvl) < self.charsets[1]:
                return False
        return True

    def name(self) -> str:
        return self._name

    ########################
    # Superclass overrides #
    ########################

    def taxonomy_tree(
        self,
        taxa: Sequence[Taxon],
        insert_tips: bool = True,
        warn: bool = True,
    ) -> dendropy.Tree:
        return super().taxonomy_tree(
            taxa=taxa,
            insert_tips=insert_tips,
            name_cleanup_fun=self.coax_name,
            warn=warn,
        )

    #################
    # Class methods #
    #################

    def coax_name(self, name: str) -> str:
        """
        Coax a potentially too-short or too-long name to proper format.

        For example, we might coax the SARS-CoV-2 Pango name from
        B.1.1.529.2.86.1.1.11.1.3 (which encodes the entire history but is too)
        long under the scheme to be proper, to KP.3. Alternately, we might coax
        the too-short KP to JN.1.11.1

        Parameters
        ----------
        name : str
            The name of the taxon.

        Returns
        -------
        str
            The name, without too many or too few sublevels.
        """
        if self.is_root(name):
            return name
        return self.shorter_name(self.longer_name(name))

    def equals_ignore_alias(self, x: str, y: str) -> bool:
        """
        Are two names the same, accounting for aliasing?

        For example, the Pango SARS-CoV-2 names JN.1.11.1.3 and KP.3 both
        encode the history of the same taxon, KP3.

        Parameters
        ----------
        x : str
            A taxon's name.
        y : str
            A putatively equivalent name for the taxon

        Returns
        -------
        bool
            Are the names the same ignoring aliasing?
        """
        return self.longer_name(x) == self.longer_name(y)

    def get_history(self, name: str, stop_at_hybrid: bool) -> Sequence[str]:
        """
        Get a path of ancestry from the root to this taxon.

        This is different than a long-form name because it allows us to pass
        through hybridization (recombination) events. In the face of
        recombination, when stop_at_hybrid == False, we follow the ancestry of
        the 5'-most portion of the genome.

        Parameters
        ----------
        name : str
            A taxon's name.
        stop_at_hybrid : bool
            If True, we get the history up to the most recent hybrid ancestor.
            If False, we follow the ancestry of the 5'-most portion of the genome
            through all hybrid ancestors.

        Returns
        -------
        Sequence[str]
            This taxon's ancestors, starting from root-most.
        """
        if not self.alias_map:
            raise RuntimeError(
                "Cannot obtain histories until setup_alias_map() has been called."
            )
        if not self.is_valid_name(name):
            raise ValueError(name + " is not a valid name in " + self.name())
        history = []
        self.extend_history(name, history, stop_at_hybrid)
        history.reverse()
        return history

    def extend_history(
        self, name: str, history: MutableSequence[str], stop_at_hybrid: bool
    ) -> None:
        """
        Recursively extend a path of ancestry from this taxon to the root.

        Parameters
        ----------
        name : str
            A taxon's name.
        history : MutableSequence[str]
            The history we are in the process of building
        stop_at_hybrid : bool
            Should we consider hybridization to start a new tree or not? If
            not, we break hybridization by following the first listed parent.

        Returns
        -------
        None
            Adds history to the history argument and then returns or calls
            itself if not done.
        """
        name = self.longer_name(name)
        comp = self.partition_name(name)
        if not comp[0]:
            raise ValueError("Invalid name: " + name)
        # Digest sublevels
        if comp[1]:
            for i in range(1, len(comp[1]) + 1)[::-1]:
                history.append(self.unpartition_name([comp[0], comp[1][:i]]))
        # Handle alias
        alias = self.join(comp[0])
        if self.is_root(alias):
            history.append(self.root)
        else:
            if self.is_special(alias):
                history.append(alias)
            if not self.is_hybrid(alias):
                self.extend_history(
                    self.alias_map[alias], history, stop_at_hybrid
                )
            elif not stop_at_hybrid:
                self.extend_history(
                    self.alias_map[alias][0], history, stop_at_hybrid
                )

    def invert_map(self) -> None:
        """
        Inverts the shorter->longer self.alias_map

        The inverted alias map is incapable of handling hybridization.

        Returns
        -------
        None
            The inverted map is stored as self.alias_map_inv
        """
        rev_map = {}
        for k, v in self.alias_map.items():
            if not isinstance(v, list):
                v = [v]
            for vi in v:
                # Don't add empty root alias
                if (not self.is_root(vi)) and (not self.is_hybrid(k)):
                    if vi in rev_map:
                        raise RuntimeError(
                            "Alias list cannot be inverted. "
                            + "Trying to add inverse alias for "
                            + vi
                            + ", which is an alias of "
                            + k
                            + ", but reversed map already has it"
                        )
                    rev_map[vi] = k
        self.alias_map_inv = rev_map

    def is_alias_map_hybrid(self, alias_value: Any) -> bool:
        """
        Is this lineage a hybrid according to the alias map?

        Checks whether a value (rather than a key) from an alias map indicates
        a taxon has hybrid ancestry by checking if it is a container.

        Parameters
        ----------
        alias_value : the value (as opposed to the key) for some taxon in
            self.alias_map, i.e., self.alias_map[<some key>]

        Returns
        -------
        bool
            True if the alias map indicates this is a hybrid.
        """
        for t in self.alias_map_hybrid:
            if isinstance(alias_value, t):
                return True
        return False

    @abstractmethod
    def is_special(self, name: str) -> bool:
        """
        Is this a recognized special-purpose ancestor?

        Special-purpose ancestors are allowed to be used with 0 sublevels.

        Under the Pango nomenclature, direct root descendants and recombinants
        are special-purpose ancestors. Thus for Pango SARS-CoV-2, a special
        lineage is A, B, or any recombinant such as XBB (but not a descendant,
        such as XBB.1).

        Parameters
        ----------
        name : str
            A taxon's name.

        Returns
        -------
        bool
            True if this taxon is a special taxon.
        """
        raise NotImplementedError()

    def is_valid_alias(self, alias: str) -> bool:
        """
        Does this string specify a valid shortcut/alias for a taxon's history?

        A valid alias should contain only characters allowed in the aliasing
        portion of the name, possibly with separators if the alias is
        cumulative.

        Parameters
        ----------
        alias : str
            String to be checked for validity as alias.

        Returns
        -------
        bool
            True if this is a valid alias.
        """
        if self.cumulative_alias:
            return all([set(a) < self.charsets[0] for a in self.split(alias)])
        else:
            return set(alias) < self.charsets[0]

    def join(self, comp: Sequence[str]) -> str:
        """
        Join list of component levels into name.

        The inverse of self.split(name), such that
        self.join(self.split(name)) == name.

        Parameters
        ----------
        comp : Sequence[str]
            Components of a taxon's name.

        Returns
        -------
        str
            The name a a single string.
        """
        return self.sep.join(comp)

    def longer_name(self, name: str) -> str:
        """
        Get non-aliased form of an aliased name.

        A long-form name stops at the most recent hybridization event in a
        taxon's ancestry if there is such an event, otherwise at the special
        root descendent taxa.

        For example, the Pango SARS-CoV-2 taxon JN.1.11 would become
        B.1.1.529.2.86.1.1.11.

        Parameters
        ----------
        name : str
            A taxon's name.

        Returns
        -------
        str
            The taxon's name in the longest form of history.
        """
        if not self.alias_map:
            raise RuntimeError(
                "Cannot construct long form of name without an alias list."
            )
        if self.is_root(name):
            return name
        alias_levels = list(self.partition_name(name))
        next_alias = self.alias_map[alias_levels[0][-1]]
        while not self.is_root(next_alias) and (
            not self.is_hybrid(alias_levels[0][-1])
        ):
            parts = self.partition_name(next_alias)
            alias_levels[0] = parts[0]
            alias_levels[1] = [*parts[1], *alias_levels[1]]
            next_alias = self.alias_map[parts[0][-1]]
        return self.unpartition_name(alias_levels)

    def next_shorter_alias(self, name: str, depth: int) -> str:
        """
        Get the next shortest name available to a taxon.

        This removes one "layer" of self.max_sublevels from a name. For
        example, the Pango SARS-CoV-2 lineage B.1.1.529.2.86.1.1.11 would
        become BA.2.86.1.1.11 because BA is an alias for B.1.1.529.

        Parameters
        ----------
        name : str
            A expanded taxon name to be contracted
        depth : int
            How many levels of aliasing deep is this name? Starting at 1 for
            longest (fully de-aliased) name and increasing as the name gets
            shorter.

        Returns
        -------
        str
            The taxon's name with one fewer levels of aliasing.
        """

        parts = self.partition_name(name)
        n = self.max_sublevels * depth
        alias = None
        for k, v in self.alias_map_inv.items():
            kl = self.partition_name(k)
            if (
                kl[1] == parts[1]
                or self.partition_name(self.longer_name(k))[1][:n]
                == self.partition_name(self.longer_name(name))[1][:n]
            ):
                alias = v
                break
        if not alias:
            raise RuntimeError("Cannot find shorter alias for " + name)
        return alias

    def num_sublevels(self, name: str) -> int:
        """
        How many sublevels does this name contain?

        For a Pango SARS-CoV-2 example, the names XBB, XBB.1, XBB.1.5, and
        XBB.1.5.39 contain 0, 1, 2, and 3 sublevels respectively.

        Parameters
        ----------
        name : str
            The taxon's name.

        Returns
        -------
        int
            The number of sublevels the name contains.
        """
        return len(self.partition_name(name)[1])

    def partition_name(self, name: str) -> Sequence[Sequence[str]]:
        """
        Splits name into alias and sublevels, each as a sequence of components

        This function assumes that the name is ordered alias, sublevels, and does not check correctness.

        Parameters
        ----------
        name : str
            The taxon's name to be partitioned.

        Returns
        -------
        Sequence[Sequence[str]]
            First element is Sequence of components in the aliasing portion of
            the taxon's name, second element is Sequence of sublevels.
        """
        comp = self.split(name)
        if not self.cumulative_alias:
            alias = [] if len(comp) == 0 else [comp[0]]
            sublevels = [] if len(comp) < 2 else comp[1:]
            return [alias, sublevels]

        if not (set(comp[-1]) < self.charsets[1]):
            return [comp, []]
        if not (set(comp[0]) < self.charsets[0]):
            return [[], comp]
        n = 1
        while set(comp[-n]) < self.charsets[1]:
            n += 1

        alias = [comp[0]] if n == len(comp) else comp[: (len(comp) - n + 1)]
        sublevels = [comp[-1]] if n == 1 else comp[(len(comp) - n + 1) :]
        return [alias, sublevels]

    def sanitize_map(self) -> None:
        """
        Drop ambiguity markers and check all names are valid.

        For the purposes of determining ancestry, an unknown sublineage is
            effectively just its ancestor, and we treat it as such.

        Returns
        -------
        None
            Modifies self.alias_map in-place
        """
        if not self.alias_map:
            raise RuntimeError(
                "Missing self.alias_map when trying to sanitize."
            )

        for k, v in self.alias_map.items():
            if not self.is_valid_alias(k):
                raise RuntimeError(
                    "Found invalid taxon as key in alias list: " + k
                )
            if self.is_ambiguous(k):
                raise RuntimeError(
                    "Found ambiguous taxon as key in alias list: " + k
                )
            if not self.is_alias_map_hybrid(v):
                if self.is_root(v):
                    if not self.is_special(k):
                        raise RuntimeError(
                            'Found alias for root in taxon not listed as special: "'
                            + k
                            + '"'
                        )
                else:
                    v = [v]
            for i in range(len(v)):
                if self.is_ambiguous(v[i]):
                    v[i] = v[i][:-1]
                if not self.is_valid_name(v[i], max_sublevels=integer_inf):
                    raise RuntimeError(
                        'Found invalid taxon as value in alias list: "'
                        + v[i]
                        + '" (for key "'
                        + k
                        + '")'
                    )

    def shorter_name(self, name: str) -> str:
        """
        Get shortest form of a maximally-long name using aliases

        For example, the SARS-CoV-2 Pango name B.1.1.529.2.86.1.1.11.1.3 will
        be made into KP.3, and B.1.1.529.2.86.1.1.11.1 will be made into
        JN.1.11.1. Both of these are the shortest-possible valid forms of the
        names, having neither too many nor too few sublevels.

        Parameters
        ----------
        name : str
            The taxon's name to be shortened.

        Returns
        -------
        str
            Shortest valid form of the name for this taxon.
        """
        if not self.alias_map:
            raise RuntimeError(
                "Cannot get shorter name without an alias list."
            )
        comp = list(self.partition_name(name))
        lvl = 1
        while len(comp[1]) > self.max_sublevels:
            alias = self.next_shorter_alias(self.unpartition_name(comp), lvl)
            comp[0] = [alias]
            comp[1] = comp[1][3:]
            lvl += 1
        return self.unpartition_name(comp)

    def split(self, name: str) -> Sequence[str]:
        """
        Split name into component levels

        The inverse of self.join(name), such that
        self.split(self.join(components)) == components.

        Parameters
        ----------
        name : str
            The name a a single string.

        Returns
        -------
        Sequence[str]
            Components of a taxon's name.
        """
        return name.split(self.sep)

    def unpartition_name(self, components: Sequence[Sequence[str]]) -> str:
        """
        Undoes partition_name

        Parameters
        ----------
        components : Sequence[Sequence[str]]
            First element is Sequence of components in the aliasing portion of
            the taxon's name, second element is Sequence of sublevels.

        Returns
        -------
        str
            The taxon's name as a single string.
        """
        return self.join([*components[0], *components[1]])

alias_map: dict = {} instance-attribute

Defines mapping to make longer names from shorter ones

alias_map_inv: dict = {} instance-attribute

Defines mapping to make shorter names from longer ones

__init__(alias_map_hybrid, charsets, cumulative_alias, max_sublevels, root, sep, special, name)

Initialization of PangoLikeNomenclature objects.

Parameters:

  • alias_map_hybrid (Collection[type]) –

    Container type(s) used in alias map when hybrid ancestry is indicated.

  • charsets (Sequence[set]) –

    Defines what's allowed in alias names [0] and sublevel names [1]

  • cumulative_alias (bool) –

    Does the alias accumulate (like RSV system) or not (like Pango)

  • max_sublevels (int) –

    Defines maximum number of sublevels before aliasing must be done, 3 for Pango SARS-CoV-2.

  • root (str) –

    Name for the root taxon. If not explicitly specified by naming system, anything that will not conflict with other taxon names could be used.

  • sep (str) –

    Defines what separates the levels of the name, "." in Pango and RSV.

  • special (Container) –

    Defines what aliases are allowed to appear alone, such as "A" in Pango SARS-CoV-2.

  • name (str) –

    The name of this nomenclature system, e.g. PangoNomenclature(SARS-CoV-2).

Source code in cladecombiner/nomenclature.py
def __init__(
    self,
    alias_map_hybrid: Collection[type],
    charsets: Sequence[set],
    cumulative_alias: bool,
    max_sublevels: int,
    root: str,
    sep: str,
    special: Container,
    name: str,
):
    """
    Initialization of PangoLikeNomenclature objects.

    Parameters
    ----------
    alias_map_hybrid : Collection[type]
        Container type(s) used in alias map when hybrid ancestry is indicated.
    charsets : Sequence[set]
        Defines what's allowed in alias names [0] and sublevel names [1]
    cumulative_alias: bool
        Does the alias accumulate (like RSV system) or not (like Pango)
    max_sublevels : int
        Defines maximum number of sublevels before aliasing must be done, 3 for
        Pango SARS-CoV-2.
    root : str
        Name for the root taxon. If not explicitly specified by naming system,
        anything that will not conflict with other taxon names could be used.
    sep : str
        Defines what separates the levels of the name, "." in Pango and RSV.
    special : Container
        Defines what aliases are allowed to appear alone, such as "A" in Pango
        SARS-CoV-2.
    name : str
        The name of this nomenclature system, e.g. PangoNomenclature(SARS-CoV-2).
    """
    self.alias_map_hybrid: Collection[type] = alias_map_hybrid
    self.charsets: Sequence[set] = charsets
    self.cumulative_alias: bool = cumulative_alias
    self.max_sublevels: int = max_sublevels
    self.root: str = root
    self.sep: str = sep
    self.special: Container = special
    self._name: str = name

    self.alias_map: dict = {}
    "Defines mapping to make longer names from shorter ones"
    self.alias_map_inv: dict = {}
    "Defines mapping to make shorter names from longer ones"

coax_name(name)

Coax a potentially too-short or too-long name to proper format.

For example, we might coax the SARS-CoV-2 Pango name from B.1.1.529.2.86.1.1.11.1.3 (which encodes the entire history but is too) long under the scheme to be proper, to KP.3. Alternately, we might coax the too-short KP to JN.1.11.1

Parameters:

  • name (str) –

    The name of the taxon.

Returns:

  • str

    The name, without too many or too few sublevels.

Source code in cladecombiner/nomenclature.py
def coax_name(self, name: str) -> str:
    """
    Coax a potentially too-short or too-long name to proper format.

    For example, we might coax the SARS-CoV-2 Pango name from
    B.1.1.529.2.86.1.1.11.1.3 (which encodes the entire history but is too)
    long under the scheme to be proper, to KP.3. Alternately, we might coax
    the too-short KP to JN.1.11.1

    Parameters
    ----------
    name : str
        The name of the taxon.

    Returns
    -------
    str
        The name, without too many or too few sublevels.
    """
    if self.is_root(name):
        return name
    return self.shorter_name(self.longer_name(name))

equals_ignore_alias(x, y)

Are two names the same, accounting for aliasing?

For example, the Pango SARS-CoV-2 names JN.1.11.1.3 and KP.3 both encode the history of the same taxon, KP3.

Parameters:

  • x (str) –

    A taxon's name.

  • y (str) –

    A putatively equivalent name for the taxon

Returns:

  • bool

    Are the names the same ignoring aliasing?

Source code in cladecombiner/nomenclature.py
def equals_ignore_alias(self, x: str, y: str) -> bool:
    """
    Are two names the same, accounting for aliasing?

    For example, the Pango SARS-CoV-2 names JN.1.11.1.3 and KP.3 both
    encode the history of the same taxon, KP3.

    Parameters
    ----------
    x : str
        A taxon's name.
    y : str
        A putatively equivalent name for the taxon

    Returns
    -------
    bool
        Are the names the same ignoring aliasing?
    """
    return self.longer_name(x) == self.longer_name(y)

extend_history(name, history, stop_at_hybrid)

Recursively extend a path of ancestry from this taxon to the root.

Parameters:

  • name (str) –

    A taxon's name.

  • history (MutableSequence[str]) –

    The history we are in the process of building

  • stop_at_hybrid (bool) –

    Should we consider hybridization to start a new tree or not? If not, we break hybridization by following the first listed parent.

Returns:

  • None

    Adds history to the history argument and then returns or calls itself if not done.

Source code in cladecombiner/nomenclature.py
def extend_history(
    self, name: str, history: MutableSequence[str], stop_at_hybrid: bool
) -> None:
    """
    Recursively extend a path of ancestry from this taxon to the root.

    Parameters
    ----------
    name : str
        A taxon's name.
    history : MutableSequence[str]
        The history we are in the process of building
    stop_at_hybrid : bool
        Should we consider hybridization to start a new tree or not? If
        not, we break hybridization by following the first listed parent.

    Returns
    -------
    None
        Adds history to the history argument and then returns or calls
        itself if not done.
    """
    name = self.longer_name(name)
    comp = self.partition_name(name)
    if not comp[0]:
        raise ValueError("Invalid name: " + name)
    # Digest sublevels
    if comp[1]:
        for i in range(1, len(comp[1]) + 1)[::-1]:
            history.append(self.unpartition_name([comp[0], comp[1][:i]]))
    # Handle alias
    alias = self.join(comp[0])
    if self.is_root(alias):
        history.append(self.root)
    else:
        if self.is_special(alias):
            history.append(alias)
        if not self.is_hybrid(alias):
            self.extend_history(
                self.alias_map[alias], history, stop_at_hybrid
            )
        elif not stop_at_hybrid:
            self.extend_history(
                self.alias_map[alias][0], history, stop_at_hybrid
            )

get_history(name, stop_at_hybrid)

Get a path of ancestry from the root to this taxon.

This is different than a long-form name because it allows us to pass through hybridization (recombination) events. In the face of recombination, when stop_at_hybrid == False, we follow the ancestry of the 5'-most portion of the genome.

Parameters:

  • name (str) –

    A taxon's name.

  • stop_at_hybrid (bool) –

    If True, we get the history up to the most recent hybrid ancestor. If False, we follow the ancestry of the 5'-most portion of the genome through all hybrid ancestors.

Returns:

  • Sequence[str]

    This taxon's ancestors, starting from root-most.

Source code in cladecombiner/nomenclature.py
def get_history(self, name: str, stop_at_hybrid: bool) -> Sequence[str]:
    """
    Get a path of ancestry from the root to this taxon.

    This is different than a long-form name because it allows us to pass
    through hybridization (recombination) events. In the face of
    recombination, when stop_at_hybrid == False, we follow the ancestry of
    the 5'-most portion of the genome.

    Parameters
    ----------
    name : str
        A taxon's name.
    stop_at_hybrid : bool
        If True, we get the history up to the most recent hybrid ancestor.
        If False, we follow the ancestry of the 5'-most portion of the genome
        through all hybrid ancestors.

    Returns
    -------
    Sequence[str]
        This taxon's ancestors, starting from root-most.
    """
    if not self.alias_map:
        raise RuntimeError(
            "Cannot obtain histories until setup_alias_map() has been called."
        )
    if not self.is_valid_name(name):
        raise ValueError(name + " is not a valid name in " + self.name())
    history = []
    self.extend_history(name, history, stop_at_hybrid)
    history.reverse()
    return history

invert_map()

Inverts the shorter->longer self.alias_map

The inverted alias map is incapable of handling hybridization.

Returns:

  • None

    The inverted map is stored as self.alias_map_inv

Source code in cladecombiner/nomenclature.py
def invert_map(self) -> None:
    """
    Inverts the shorter->longer self.alias_map

    The inverted alias map is incapable of handling hybridization.

    Returns
    -------
    None
        The inverted map is stored as self.alias_map_inv
    """
    rev_map = {}
    for k, v in self.alias_map.items():
        if not isinstance(v, list):
            v = [v]
        for vi in v:
            # Don't add empty root alias
            if (not self.is_root(vi)) and (not self.is_hybrid(k)):
                if vi in rev_map:
                    raise RuntimeError(
                        "Alias list cannot be inverted. "
                        + "Trying to add inverse alias for "
                        + vi
                        + ", which is an alias of "
                        + k
                        + ", but reversed map already has it"
                    )
                rev_map[vi] = k
    self.alias_map_inv = rev_map

is_alias_map_hybrid(alias_value)

Is this lineage a hybrid according to the alias map?

Checks whether a value (rather than a key) from an alias map indicates a taxon has hybrid ancestry by checking if it is a container.

Parameters:

  • alias_value (the value (as opposed to the key) for some taxon in) –

    self.alias_map, i.e., self.alias_map[]

Returns:

  • bool

    True if the alias map indicates this is a hybrid.

Source code in cladecombiner/nomenclature.py
def is_alias_map_hybrid(self, alias_value: Any) -> bool:
    """
    Is this lineage a hybrid according to the alias map?

    Checks whether a value (rather than a key) from an alias map indicates
    a taxon has hybrid ancestry by checking if it is a container.

    Parameters
    ----------
    alias_value : the value (as opposed to the key) for some taxon in
        self.alias_map, i.e., self.alias_map[<some key>]

    Returns
    -------
    bool
        True if the alias map indicates this is a hybrid.
    """
    for t in self.alias_map_hybrid:
        if isinstance(alias_value, t):
            return True
    return False

is_special(name) abstractmethod

Is this a recognized special-purpose ancestor?

Special-purpose ancestors are allowed to be used with 0 sublevels.

Under the Pango nomenclature, direct root descendants and recombinants are special-purpose ancestors. Thus for Pango SARS-CoV-2, a special lineage is A, B, or any recombinant such as XBB (but not a descendant, such as XBB.1).

Parameters:

  • name (str) –

    A taxon's name.

Returns:

  • bool

    True if this taxon is a special taxon.

Source code in cladecombiner/nomenclature.py
@abstractmethod
def is_special(self, name: str) -> bool:
    """
    Is this a recognized special-purpose ancestor?

    Special-purpose ancestors are allowed to be used with 0 sublevels.

    Under the Pango nomenclature, direct root descendants and recombinants
    are special-purpose ancestors. Thus for Pango SARS-CoV-2, a special
    lineage is A, B, or any recombinant such as XBB (but not a descendant,
    such as XBB.1).

    Parameters
    ----------
    name : str
        A taxon's name.

    Returns
    -------
    bool
        True if this taxon is a special taxon.
    """
    raise NotImplementedError()

is_valid_alias(alias)

Does this string specify a valid shortcut/alias for a taxon's history?

A valid alias should contain only characters allowed in the aliasing portion of the name, possibly with separators if the alias is cumulative.

Parameters:

  • alias (str) –

    String to be checked for validity as alias.

Returns:

  • bool

    True if this is a valid alias.

Source code in cladecombiner/nomenclature.py
def is_valid_alias(self, alias: str) -> bool:
    """
    Does this string specify a valid shortcut/alias for a taxon's history?

    A valid alias should contain only characters allowed in the aliasing
    portion of the name, possibly with separators if the alias is
    cumulative.

    Parameters
    ----------
    alias : str
        String to be checked for validity as alias.

    Returns
    -------
    bool
        True if this is a valid alias.
    """
    if self.cumulative_alias:
        return all([set(a) < self.charsets[0] for a in self.split(alias)])
    else:
        return set(alias) < self.charsets[0]

join(comp)

Join list of component levels into name.

The inverse of self.split(name), such that self.join(self.split(name)) == name.

Parameters:

  • comp (Sequence[str]) –

    Components of a taxon's name.

Returns:

  • str

    The name a a single string.

Source code in cladecombiner/nomenclature.py
def join(self, comp: Sequence[str]) -> str:
    """
    Join list of component levels into name.

    The inverse of self.split(name), such that
    self.join(self.split(name)) == name.

    Parameters
    ----------
    comp : Sequence[str]
        Components of a taxon's name.

    Returns
    -------
    str
        The name a a single string.
    """
    return self.sep.join(comp)

longer_name(name)

Get non-aliased form of an aliased name.

A long-form name stops at the most recent hybridization event in a taxon's ancestry if there is such an event, otherwise at the special root descendent taxa.

For example, the Pango SARS-CoV-2 taxon JN.1.11 would become B.1.1.529.2.86.1.1.11.

Parameters:

  • name (str) –

    A taxon's name.

Returns:

  • str

    The taxon's name in the longest form of history.

Source code in cladecombiner/nomenclature.py
def longer_name(self, name: str) -> str:
    """
    Get non-aliased form of an aliased name.

    A long-form name stops at the most recent hybridization event in a
    taxon's ancestry if there is such an event, otherwise at the special
    root descendent taxa.

    For example, the Pango SARS-CoV-2 taxon JN.1.11 would become
    B.1.1.529.2.86.1.1.11.

    Parameters
    ----------
    name : str
        A taxon's name.

    Returns
    -------
    str
        The taxon's name in the longest form of history.
    """
    if not self.alias_map:
        raise RuntimeError(
            "Cannot construct long form of name without an alias list."
        )
    if self.is_root(name):
        return name
    alias_levels = list(self.partition_name(name))
    next_alias = self.alias_map[alias_levels[0][-1]]
    while not self.is_root(next_alias) and (
        not self.is_hybrid(alias_levels[0][-1])
    ):
        parts = self.partition_name(next_alias)
        alias_levels[0] = parts[0]
        alias_levels[1] = [*parts[1], *alias_levels[1]]
        next_alias = self.alias_map[parts[0][-1]]
    return self.unpartition_name(alias_levels)

next_shorter_alias(name, depth)

Get the next shortest name available to a taxon.

This removes one "layer" of self.max_sublevels from a name. For example, the Pango SARS-CoV-2 lineage B.1.1.529.2.86.1.1.11 would become BA.2.86.1.1.11 because BA is an alias for B.1.1.529.

Parameters:

  • name (str) –

    A expanded taxon name to be contracted

  • depth (int) –

    How many levels of aliasing deep is this name? Starting at 1 for longest (fully de-aliased) name and increasing as the name gets shorter.

Returns:

  • str

    The taxon's name with one fewer levels of aliasing.

Source code in cladecombiner/nomenclature.py
def next_shorter_alias(self, name: str, depth: int) -> str:
    """
    Get the next shortest name available to a taxon.

    This removes one "layer" of self.max_sublevels from a name. For
    example, the Pango SARS-CoV-2 lineage B.1.1.529.2.86.1.1.11 would
    become BA.2.86.1.1.11 because BA is an alias for B.1.1.529.

    Parameters
    ----------
    name : str
        A expanded taxon name to be contracted
    depth : int
        How many levels of aliasing deep is this name? Starting at 1 for
        longest (fully de-aliased) name and increasing as the name gets
        shorter.

    Returns
    -------
    str
        The taxon's name with one fewer levels of aliasing.
    """

    parts = self.partition_name(name)
    n = self.max_sublevels * depth
    alias = None
    for k, v in self.alias_map_inv.items():
        kl = self.partition_name(k)
        if (
            kl[1] == parts[1]
            or self.partition_name(self.longer_name(k))[1][:n]
            == self.partition_name(self.longer_name(name))[1][:n]
        ):
            alias = v
            break
    if not alias:
        raise RuntimeError("Cannot find shorter alias for " + name)
    return alias

num_sublevels(name)

How many sublevels does this name contain?

For a Pango SARS-CoV-2 example, the names XBB, XBB.1, XBB.1.5, and XBB.1.5.39 contain 0, 1, 2, and 3 sublevels respectively.

Parameters:

  • name (str) –

    The taxon's name.

Returns:

  • int

    The number of sublevels the name contains.

Source code in cladecombiner/nomenclature.py
def num_sublevels(self, name: str) -> int:
    """
    How many sublevels does this name contain?

    For a Pango SARS-CoV-2 example, the names XBB, XBB.1, XBB.1.5, and
    XBB.1.5.39 contain 0, 1, 2, and 3 sublevels respectively.

    Parameters
    ----------
    name : str
        The taxon's name.

    Returns
    -------
    int
        The number of sublevels the name contains.
    """
    return len(self.partition_name(name)[1])

partition_name(name)

Splits name into alias and sublevels, each as a sequence of components

This function assumes that the name is ordered alias, sublevels, and does not check correctness.

Parameters:

  • name (str) –

    The taxon's name to be partitioned.

Returns:

  • Sequence[Sequence[str]]

    First element is Sequence of components in the aliasing portion of the taxon's name, second element is Sequence of sublevels.

Source code in cladecombiner/nomenclature.py
def partition_name(self, name: str) -> Sequence[Sequence[str]]:
    """
    Splits name into alias and sublevels, each as a sequence of components

    This function assumes that the name is ordered alias, sublevels, and does not check correctness.

    Parameters
    ----------
    name : str
        The taxon's name to be partitioned.

    Returns
    -------
    Sequence[Sequence[str]]
        First element is Sequence of components in the aliasing portion of
        the taxon's name, second element is Sequence of sublevels.
    """
    comp = self.split(name)
    if not self.cumulative_alias:
        alias = [] if len(comp) == 0 else [comp[0]]
        sublevels = [] if len(comp) < 2 else comp[1:]
        return [alias, sublevels]

    if not (set(comp[-1]) < self.charsets[1]):
        return [comp, []]
    if not (set(comp[0]) < self.charsets[0]):
        return [[], comp]
    n = 1
    while set(comp[-n]) < self.charsets[1]:
        n += 1

    alias = [comp[0]] if n == len(comp) else comp[: (len(comp) - n + 1)]
    sublevels = [comp[-1]] if n == 1 else comp[(len(comp) - n + 1) :]
    return [alias, sublevels]

sanitize_map()

Drop ambiguity markers and check all names are valid.

For the purposes of determining ancestry, an unknown sublineage is effectively just its ancestor, and we treat it as such.

Returns:

  • None

    Modifies self.alias_map in-place

Source code in cladecombiner/nomenclature.py
def sanitize_map(self) -> None:
    """
    Drop ambiguity markers and check all names are valid.

    For the purposes of determining ancestry, an unknown sublineage is
        effectively just its ancestor, and we treat it as such.

    Returns
    -------
    None
        Modifies self.alias_map in-place
    """
    if not self.alias_map:
        raise RuntimeError(
            "Missing self.alias_map when trying to sanitize."
        )

    for k, v in self.alias_map.items():
        if not self.is_valid_alias(k):
            raise RuntimeError(
                "Found invalid taxon as key in alias list: " + k
            )
        if self.is_ambiguous(k):
            raise RuntimeError(
                "Found ambiguous taxon as key in alias list: " + k
            )
        if not self.is_alias_map_hybrid(v):
            if self.is_root(v):
                if not self.is_special(k):
                    raise RuntimeError(
                        'Found alias for root in taxon not listed as special: "'
                        + k
                        + '"'
                    )
            else:
                v = [v]
        for i in range(len(v)):
            if self.is_ambiguous(v[i]):
                v[i] = v[i][:-1]
            if not self.is_valid_name(v[i], max_sublevels=integer_inf):
                raise RuntimeError(
                    'Found invalid taxon as value in alias list: "'
                    + v[i]
                    + '" (for key "'
                    + k
                    + '")'
                )

shorter_name(name)

Get shortest form of a maximally-long name using aliases

For example, the SARS-CoV-2 Pango name B.1.1.529.2.86.1.1.11.1.3 will be made into KP.3, and B.1.1.529.2.86.1.1.11.1 will be made into JN.1.11.1. Both of these are the shortest-possible valid forms of the names, having neither too many nor too few sublevels.

Parameters:

  • name (str) –

    The taxon's name to be shortened.

Returns:

  • str

    Shortest valid form of the name for this taxon.

Source code in cladecombiner/nomenclature.py
def shorter_name(self, name: str) -> str:
    """
    Get shortest form of a maximally-long name using aliases

    For example, the SARS-CoV-2 Pango name B.1.1.529.2.86.1.1.11.1.3 will
    be made into KP.3, and B.1.1.529.2.86.1.1.11.1 will be made into
    JN.1.11.1. Both of these are the shortest-possible valid forms of the
    names, having neither too many nor too few sublevels.

    Parameters
    ----------
    name : str
        The taxon's name to be shortened.

    Returns
    -------
    str
        Shortest valid form of the name for this taxon.
    """
    if not self.alias_map:
        raise RuntimeError(
            "Cannot get shorter name without an alias list."
        )
    comp = list(self.partition_name(name))
    lvl = 1
    while len(comp[1]) > self.max_sublevels:
        alias = self.next_shorter_alias(self.unpartition_name(comp), lvl)
        comp[0] = [alias]
        comp[1] = comp[1][3:]
        lvl += 1
    return self.unpartition_name(comp)

split(name)

Split name into component levels

The inverse of self.join(name), such that self.split(self.join(components)) == components.

Parameters:

  • name (str) –

    The name a a single string.

Returns:

  • Sequence[str]

    Components of a taxon's name.

Source code in cladecombiner/nomenclature.py
def split(self, name: str) -> Sequence[str]:
    """
    Split name into component levels

    The inverse of self.join(name), such that
    self.split(self.join(components)) == components.

    Parameters
    ----------
    name : str
        The name a a single string.

    Returns
    -------
    Sequence[str]
        Components of a taxon's name.
    """
    return name.split(self.sep)

unpartition_name(components)

Undoes partition_name

Parameters:

  • components (Sequence[Sequence[str]]) –

    First element is Sequence of components in the aliasing portion of the taxon's name, second element is Sequence of sublevels.

Returns:

  • str

    The taxon's name as a single string.

Source code in cladecombiner/nomenclature.py
def unpartition_name(self, components: Sequence[Sequence[str]]) -> str:
    """
    Undoes partition_name

    Parameters
    ----------
    components : Sequence[Sequence[str]]
        First element is Sequence of components in the aliasing portion of
        the taxon's name, second element is Sequence of sublevels.

    Returns
    -------
    str
        The taxon's name as a single string.
    """
    return self.join([*components[0], *components[1]])

PangoNomenclature

Bases: PangoLikeNomenclature

Pango nomenclature in the general sense, absent SARS-CoV-2- or mpox-specific features.

Nomenclatures for specific systems to which Pango is applied are initialized from this class by filling in the system-specific details and providing a location for the alias map. See init for details.

See: https://doi.org/10.1038/s41564-020-0770-5

Source code in cladecombiner/nomenclature.py
class PangoNomenclature(PangoLikeNomenclature):
    """
    Pango nomenclature in the general sense, absent SARS-CoV-2- or mpox-specific features.

    Nomenclatures for specific systems to which Pango is applied are initialized from
    this class by filling in the system-specific details and providing a location for
    the alias map. See __init__ for details.

    See: https://doi.org/10.1038/s41564-020-0770-5
    """

    def __init__(
        self,
        alias_map_hybrid: Collection[type],
        max_sublevels: int,
        special: Container,
        system: str,
        fp_alias_json: Optional[str] = None,
        url_alias_json: Optional[str] = None,
    ):
        """
        Initialization of PangoNomenclature objects.

        Parameters
        ----------
        alias_map_hybrid : Collection[type]
            Container type(s) used in alias map when hybrid ancestry is indicated.
        max_sublevels : int
            Defines maximum number of sublevels before aliasing must be done, 3 for
            Pango SARS-CoV-2.
        system : str
            The nomenclature's name is taken to be f"PangoNomenclature({system})", e.g.
            "PangoNomenclature(SARS-CoV-2)".
        fp_alias_json: Optional[str]
            A filepath to a local json providing the alias map. Must provide either
            this or url_alias_json
        url_alias_json: Optional[str]
            A url to a remote json providing the alias map. Must provide either
            this or fp_alias_json
        """
        super().__init__(
            alias_map_hybrid=alias_map_hybrid,
            charsets=[set(string.ascii_uppercase), set(string.digits)],
            cumulative_alias=False,
            max_sublevels=max_sublevels,
            root="",
            sep=r".",
            special=special,
            name=f"PangoNomenclature({system})",
        )
        self.ambiguity = r"*"
        self.fp_alias_json = fp_alias_json
        self.url_alias_json = url_alias_json

        if (self.fp_alias_json is None) and (self.url_alias_json is None):
            raise ValueError(
                "Must provide either a local or remote filepath to the alias json."
            )

    ##############################
    # Superclass implementations #
    ##############################

    def is_ambiguous(self, name: str) -> bool:
        """
        Does this name specify an ambiguous taxon?

        Pango taxa are ambiguous if the name ends in *, such that JN.1* means
        some unknown or unspecified sublineage of JN.1.

        Parameters
        ----------
        name : str
            The name of the taxon

        Returns
        -------
        bool
            True if the name is ambiguous.
        """
        if self.is_root(name):
            return False
        elif str(name)[-1] == self.ambiguity:
            return True
        else:
            return False

    def is_hybrid(self, name: str) -> bool:
        """
        Does this name specify a hybrid taxon?

        Hybrids are recombinants, and recombinant names start with X:
        https://virological.org/t/pango-lineage-nomenclature-provisional-rules-for-naming-recombinant-lineages/657

        Parameters
        ----------
        name : str
            The name of the taxon

        Returns
        -------
        bool
            True if the name is a hybrid.
        """
        if self.is_root(name):
            return False
        elif name[0] == "X":
            return True
        else:
            return False

    def is_special(self, name: str) -> bool:
        return name in self.special or (
            self.is_hybrid(name) and self.num_sublevels(name) == 0
        )

    def setup_alias_map(self) -> None:
        """
        Sets up the alias and reverse alias maps.

        The alias map will be retrieved preferentially from local using self.fp_alias_json
        if it exists, otherwise it will be retrieved remotely using self.url_alias_json.
        If neither are specified, a RuntimeError is raised.

        Raw alias maps for Pango nomenclatures are (remote or local) json files
        which provide either:
            1. The long-form names to replace an alias
            2. The parents of a recombinant

        Neither of these need to be in the absolute longest form to work, so that,
        for example, either "JN": "B.1.1.529.2.86.1" or "JN": "BA.2.86.1" would be
        valid.

        Returns
        -------
        None
            Reads the alias map and stores it in self.alias_map, then calls
            self.sanitize_map() and self.invert_map().
        """
        # Should we be thinking about encoding and/or defensive measures?
        if self.fp_alias_json:
            alias_file = open(self.fp_alias_json)
            alias = json.load(alias_file)
            alias_file.close()
            self.alias_map = dict(alias)
        elif self.url_alias_json:
            with urllib.request.urlopen(self.url_alias_json) as response:
                self.alias_map = json.loads(response.read().decode("utf8"))
        else:
            raise RuntimeError(
                "Must provide either a local or remote filepath to the alias json."
            )

        self.sanitize_map()
        self.invert_map()

    ########################
    # Superclass overrides #
    ########################

    def is_valid_name(
        self,
        name: str,
        min_sublevels: int = 1,
        max_sublevels: int | None = None,
    ) -> bool:
        """
        Is this name valid in the Pango nomenclature?

        A valid name must have >1 and <= self.max_sublevels sublevels unless
        it is a special-purpose ancestor such as a recombinant or a directly-
        named root descendant, in which case they may have 0 sublevels.

        Parameters
        ----------
        name : string specifying name of the taxon

        Returns
        -------
        bool
            True if this is a valid name under the Pango nomenclature.
        """
        if self.is_special(name) or self.is_hybrid(name):
            return True
        return super().is_valid_name(name, min_sublevels, max_sublevels)

__init__(alias_map_hybrid, max_sublevels, special, system, fp_alias_json=None, url_alias_json=None)

Initialization of PangoNomenclature objects.

Parameters:

  • alias_map_hybrid (Collection[type]) –

    Container type(s) used in alias map when hybrid ancestry is indicated.

  • max_sublevels (int) –

    Defines maximum number of sublevels before aliasing must be done, 3 for Pango SARS-CoV-2.

  • system (str) –

    The nomenclature's name is taken to be f"PangoNomenclature({system})", e.g. "PangoNomenclature(SARS-CoV-2)".

  • fp_alias_json (Optional[str], default: None ) –

    A filepath to a local json providing the alias map. Must provide either this or url_alias_json

  • url_alias_json (Optional[str], default: None ) –

    A url to a remote json providing the alias map. Must provide either this or fp_alias_json

Source code in cladecombiner/nomenclature.py
def __init__(
    self,
    alias_map_hybrid: Collection[type],
    max_sublevels: int,
    special: Container,
    system: str,
    fp_alias_json: Optional[str] = None,
    url_alias_json: Optional[str] = None,
):
    """
    Initialization of PangoNomenclature objects.

    Parameters
    ----------
    alias_map_hybrid : Collection[type]
        Container type(s) used in alias map when hybrid ancestry is indicated.
    max_sublevels : int
        Defines maximum number of sublevels before aliasing must be done, 3 for
        Pango SARS-CoV-2.
    system : str
        The nomenclature's name is taken to be f"PangoNomenclature({system})", e.g.
        "PangoNomenclature(SARS-CoV-2)".
    fp_alias_json: Optional[str]
        A filepath to a local json providing the alias map. Must provide either
        this or url_alias_json
    url_alias_json: Optional[str]
        A url to a remote json providing the alias map. Must provide either
        this or fp_alias_json
    """
    super().__init__(
        alias_map_hybrid=alias_map_hybrid,
        charsets=[set(string.ascii_uppercase), set(string.digits)],
        cumulative_alias=False,
        max_sublevels=max_sublevels,
        root="",
        sep=r".",
        special=special,
        name=f"PangoNomenclature({system})",
    )
    self.ambiguity = r"*"
    self.fp_alias_json = fp_alias_json
    self.url_alias_json = url_alias_json

    if (self.fp_alias_json is None) and (self.url_alias_json is None):
        raise ValueError(
            "Must provide either a local or remote filepath to the alias json."
        )

is_ambiguous(name)

Does this name specify an ambiguous taxon?

Pango taxa are ambiguous if the name ends in , such that JN.1 means some unknown or unspecified sublineage of JN.1.

Parameters:

  • name (str) –

    The name of the taxon

Returns:

  • bool

    True if the name is ambiguous.

Source code in cladecombiner/nomenclature.py
def is_ambiguous(self, name: str) -> bool:
    """
    Does this name specify an ambiguous taxon?

    Pango taxa are ambiguous if the name ends in *, such that JN.1* means
    some unknown or unspecified sublineage of JN.1.

    Parameters
    ----------
    name : str
        The name of the taxon

    Returns
    -------
    bool
        True if the name is ambiguous.
    """
    if self.is_root(name):
        return False
    elif str(name)[-1] == self.ambiguity:
        return True
    else:
        return False

is_hybrid(name)

Does this name specify a hybrid taxon?

Hybrids are recombinants, and recombinant names start with X: https://virological.org/t/pango-lineage-nomenclature-provisional-rules-for-naming-recombinant-lineages/657

Parameters:

  • name (str) –

    The name of the taxon

Returns:

  • bool

    True if the name is a hybrid.

Source code in cladecombiner/nomenclature.py
def is_hybrid(self, name: str) -> bool:
    """
    Does this name specify a hybrid taxon?

    Hybrids are recombinants, and recombinant names start with X:
    https://virological.org/t/pango-lineage-nomenclature-provisional-rules-for-naming-recombinant-lineages/657

    Parameters
    ----------
    name : str
        The name of the taxon

    Returns
    -------
    bool
        True if the name is a hybrid.
    """
    if self.is_root(name):
        return False
    elif name[0] == "X":
        return True
    else:
        return False

is_valid_name(name, min_sublevels=1, max_sublevels=None)

Is this name valid in the Pango nomenclature?

A valid name must have >1 and <= self.max_sublevels sublevels unless it is a special-purpose ancestor such as a recombinant or a directly- named root descendant, in which case they may have 0 sublevels.

Parameters:

  • name (string specifying name of the taxon) –

Returns:

  • bool

    True if this is a valid name under the Pango nomenclature.

Source code in cladecombiner/nomenclature.py
def is_valid_name(
    self,
    name: str,
    min_sublevels: int = 1,
    max_sublevels: int | None = None,
) -> bool:
    """
    Is this name valid in the Pango nomenclature?

    A valid name must have >1 and <= self.max_sublevels sublevels unless
    it is a special-purpose ancestor such as a recombinant or a directly-
    named root descendant, in which case they may have 0 sublevels.

    Parameters
    ----------
    name : string specifying name of the taxon

    Returns
    -------
    bool
        True if this is a valid name under the Pango nomenclature.
    """
    if self.is_special(name) or self.is_hybrid(name):
        return True
    return super().is_valid_name(name, min_sublevels, max_sublevels)

setup_alias_map()

Sets up the alias and reverse alias maps.

The alias map will be retrieved preferentially from local using self.fp_alias_json if it exists, otherwise it will be retrieved remotely using self.url_alias_json. If neither are specified, a RuntimeError is raised.

Raw alias maps for Pango nomenclatures are (remote or local) json files which provide either: 1. The long-form names to replace an alias 2. The parents of a recombinant

Neither of these need to be in the absolute longest form to work, so that, for example, either "JN": "B.1.1.529.2.86.1" or "JN": "BA.2.86.1" would be valid.

Returns:

  • None

    Reads the alias map and stores it in self.alias_map, then calls self.sanitize_map() and self.invert_map().

Source code in cladecombiner/nomenclature.py
def setup_alias_map(self) -> None:
    """
    Sets up the alias and reverse alias maps.

    The alias map will be retrieved preferentially from local using self.fp_alias_json
    if it exists, otherwise it will be retrieved remotely using self.url_alias_json.
    If neither are specified, a RuntimeError is raised.

    Raw alias maps for Pango nomenclatures are (remote or local) json files
    which provide either:
        1. The long-form names to replace an alias
        2. The parents of a recombinant

    Neither of these need to be in the absolute longest form to work, so that,
    for example, either "JN": "B.1.1.529.2.86.1" or "JN": "BA.2.86.1" would be
    valid.

    Returns
    -------
    None
        Reads the alias map and stores it in self.alias_map, then calls
        self.sanitize_map() and self.invert_map().
    """
    # Should we be thinking about encoding and/or defensive measures?
    if self.fp_alias_json:
        alias_file = open(self.fp_alias_json)
        alias = json.load(alias_file)
        alias_file.close()
        self.alias_map = dict(alias)
    elif self.url_alias_json:
        with urllib.request.urlopen(self.url_alias_json) as response:
            self.alias_map = json.loads(response.read().decode("utf8"))
    else:
        raise RuntimeError(
            "Must provide either a local or remote filepath to the alias json."
        )

    self.sanitize_map()
    self.invert_map()

cladecombiner.taxonomy_scheme

PhylogeneticTaxonomyScheme

Bases: TreelikeTaxonomyScheme

A TaxonomyScheme powered by a phylogeny.

Errors are provoked when a PhylogeneticTaxonomyScheme is queried about taxa that are not in the phylogeny.

Internally, a dendropy.Tree object is used to represent the taxonomic relationships.

Source code in cladecombiner/taxonomy_scheme.py
class PhylogeneticTaxonomyScheme(TreelikeTaxonomyScheme):
    """
    A TaxonomyScheme powered by a phylogeny.

    Errors are provoked when a PhylogeneticTaxonomyScheme is queried about taxa
    that are not in the phylogeny.

    Internally, a dendropy.Tree object is used to represent the taxonomic
    relationships.
    """

    def __init__(self, tree: dendropy.Tree):
        """
        PhylogeneticTaxonomyScheme constructor

        Parameters
        ----------
        tree : dendropy.Tree
            The phylogeny to be used, internal nodes must be labeled.
        """

        for node in tree.preorder_node_iter():
            if node.label is None:
                raise ValueError(
                    "TaxonomyTree constructor requires all nodes have labels."
                )
        self.tree = tree
        "The tree describing the relationships between taxa"
        self.node_to_taxon: dict[dendropy.Node, Taxon] = {}
        "The taxon represented by each node, for ease of access"
        self.taxon_to_node: dict[Taxon, dendropy.Node] = {}
        "The node representing each taxon, for ease of access"

        self.map_from_tree()

    ########################
    # Superclass overrides #
    #                      #
    # These change only    #
    # how methods work,    #
    # not what they return #
    ########################

    def ancestors(self, taxon: Taxon) -> Sequence[Taxon]:
        nodes = self.node_path_to_root(taxon)
        return [self.node_to_taxon[node] for node in nodes[1:]]

    def children(self, taxon: Taxon) -> Collection[Taxon]:
        child_nodes = self.taxon_to_node[taxon].child_nodes()
        children = []
        if child_nodes:
            for node in child_nodes:
                children.append(self.node_to_taxon[node])
        return children

    def contains(self, focal: Taxon, target: Taxon) -> bool:
        if focal not in self.taxon_to_node:
            return False
        node_x = self.taxon_to_node[focal]

        if target not in self.taxon_to_node:
            return False
        node = self.taxon_to_node[target]

        while node is not self.tree.seed_node:
            if node is node_x:
                return True
            node = node.parent_node

        if node is node_x:
            return True

        return False

    def descendants(self, taxon: Taxon, tip_only: bool) -> Collection[Taxon]:
        desc = []
        queue = [self.taxon_to_node[taxon]]
        while queue:
            node = queue.pop(0)
            has_kids = False
            for child in node.child_node_iter():
                has_kids = True
                queue.append(child)
            if (not tip_only) or (not has_kids):
                desc.append(self.node_to_taxon[node])
        if not tip_only:
            desc = desc[1:]
        return desc

    def is_root(self, taxon: Taxon) -> bool:
        return taxon == self.node_to_taxon[self.root()]

    def is_valid_taxon(self, taxon: Taxon) -> bool:
        return taxon in self.taxon_to_node.keys()

    def mrca(self, taxa: Sequence[Taxon]) -> Taxon:
        paths = [self.node_path_to_root(taxon) for taxon in taxa]
        max_idx = max([len(path) for path in paths])
        idx = 0
        while idx < max_idx:
            tax_at_lvl = set([path[-(1 + idx)] for path in paths])
            if len(tax_at_lvl) == 1:
                idx += 1
            else:
                idx -= 1
                break
        if idx < 0:
            raise RuntimeError("Provided taxa do not have MRCA in the tree.")
        return self.node_to_taxon[paths[0][-(1 + idx)]]

    def parents(self, taxon: Taxon) -> Taxon | None:
        node_x = self.taxon_to_node[taxon]
        if node_x is self.tree.seed_node:
            return None
        return self.node_to_taxon[node_x.parent_node]

    #################
    # Class methods #
    #################

    def map_from_tree(self) -> None:
        """
        Make Node<->Taxon maps

        By using these maps, we can avoid searching the tree repeatedly.

        Returns
        -------
        None
            Modifies self.node_to_taxon and self.taxon_to_node in-place.
        """

        self.node_to_taxon = {}
        self.taxon_to_node = {}
        for node in self.tree.preorder_node_iter():
            assert isinstance(node.is_leaf(), bool)  # Pylance paranoia
            taxon = Taxon(node.label, is_tip=node.is_leaf())
            self.node_to_taxon[node] = taxon
            self.taxon_to_node[taxon] = node

    def node_path_to_root(self, taxon: Taxon) -> Sequence[dendropy.Node]:
        """
        Get all nodes between given taxon and the root (inclusive of the root and this node)

        Parameters
        ----------
        taxon : Taxon
            The taxon for which we want the path to the root.

        Returns
        -------
        Sequence[dendropy.Node]
            Path of nodes in self.tree from this taxon (inclusive) to the root
            (inclusive).
        """
        path = []
        node = self.taxon_to_node[taxon]
        while node is not self.tree.seed_node:
            path.append(node)
            node = node.parent_node
        path.append(self.root())
        return path

    def prune_subtree(self, taxon: Taxon) -> None:
        """
        Remove subtree corresponding to this taxon and clean up maps

        Parameters
        ----------
        taxon : Taxon
            The taxon which is the base of the subtree to be removed.

        Returns
        -------
        None
            Edits self.tree in-place.
        """
        node = self.taxon_to_node[taxon]
        node.parent_node.remove_child(node)
        self.map_from_tree()

    def root(self) -> dendropy.Node:
        """
        Typing-safe function to access root, always returns a dendropy.Node.

        Returns
        -------
        dendropy.Node
            The root node of the phylogeny underlying this taxonomy scheme.
        """
        if not isinstance(self.tree.seed_node, dendropy.Node):
            raise RuntimeError("Malformed tree has no seed_node")
        else:
            return self.tree.seed_node

node_to_taxon: dict[dendropy.Node, Taxon] = {} instance-attribute

The taxon represented by each node, for ease of access

taxon_to_node: dict[Taxon, dendropy.Node] = {} instance-attribute

The node representing each taxon, for ease of access

tree = tree instance-attribute

The tree describing the relationships between taxa

__init__(tree)

PhylogeneticTaxonomyScheme constructor

Parameters:

  • tree (Tree) –

    The phylogeny to be used, internal nodes must be labeled.

Source code in cladecombiner/taxonomy_scheme.py
def __init__(self, tree: dendropy.Tree):
    """
    PhylogeneticTaxonomyScheme constructor

    Parameters
    ----------
    tree : dendropy.Tree
        The phylogeny to be used, internal nodes must be labeled.
    """

    for node in tree.preorder_node_iter():
        if node.label is None:
            raise ValueError(
                "TaxonomyTree constructor requires all nodes have labels."
            )
    self.tree = tree
    "The tree describing the relationships between taxa"
    self.node_to_taxon: dict[dendropy.Node, Taxon] = {}
    "The taxon represented by each node, for ease of access"
    self.taxon_to_node: dict[Taxon, dendropy.Node] = {}
    "The node representing each taxon, for ease of access"

    self.map_from_tree()

map_from_tree()

Make Node<->Taxon maps

By using these maps, we can avoid searching the tree repeatedly.

Returns:

  • None

    Modifies self.node_to_taxon and self.taxon_to_node in-place.

Source code in cladecombiner/taxonomy_scheme.py
def map_from_tree(self) -> None:
    """
    Make Node<->Taxon maps

    By using these maps, we can avoid searching the tree repeatedly.

    Returns
    -------
    None
        Modifies self.node_to_taxon and self.taxon_to_node in-place.
    """

    self.node_to_taxon = {}
    self.taxon_to_node = {}
    for node in self.tree.preorder_node_iter():
        assert isinstance(node.is_leaf(), bool)  # Pylance paranoia
        taxon = Taxon(node.label, is_tip=node.is_leaf())
        self.node_to_taxon[node] = taxon
        self.taxon_to_node[taxon] = node

node_path_to_root(taxon)

Get all nodes between given taxon and the root (inclusive of the root and this node)

Parameters:

  • taxon (Taxon) –

    The taxon for which we want the path to the root.

Returns:

  • Sequence[Node]

    Path of nodes in self.tree from this taxon (inclusive) to the root (inclusive).

Source code in cladecombiner/taxonomy_scheme.py
def node_path_to_root(self, taxon: Taxon) -> Sequence[dendropy.Node]:
    """
    Get all nodes between given taxon and the root (inclusive of the root and this node)

    Parameters
    ----------
    taxon : Taxon
        The taxon for which we want the path to the root.

    Returns
    -------
    Sequence[dendropy.Node]
        Path of nodes in self.tree from this taxon (inclusive) to the root
        (inclusive).
    """
    path = []
    node = self.taxon_to_node[taxon]
    while node is not self.tree.seed_node:
        path.append(node)
        node = node.parent_node
    path.append(self.root())
    return path

prune_subtree(taxon)

Remove subtree corresponding to this taxon and clean up maps

Parameters:

  • taxon (Taxon) –

    The taxon which is the base of the subtree to be removed.

Returns:

  • None

    Edits self.tree in-place.

Source code in cladecombiner/taxonomy_scheme.py
def prune_subtree(self, taxon: Taxon) -> None:
    """
    Remove subtree corresponding to this taxon and clean up maps

    Parameters
    ----------
    taxon : Taxon
        The taxon which is the base of the subtree to be removed.

    Returns
    -------
    None
        Edits self.tree in-place.
    """
    node = self.taxon_to_node[taxon]
    node.parent_node.remove_child(node)
    self.map_from_tree()

root()

Typing-safe function to access root, always returns a dendropy.Node.

Returns:

  • Node

    The root node of the phylogeny underlying this taxonomy scheme.

Source code in cladecombiner/taxonomy_scheme.py
def root(self) -> dendropy.Node:
    """
    Typing-safe function to access root, always returns a dendropy.Node.

    Returns
    -------
    dendropy.Node
        The root node of the phylogeny underlying this taxonomy scheme.
    """
    if not isinstance(self.tree.seed_node, dendropy.Node):
        raise RuntimeError("Malformed tree has no seed_node")
    else:
        return self.tree.seed_node

TaxonomyScheme

Bases: ABC

Abstract class for most general casting of Taxonomy

Allows hybridization-induced multiple ancestry.

Source code in cladecombiner/taxonomy_scheme.py
class TaxonomyScheme(ABC):
    """
    Abstract class for most general casting of Taxonomy

    Allows hybridization-induced multiple ancestry.
    """

    def ancestors(self, taxon: Taxon) -> Collection[Taxon]:
        """
        All taxa which are between this taxon and the root (including the root).

        Parameters
        ----------
        taxon : Taxon
            The taxon whose ancestors we want.

        Returns
        -------
        Collection[Taxon]
            All unique taxa between this taxon and the root.
            Empty container if this taxon is the root.
        """
        if self.is_root(taxon):
            return set()

        anc = set()
        queue = list(self.parents(taxon))
        while queue:
            tax = queue.pop(0)
            anc.add(tax)
            queue = [*queue, list(self.parents(tax))]
        return anc

    @abstractmethod
    def children(self, taxon: Taxon) -> Collection[Taxon]:
        """
        All taxa which are direct children of this taxon.

        Parameters
        ----------
        taxon : Taxon
            The taxon whose children we want.

        Returns
        -------
        Collection[Taxon]
            A collection of the taxa that are direct descendants of this taxon.
            Returns empty container if this taxon has no children (i.e., if
            this taxon is a tip taxon).
        """
        raise NotImplementedError()

    @abstractmethod
    def descendants(self, taxon: Taxon, tip_only: bool) -> Collection[Taxon]:
        """
        All taxa which are contained by this taxon.

        Parameters
        ----------
        taxon : Taxon
            The taxon whose descendants we want.
        tip_only : bool
            Do we want only tip descendants of this taxon?

        Returns
        -------
        Collection[Taxon]
            If tip_only == True, all tips that are descended from this taxon.
            Otherwise, a collection of the taxa that descend from this taxon.
            That is, its children, and its childrens' children, and so forth.
            Returns empty container if this taxon is a tip.
        """
        desc = set()
        queue = list(self.children(taxon))
        while queue:
            tax = queue.pop(0)
            desc.add(tax)
            queue = [*queue, list(self.children(tax))]
        return desc

    @abstractmethod
    def is_root(self, taxon: Taxon) -> bool:
        """
        Is this the largest taxon that contains all others?

        Parameters
        ----------
        taxon: Taxon
            The taxon to be checked.

        Returns
        -------
        bool
            True if this taxon is the root.
        """
        raise NotImplementedError()

    @abstractmethod
    def is_valid_taxon(self, taxon: Taxon) -> bool:
        """
        Does the scheme recognize this Taxon?

        Parameters
        ----------
        taxon: Taxon
            The taxon to be checked.

        Returns
        -------
        bool
            True if this taxon is valid.
        """
        raise NotImplementedError()

    @abstractmethod
    def parents(self, taxon: Taxon) -> Collection[Taxon]:
        """
        All parent taxa of taxon, e.g. ancestors exactly one level above this taxon.

        Hybridization allows a taxon to have multiple parent taxa.

        Parameters
        ----------
        taxon : Taxon
            The taxon whose parents we want.

        Returns
        -------
        Collection[Taxon]
            A collection of the taxa that are direct parents of this taxon.
            Returns empty container if this taxon is the root.
        """
        raise NotImplementedError()

    def validate(self, taxa: Iterable[Taxon]) -> None:
        nontaxon = [taxon for taxon in taxa if not isinstance(taxon, Taxon)]
        if len(nontaxon) > 0:
            raise TypeError(f"Found invalid non-Taxon inputs: {nontaxon}")
        invalid = [taxon for taxon in taxa if not self.is_valid_taxon(taxon)]
        if len(invalid) > 0:
            raise ValueError(
                f"The following taxa are not recognized by the provided TaxonomyScheme ({self.__repr__()}): {invalid}"
            )

ancestors(taxon)

All taxa which are between this taxon and the root (including the root).

Parameters:

  • taxon (Taxon) –

    The taxon whose ancestors we want.

Returns:

  • Collection[Taxon]

    All unique taxa between this taxon and the root. Empty container if this taxon is the root.

Source code in cladecombiner/taxonomy_scheme.py
def ancestors(self, taxon: Taxon) -> Collection[Taxon]:
    """
    All taxa which are between this taxon and the root (including the root).

    Parameters
    ----------
    taxon : Taxon
        The taxon whose ancestors we want.

    Returns
    -------
    Collection[Taxon]
        All unique taxa between this taxon and the root.
        Empty container if this taxon is the root.
    """
    if self.is_root(taxon):
        return set()

    anc = set()
    queue = list(self.parents(taxon))
    while queue:
        tax = queue.pop(0)
        anc.add(tax)
        queue = [*queue, list(self.parents(tax))]
    return anc

children(taxon) abstractmethod

All taxa which are direct children of this taxon.

Parameters:

  • taxon (Taxon) –

    The taxon whose children we want.

Returns:

  • Collection[Taxon]

    A collection of the taxa that are direct descendants of this taxon. Returns empty container if this taxon has no children (i.e., if this taxon is a tip taxon).

Source code in cladecombiner/taxonomy_scheme.py
@abstractmethod
def children(self, taxon: Taxon) -> Collection[Taxon]:
    """
    All taxa which are direct children of this taxon.

    Parameters
    ----------
    taxon : Taxon
        The taxon whose children we want.

    Returns
    -------
    Collection[Taxon]
        A collection of the taxa that are direct descendants of this taxon.
        Returns empty container if this taxon has no children (i.e., if
        this taxon is a tip taxon).
    """
    raise NotImplementedError()

descendants(taxon, tip_only) abstractmethod

All taxa which are contained by this taxon.

Parameters:

  • taxon (Taxon) –

    The taxon whose descendants we want.

  • tip_only (bool) –

    Do we want only tip descendants of this taxon?

Returns:

  • Collection[Taxon]

    If tip_only == True, all tips that are descended from this taxon. Otherwise, a collection of the taxa that descend from this taxon. That is, its children, and its childrens' children, and so forth. Returns empty container if this taxon is a tip.

Source code in cladecombiner/taxonomy_scheme.py
@abstractmethod
def descendants(self, taxon: Taxon, tip_only: bool) -> Collection[Taxon]:
    """
    All taxa which are contained by this taxon.

    Parameters
    ----------
    taxon : Taxon
        The taxon whose descendants we want.
    tip_only : bool
        Do we want only tip descendants of this taxon?

    Returns
    -------
    Collection[Taxon]
        If tip_only == True, all tips that are descended from this taxon.
        Otherwise, a collection of the taxa that descend from this taxon.
        That is, its children, and its childrens' children, and so forth.
        Returns empty container if this taxon is a tip.
    """
    desc = set()
    queue = list(self.children(taxon))
    while queue:
        tax = queue.pop(0)
        desc.add(tax)
        queue = [*queue, list(self.children(tax))]
    return desc

is_root(taxon) abstractmethod

Is this the largest taxon that contains all others?

Parameters:

  • taxon (Taxon) –

    The taxon to be checked.

Returns:

  • bool

    True if this taxon is the root.

Source code in cladecombiner/taxonomy_scheme.py
@abstractmethod
def is_root(self, taxon: Taxon) -> bool:
    """
    Is this the largest taxon that contains all others?

    Parameters
    ----------
    taxon: Taxon
        The taxon to be checked.

    Returns
    -------
    bool
        True if this taxon is the root.
    """
    raise NotImplementedError()

is_valid_taxon(taxon) abstractmethod

Does the scheme recognize this Taxon?

Parameters:

  • taxon (Taxon) –

    The taxon to be checked.

Returns:

  • bool

    True if this taxon is valid.

Source code in cladecombiner/taxonomy_scheme.py
@abstractmethod
def is_valid_taxon(self, taxon: Taxon) -> bool:
    """
    Does the scheme recognize this Taxon?

    Parameters
    ----------
    taxon: Taxon
        The taxon to be checked.

    Returns
    -------
    bool
        True if this taxon is valid.
    """
    raise NotImplementedError()

parents(taxon) abstractmethod

All parent taxa of taxon, e.g. ancestors exactly one level above this taxon.

Hybridization allows a taxon to have multiple parent taxa.

Parameters:

  • taxon (Taxon) –

    The taxon whose parents we want.

Returns:

  • Collection[Taxon]

    A collection of the taxa that are direct parents of this taxon. Returns empty container if this taxon is the root.

Source code in cladecombiner/taxonomy_scheme.py
@abstractmethod
def parents(self, taxon: Taxon) -> Collection[Taxon]:
    """
    All parent taxa of taxon, e.g. ancestors exactly one level above this taxon.

    Hybridization allows a taxon to have multiple parent taxa.

    Parameters
    ----------
    taxon : Taxon
        The taxon whose parents we want.

    Returns
    -------
    Collection[Taxon]
        A collection of the taxa that are direct parents of this taxon.
        Returns empty container if this taxon is the root.
    """
    raise NotImplementedError()

TreelikeTaxonomyScheme

Bases: TaxonomyScheme

Abstract class for hybrid-free Taxonomy.

Common taxonomic notions that are either ill-defined or require generalization in the face of hybridization are defined here, such as the MRCA of a set of taxa.

Source code in cladecombiner/taxonomy_scheme.py
class TreelikeTaxonomyScheme(TaxonomyScheme):
    """
    Abstract class for hybrid-free Taxonomy.

    Common taxonomic notions that are either ill-defined or require
    generalization in the face of hybridization are defined here, such as the
    MRCA of a set of taxa.
    """

    ########################
    # Superclass overrides #
    ########################

    def ancestors(self, taxon: Taxon) -> Sequence[Taxon]:
        """
        Postorder sequence of taxa between this taxon and the root (including
        the root).

        Parameters
        ----------
        taxon : Taxon
            The taxon whose ancestors we want.

        Returns
        -------
        Sequence[Taxon]
            All unique taxa between this taxon and the root, in that order,
            and including the root. Returns empty container if this taxon is
            the root.
        """
        anc = []
        parent = self.parents(taxon)
        while parent is not None:
            anc.append(parent)
            parent = self.parents(parent)
        return anc

    @abstractmethod
    def parents(self, taxon: Taxon) -> Taxon | None:
        """
        A taxon has only one parent if the scheme is treelike.

        Parameters
        ----------
        taxon : Taxon
            The taxon whose parents we want.

        Returns
        -------
        Taxon
            The taxon's parent, or None if this is the root.
        """
        raise NotImplementedError()

    #################
    # Class methods #
    #################

    @abstractmethod
    def contains(self, focal: Taxon, target: Taxon) -> bool:
        """
        Does the focal taxon contain the target taxon?

        That is, is target a descendant of focal?

        Parameters
        ----------
        focal : Taxon
            This taxon may or may not contain the target taxon.
        target : Taxon
            The taxon which may or may not be contained by the focal taxon.

        Returns
        -------
        bool
            True if focal contains target.
        """
        raise NotImplementedError()

    @abstractmethod
    def mrca(self, taxa: Iterable[Taxon]) -> Taxon:
        """
        Find the MRCA of a set of taxa

        The MRCA is the most recent common ancestor of a set of taxa. There
        are potentially many common ancestors of a particular group of taxa,
        but this is the one which contains the fewest other taxa possible.

        Parameters
        ----------
        taxa : Iterable[Taxon]
            The taxa for which we want the MRCA.

        Returns
        -------
        Taxon
            The MRCA.
        """
        raise NotImplementedError()

ancestors(taxon)

Postorder sequence of taxa between this taxon and the root (including the root).

Parameters:

  • taxon (Taxon) –

    The taxon whose ancestors we want.

Returns:

  • Sequence[Taxon]

    All unique taxa between this taxon and the root, in that order, and including the root. Returns empty container if this taxon is the root.

Source code in cladecombiner/taxonomy_scheme.py
def ancestors(self, taxon: Taxon) -> Sequence[Taxon]:
    """
    Postorder sequence of taxa between this taxon and the root (including
    the root).

    Parameters
    ----------
    taxon : Taxon
        The taxon whose ancestors we want.

    Returns
    -------
    Sequence[Taxon]
        All unique taxa between this taxon and the root, in that order,
        and including the root. Returns empty container if this taxon is
        the root.
    """
    anc = []
    parent = self.parents(taxon)
    while parent is not None:
        anc.append(parent)
        parent = self.parents(parent)
    return anc

contains(focal, target) abstractmethod

Does the focal taxon contain the target taxon?

That is, is target a descendant of focal?

Parameters:

  • focal (Taxon) –

    This taxon may or may not contain the target taxon.

  • target (Taxon) –

    The taxon which may or may not be contained by the focal taxon.

Returns:

  • bool

    True if focal contains target.

Source code in cladecombiner/taxonomy_scheme.py
@abstractmethod
def contains(self, focal: Taxon, target: Taxon) -> bool:
    """
    Does the focal taxon contain the target taxon?

    That is, is target a descendant of focal?

    Parameters
    ----------
    focal : Taxon
        This taxon may or may not contain the target taxon.
    target : Taxon
        The taxon which may or may not be contained by the focal taxon.

    Returns
    -------
    bool
        True if focal contains target.
    """
    raise NotImplementedError()

mrca(taxa) abstractmethod

Find the MRCA of a set of taxa

The MRCA is the most recent common ancestor of a set of taxa. There are potentially many common ancestors of a particular group of taxa, but this is the one which contains the fewest other taxa possible.

Parameters:

  • taxa (Iterable[Taxon]) –

    The taxa for which we want the MRCA.

Returns:

Source code in cladecombiner/taxonomy_scheme.py
@abstractmethod
def mrca(self, taxa: Iterable[Taxon]) -> Taxon:
    """
    Find the MRCA of a set of taxa

    The MRCA is the most recent common ancestor of a set of taxa. There
    are potentially many common ancestors of a particular group of taxa,
    but this is the one which contains the fewest other taxa possible.

    Parameters
    ----------
    taxa : Iterable[Taxon]
        The taxa for which we want the MRCA.

    Returns
    -------
    Taxon
        The MRCA.
    """
    raise NotImplementedError()

parents(taxon) abstractmethod

A taxon has only one parent if the scheme is treelike.

Parameters:

  • taxon (Taxon) –

    The taxon whose parents we want.

Returns:

  • Taxon

    The taxon's parent, or None if this is the root.

Source code in cladecombiner/taxonomy_scheme.py
@abstractmethod
def parents(self, taxon: Taxon) -> Taxon | None:
    """
    A taxon has only one parent if the scheme is treelike.

    Parameters
    ----------
    taxon : Taxon
        The taxon whose parents we want.

    Returns
    -------
    Taxon
        The taxon's parent, or None if this is the root.
    """
    raise NotImplementedError()

cladecombiner.tree_utils

add_paraphyletic_tips(phy, tips)

Disambiguates ancestral versus tip taxa by adding tips explicitly.

Assumes all nodes have labels.

In nomenclatures for evolving pathogens, naming a new taxon will make a previously-named taxon paraphyletic. There can then be ambiguity with respect to whether that previous taxon name is being used to refer to the monophyletic group comprising this taxon and all its descendants, or the non-monophyletic group of the previous taxon except its newly named descendant.

This function adds a tip to the phylogeny to represent the non-monophyletic group which has been split by subsequently-named taxa.

For example, the SARS-CoV-2 Pango taxon JN.1 could mean the higher taxon JN.1 (which includes many more specifically-named taxa, such as JN.1.11.1 (KP) and JN.1.30.1 (KU)), or JN.1 as something we can observe as a label for sampled sequences. The latter of these means a non-more-specifically-named JN.1 lineage, some part of the tree of JN.1 which has not been named more specifically. This also occurs with NextStrain clades, for example the SARS-CoV-2 clade 23I was made paraphyletic with respect to 24A, which was in turn made paraphyletic by 24B. So 23I can mean an ancestral taxon, comprising all lineages in any of these clades, or a non-more-specifically named part of the 23I tree, which we could see in a sample at the same time as we see 24A.

Parameters:

  • phy (dendropy.Tree with a label for all nodes) –

    The tree to which we will add the tips.

  • tips (Sequence[str]) –

    The names of taxa that should exist as both ancestral and tip taxa.

Returns:

  • Tree

    The tree with all added tips.

Source code in cladecombiner/tree_utils.py
def add_paraphyletic_tips(
    phy: dendropy.Tree, tips: Sequence[str]
) -> dendropy.Tree:
    """
    Disambiguates ancestral versus tip taxa by adding tips explicitly.

    Assumes all nodes have labels.

    In nomenclatures for evolving pathogens, naming a new taxon will make
    a previously-named taxon paraphyletic. There can then be ambiguity with
    respect to whether that previous taxon name is being used to refer to
    the monophyletic group comprising this taxon and all its descendants,
    or the non-monophyletic group of the previous taxon except its newly
    named descendant.

    This function adds a tip to the phylogeny to represent the
    non-monophyletic group which has been split by subsequently-named
    taxa.

    For example, the SARS-CoV-2 Pango taxon JN.1 could mean the higher
    taxon JN.1 (which includes many more specifically-named taxa, such as
    JN.1.11.1 (KP) and JN.1.30.1 (KU)), or JN.1 as something we can
    observe as a label for sampled sequences. The latter of these means a
    non-more-specifically-named JN.1 lineage, some part of the tree of JN.1
    which has not been named more specifically. This also occurs with
    NextStrain clades, for example the SARS-CoV-2 clade 23I was made
    paraphyletic with respect to 24A, which was in turn made paraphyletic
    by 24B. So 23I can mean an ancestral taxon, comprising all lineages in
    any of these clades, or a non-more-specifically named part of the 23I
    tree, which we could see in a sample at the same time as we see 24A.

    Parameters
    ---------
    phy : dendropy.Tree with a label for all nodes
        The tree to which we will add the tips.
    tips : Sequence[str]
        The names of taxa that should exist as both ancestral and tip taxa.

    Returns
    -------
    dendropy.Tree
        The tree with all added tips.
    """
    tree = copy.deepcopy(phy)
    to_add = []
    for node in tree.preorder_node_iter():
        if node.is_internal():
            if node.label in tips:
                tip = dendropy.Node(label=node.label)
                to_add.append(
                    (
                        node,
                        tip,
                    )
                )
    for nt in to_add:
        nt[0].add_child(nt[1])

    return tree

fully_labeled_subtrees_same(node1, node2)

Are two subtrees with every node labeled topologically equivalent?

Used by fully_labeled_trees_same().

Recursive function, calls itself until either a difference is seen or all tips in the subtree in both tree 1 and tree 2 are seen.

Parameters:

  • node1 (Node) –

    Node defining the subtree in tree 1.

  • node2 (Node) –

    Node defining the subtree in tree 2.

Returns:

  • bool

    True if the subtrees are the same.

Source code in cladecombiner/tree_utils.py
def fully_labeled_subtrees_same(
    node1: dendropy.Node, node2: dendropy.Node
) -> bool:
    """
    Are two subtrees with every node labeled topologically equivalent?

    Used by fully_labeled_trees_same().

    Recursive function, calls itself until either a difference is seen or all
    tips in the subtree in both tree 1 and tree 2 are seen.

    Parameters
    ---------
    node1 : dendropy.Node
        Node defining the subtree in tree 1.
    node2 : dendropy.Node
        Node defining the subtree in tree 2.

    Returns
    -------
    bool
        True if the subtrees are the same.
    """
    children1 = node1.child_nodes()
    children2 = node2.child_nodes()

    child_labels1 = [child.label for child in children1].sort()
    child_labels2 = [child.label for child in children2].sort()

    if not child_labels1 == child_labels2:
        return False

    for child1 in children1:
        for child2 in children2:
            if child1.label == child2.label:
                fully_labeled_subtrees_same(child1, child2)

    return True

fully_labeled_trees_same(tree1, tree2)

Are two trees with every node labeled topologically equivalent?

Standard topological identity means that two trees portray the same evolutionary relationships between the tips. This function assumes that every internal node is labeled and checks the relationships between all nodes.

Calls fully_labeled_subtrees_same() to recursively evaluate subtrees.

Parameters:

  • tree1 (Tree) –

    One tree to compare.

  • tree2 (Tree) –

    The other tree to compare.

Returns:

  • bool

    True if the trees are the same.

Source code in cladecombiner/tree_utils.py
def fully_labeled_trees_same(
    tree1: dendropy.Tree, tree2: dendropy.Tree
) -> bool:
    """
    Are two trees with every node labeled topologically equivalent?

    Standard topological identity means that two trees portray the same
    evolutionary relationships between the tips. This function assumes that
    every internal node is labeled and checks the relationships between all
    nodes.

    Calls fully_labeled_subtrees_same() to recursively evaluate subtrees.

    Parameters
    ---------
    tree1 : dendropy.Tree
        One tree to compare.
    tree2 : dendropy.Tree
        The other tree to compare.

    Returns
    -------
    bool
        True if the trees are the same.
    """
    if isinstance(tree1.seed_node, dendropy.Node) and isinstance(
        tree2.seed_node, dendropy.Node
    ):
        if tree1.seed_node.label != tree2.seed_node.label:
            return False
        else:
            return fully_labeled_subtrees_same(
                tree1.seed_node, tree2.seed_node
            )
    else:
        # Should never hit, required for type checking
        raise RuntimeError("Malformed tree, seed_node must be a dendropy.Node")