"""Bayesian forecasting with uncertainty quantification using MCMC."""
import logging
from typing import Any, Dict, Optional
import numpy as np
import pandas as pd
from timesmith.core.base import BaseForecaster
from timesmith.core.tags import set_tags
from timesmith.results.forecast import Forecast
logger = logging.getLogger(__name__)
# Optional PyMC for Bayesian inference
try:
import arviz as az
import pymc as pm
PYMC_AVAILABLE = True
except ImportError:
PYMC_AVAILABLE = False
logger.warning(
"PyMC not available. BayesianForecaster requires PyMC. "
"Install with: pip install pymc arviz"
)
[docs]
class BayesianForecaster(BaseForecaster):
"""Bayesian forecaster with uncertainty quantification using MCMC.
Uses Bayesian inference to estimate forecaster parameters and provides
uncertainty quantification through posterior sampling. Works by fitting
a probabilistic model to the time series and sampling from the posterior.
This is a general-purpose Bayesian forecaster that can be used with
any time series model. For specific models (like ARIMA), consider using
model-specific Bayesian implementations.
"""
[docs]
def __init__(
self,
model_type: str = "linear_trend",
n_samples: int = 2000,
n_tune: int = 1000,
random_state: Optional[int] = None,
prior_params: Optional[Dict[str, Any]] = None,
):
"""Initialize Bayesian forecaster.
Args:
model_type: Type of probabilistic model:
- 'linear_trend': Linear trend with noise
- 'exponential_trend': Exponential trend
- 'ar1': AR(1) model
n_samples: Number of MCMC samples.
n_tune: Number of tuning samples.
random_state: Random state for reproducibility.
prior_params: Optional prior parameter distributions.
"""
if not PYMC_AVAILABLE:
raise ImportError(
"PyMC is required for BayesianForecaster. "
"Install with: pip install pymc arviz"
)
super().__init__()
self.model_type = model_type
self.n_samples = n_samples
self.n_tune = n_tune
self.random_state = random_state
self.prior_params = prior_params or {}
self.trace_ = None
self.model_ = None
set_tags(
self,
scitype_input="SeriesLike",
scitype_output="ForecastLike",
handles_missing=False,
requires_sorted_index=True,
)
[docs]
def fit(
self, y: Any, X: Optional[Any] = None, **fit_params: Any
) -> "BayesianForecaster":
"""Fit Bayesian model using MCMC.
Args:
y: Target time series.
X: Optional exogenous data (not yet supported).
**fit_params: Additional fit parameters.
Returns:
Self for method chaining.
"""
if X is not None:
logger.warning("Exogenous data X not yet supported in BayesianForecaster")
if isinstance(y, pd.Series):
self.y_ = y.values
self.index_ = y.index
elif isinstance(y, pd.DataFrame) and y.shape[1] == 1:
self.y_ = y.iloc[:, 0].values
self.index_ = y.index
else:
self.y_ = np.asarray(y, dtype=float)
self.index_ = np.arange(len(self.y_))
# Remove invalid values
valid_mask = np.isfinite(self.y_)
self.y_ = self.y_[valid_mask]
self.index_ = self.index_[valid_mask]
if len(self.y_) < 3:
raise ValueError("Need at least 3 valid data points")
# Normalize time to start at 0
if isinstance(self.index_, pd.DatetimeIndex):
time_arr = (self.index_ - self.index_[0]).days
else:
time_arr = np.arange(len(self.y_))
# Build and fit PyMC model
self.trace_, self.model_ = self._fit_bayesian_model(time_arr, self.y_)
self._is_fitted = True
return self
def _fit_bayesian_model(self, time: np.ndarray, y: np.ndarray) -> tuple:
"""Fit Bayesian model using PyMC.
Args:
time: Time array (normalized).
y: Target values.
Returns:
Tuple of (trace, model).
"""
with pm.Model() as model:
if self.model_type == "linear_trend":
# Linear trend model: y = a + b*t + noise
a = pm.Normal(
"a",
mu=np.mean(y),
sigma=np.std(y),
**self.prior_params.get("a", {}),
)
b = pm.Normal(
"b",
mu=0.0,
sigma=np.std(y) / len(y),
**self.prior_params.get("b", {}),
)
sigma = pm.HalfNormal(
"sigma",
sigma=np.std(y),
**self.prior_params.get("sigma", {}),
)
# Likelihood
mu = a + b * time
pm.Normal("obs", mu=mu, sigma=sigma, observed=y)
elif self.model_type == "exponential_trend":
# Exponential trend: y = a * exp(b*t) + noise
a = pm.Lognormal(
"a",
mu=np.log(np.maximum(y[0], 0.1)),
sigma=0.5,
**self.prior_params.get("a", {}),
)
b = pm.Normal(
"b",
mu=0.0,
sigma=0.1,
**self.prior_params.get("b", {}),
)
sigma = pm.HalfNormal(
"sigma",
sigma=np.std(y),
**self.prior_params.get("sigma", {}),
)
# Likelihood
mu = a * pm.math.exp(b * time)
pm.Normal("obs", mu=mu, sigma=sigma, observed=y)
elif self.model_type == "ar1":
# AR(1) model: y_t = phi * y_{t-1} + noise
phi = pm.Normal(
"phi",
mu=0.0,
sigma=1.0,
**self.prior_params.get("phi", {}),
)
sigma = pm.HalfNormal(
"sigma",
sigma=np.std(y),
**self.prior_params.get("sigma", {}),
)
# Likelihood (AR(1))
mu = phi * y[:-1]
pm.Normal("obs", mu=mu, sigma=sigma, observed=y[1:])
else:
raise ValueError(f"Unknown model_type: {self.model_type}")
# Sample
trace = pm.sample(
draws=self.n_samples,
tune=self.n_tune,
random_seed=self.random_state,
return_inferencedata=True,
progressbar=False,
)
logger.info(
f"Completed Bayesian sampling: {len(trace.posterior.draw)} samples "
f"(model={self.model_type})"
)
return trace, model
[docs]
def predict(
self, fh: Any, X: Optional[Any] = None, **predict_params: Any
) -> Forecast:
"""Generate Bayesian forecast with uncertainty.
Args:
fh: Forecast horizon (integer or array-like).
X: Optional exogenous data (not yet supported).
**predict_params: Additional prediction parameters.
Returns:
Forecast with mean prediction and uncertainty intervals.
"""
self._check_is_fitted()
if X is not None:
logger.warning("Exogenous data X not yet supported in BayesianForecaster")
# Convert fh to array
if isinstance(fh, (int, np.integer)):
fh_arr = np.arange(1, fh + 1)
elif isinstance(fh, (list, np.ndarray, pd.Index)):
fh_arr = np.asarray(fh)
else:
raise ValueError(f"Unsupported fh type: {type(fh)}")
# Get forecast times (relative to last observed time)
last_time = len(self.y_) - 1
forecast_times = last_time + fh_arr
# Sample from posterior and generate forecasts
posterior = self.trace_.posterior
forecasts = []
# Limit samples for computational efficiency
n_posterior_samples = min(100, len(posterior.draw))
for i in range(n_posterior_samples):
# Sample parameters from posterior
if self.model_type == "linear_trend":
a = float(posterior.a.values.flatten()[i])
b = float(posterior.b.values.flatten()[i])
y_pred = a + b * forecast_times
elif self.model_type == "exponential_trend":
a = float(posterior.a.values.flatten()[i])
b = float(posterior.b.values.flatten()[i])
y_pred = a * np.exp(b * forecast_times)
elif self.model_type == "ar1":
phi = float(posterior.phi.values.flatten()[i])
# AR(1) forecast: y_t = phi * y_{t-1}
y_pred = np.zeros(len(fh_arr))
last_value = self.y_[-1]
for j, t in enumerate(fh_arr):
last_value = phi * last_value
y_pred[j] = last_value
else:
raise ValueError(f"Unknown model_type: {self.model_type}")
forecasts.append(y_pred)
forecasts = np.array(forecasts)
# Calculate statistics
mean_forecast = np.mean(forecasts, axis=0)
std_forecast = np.std(forecasts, axis=0)
# Calculate quantiles
quantiles = {
"p05": np.quantile(forecasts, 0.05, axis=0),
"p25": np.quantile(forecasts, 0.25, axis=0),
"p50": np.quantile(forecasts, 0.50, axis=0),
"p75": np.quantile(forecasts, 0.75, axis=0),
"p95": np.quantile(forecasts, 0.95, axis=0),
}
# Create prediction intervals DataFrame
y_int = pd.DataFrame(
{
"lower": quantiles["p05"],
"upper": quantiles["p95"],
}
)
# Convert mean forecast to Series
if isinstance(fh, (list, np.ndarray, pd.Index)):
y_pred_series = pd.Series(mean_forecast, index=fh)
else:
y_pred_series = pd.Series(mean_forecast)
return Forecast(
y_pred=y_pred_series,
fh=fh,
y_int=y_int,
metadata={
"n_samples": n_posterior_samples,
"std": std_forecast.tolist()
if isinstance(std_forecast, np.ndarray)
else std_forecast,
"quantiles": {
k: v.tolist() if isinstance(v, np.ndarray) else v
for k, v in quantiles.items()
},
"model_type": self.model_type,
},
)
[docs]
def predict_interval(
self,
fh: Any,
X: Optional[Any] = None,
coverage: float = 0.9,
**predict_params: Any,
) -> Forecast:
"""Generate forecast with prediction intervals.
Args:
fh: Forecast horizon.
X: Optional exogenous data (not yet supported).
coverage: Coverage level (e.g., 0.9 for 90%).
**predict_params: Additional prediction parameters.
Returns:
Forecast with prediction intervals.
"""
# predict() already includes intervals, so just adjust coverage
forecast = self.predict(fh, X, **predict_params)
# Adjust intervals to requested coverage
alpha = 1 - coverage
lower_quantile = alpha / 2
upper_quantile = 1 - alpha / 2
# Get forecasts from metadata
if "forecasts" in forecast.metadata:
forecasts = np.array(forecast.metadata["forecasts"])
else:
# Re-generate if needed (shouldn't happen)
return self.predict(fh, X, **predict_params)
lower = np.quantile(forecasts, lower_quantile, axis=0)
upper = np.quantile(forecasts, upper_quantile, axis=0)
forecast.y_int = pd.DataFrame({"lower": lower, "upper": upper})
return forecast
[docs]
def get_posterior_summary(self) -> pd.DataFrame:
"""Get summary statistics of posterior distributions.
Returns:
DataFrame with posterior summary (mean, std, credible intervals).
"""
self._check_is_fitted()
return az.summary(self.trace_)