Source code for timesmith.core.seasonal

"""Seasonal baseline anomaly detection for time series."""

import logging
from typing import Any, Optional

import numpy as np
import pandas as pd

from timesmith.core.base import BaseDetector
from timesmith.core.tags import set_tags

logger = logging.getLogger(__name__)


[docs] class SeasonalBaselineDetector(BaseDetector): """Seasonal baseline anomaly detector for time series. Calculates seasonal baselines (e.g., weekly, monthly) and flags points that deviate significantly from expected seasonal patterns. """
[docs] def __init__( self, seasonality: str = "week", threshold_sigma: float = 2.5, ): """Initialize seasonal baseline detector. Args: seasonality: Seasonality to use. Options: 'week', 'month', 'day', 'hour'. threshold_sigma: Number of standard deviations for threshold. """ super().__init__() self.seasonality = seasonality self.threshold_sigma = threshold_sigma self.seasonal_stats_ = None set_tags( self, scitype_input="SeriesLike", scitype_output="SeriesLike", handles_missing=False, requires_sorted_index=True, )
def _get_seasonal_key(self, dates: pd.Series) -> pd.Series: """Extract seasonal key from dates based on seasonality type.""" seasonality_map = { "week": lambda d: d.dt.isocalendar().week, "month": lambda d: d.dt.month, "day": lambda d: d.dt.dayofyear, "hour": lambda d: d.dt.hour, } if self.seasonality not in seasonality_map: raise ValueError(f"Unknown seasonality: {self.seasonality}") return seasonality_map[self.seasonality](dates)
[docs] def fit( self, y: Any, X: Optional[Any] = None, **fit_params: Any ) -> "SeasonalBaselineDetector": """Fit the detector by computing seasonal baselines. Args: y: Target time series with datetime index. X: Optional exogenous data (ignored). **fit_params: Additional fit parameters. Returns: Self for method chaining. """ if isinstance(y, pd.Series): series = y elif isinstance(y, pd.DataFrame) and y.shape[1] == 1: series = y.iloc[:, 0] else: raise ValueError( "y must be Series or single-column DataFrame with datetime index" ) if not isinstance(series.index, pd.DatetimeIndex): raise ValueError( "Data must have datetime index for seasonal baseline detection" ) # Use vectorized NumPy operations for seasonal statistics values = series.values seasonal_keys = self._get_seasonal_key(series.index) # Convert keys to integer indices for efficient numpy operations unique_keys = pd.Series(seasonal_keys).unique() key_to_idx = {key: idx for idx, key in enumerate(unique_keys)} key_indices = np.array( [key_to_idx[key] for key in seasonal_keys], dtype=np.int32 ) n_keys = len(unique_keys) # Vectorized computation of mean and std per group means = np.zeros(n_keys, dtype=np.float64) stds = np.zeros(n_keys, dtype=np.float64) for i, key in enumerate(unique_keys): mask = key_indices == i if mask.sum() > 0: group_values = values[mask] means[i] = np.mean(group_values) std_val = np.std(group_values, ddof=1) if len(group_values) > 1 else 0.0 stds[i] = std_val if std_val > 0 else 1.0 # Create DataFrame for compatibility seasonal_stats = pd.DataFrame( { "seasonal_key": unique_keys, "mean": means, "std": stds, } ) self.seasonal_stats_ = seasonal_stats self.index_ = series.index self.y_ = series.values self._is_fitted = True return self
[docs] def score(self, y: Any, X: Optional[Any] = None) -> np.ndarray: """Compute Z-scores relative to seasonal baseline. Args: y: Target time series (should match fit data). X: Optional exogenous data (ignored). Returns: Array of Z-scores relative to seasonal baseline. """ self._check_is_fitted() if isinstance(y, pd.Series): series = y elif isinstance(y, pd.DataFrame) and y.shape[1] == 1: series = y.iloc[:, 0] else: raise ValueError("y must be Series or single-column DataFrame") if not isinstance(series.index, pd.DatetimeIndex): raise ValueError("Data must have datetime index") # Create DataFrame df = pd.DataFrame({"value": series.values, "date": series.index}) df["seasonal_key"] = self._get_seasonal_key(df["date"]) # Merge with seasonal stats df = df.merge(self.seasonal_stats_, on="seasonal_key", how="left") # Compute Z-scores z_scores = np.abs((df["value"] - df["mean"]) / df["std"]) z_scores = z_scores.fillna(0).values return z_scores
[docs] def predict( self, y: Any, X: Optional[Any] = None, threshold: Optional[float] = None ) -> np.ndarray: """Predict anomaly flags. Args: y: Target time series (should match fit data). X: Optional exogenous data (ignored). threshold: Optional threshold (uses self.threshold_sigma if not provided). Returns: Boolean array with True at anomalies. """ threshold = threshold or self.threshold_sigma scores = self.score(y, X) flags = np.zeros(len(scores), dtype=bool) flags[scores > threshold] = True logger.info( f"Seasonal baseline detector found {flags.sum()} anomalies " f"(seasonality={self.seasonality}, threshold={threshold})" ) return flags