Source code for timesmith.forecasters.var

"""Vector Autoregression (VAR) forecaster implementation."""

import logging
from typing import Any, Optional

import numpy as np
import pandas as pd

from timesmith.core.base import BaseForecaster
from timesmith.core.tags import set_tags
from timesmith.results.forecast import Forecast

logger = logging.getLogger(__name__)

try:
    from statsmodels.tsa.api import VAR
    from statsmodels.tsa.stattools import adfuller

    HAS_STATSMODELS = True
except ImportError:
    VAR = None
    adfuller = None
    HAS_STATSMODELS = False
    logger.warning(
        "statsmodels not installed. VARForecaster will not work. "
        "Install with: pip install statsmodels"
    )


[docs] class VARForecaster(BaseForecaster): """Vector Autoregression (VAR) forecaster for multivariate time series. Fits a VAR model to multiple interdependent time series and generates forecasts for all series simultaneously. """
[docs] def __init__( self, maxlags: int = 10, ic: str = "aic", force_differencing: bool = False, verbose: bool = False, ): """Initialize VAR forecaster. Args: maxlags: Maximum number of lags to consider for model selection. ic: Information criterion for lag selection ('aic', 'bic', 'fpe', 'hqic'). force_differencing: Whether to force first-order differencing (default: False). verbose: Whether to print diagnostic information. """ if not HAS_STATSMODELS: raise ImportError( "statsmodels is required for VARForecaster. " "Install with: pip install statsmodels" ) super().__init__() self.maxlags = maxlags self.ic = ic self.force_differencing = force_differencing self.verbose = verbose set_tags( self, scitype_input="PanelLike", scitype_output="PanelLike", handles_missing=False, requires_sorted_index=True, supports_panel=True, requires_fh=True, )
[docs] def fit( self, y: Any, X: Optional[Any] = None, **fit_params: Any ) -> "VARForecaster": """Fit VAR model. Args: y: Multivariate time series (DataFrame with multiple columns or PanelLike). X: Optional exogenous data (not yet supported). **fit_params: Additional fit parameters. Returns: Self for method chaining. """ if X is not None: logger.warning( "Exogenous variables (X) not yet supported for VARForecaster" ) # Convert to DataFrame if needed if isinstance(y, pd.Series): raise ValueError( "VARForecaster requires multivariate data (DataFrame with multiple columns)" ) elif isinstance(y, pd.DataFrame): df = y.copy() else: df = pd.DataFrame(y) if df.shape[1] < 2: raise ValueError("VARForecaster requires at least 2 time series (columns)") # Ensure datetime index if not isinstance(df.index, pd.DatetimeIndex): if isinstance(df.index, pd.RangeIndex): # Create a default datetime index df.index = pd.date_range(start="2000-01-01", periods=len(df), freq="D") else: df.index = pd.to_datetime(df.index) self.train_index_ = df.index self.column_names_ = df.columns.tolist() # Check stationarity and apply differencing if needed if self.force_differencing: df_stationary = df.diff().dropna() self.differenced_ = True if self.verbose: logger.info("Applied first-order differencing") else: # Check stationarity for each series df_stationary = df.copy() self.differenced_ = False needs_differencing = False for col in df.columns: result = adfuller(df[col].dropna()) if result[1] > 0.05: if self.verbose: logger.info(f"{col}: Non-stationary (p-value={result[1]:.4f})") needs_differencing = True elif self.verbose: logger.info(f"{col}: Stationary (p-value={result[1]:.4f})") if needs_differencing: df_stationary = df.diff().dropna() self.differenced_ = True if self.verbose: logger.info( "Applied first-order differencing due to non-stationarity" ) # Fit VAR model self.model_ = VAR(df_stationary) lag_order = self.model_.select_order(maxlags=self.maxlags) # Select optimal lag based on information criterion if self.ic == "aic": optimal_lag = lag_order.aic elif self.ic == "bic": optimal_lag = lag_order.bic elif self.ic == "fpe": optimal_lag = lag_order.fpe elif self.ic == "hqic": optimal_lag = lag_order.hqic else: optimal_lag = lag_order.aic self.optimal_lag_ = optimal_lag if self.verbose: logger.info(f"Optimal lag order: {optimal_lag}") # Fit model with optimal lag self.fitted_model_ = self.model_.fit(optimal_lag) if self.verbose: logger.info(f"VAR({optimal_lag}) model fitted") self._is_fitted = True return self
[docs] def predict( self, fh: Any, X: Optional[Any] = None, **predict_params: Any ) -> Forecast: """Generate forecast. Args: fh: Forecast horizon (integer or array). X: Optional exogenous data (ignored). **predict_params: Additional prediction parameters. Returns: Forecast object with predictions for all series. """ self._check_is_fitted() if X is not None: logger.warning( "Exogenous variables (X) not yet supported for VARForecaster" ) # Convert fh to integer if isinstance(fh, (list, np.ndarray)): n_periods = len(fh) elif isinstance(fh, int): n_periods = fh else: n_periods = int(fh) # Generate forecast # Use the last optimal_lag_ observations as starting point forecast = self.fitted_model_.forecast( self.fitted_model_.y[-self.optimal_lag_ :], steps=n_periods ) # Convert to DataFrame forecast_df = pd.DataFrame( forecast, columns=self.column_names_, ) # Create future index freq = pd.infer_freq(self.train_index_) or "D" last_date = self.train_index_[-1] future_index = pd.date_range( start=last_date + pd.Timedelta(days=1), periods=n_periods, freq=freq, ) forecast_df.index = future_index # Undo differencing if needed if self.differenced_: # Get last values before differencing last_values = self.fitted_model_.y[-1] # Cumulative sum to undo differencing forecast_df = forecast_df.cumsum() + last_values # Return as Forecast object # For multivariate, y_pred is a DataFrame return Forecast(y_pred=forecast_df, fh=fh)
[docs] def predict_interval( self, fh: Any, X: Optional[Any] = None, coverage: float = 0.9, **predict_params: Any, ) -> Forecast: """Generate forecast with prediction intervals. Args: fh: Forecast horizon. X: Optional exogenous data. coverage: Coverage level (e.g., 0.9 for 90%). **predict_params: Additional prediction parameters. Returns: Forecast with intervals. """ self._check_is_fitted() # Get point forecast forecast = self.predict(fh, X, **predict_params) # VAR models don't provide built-in prediction intervals # We'll estimate them from residuals residuals = self.fitted_model_.resid # Calculate standard errors for each variable std_errors = {} for i, col in enumerate(self.column_names_): std_errors[col] = ( np.std(residuals.iloc[:, i]) if hasattr(residuals, "iloc") else np.std(residuals[:, i]) ) # Use z-score for intervals from scipy import stats z_score = stats.norm.ppf((1 + coverage) / 2) # Create intervals DataFrame intervals = {} for col in self.column_names_: margin = z_score * std_errors[col] intervals[f"{col}_lower"] = forecast.y_pred[col] - margin intervals[f"{col}_upper"] = forecast.y_pred[col] + margin y_int = pd.DataFrame(intervals, index=forecast.y_pred.index) return Forecast(y_pred=forecast.y_pred, fh=fh, y_int=y_int)