Source code for timesmith.forecasters.kalman

"""Kalman filter forecaster implementation."""

import logging
from typing import TYPE_CHECKING, Any, Optional, Union

import numpy as np
import pandas as pd

from timesmith.core.base import BaseForecaster
from timesmith.core.tags import set_tags
from timesmith.results.forecast import Forecast
from timesmith.utils.ts_utils import ensure_datetime_index

if TYPE_CHECKING:
    from timesmith.typing import SeriesLike, TableLike

logger = logging.getLogger(__name__)

try:
    from filterpy.common import Q_discrete_white_noise
    from filterpy.kalman import KalmanFilter

    HAS_FILTERPY = True
except ImportError:
    KalmanFilter = None
    Q_discrete_white_noise = None
    HAS_FILTERPY = False
    logger.warning(
        "filterpy not installed. KalmanFilterForecaster will not work. "
        "Install with: pip install filterpy"
    )


[docs] class KalmanFilterForecaster(BaseForecaster): """Kalman filter forecaster for state space models. Uses Kalman filtering for time series forecasting with state space models. """
[docs] def __init__( self, state_dimension: int = 2, measurement_dimension: int = 1, initial_state: Optional[np.ndarray] = None, state_transition: Optional[np.ndarray] = None, measurement_matrix: Optional[np.ndarray] = None, initial_covariance: float = 1000.0, measurement_noise: float = 5.0, process_noise: float = 0.1, dt: float = 1.0, ): """Initialize Kalman filter forecaster. Args: state_dimension: Dimension of state vector (default: 2). measurement_dimension: Dimension of measurement vector (default: 1). initial_state: Initial state vector (default: zeros). state_transition: State transition matrix F (default: constant velocity model). measurement_matrix: Measurement matrix H (default: [1, 0]). initial_covariance: Initial covariance P (default: 1000.0). measurement_noise: Measurement noise R (default: 5.0). process_noise: Process noise variance (default: 0.1). dt: Time step (default: 1.0). """ if not HAS_FILTERPY: raise ImportError( "filterpy is required for KalmanFilterForecaster. " "Install with: pip install filterpy" ) super().__init__() self.state_dimension = state_dimension self.measurement_dimension = measurement_dimension self.initial_state = initial_state self.state_transition = state_transition self.measurement_matrix = measurement_matrix self.initial_covariance = initial_covariance self.measurement_noise = measurement_noise self.process_noise = process_noise self.dt = dt set_tags( self, scitype_input="SeriesLike", scitype_output="ForecastLike", handles_missing=False, requires_sorted_index=True, supports_panel=False, requires_fh=True, )
def _create_kalman_filter(self) -> KalmanFilter: """Create and configure Kalman filter.""" kf = KalmanFilter(dim_x=self.state_dimension, dim_z=self.measurement_dimension) # Set initial state if self.initial_state is not None: kf.x = np.array(self.initial_state) else: kf.x = np.zeros(self.state_dimension) # Set state transition matrix F if self.state_transition is not None: kf.F = np.array(self.state_transition) else: # Default: constant velocity model kf.F = np.array([[1.0, self.dt], [0.0, 1.0]]) # Set measurement matrix H if self.measurement_matrix is not None: kf.H = np.array(self.measurement_matrix) else: # Default: measure position only kf.H = np.array([[1.0, 0.0]]) # Set initial covariance P kf.P = np.eye(self.state_dimension) * self.initial_covariance # Set measurement noise R kf.R = self.measurement_noise # Set process noise Q kf.Q = Q_discrete_white_noise( dim=self.state_dimension, dt=self.dt, var=self.process_noise, ) return kf
[docs] def fit( self, y: Union["SeriesLike", Any], X: Optional[Union["TableLike", Any]] = None, **fit_params: Any, ) -> "KalmanFilterForecaster": """Fit Kalman filter to data. Args: y: Target time series. X: Optional exogenous data (ignored). **fit_params: Additional fit parameters. Returns: Self for method chaining. """ if X is not None: logger.warning( "Exogenous variables (X) not yet supported for KalmanFilterForecaster" ) if isinstance(y, pd.Series): series = y elif isinstance(y, pd.DataFrame) and y.shape[1] == 1: series = y.iloc[:, 0] else: raise ValueError("y must be SeriesLike (Series or single-column DataFrame)") series = ensure_datetime_index(series) self.train_index_ = series.index self.train_values_ = series.values # Create and fit Kalman filter self.kf_ = self._create_kalman_filter() # Apply filter to all measurements estimates = [] for measurement in self.train_values_: self.kf_.predict() self.kf_.update(measurement) estimates.append(self.kf_.x.copy()) self.estimates_ = np.array(estimates) # Store last state for forecasting self.last_state_ = self.kf_.x.copy() self.last_covariance_ = self.kf_.P.copy() self._is_fitted = True return self
[docs] def predict( self, fh: Union[int, list, Any], X: Optional[Union["TableLike", Any]] = None, **predict_params: Any, ) -> Forecast: """Generate forecast. Args: fh: Forecast horizon (integer or array). X: Optional exogenous data (ignored). **predict_params: Additional prediction parameters. Returns: Forecast object with predictions. """ self._check_is_fitted() if X is not None: logger.warning( "Exogenous variables (X) not yet supported for KalmanFilterForecaster" ) # Convert fh to integer if isinstance(fh, (list, np.ndarray)): n_periods = len(fh) elif isinstance(fh, int): n_periods = fh else: n_periods = int(fh) # Generate forecast by propagating state forward predictions = [] current_state = self.last_state_.copy() current_cov = self.last_covariance_.copy() for _ in range(n_periods): # Predict next state current_state = self.kf_.F @ current_state current_cov = self.kf_.F @ current_cov @ self.kf_.F.T + self.kf_.Q # Extract measurement (position) prediction = (self.kf_.H @ current_state)[0] predictions.append(prediction) # Create forecast Series freq = pd.infer_freq(self.train_index_) or "D" last_date = self.train_index_[-1] future_index = pd.date_range( start=last_date + pd.Timedelta(days=1), periods=n_periods, freq=freq, ) y_pred = pd.Series(predictions, index=future_index) return Forecast(y_pred=y_pred, fh=fh)
[docs] def predict_interval( self, fh: Any, X: Optional[Any] = None, coverage: float = 0.9, **predict_params: Any, ) -> Forecast: """Generate forecast with prediction intervals. Args: fh: Forecast horizon. X: Optional exogenous data. coverage: Coverage level (e.g., 0.9 for 90%). **predict_params: Additional prediction parameters. Returns: Forecast with intervals. """ self._check_is_fitted() # Get point forecast forecast = self.predict(fh, X, **predict_params) # Calculate intervals from covariance predictions = [] lower_bounds = [] upper_bounds = [] current_state = self.last_state_.copy() current_cov = self.last_covariance_.copy() from scipy import stats z_score = stats.norm.ppf((1 + coverage) / 2) for _ in range(len(forecast.y_pred)): # Predict next state current_state = self.kf_.F @ current_state current_cov = self.kf_.F @ current_cov @ self.kf_.F.T + self.kf_.Q # Extract measurement and uncertainty prediction = (self.kf_.H @ current_state)[0] measurement_cov = self.kf_.H @ current_cov @ self.kf_.H.T std_error = np.sqrt(measurement_cov[0, 0]) predictions.append(prediction) lower_bounds.append(prediction - z_score * std_error) upper_bounds.append(prediction + z_score * std_error) y_int = pd.DataFrame( { "lower": lower_bounds, "upper": upper_bounds, }, index=forecast.y_pred.index, ) return Forecast(y_pred=forecast.y_pred, fh=fh, y_int=y_int)