"""Kalman filter forecaster implementation."""
import logging
from typing import TYPE_CHECKING, Any, Optional, Union
import numpy as np
import pandas as pd
from timesmith.core.base import BaseForecaster
from timesmith.core.tags import set_tags
from timesmith.results.forecast import Forecast
from timesmith.utils.ts_utils import ensure_datetime_index
if TYPE_CHECKING:
from timesmith.typing import SeriesLike, TableLike
logger = logging.getLogger(__name__)
try:
from filterpy.common import Q_discrete_white_noise
from filterpy.kalman import KalmanFilter
HAS_FILTERPY = True
except ImportError:
KalmanFilter = None
Q_discrete_white_noise = None
HAS_FILTERPY = False
logger.warning(
"filterpy not installed. KalmanFilterForecaster will not work. "
"Install with: pip install filterpy"
)
[docs]
class KalmanFilterForecaster(BaseForecaster):
"""Kalman filter forecaster for state space models.
Uses Kalman filtering for time series forecasting with state space models.
"""
[docs]
def __init__(
self,
state_dimension: int = 2,
measurement_dimension: int = 1,
initial_state: Optional[np.ndarray] = None,
state_transition: Optional[np.ndarray] = None,
measurement_matrix: Optional[np.ndarray] = None,
initial_covariance: float = 1000.0,
measurement_noise: float = 5.0,
process_noise: float = 0.1,
dt: float = 1.0,
):
"""Initialize Kalman filter forecaster.
Args:
state_dimension: Dimension of state vector (default: 2).
measurement_dimension: Dimension of measurement vector (default: 1).
initial_state: Initial state vector (default: zeros).
state_transition: State transition matrix F (default: constant velocity model).
measurement_matrix: Measurement matrix H (default: [1, 0]).
initial_covariance: Initial covariance P (default: 1000.0).
measurement_noise: Measurement noise R (default: 5.0).
process_noise: Process noise variance (default: 0.1).
dt: Time step (default: 1.0).
"""
if not HAS_FILTERPY:
raise ImportError(
"filterpy is required for KalmanFilterForecaster. "
"Install with: pip install filterpy"
)
super().__init__()
self.state_dimension = state_dimension
self.measurement_dimension = measurement_dimension
self.initial_state = initial_state
self.state_transition = state_transition
self.measurement_matrix = measurement_matrix
self.initial_covariance = initial_covariance
self.measurement_noise = measurement_noise
self.process_noise = process_noise
self.dt = dt
set_tags(
self,
scitype_input="SeriesLike",
scitype_output="ForecastLike",
handles_missing=False,
requires_sorted_index=True,
supports_panel=False,
requires_fh=True,
)
def _create_kalman_filter(self) -> KalmanFilter:
"""Create and configure Kalman filter."""
kf = KalmanFilter(dim_x=self.state_dimension, dim_z=self.measurement_dimension)
# Set initial state
if self.initial_state is not None:
kf.x = np.array(self.initial_state)
else:
kf.x = np.zeros(self.state_dimension)
# Set state transition matrix F
if self.state_transition is not None:
kf.F = np.array(self.state_transition)
else:
# Default: constant velocity model
kf.F = np.array([[1.0, self.dt], [0.0, 1.0]])
# Set measurement matrix H
if self.measurement_matrix is not None:
kf.H = np.array(self.measurement_matrix)
else:
# Default: measure position only
kf.H = np.array([[1.0, 0.0]])
# Set initial covariance P
kf.P = np.eye(self.state_dimension) * self.initial_covariance
# Set measurement noise R
kf.R = self.measurement_noise
# Set process noise Q
kf.Q = Q_discrete_white_noise(
dim=self.state_dimension,
dt=self.dt,
var=self.process_noise,
)
return kf
[docs]
def fit(
self,
y: Union["SeriesLike", Any],
X: Optional[Union["TableLike", Any]] = None,
**fit_params: Any,
) -> "KalmanFilterForecaster":
"""Fit Kalman filter to data.
Args:
y: Target time series.
X: Optional exogenous data (ignored).
**fit_params: Additional fit parameters.
Returns:
Self for method chaining.
"""
if X is not None:
logger.warning(
"Exogenous variables (X) not yet supported for KalmanFilterForecaster"
)
if isinstance(y, pd.Series):
series = y
elif isinstance(y, pd.DataFrame) and y.shape[1] == 1:
series = y.iloc[:, 0]
else:
raise ValueError("y must be SeriesLike (Series or single-column DataFrame)")
series = ensure_datetime_index(series)
self.train_index_ = series.index
self.train_values_ = series.values
# Create and fit Kalman filter
self.kf_ = self._create_kalman_filter()
# Apply filter to all measurements
estimates = []
for measurement in self.train_values_:
self.kf_.predict()
self.kf_.update(measurement)
estimates.append(self.kf_.x.copy())
self.estimates_ = np.array(estimates)
# Store last state for forecasting
self.last_state_ = self.kf_.x.copy()
self.last_covariance_ = self.kf_.P.copy()
self._is_fitted = True
return self
[docs]
def predict(
self,
fh: Union[int, list, Any],
X: Optional[Union["TableLike", Any]] = None,
**predict_params: Any,
) -> Forecast:
"""Generate forecast.
Args:
fh: Forecast horizon (integer or array).
X: Optional exogenous data (ignored).
**predict_params: Additional prediction parameters.
Returns:
Forecast object with predictions.
"""
self._check_is_fitted()
if X is not None:
logger.warning(
"Exogenous variables (X) not yet supported for KalmanFilterForecaster"
)
# Convert fh to integer
if isinstance(fh, (list, np.ndarray)):
n_periods = len(fh)
elif isinstance(fh, int):
n_periods = fh
else:
n_periods = int(fh)
# Generate forecast by propagating state forward
predictions = []
current_state = self.last_state_.copy()
current_cov = self.last_covariance_.copy()
for _ in range(n_periods):
# Predict next state
current_state = self.kf_.F @ current_state
current_cov = self.kf_.F @ current_cov @ self.kf_.F.T + self.kf_.Q
# Extract measurement (position)
prediction = (self.kf_.H @ current_state)[0]
predictions.append(prediction)
# Create forecast Series
freq = pd.infer_freq(self.train_index_) or "D"
last_date = self.train_index_[-1]
future_index = pd.date_range(
start=last_date + pd.Timedelta(days=1),
periods=n_periods,
freq=freq,
)
y_pred = pd.Series(predictions, index=future_index)
return Forecast(y_pred=y_pred, fh=fh)
[docs]
def predict_interval(
self,
fh: Any,
X: Optional[Any] = None,
coverage: float = 0.9,
**predict_params: Any,
) -> Forecast:
"""Generate forecast with prediction intervals.
Args:
fh: Forecast horizon.
X: Optional exogenous data.
coverage: Coverage level (e.g., 0.9 for 90%).
**predict_params: Additional prediction parameters.
Returns:
Forecast with intervals.
"""
self._check_is_fitted()
# Get point forecast
forecast = self.predict(fh, X, **predict_params)
# Calculate intervals from covariance
predictions = []
lower_bounds = []
upper_bounds = []
current_state = self.last_state_.copy()
current_cov = self.last_covariance_.copy()
from scipy import stats
z_score = stats.norm.ppf((1 + coverage) / 2)
for _ in range(len(forecast.y_pred)):
# Predict next state
current_state = self.kf_.F @ current_state
current_cov = self.kf_.F @ current_cov @ self.kf_.F.T + self.kf_.Q
# Extract measurement and uncertainty
prediction = (self.kf_.H @ current_state)[0]
measurement_cov = self.kf_.H @ current_cov @ self.kf_.H.T
std_error = np.sqrt(measurement_cov[0, 0])
predictions.append(prediction)
lower_bounds.append(prediction - z_score * std_error)
upper_bounds.append(prediction + z_score * std_error)
y_int = pd.DataFrame(
{
"lower": lower_bounds,
"upper": upper_bounds,
},
index=forecast.y_pred.index,
)
return Forecast(y_pred=forecast.y_pred, fh=fh, y_int=y_int)