Skip to content

API reference

iup

iup.CoverageData

Bases: Data

Source code in iup/__init__.py
class CoverageData(Data):
    def validate(self):
        """Must have time_end and estimate columns; can have more."""
        self.assert_in_schema({"time_end": pl.Date, "estimate": pl.Float64})

iup.CoverageData.validate()

Must have time_end and estimate columns; can have more.

Source code in iup/__init__.py
def validate(self):
    """Must have time_end and estimate columns; can have more."""
    self.assert_in_schema({"time_end": pl.Date, "estimate": pl.Float64})

iup.CumulativeCoverageData

Bases: CoverageData

Source code in iup/__init__.py
class CumulativeCoverageData(CoverageData):
    def validate(self):
        super().validate()
        assert self["estimate"].is_between(0.0, 1.0).all(), (
            "Cumulative coverage `estimate` must be a proportion"
        )

    def to_incident(self, groups: List[str,] | None) -> IncidentCoverageData:
        """Convert cumulative to incident coverage data.

        Because the first report date for each group is often rollout,
        incident coverage on the first report date is 0.

        Args:
            groups: Names of the columns of grouping factors, or None. If `None`,
                then data will be grouped by `"season"`.

        Returns:
            Incident coverage on each date in the input cumulative coverage data.
        """
        if groups is None:
            groups = ["season"]

        out = self.with_columns(
            estimate=pl.col("estimate").diff().over(groups).fill_null(0)
        )

        return IncidentCoverageData(out)

iup.CumulativeCoverageData.to_incident(groups)

Convert cumulative to incident coverage data.

Because the first report date for each group is often rollout, incident coverage on the first report date is 0.

Parameters:

Name Type Description Default
groups List[str,] | None

Names of the columns of grouping factors, or None. If None, then data will be grouped by "season".

required

Returns:

Type Description
IncidentCoverageData

Incident coverage on each date in the input cumulative coverage data.

Source code in iup/__init__.py
def to_incident(self, groups: List[str,] | None) -> IncidentCoverageData:
    """Convert cumulative to incident coverage data.

    Because the first report date for each group is often rollout,
    incident coverage on the first report date is 0.

    Args:
        groups: Names of the columns of grouping factors, or None. If `None`,
            then data will be grouped by `"season"`.

    Returns:
        Incident coverage on each date in the input cumulative coverage data.
    """
    if groups is None:
        groups = ["season"]

    out = self.with_columns(
        estimate=pl.col("estimate").diff().over(groups).fill_null(0)
    )

    return IncidentCoverageData(out)

iup.Data

Bases: DataFrame

Abstract class for observed data and forecast data.

Source code in iup/__init__.py
class Data(pl.DataFrame):
    """
    Abstract class for observed data and forecast data.
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.validate()

    def validate(self):
        raise NotImplementedError("Subclasses must implement this method.")

    def assert_in_schema(self, names_types: dict[str, DataTypeClass]):
        """Verify that columns of the expected types are present in the data frame.

        Args:
            names_types: Column names and types mapping.
        """
        for name, type_ in names_types.items():
            if name not in self.schema.names():
                raise RuntimeError(f"Column '{name}' not found")
            elif (
                name in self.schema.names() and (name, type_) not in self.schema.items()
            ):
                actual_type = self.schema.to_python()[name]
                raise RuntimeError(
                    f"Column '{name}' has type {actual_type}, not {type_}"
                )
            else:
                assert (name, type_) in self.schema.items()

iup.Data.assert_in_schema(names_types)

Verify that columns of the expected types are present in the data frame.

Parameters:

Name Type Description Default
names_types dict[str, DataTypeClass]

Column names and types mapping.

required
Source code in iup/__init__.py
def assert_in_schema(self, names_types: dict[str, DataTypeClass]):
    """Verify that columns of the expected types are present in the data frame.

    Args:
        names_types: Column names and types mapping.
    """
    for name, type_ in names_types.items():
        if name not in self.schema.names():
            raise RuntimeError(f"Column '{name}' not found")
        elif (
            name in self.schema.names() and (name, type_) not in self.schema.items()
        ):
            actual_type = self.schema.to_python()[name]
            raise RuntimeError(
                f"Column '{name}' has type {actual_type}, not {type_}"
            )
        else:
            assert (name, type_) in self.schema.items()

iup.IncidentCoverageData

Bases: CoverageData

Source code in iup/__init__.py
class IncidentCoverageData(CoverageData):
    def validate(self):
        super().validate()
        if not self["estimate"].is_between(-1.0, 1.0).all():
            bad_values = (
                self.filter(pl.col("estimate").is_between(-1.0, 1.0).not_())["estimate"]
                .unique()
                .to_list()
            )
            raise ValueError(
                f"Incident coverage `estimate` must be have values between -1 and +1. "
                f"Values included {bad_values}"
            )

    def to_cumulative(
        self, groups: List[str,] | None, prev_cumulative: pl.DataFrame | None = None
    ) -> "CumulativeCoverageData":
        """Convert incident to cumulative coverage data.

        Cumulative sum of incident coverage gives the cumulative coverage.
        Optionally, additional cumulative coverage from before the start of
        the incident data may be provided.
        Even if no groups are specified, the data must at least be grouped by season.

        Args:
            groups: Names of the columns of grouping factors, or None. If `None`, then
                data will be grouped by `"season"`.
            prev_cumulative: Cumulative coverage from before the start of the incident
                data, for each group, or None. If `None`, group by `"season"`.

        Returns:
            Cumulative coverage on each date in the input incident coverage data.
        """
        if groups is None:
            groups = ["season"]

        out = self.with_columns(estimate=pl.col("estimate").cum_sum().over(groups))

        if prev_cumulative is not None:
            out = out.join(prev_cumulative, on=groups)

            out = out.with_columns(
                estimate=pl.col("estimate") + pl.col("last_cumulative")
            ).drop("last_cumulative")

        return CumulativeCoverageData(out)

iup.IncidentCoverageData.to_cumulative(groups, prev_cumulative=None)

Convert incident to cumulative coverage data.

Cumulative sum of incident coverage gives the cumulative coverage. Optionally, additional cumulative coverage from before the start of the incident data may be provided. Even if no groups are specified, the data must at least be grouped by season.

Parameters:

Name Type Description Default
groups List[str,] | None

Names of the columns of grouping factors, or None. If None, then data will be grouped by "season".

required
prev_cumulative DataFrame | None

Cumulative coverage from before the start of the incident data, for each group, or None. If None, group by "season".

None

Returns:

Type Description
CumulativeCoverageData

Cumulative coverage on each date in the input incident coverage data.

Source code in iup/__init__.py
def to_cumulative(
    self, groups: List[str,] | None, prev_cumulative: pl.DataFrame | None = None
) -> "CumulativeCoverageData":
    """Convert incident to cumulative coverage data.

    Cumulative sum of incident coverage gives the cumulative coverage.
    Optionally, additional cumulative coverage from before the start of
    the incident data may be provided.
    Even if no groups are specified, the data must at least be grouped by season.

    Args:
        groups: Names of the columns of grouping factors, or None. If `None`, then
            data will be grouped by `"season"`.
        prev_cumulative: Cumulative coverage from before the start of the incident
            data, for each group, or None. If `None`, group by `"season"`.

    Returns:
        Cumulative coverage on each date in the input incident coverage data.
    """
    if groups is None:
        groups = ["season"]

    out = self.with_columns(estimate=pl.col("estimate").cum_sum().over(groups))

    if prev_cumulative is not None:
        out = out.join(prev_cumulative, on=groups)

        out = out.with_columns(
            estimate=pl.col("estimate") + pl.col("last_cumulative")
        ).drop("last_cumulative")

    return CumulativeCoverageData(out)

iup.PointForecast

Bases: QuantileForecast

Class for forecast with point estimate A subclass when quantile is 50% For now, enforce the "quantile50" to be "estimate"

Source code in iup/__init__.py
class PointForecast(QuantileForecast):
    """
    Class for forecast with point estimate
    A subclass when quantile is 50%
    For now, enforce the "quantile50" to be "estimate"
    """

    def validate(self):
        super().validate()
        assert (self["quantile"] == 0.50).all()

iup.QuantileForecast

Bases: Data

Class for forecast with quantiles. Save for future.

Source code in iup/__init__.py
class QuantileForecast(Data):
    """
    Class for forecast with quantiles.
    Save for future.
    """

    def validate(self):
        self.assert_in_schema(
            {"time_end": pl.Date, "quantile": pl.Float64, "estimate": pl.Float64}
        )

        assert self["quantile"].is_between(0.0, 1.0).all(), (
            "quantiles must be between 0 and 1"
        )

iup.SampleForecast

Bases: Data

Class for forecast with posterior distribution. Save for future.

Source code in iup/__init__.py
class SampleForecast(Data):
    """
    Class for forecast with posterior distribution.
    Save for future.
    """

    def validate(self):
        self.assert_in_schema(
            {"time_end": pl.Date, "sample_id": pl.UInt64, "estimate": pl.Float64}
        )

iup.models

iup.models.LPLModel

Bases: CoverageModel

Subclass of CoverageModel for a mixed Logistic Plus Linear model. For details, see the online docs.

Source code in iup/models.py
class LPLModel(CoverageModel):
    """
    Subclass of CoverageModel for a mixed Logistic Plus Linear model.
    For details, see the online docs.
    """

    def __init__(
        self,
        data: iup.CumulativeCoverageData,
        forecast_date: datetime.date,
        params: dict[str, Any],
        quantiles: list[float],
        date_column: str = "time_end",
    ):
        """Initialize with a seed and the model structure.

        Args:
            data: Cumulative coverage data for fitting and prediction.
            forecast_date: Date to split fit and prediction data.
            params: All parameters including parameter names and values to specify prior distributions, Control parameters for MCMC fitting, and season start month and day
            quantiles: Posterior sample quantiles
            date_column: Name of the date column in the data. Defaults to "time_end".
        """
        self.raw_data = data
        self.date_column = date_column
        self.quantiles = quantiles
        self.forecast_date = forecast_date

        # use parameters, separating MCMC and model fitting parameters
        self.params = params

        mcmc_keys = {"num_warmup", "num_samples", "num_chains", "progress_bar"}
        self.mcmc_params = {k: v for k, v in params.items() if k in mcmc_keys}
        self.model_params = {
            k: v
            for k, v in params.items()
            if k in inspect.signature(self._logistic_plus_linear).parameters
        }
        self.fit_key, self.pred_key = random.split(random.key(self.params["seed"]), 2)

        # input data validation
        assert {self.date_column, "estimate", "season", "geography"}.issubset(
            self.raw_data.columns
        )

        # preprocess data
        self.data = self._preprocess(data=self.raw_data)

        # do the indexing
        self.n_group_levels = [
            self.data.select(pl.col(group).unique()).height
            for group in ["season", "geography", "season_geo"]
        ]

        # initialize MCMC. `None` is a placeholder indicating fitting has not occurred
        self.mcmc = None

    @classmethod
    def _preprocess(cls, data: pl.DataFrame) -> pl.DataFrame:
        out = (
            data
            # prepare observation data
            .rename({"sample_size": "N_tot"})
            .with_columns(N_vax=(pl.col("N_tot") * pl.col("estimate")).round(0))
            # set up temporal data
            .with_columns(elapsed=pl.col("t") / 365)
            # add interaction term
            .with_columns(
                season_geo=pl.concat_str(["season", "geography"], separator="_")
            )
        )

        # add the indices
        out = cls._index(out, groups=["season", "geography", "season_geo"])

        return out

    @staticmethod
    def _index(data: pl.DataFrame, groups: list[str]) -> pl.DataFrame:
        """
        For each column in `groups` (e.g., `"season"`), add a new column `"{group}_idx"`
        (e.g., `"season_idx"`) that has the values in the original column replaced by
        integer indices.

        Args:
            data: dataframe
            groups: names of columns

        Returns: dataframe with additional columns like `"{group}_idx"`
        """
        for group in groups:
            unique_values = (
                data.select(pl.col(group).unique().sort()).get_column(group).to_list()
            )
            indices = list(range(len(unique_values)))
            replace_map = {value: index for value, index in zip(unique_values, indices)}
            data = data.with_columns(
                pl.col(group).replace_strict(replace_map).alias(f"{group}_idx")
            )

        return data

    def model(self, data: pl.DataFrame):
        # missing `N_vax` column signals that we are drawing predictions, not fitting
        if "N_vax" in data.columns:
            N_vax = jnp.array(data["N_vax"])
        else:
            N_vax = None

        return self._logistic_plus_linear(
            N_vax=N_vax,
            elapsed=jnp.array(data["elapsed"]),
            # jax runs into a problem if you don't specify this type
            N_tot=jnp.array(data["N_tot"], dtype=jnp.int32),
            groups=jnp.array(
                data.select(
                    [f"{group}_idx" for group in ["season", "geography", "season_geo"]]
                )
            ),
            n_groups=3,
            n_group_levels=self.n_group_levels,
            **self.model_params,
        )

    @staticmethod
    def _logistic_plus_linear(
        N_vax: jnp.ndarray | None,
        elapsed: jnp.ndarray,
        N_tot: jnp.ndarray,
        groups: jnp.ndarray,
        n_groups: int,
        n_group_levels: list[int],
        muA_shape1: float,
        muA_shape2: float,
        sigmaA_rate: float,
        tau_shape1: float,
        tau_shape2: float,
        K_shape: float,
        K_rate: float,
        muM_shape: float,
        muM_rate: float,
        sigmaM_rate: float,
        D_shape: float,
        D_rate: float,
    ):
        """Fit a mixed Logistic Plus Linear model on training data.

        Args:
            elapsed: Fraction of a year elapsed since the start of season at each data point.
            N_vax: Number of people vaccinated at each data point, or `None`.
            N_tot: Total number of people in the population at each data point.
            groups: Numeric codes for groups: row = data point, col = grouping factor.
            n_groups: Number of grouping factors.
            n_group_levels: Number of unique levels of each grouping factor.
            muA_shape1: Beta distribution shape1 parameter for muA prior.
            muA_shape2: Beta distribution shape2 parameter for muA prior.
            sigmaA_rate: Exponential distribution rate parameter for sigmaA prior.
            tau_shape1: Beta distribution shape1 parameter for tau prior.
            tau_shape2: Beta distribution shape2 parameter for tau prior.
            K_shape: Gamma distribution shape parameter for K prior.
            K_rate: Gamma distribution rate parameter for K prior.
            muM_shape: Gamma distribution shape parameter for muM prior.
            muM_rate: Gamma distribution rate parameter for muM prior.
            sigmaM_rate: Exponential distribution rate parameter for sigmaM prior.
            D_shape: Gamma distribution shape parameter for D prior.
            D_rate: Gamma distribution rate parameter for D prior.
        """
        # Sample the overall average value for each parameter
        muA = numpyro.sample("muA", dist.Beta(muA_shape1, muA_shape2))
        muM = numpyro.sample("muM", dist.Gamma(muM_shape, muM_rate))
        tau = numpyro.sample("tau", dist.Beta(tau_shape1, tau_shape2))
        K = numpyro.sample("K", dist.Gamma(K_shape, K_rate))
        D = numpyro.sample("d", dist.Gamma(D_shape, D_rate))

        sigmaA = numpyro.sample(
            "sigmaA", dist.Exponential(sigmaA_rate), sample_shape=(n_groups,)
        )
        sigmaM = numpyro.sample(
            "sigmaM", dist.Exponential(sigmaM_rate), sample_shape=(n_groups,)
        )
        zA = numpyro.sample(
            "zA", dist.Normal(0, 1), sample_shape=(sum(n_group_levels),)
        )
        zM = numpyro.sample(
            "zM", dist.Normal(0, 1), sample_shape=(sum(n_group_levels),)
        )
        deltaA = zA * np.repeat(sigmaA, np.array(n_group_levels))
        deltaM = zM * np.repeat(sigmaM, np.array(n_group_levels))

        A = muA + np.sum(deltaA[groups], axis=1)
        M = muM + np.sum(deltaM[groups], axis=1)

        # Calculate latent true coverage at each datum
        v = A / (1 + jnp.exp(-K * (elapsed - tau))) + (M * elapsed)  # type: ignore

        numpyro.sample("obs", dist.BetaBinomial(v * D, (1 - v) * D, N_tot), obs=N_vax)  # type: ignore

    def fit(self) -> Self:
        """Fit a mixed Logistic Plus Linear model on training data.

        If grouping factors are specified, a hierarchical model will be built with
        group-specific parameters for the logistic maximum and linear slope,
        drawn from a shared distribution. Other parameters are non-hierarchical.

        Uses the data, groups, model_params, and mcmc_params specified during
        initialization.

        Returns:
            Self with the fitted model stored in the mcmc attribute.
        """
        self.kernel = NUTS(self.model, init_strategy=init_to_sample)
        self.mcmc = MCMC(self.kernel, **self.mcmc_params)
        self.mcmc.run(
            self.fit_key,
            self.data.filter(pl.col(self.date_column) <= self.forecast_date),
        )

        if "progress_bar" in self.mcmc_params and self.mcmc_params["progress_bar"]:
            self.mcmc.print_summary()

        return self

    def predict(self) -> pl.DataFrame:
        """
        Make projections from a fit Logistic Plus Linear model.

        Returns:
            Sample forecast data frame with predictions for dates after forecast_date.
        """

        assert self.mcmc is not None, f"Need to fit() first; mcmc is {self.mcmc}"

        predictive = Predictive(self.model, self.mcmc.get_samples())
        # run the predictions, not using the observations
        pred = predictive(self.pred_key, self.data.drop("N_vax"))

        # observations are rows; posterior samples are columns
        pred = np.array(pred["obs"]).transpose()

        # put predictions into a dataframe
        sample_cols = [f"_sample_{i}" for i in range(pred.shape[1])]
        pred = pl.DataFrame(pred, schema=sample_cols)

        index_cols = [self.date_column, "N_tot", "season", "geography", "season_geo"]

        data_pred = (
            pl.concat([self.data, pred], how="horizontal")
            .unpivot(
                on=sample_cols,
                index=index_cols,
                variable_name="sample_id",
                value_name="estimate",
            )
            .with_columns(
                forecast_date=self.forecast_date,
                # convert from sample_id strings to integers
                sample_id=pl.col("sample_id")
                .replace_strict({name: i for i, name in enumerate(sample_cols)})
                .cast(pl.UInt64),
                estimate=pl.col("estimate") / pl.col("N_tot"),
            )
            .group_by(
                ["season", "geography", "season_geo", "time_end", "forecast_date"]
            )
            .agg(
                quantile=pl.concat_arr(self.quantiles),
                estimate=pl.concat_arr(
                    [pl.quantile("estimate", q).alias(str(q)) for q in self.quantiles]
                ),
            )
        )

        return iup.QuantileForecast(data_pred.explode(["quantile", "estimate"]))

iup.models.LPLModel.__init__(data, forecast_date, params, quantiles, date_column='time_end')

Initialize with a seed and the model structure.

Parameters:

Name Type Description Default
data CumulativeCoverageData

Cumulative coverage data for fitting and prediction.

required
forecast_date date

Date to split fit and prediction data.

required
params dict[str, Any]

All parameters including parameter names and values to specify prior distributions, Control parameters for MCMC fitting, and season start month and day

required
quantiles list[float]

Posterior sample quantiles

required
date_column str

Name of the date column in the data. Defaults to "time_end".

'time_end'
Source code in iup/models.py
def __init__(
    self,
    data: iup.CumulativeCoverageData,
    forecast_date: datetime.date,
    params: dict[str, Any],
    quantiles: list[float],
    date_column: str = "time_end",
):
    """Initialize with a seed and the model structure.

    Args:
        data: Cumulative coverage data for fitting and prediction.
        forecast_date: Date to split fit and prediction data.
        params: All parameters including parameter names and values to specify prior distributions, Control parameters for MCMC fitting, and season start month and day
        quantiles: Posterior sample quantiles
        date_column: Name of the date column in the data. Defaults to "time_end".
    """
    self.raw_data = data
    self.date_column = date_column
    self.quantiles = quantiles
    self.forecast_date = forecast_date

    # use parameters, separating MCMC and model fitting parameters
    self.params = params

    mcmc_keys = {"num_warmup", "num_samples", "num_chains", "progress_bar"}
    self.mcmc_params = {k: v for k, v in params.items() if k in mcmc_keys}
    self.model_params = {
        k: v
        for k, v in params.items()
        if k in inspect.signature(self._logistic_plus_linear).parameters
    }
    self.fit_key, self.pred_key = random.split(random.key(self.params["seed"]), 2)

    # input data validation
    assert {self.date_column, "estimate", "season", "geography"}.issubset(
        self.raw_data.columns
    )

    # preprocess data
    self.data = self._preprocess(data=self.raw_data)

    # do the indexing
    self.n_group_levels = [
        self.data.select(pl.col(group).unique()).height
        for group in ["season", "geography", "season_geo"]
    ]

    # initialize MCMC. `None` is a placeholder indicating fitting has not occurred
    self.mcmc = None

iup.models.LPLModel.fit()

Fit a mixed Logistic Plus Linear model on training data.

If grouping factors are specified, a hierarchical model will be built with group-specific parameters for the logistic maximum and linear slope, drawn from a shared distribution. Other parameters are non-hierarchical.

Uses the data, groups, model_params, and mcmc_params specified during initialization.

Returns:

Type Description
Self

Self with the fitted model stored in the mcmc attribute.

Source code in iup/models.py
def fit(self) -> Self:
    """Fit a mixed Logistic Plus Linear model on training data.

    If grouping factors are specified, a hierarchical model will be built with
    group-specific parameters for the logistic maximum and linear slope,
    drawn from a shared distribution. Other parameters are non-hierarchical.

    Uses the data, groups, model_params, and mcmc_params specified during
    initialization.

    Returns:
        Self with the fitted model stored in the mcmc attribute.
    """
    self.kernel = NUTS(self.model, init_strategy=init_to_sample)
    self.mcmc = MCMC(self.kernel, **self.mcmc_params)
    self.mcmc.run(
        self.fit_key,
        self.data.filter(pl.col(self.date_column) <= self.forecast_date),
    )

    if "progress_bar" in self.mcmc_params and self.mcmc_params["progress_bar"]:
        self.mcmc.print_summary()

    return self

iup.models.LPLModel.predict()

Make projections from a fit Logistic Plus Linear model.

Returns:

Type Description
DataFrame

Sample forecast data frame with predictions for dates after forecast_date.

Source code in iup/models.py
def predict(self) -> pl.DataFrame:
    """
    Make projections from a fit Logistic Plus Linear model.

    Returns:
        Sample forecast data frame with predictions for dates after forecast_date.
    """

    assert self.mcmc is not None, f"Need to fit() first; mcmc is {self.mcmc}"

    predictive = Predictive(self.model, self.mcmc.get_samples())
    # run the predictions, not using the observations
    pred = predictive(self.pred_key, self.data.drop("N_vax"))

    # observations are rows; posterior samples are columns
    pred = np.array(pred["obs"]).transpose()

    # put predictions into a dataframe
    sample_cols = [f"_sample_{i}" for i in range(pred.shape[1])]
    pred = pl.DataFrame(pred, schema=sample_cols)

    index_cols = [self.date_column, "N_tot", "season", "geography", "season_geo"]

    data_pred = (
        pl.concat([self.data, pred], how="horizontal")
        .unpivot(
            on=sample_cols,
            index=index_cols,
            variable_name="sample_id",
            value_name="estimate",
        )
        .with_columns(
            forecast_date=self.forecast_date,
            # convert from sample_id strings to integers
            sample_id=pl.col("sample_id")
            .replace_strict({name: i for i, name in enumerate(sample_cols)})
            .cast(pl.UInt64),
            estimate=pl.col("estimate") / pl.col("N_tot"),
        )
        .group_by(
            ["season", "geography", "season_geo", "time_end", "forecast_date"]
        )
        .agg(
            quantile=pl.concat_arr(self.quantiles),
            estimate=pl.concat_arr(
                [pl.quantile("estimate", q).alias(str(q)) for q in self.quantiles]
            ),
        )
    )

    return iup.QuantileForecast(data_pred.explode(["quantile", "estimate"]))

iup.eval

iup.eval.eos_abs_diff(obs, pred, grouping_factors)

Calculate the absolute difference between observed data and prediction for the last date in a season.

Parameters:

Name Type Description Default
obs DataFrame | LazyFrame

Observed data.

required
pred DataFrame | LazyFrame

Predicted data.

required
grouping_factors List[str]

Grouping factor column names (must include 'season').

required

Returns:

Type Description
DataFrame

Data frame with absolute difference scores for end-of-season dates.

Source code in iup/eval.py
def eos_abs_diff(
    obs: pl.DataFrame | pl.LazyFrame,
    pred: pl.DataFrame | pl.LazyFrame,
    grouping_factors: List[str],
) -> pl.DataFrame:
    """Calculate the absolute difference between observed data and prediction for the last date in a season.

    Args:
        obs: Observed data.
        pred: Predicted data.
        grouping_factors: Grouping factor column names (must include 'season').

    Returns:
        Data frame with absolute difference scores for end-of-season dates.
    """
    assert "season" in grouping_factors
    obs = _ensure_lazy(obs)
    pred = _ensure_lazy(pred)

    return (
        pred.filter(pl.col("quantile") == 0.5)
        .join(
            obs.filter(
                (pl.col("time_end") == pl.col("time_end").max()).over(grouping_factors)
            ),
            on=["time_end"] + grouping_factors,
            how="right",
        )
        .rename({"estimate_right": "obs", "estimate": "pred"})
        .with_columns(
            score_value=(pl.col("pred") - pl.col("obs")).abs(),
            score_fun=pl.lit("eos_abs_diff"),
        )
        .select(grouping_factors + SCORE_COLS)
        .collect()
    )

iup.eval.mspe(obs, pred, grouping_factors)

Mean square prediction error

Parameters:

Name Type Description Default
obs DataFrame | LazyFrame

Data frame with columns time_end, estimate, and the grouping factors

required
pred DataFrame | LazyFrame

Data frame columns model, time_end, forecast_date, estimate, and the grouping factors.

required
grouping_factors List[str]

Grouping factor column names.

required

Returns:

Type Description
DataFrame

Data frame with scores.

Source code in iup/eval.py
def mspe(
    obs: pl.DataFrame | pl.LazyFrame,
    pred: pl.DataFrame | pl.LazyFrame,
    grouping_factors: List[str],
) -> pl.DataFrame:
    """Mean square prediction error

    Args:
        obs: Data frame with columns `time_end`, `estimate`, and the grouping factors
        pred: Data frame columns `model`, `time_end`, `forecast_date`, `estimate`, and
            the grouping factors.
        grouping_factors: Grouping factor column names.

    Returns:
        Data frame with scores.
    """
    obs = _ensure_lazy(obs)
    pred = _ensure_lazy(pred)

    return (
        pred.filter(pl.col("quantile") == 0.5)
        .join(obs, on=["time_end"] + grouping_factors, how="right")
        .rename({"estimate_right": "obs", "estimate": "pred"})
        .with_columns(score_value=(pl.col("pred") - pl.col("obs")) ** 2)
        .group_by(["model", "forecast_date"] + grouping_factors)
        .agg(pl.col("score_value").mean())
        .with_columns(score_fun=pl.lit("mspe"))
        .select(grouping_factors + SCORE_COLS)
        .collect()
    )

iup.utils

iup.utils.date_to_season(date, season_start_month, season_start_day=1)

Extract the overwinter disease season from a date.

Dates in year Y before the season start (e.g., Sep 1) are in the second part of the season (i.e., in season Y-1/Y). Dates in year Y after the season start are in season Y/Y+1. E.g., 2023-10-07 and 2024-04-18 are both in "2023/2024".

Parameters:

Name Type Description Default
date Expr

Dates in an coverage data frame.

required
season_start_month int

First month of the overwinter disease season.

required
season_start_day int

First day of the first month of the overwinter disease season.

1

Returns:

Type Description
Expr

Seasons for each date.

Source code in iup/utils.py
def date_to_season(
    date: pl.Expr, season_start_month: int, season_start_day: int = 1
) -> pl.Expr:
    """Extract the overwinter disease season from a date.

    Dates in year Y before the season start (e.g., Sep 1) are in the second part of
    the season (i.e., in season Y-1/Y). Dates in year Y after the season start are in
    season Y/Y+1. E.g., 2023-10-07 and 2024-04-18 are both in "2023/2024".

    Args:
        date: Dates in an coverage data frame.
        season_start_month: First month of the overwinter disease season.
        season_start_day: First day of the first month of the overwinter disease season.

    Returns:
        Seasons for each date.
    """

    # for every date, figure out the season breakpoint in that year
    season_start = pl.date(date.dt.year(), season_start_month, season_start_day)

    # what is the first year in the two-year season indicator?
    date_year = date.dt.year()
    year1 = pl.when(date < season_start).then(date_year - 1).otherwise(date_year)

    year2 = year1 + 1
    return pl.format("{}/{}", year1, year2)