Source code for timesmith.forecasters.ensemble

"""Ensemble forecasting using classification + regression approach.

This forecaster uses a hybrid approach where:
- Classification model predicts direction (increase/decrease)
- Regression model predicts magnitude using classification predictions as features

This approach can outperform standard ARIMA models for complex patterns and noisy data.
"""

import logging
from typing import Any, Optional, Tuple

import numpy as np
import pandas as pd

from timesmith.core.base import BaseForecaster
from timesmith.core.tags import set_tags
from timesmith.results.forecast import Forecast

logger = logging.getLogger(__name__)

# Optional sklearn for ensemble methods
try:
    from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
    from sklearn.preprocessing import StandardScaler

    SKLEARN_AVAILABLE = True
except ImportError:
    SKLEARN_AVAILABLE = False
    logger.warning(
        "scikit-learn not available. EnsembleForecaster requires sklearn. "
        "Install with: pip install scikit-learn"
    )


[docs] class EnsembleForecaster(BaseForecaster): """Ensemble forecaster combining classification and regression models. Uses Random Forest classifier to predict direction (up/down) and Random Forest regressor to predict magnitude, with classification predictions as features. """
[docs] def __init__( self, n_lags: int = 2, random_state: Optional[int] = None, n_estimators_classifier: int = 100, n_estimators_regressor: int = 100, max_depth: Optional[int] = None, ): """Initialize ensemble forecaster. Args: n_lags: Number of lagged features to use. random_state: Random seed for reproducibility. n_estimators_classifier: Number of trees for classifier. n_estimators_regressor: Number of trees for regressor. max_depth: Maximum depth of trees (None = unlimited). """ if not SKLEARN_AVAILABLE: raise ImportError( "scikit-learn is required for EnsembleForecaster. " "Install with: pip install scikit-learn" ) super().__init__() self.n_lags = n_lags self.random_state = random_state self.classifier = RandomForestClassifier( n_estimators=n_estimators_classifier, max_depth=max_depth, random_state=random_state, n_jobs=-1, ) self.regressor = RandomForestRegressor( n_estimators=n_estimators_regressor, max_depth=max_depth, random_state=random_state, n_jobs=-1, ) self.scaler = StandardScaler() set_tags( self, scitype_input="SeriesLike", scitype_output="ForecastLike", handles_missing=False, requires_sorted_index=True, )
[docs] def fit( self, y: Any, X: Optional[Any] = None, **fit_params: Any ) -> "EnsembleForecaster": """Fit ensemble models on training data. Args: y: Target time series. X: Optional exogenous data (not yet supported). **fit_params: Additional fit parameters. Returns: Self for method chaining. """ if X is not None: logger.warning("Exogenous data X not yet supported in EnsembleForecaster") if isinstance(y, pd.Series): self.y_ = y.values self.index_ = y.index elif isinstance(y, pd.DataFrame) and y.shape[1] == 1: self.y_ = y.iloc[:, 0].values self.index_ = y.index else: self.y_ = np.asarray(y, dtype=float) self.index_ = np.arange(len(self.y_)) # Remove invalid values valid_mask = np.isfinite(self.y_) self.y_ = self.y_[valid_mask] self.index_ = self.index_[valid_mask] if len(self.y_) < self.n_lags + 10: raise ValueError( f"Need at least {self.n_lags + 10} data points for training" ) # Create features X_features, y_class, y_reg = self._create_features(self.y_) if len(X_features) < 20: raise ValueError("Not enough data points after feature creation") # Scale features X_scaled = self.scaler.fit_transform(X_features) # Train classification model self.classifier.fit(X_scaled, y_class) # Get classification predictions for training y_class_pred = self.classifier.predict(X_scaled) # Add classification predictions as feature for regression X_reg = np.column_stack([X_scaled, y_class_pred]) # Train regression model self.regressor.fit(X_reg, y_reg) self._is_fitted = True return self
[docs] def predict( self, fh: Any, X: Optional[Any] = None, **predict_params: Any ) -> Forecast: """Generate forecasts. Args: fh: Forecast horizon (integer or array-like). X: Optional exogenous data (not yet supported). **predict_params: Additional prediction parameters. Returns: Forecast results. """ self._check_is_fitted() if X is not None: logger.warning("Exogenous data X not yet supported in EnsembleForecaster") # Convert fh to integer if isinstance(fh, (int, np.integer)): n_steps = int(fh) elif isinstance(fh, (list, np.ndarray, pd.Index)): n_steps = len(fh) fh_arr = np.asarray(fh) else: raise ValueError(f"Unsupported fh type: {type(fh)}") # Recursive prediction predictions = [] current_data = self.y_.copy() for step in range(n_steps): # Create features from current data X_step, _, _ = self._create_features(current_data) if len(X_step) == 0: # Not enough data, use last value predictions.append(current_data[-1]) continue # Use last row X_step_last = ( X_step.iloc[[-1]] if isinstance(X_step, pd.DataFrame) else X_step[-1:] ) # Scale features X_step_scaled = self.scaler.transform(X_step_last) # Predict direction direction_pred = self.classifier.predict(X_step_scaled) # Add direction prediction as feature X_step_reg = np.column_stack([X_step_scaled, direction_pred]) # Predict value pred_value = self.regressor.predict(X_step_reg)[0] predictions.append(pred_value) # Append prediction to current_data for next iteration current_data = np.append(current_data, pred_value) predictions = np.array(predictions) # Convert to Series if isinstance(fh, (list, np.ndarray, pd.Index)): y_pred_series = pd.Series(predictions, index=fh_arr) else: y_pred_series = pd.Series(predictions) return Forecast( y_pred=y_pred_series, fh=fh, metadata={ "n_lags": self.n_lags, "method": "ensemble_classification_regression", }, )
def _create_features( self, data: np.ndarray ) -> Tuple[pd.DataFrame, pd.Series, pd.Series]: """Create features for classification and regression. Args: data: Time series data. Returns: Tuple of (features, direction_target, regression_target). """ df = pd.DataFrame({"value": data}) # Create lagged features for lag in range(1, self.n_lags + 1): df[f"lag_{lag}"] = df["value"].shift(lag) # Rate of change df["rate_of_change"] = df["value"].diff() # Moving averages if len(df) > 5: df["ma_3"] = df["value"].rolling(window=3, min_periods=1).mean() df["ma_5"] = df["value"].rolling(window=5, min_periods=1).mean() # Target for classification: 1 if next value increases, 0 if decreases df["direction"] = (df["value"].shift(-1) > df["value"]).astype(int) # Target for regression: next value df["next_value"] = df["value"].shift(-1) # Drop NaN rows df = df.dropna() if len(df) == 0: raise ValueError("Not enough data to create features") # Features (excluding targets) feature_cols = [ col for col in df.columns if col not in ["direction", "next_value", "value"] ] X = df[feature_cols] y_class = df["direction"] y_reg = df["next_value"] return X, y_class, y_reg