Source code for timesmith.forecasters.var

"""Vector Autoregression (VAR) forecaster implementation."""

import logging
from typing import TYPE_CHECKING, Any, Optional, Union

import numpy as np
import pandas as pd

from timesmith.core.base import BaseForecaster
from timesmith.core.tags import set_tags
from timesmith.results.forecast import Forecast

if TYPE_CHECKING:
    from timesmith.typing import SeriesLike, TableLike

logger = logging.getLogger(__name__)

try:
    from statsmodels.tsa.api import VAR
    from statsmodels.tsa.stattools import adfuller

    HAS_STATSMODELS = True
except ImportError:
    VAR = None
    adfuller = None
    HAS_STATSMODELS = False
    logger.warning(
        "statsmodels not installed. VARForecaster will not work. "
        "Install with: pip install statsmodels"
    )


[docs] class VARForecaster(BaseForecaster): """Vector Autoregression (VAR) forecaster for multivariate time series. Fits a VAR model to multiple interdependent time series and generates forecasts for all series simultaneously. """
[docs] def __init__( self, maxlags: int = 10, ic: str = "aic", force_differencing: bool = False, verbose: bool = False, ): """Initialize VAR forecaster. Args: maxlags: Maximum number of lags to consider for model selection. ic: Information criterion for lag selection ('aic', 'bic', 'fpe', 'hqic'). force_differencing: Whether to force first-order differencing (default: False). verbose: Whether to print diagnostic information. """ if not HAS_STATSMODELS: raise ImportError( "statsmodels is required for VARForecaster. " "Install with: pip install statsmodels" ) super().__init__() self.maxlags = maxlags self.ic = ic self.force_differencing = force_differencing self.verbose = verbose set_tags( self, scitype_input="PanelLike", scitype_output="PanelLike", handles_missing=False, requires_sorted_index=True, supports_panel=True, requires_fh=True, )
[docs] def fit( self, y: Union["SeriesLike", Any], X: Optional[Union["TableLike", Any]] = None, **fit_params: Any, ) -> "VARForecaster": """Fit VAR model. Args: y: Multivariate time series (DataFrame with multiple columns or PanelLike). X: Optional exogenous data (not yet supported). **fit_params: Additional fit parameters. Returns: Self for method chaining. """ if X is not None: logger.warning( "Exogenous variables (X) not yet supported for VARForecaster" ) # Convert to DataFrame if needed if isinstance(y, pd.Series): raise ValueError( "VARForecaster requires multivariate data (DataFrame with multiple columns)" ) elif isinstance(y, pd.DataFrame): df = y.copy() else: df = pd.DataFrame(y) if df.shape[1] < 2: raise ValueError("VARForecaster requires at least 2 time series (columns)") # Ensure datetime index if not isinstance(df.index, pd.DatetimeIndex): if isinstance(df.index, pd.RangeIndex): # Create a default datetime index df.index = pd.date_range(start="2000-01-01", periods=len(df), freq="D") else: df.index = pd.to_datetime(df.index) self.train_index_ = df.index self.column_names_ = df.columns.tolist() # Check stationarity and apply differencing if needed if self.force_differencing: df_stationary = df.diff().dropna() self.differenced_ = True if self.verbose: logger.info("Applied first-order differencing") else: # Check stationarity for each series df_stationary = df.copy() self.differenced_ = False needs_differencing = False for col in df.columns: result = adfuller(df[col].dropna()) if result[1] > 0.05: if self.verbose: logger.info(f"{col}: Non-stationary (p-value={result[1]:.4f})") needs_differencing = True elif self.verbose: logger.info(f"{col}: Stationary (p-value={result[1]:.4f})") if needs_differencing: df_stationary = df.diff().dropna() self.differenced_ = True if self.verbose: logger.info( "Applied first-order differencing due to non-stationarity" ) # Fit VAR model self.model_ = VAR(df_stationary) lag_order = self.model_.select_order(maxlags=self.maxlags) # Select optimal lag based on information criterion if self.ic == "aic": optimal_lag = lag_order.aic elif self.ic == "bic": optimal_lag = lag_order.bic elif self.ic == "fpe": optimal_lag = lag_order.fpe elif self.ic == "hqic": optimal_lag = lag_order.hqic else: optimal_lag = lag_order.aic self.optimal_lag_ = optimal_lag if self.verbose: logger.info(f"Optimal lag order: {optimal_lag}") # Fit model with optimal lag self.fitted_model_ = self.model_.fit(optimal_lag) if self.verbose: logger.info(f"VAR({optimal_lag}) model fitted") self._is_fitted = True return self
[docs] def predict( self, fh: Union[int, list, Any], X: Optional[Union["TableLike", Any]] = None, **predict_params: Any, ) -> Forecast: """Generate forecast. Args: fh: Forecast horizon (integer or array). X: Optional exogenous data (ignored). **predict_params: Additional prediction parameters. Returns: Forecast object with predictions for all series. """ self._check_is_fitted() if X is not None: logger.warning( "Exogenous variables (X) not yet supported for VARForecaster" ) # Convert fh to integer if isinstance(fh, (list, np.ndarray)): n_periods = len(fh) elif isinstance(fh, int): n_periods = fh else: n_periods = int(fh) # Generate forecast # Use the last optimal_lag_ observations as starting point forecast = self.fitted_model_.forecast( self.fitted_model_.y[-self.optimal_lag_ :], steps=n_periods ) # Convert to DataFrame forecast_df = pd.DataFrame( forecast, columns=self.column_names_, ) # Create future index freq = pd.infer_freq(self.train_index_) or "D" last_date = self.train_index_[-1] future_index = pd.date_range( start=last_date + pd.Timedelta(days=1), periods=n_periods, freq=freq, ) forecast_df.index = future_index # Undo differencing if needed if self.differenced_: # Get last values before differencing last_values = self.fitted_model_.y[-1] # Cumulative sum to undo differencing forecast_df = forecast_df.cumsum() + last_values # Return as Forecast object # For multivariate, y_pred is a DataFrame return Forecast(y_pred=forecast_df, fh=fh)
[docs] def predict_interval( self, fh: Any, X: Optional[Any] = None, coverage: float = 0.9, **predict_params: Any, ) -> Forecast: """Generate forecast with prediction intervals. Args: fh: Forecast horizon. X: Optional exogenous data. coverage: Coverage level (e.g., 0.9 for 90%). **predict_params: Additional prediction parameters. Returns: Forecast with intervals. """ self._check_is_fitted() # Get point forecast forecast = self.predict(fh, X, **predict_params) # VAR models don't provide built-in prediction intervals # We'll estimate them from residuals residuals = self.fitted_model_.resid # Calculate standard errors for each variable std_errors = {} for i, col in enumerate(self.column_names_): std_errors[col] = ( np.std(residuals.iloc[:, i]) if hasattr(residuals, "iloc") else np.std(residuals[:, i]) ) # Use z-score for intervals from scipy import stats z_score = stats.norm.ppf((1 + coverage) / 2) # Create intervals DataFrame intervals = {} for col in self.column_names_: margin = z_score * std_errors[col] intervals[f"{col}_lower"] = forecast.y_pred[col] - margin intervals[f"{col}_upper"] = forecast.y_pred[col] + margin y_int = pd.DataFrame(intervals, index=forecast.y_pred.index) return Forecast(y_pred=forecast.y_pred, fh=fh, y_int=y_int)