"""Backtest functionality for forecasters."""
import logging
from typing import Any, Optional
import numpy as np
import pandas as pd
from timesmith.core.base import BaseForecaster
from timesmith.eval.metrics import mae, mape, rmse
from timesmith.eval.splitters import ExpandingWindowSplit
from timesmith.results.backtest import BacktestResult
from timesmith.tasks.forecast import ForecastTask
logger = logging.getLogger(__name__)
[docs]
def backtest_forecaster(
forecaster: BaseForecaster,
task: ForecastTask,
splitter: Optional[Any] = None,
metrics: Optional[list] = None,
) -> BacktestResult:
"""Run backtest on a forecaster with a task.
Args:
forecaster: Forecaster or forecaster pipeline to test.
task: ForecastTask with y, fh, and optional X.
splitter: Optional splitter (defaults to ExpandingWindowSplit).
metrics: Optional list of metric functions (defaults to [mae, rmse, mape]).
Returns:
BacktestResult with results table and summary.
"""
if metrics is None:
metrics = [mae, rmse, mape]
if splitter is None:
# Default: use expanding window with initial window = 80% of data
n = len(task.y)
initial_window = max(1, int(0.8 * n))
splitter = ExpandingWindowSplit(initial_window=initial_window, fh=task.fh)
results_rows = []
fold_id = 0
for train_idx, test_idx, cutoff in splitter.split(task.y):
# Get train/test splits
y_train = (
task.y.iloc[train_idx] if hasattr(task.y, "iloc") else task.y[train_idx]
)
y_test = task.y.iloc[test_idx] if hasattr(task.y, "iloc") else task.y[test_idx]
X_train = None
X_test = None
if task.X is not None:
X_train = (
task.X.iloc[train_idx] if hasattr(task.X, "iloc") else task.X[train_idx]
)
X_test = (
task.X.iloc[test_idx] if hasattr(task.X, "iloc") else task.X[test_idx]
)
# Fit forecaster
logger.debug(f"Fitting forecaster for fold {fold_id}")
forecaster.fit(y_train, X_train)
# Predict
logger.debug(f"Predicting for fold {fold_id}")
forecast = forecaster.predict(task.fh, X_test)
# Extract predictions
if hasattr(forecast, "y_pred"):
y_pred = forecast.y_pred
elif isinstance(forecast, pd.Series):
y_pred = forecast
elif isinstance(forecast, pd.DataFrame):
y_pred = forecast.iloc[:, 0] if forecast.shape[1] > 0 else forecast
else:
y_pred = forecast
# Ensure y_pred and y_test are aligned
y_pred = _align_predictions(y_pred, y_test)
# Compute metrics
metric_values = {}
for metric_func in metrics:
try:
metric_name = metric_func.__name__
metric_value = metric_func(y_test, y_pred)
metric_values[metric_name] = metric_value
except Exception as e:
logger.warning(f"Error computing {metric_func.__name__}: {e}")
metric_values[metric_func.__name__] = None
# Store results
results_rows.append(
{
"fold_id": fold_id,
"cutoff": cutoff,
"fh": task.fh,
"y_true": y_test,
"y_pred": y_pred,
**metric_values,
}
)
fold_id += 1
# Create results DataFrame
results_df = pd.DataFrame(results_rows)
# Compute summary metrics
summary = {}
for metric_func in metrics:
metric_name = metric_func.__name__
if metric_name in results_df.columns:
values = results_df[metric_name].dropna()
if len(values) > 0:
summary[f"mean_{metric_name}"] = float(values.mean())
summary[f"std_{metric_name}"] = float(values.std())
# Create per-fold metrics DataFrame
metric_cols = [m.__name__ for m in metrics if m.__name__ in results_df.columns]
per_fold_metrics = results_df[["fold_id", "cutoff"] + metric_cols].copy()
return BacktestResult(
results=results_df,
summary=summary,
per_fold_metrics=per_fold_metrics,
)
def _align_predictions(y_pred: Any, y_test: Any) -> Any:
"""Align predictions with test data.
Args:
y_pred: Predictions.
y_test: Test data.
Returns:
Aligned predictions.
"""
import pandas as pd
if isinstance(y_pred, pd.Series) and isinstance(y_test, pd.Series):
# Try to align by index
if len(y_pred) == len(y_test):
return y_pred.values
elif len(y_pred) > len(y_test):
return y_pred.iloc[: len(y_test)].values
else:
# Pad with last value if needed
last_val = y_pred.iloc[-1]
padded = pd.Series([last_val] * (len(y_test) - len(y_pred)))
return pd.concat([y_pred, padded]).values
# Convert to array and take first len(y_test) values
if hasattr(y_pred, "values"):
y_pred = y_pred.values
elif hasattr(y_pred, "__array__"):
y_pred = y_pred.__array__()
if isinstance(y_test, pd.Series):
n = len(y_test)
else:
n = len(y_test)
if len(y_pred) >= n:
return y_pred[:n]
else:
# Pad with last value
last_val = y_pred[-1]
return np.concatenate([y_pred, [last_val] * (n - len(y_pred))])