Source code for timesmith.core.decomposition

"""Time series decomposition transformers.

Provides trend and seasonality detection and removal for time series analysis.
"""

import logging
from typing import Any, Dict, Optional

import numpy as np
import pandas as pd

from timesmith.core.base import BaseTransformer
from timesmith.core.tags import set_tags
from timesmith.typing import SeriesLike

logger = logging.getLogger(__name__)

# Optional scipy imports
try:
    from scipy import signal, stats
    from scipy.ndimage import uniform_filter1d

    HAS_SCIPY = True
except ImportError:
    HAS_SCIPY = False
    logger.warning(
        "scipy not installed. Decomposition functionality will be limited. "
        "Install with: pip install scipy"
    )


def _detect_seasonal_period(data: np.ndarray, max_period: int = 50) -> Optional[int]:
    """Detect seasonal period using autocorrelation.

    Args:
        data: Time series data.
        max_period: Maximum period to check.

    Returns:
        Detected seasonal period or None.
    """
    n = len(data)
    if n < max_period * 2:
        return None

    # Compute autocorrelation
    autocorr = np.correlate(data, data, mode="full")
    autocorr = autocorr[n - 1 :] / autocorr[n - 1]

    # Find peaks in autocorrelation (potential seasonal periods)
    if not HAS_SCIPY:
        # Fallback: find peaks manually
        peaks = []
        for i in range(1, min(max_period, len(autocorr) - 1)):
            if (
                autocorr[i] > 0.3
                and autocorr[i] > autocorr[i - 1]
                and autocorr[i] > autocorr[i + 1]
            ):
                peaks.append(i)
        peaks = np.array(peaks)
    else:
        peaks, _ = signal.find_peaks(autocorr[1:max_period], height=0.3)

    if len(peaks) > 0:
        # Return first significant peak
        return int(peaks[0] + 1)

    return None


[docs] def detect_trend(y: SeriesLike, method: str = "linear") -> Dict[str, Any]: """Detect trend in time series data. Args: y: Time series values. method: Trend detection method: 'linear', 'polynomial', or 'moving_average'. Returns: Dictionary with trend information. """ if isinstance(y, pd.Series): y_arr = y.values elif isinstance(y, pd.DataFrame) and y.shape[1] == 1: y_arr = y.iloc[:, 0].values else: y_arr = np.asarray(y, dtype=float) valid_mask = np.isfinite(y_arr) y_arr = y_arr[valid_mask] if len(y_arr) < 3: raise ValueError("Need at least 3 data points") time_arr = np.arange(len(y_arr)) if method == "linear": slope, intercept, r_value, _, _ = np.polyfit(time_arr, y_arr, 1, full=False) trend = slope * time_arr + intercept strength = abs(r_value) return { "trend": trend, "slope": float(slope), "intercept": float(intercept), "strength": float(strength), } elif method == "theil_sen": # Theil-Sen estimator: more robust to outliers if not HAS_SCIPY: logger.warning("scipy not available, falling back to linear trend") # Fall back to linear slope, intercept, r_value, _, _ = np.polyfit(time_arr, y_arr, 1, full=False) trend = slope * time_arr + intercept return { "trend": trend, "slope": float(slope), "intercept": float(intercept), "strength": float(abs(r_value)), } else: try: slope, intercept = stats.theilslopes(y_arr, time_arr)[:2] # Approximate correlation for Theil-Sen r_value = np.corrcoef(time_arr, y_arr)[0, 1] trend = slope * time_arr + intercept return { "trend": trend, "slope": float(slope), "intercept": float(intercept), "strength": float(abs(r_value)), } except Exception as e: logger.warning(f"Theil-Sen failed: {e}, falling back to linear") # Fall back to linear slope, intercept, r_value, _, _ = np.polyfit( time_arr, y_arr, 1, full=False ) trend = slope * time_arr + intercept return { "trend": trend, "slope": float(slope), "intercept": float(intercept), "strength": float(abs(r_value)), } elif method == "polynomial": coeffs = np.polyfit(time_arr, y_arr, deg=2) trend = np.polyval(coeffs, time_arr) # Calculate R-squared as strength ss_res = np.sum((y_arr - trend) ** 2) ss_tot = np.sum((y_arr - np.mean(y_arr)) ** 2) strength = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0 return { "trend": trend, "coefficients": coeffs.tolist(), "strength": float(strength), } elif method == "moving_average": window = max(3, len(y_arr) // 10) if not HAS_SCIPY: # Fallback: simple moving average trend = np.convolve(y_arr, np.ones(window) / window, mode="same") else: trend = uniform_filter1d(y_arr, size=window, mode="nearest") # Calculate trend strength as variance reduction var_original = np.var(y_arr) var_residual = np.var(y_arr - trend) strength = 1 - (var_residual / var_original) if var_original > 0 else 0 return { "trend": trend, "strength": float(strength), } else: raise ValueError( f"Unknown method: {method}. " "Use 'linear', 'theil_sen', 'polynomial', or 'moving_average'" )
[docs] def detect_seasonality(y: SeriesLike, max_period: int = 50) -> Dict[str, Any]: """Detect seasonality in time series data. Args: y: Time series values. max_period: Maximum period to check. Returns: Dictionary with seasonality information. """ if isinstance(y, pd.Series): y_arr = y.values elif isinstance(y, pd.DataFrame) and y.shape[1] == 1: y_arr = y.iloc[:, 0].values else: y_arr = np.asarray(y, dtype=float) valid_mask = np.isfinite(y_arr) y_arr = y_arr[valid_mask] if len(y_arr) < max_period * 2: return { "period": None, "strength": 0.0, "pattern": None, } # Remove trend first trend_info = detect_trend(y_arr, method="linear") detrended = y_arr - trend_info["trend"] # Detect seasonal period period = _detect_seasonal_period(detrended, max_period) if period is None: return { "period": None, "strength": 0.0, "pattern": None, } # Extract seasonal pattern n = len(detrended) n_periods = n // period seasonal_pattern = np.zeros(period) for i in range(period): indices = np.arange(i, n, period) if len(indices) > 0: seasonal_pattern[i] = np.mean(detrended[indices]) # Center pattern seasonal_pattern = seasonal_pattern - np.mean(seasonal_pattern) # Calculate strength as variance explained seasonal_component = np.tile(seasonal_pattern, n_periods + 1)[:n] var_seasonal = np.var(seasonal_component) var_total = np.var(detrended) strength = var_seasonal / var_total if var_total > 0 else 0.0 return { "period": int(period), "strength": float(strength), "pattern": seasonal_pattern.tolist(), }
[docs] class DecomposeTransformer(BaseTransformer): """Decompose time series into trend, seasonal, and residual components."""
[docs] def __init__( self, method: str = "moving_average", seasonal_period: Optional[int] = None, trend_window: Optional[int] = None, ): """Initialize decomposition transformer. Args: method: Decomposition method: 'moving_average' or 'stl'. seasonal_period: Seasonal period (auto-detected if not specified). trend_window: Window size for trend extraction (auto-determined if not specified). """ super().__init__() self.method = method self.seasonal_period = seasonal_period self.trend_window = trend_window set_tags( self, scitype_input="SeriesLike", scitype_output="SeriesLike", handles_missing=False, requires_sorted_index=True, )
[docs] def fit( self, y: Any, X: Optional[Any] = None, **fit_params: Any ) -> "DecomposeTransformer": """Fit the decomposition transformer. Args: y: Target time series. X: Optional exogenous data (ignored). **fit_params: Additional fit parameters. Returns: Self for method chaining. """ if isinstance(y, pd.Series): self.y_ = y.values elif isinstance(y, pd.DataFrame) and y.shape[1] == 1: self.y_ = y.iloc[:, 0].values else: self.y_ = np.asarray(y, dtype=float) # Remove invalid values valid_mask = np.isfinite(self.y_) self.y_ = self.y_[valid_mask] if len(self.y_) < 10: raise ValueError("Need at least 10 data points for decomposition") # Store decomposition components self.components_ = self._decompose() self._is_fitted = True return self
[docs] def transform(self, y: Any, X: Optional[Any] = None) -> pd.Series: """Return residual component (original - trend - seasonal). Args: y: Target time series (should match fit data). X: Optional exogenous data (ignored). Returns: Residual component as Series. """ self._check_is_fitted() return pd.Series(self.components_["residual"])
[docs] def inverse_transform(self, y: Any, X: Optional[Any] = None) -> pd.Series: """Reconstruct original from residual by adding trend and seasonal. Args: y: Residual component. X: Optional exogenous data (ignored). Returns: Reconstructed original series. """ self._check_is_fitted() if isinstance(y, pd.Series): residual = y.values elif isinstance(y, pd.DataFrame) and y.shape[1] == 1: residual = y.iloc[:, 0].values else: residual = np.asarray(y) # Reconstruct: residual + trend + seasonal reconstructed = ( residual + self.components_["trend"] + self.components_["seasonal"] ) return pd.Series(reconstructed)
def _decompose(self) -> Dict[str, np.ndarray]: """Perform decomposition.""" n = len(self.y_) # Auto-determine trend window if self.trend_window is None: trend_window = max(3, n // 10) else: trend_window = self.trend_window # Extract trend using moving average if not HAS_SCIPY: # Fallback: simple moving average trend = np.convolve( self.y_, np.ones(trend_window) / trend_window, mode="same" ) else: trend = uniform_filter1d(self.y_, size=trend_window, mode="nearest") # Detrend detrended = self.y_ - trend # Extract seasonal component if self.seasonal_period is None: seasonal_period = _detect_seasonal_period(detrended) else: seasonal_period = self.seasonal_period seasonal = np.zeros_like(self.y_) if seasonal_period and seasonal_period > 1: # Average over seasonal periods n_periods = n // seasonal_period if n_periods > 0: seasonal_pattern = np.zeros(seasonal_period) for i in range(seasonal_period): indices = np.arange(i, n, seasonal_period) if len(indices) > 0: seasonal_pattern[i] = np.mean(detrended[indices]) # Center seasonal pattern seasonal_pattern = seasonal_pattern - np.mean(seasonal_pattern) # Replicate pattern for i in range(n): seasonal[i] = seasonal_pattern[i % seasonal_period] # Residual residual = detrended - seasonal return { "trend": trend, "seasonal": seasonal, "residual": residual, "original": self.y_, }
[docs] def get_components(self) -> Dict[str, np.ndarray]: """Get decomposition components. Returns: Dictionary with 'trend', 'seasonal', 'residual', and 'original' components. """ self._check_is_fitted() return self.components_
[docs] class DetrendTransformer(BaseTransformer): """Remove trend from time series."""
[docs] def __init__(self, method: str = "linear"): """Initialize detrend transformer. Args: method: Trend removal method: 'linear', 'polynomial', or 'moving_average'. """ super().__init__() self.method = method set_tags( self, scitype_input="SeriesLike", scitype_output="SeriesLike", handles_missing=False, requires_sorted_index=True, )
[docs] def fit( self, y: Any, X: Optional[Any] = None, **fit_params: Any ) -> "DetrendTransformer": """Fit the detrend transformer. Args: y: Target time series. X: Optional exogenous data (ignored). **fit_params: Additional fit parameters. Returns: Self for method chaining. """ if isinstance(y, pd.Series): self.y_ = y.values elif isinstance(y, pd.DataFrame) and y.shape[1] == 1: self.y_ = y.iloc[:, 0].values else: self.y_ = np.asarray(y, dtype=float) valid_mask = np.isfinite(self.y_) self.y_ = self.y_[valid_mask] if len(self.y_) < 3: raise ValueError("Need at least 3 data points") # Detect and store trend trend_info = detect_trend(self.y_, method=self.method) self.trend_ = trend_info["trend"] self._is_fitted = True return self
[docs] def transform(self, y: Any, X: Optional[Any] = None) -> pd.Series: """Remove trend from time series. Args: y: Target time series (should match fit data). X: Optional exogenous data (ignored). Returns: Detrended series. """ self._check_is_fitted() detrended = self.y_ - self.trend_ return pd.Series(detrended)
[docs] def inverse_transform(self, y: Any, X: Optional[Any] = None) -> pd.Series: """Add trend back to detrended series. Args: y: Detrended series. X: Optional exogenous data (ignored). Returns: Series with trend restored. """ self._check_is_fitted() if isinstance(y, pd.Series): detrended = y.values elif isinstance(y, pd.DataFrame) and y.shape[1] == 1: detrended = y.iloc[:, 0].values else: detrended = np.asarray(y) reconstructed = detrended + self.trend_ return pd.Series(reconstructed)
[docs] class DeseasonalizeTransformer(BaseTransformer): """Remove seasonality from time series."""
[docs] def __init__(self, seasonal_period: Optional[int] = None, max_period: int = 50): """Initialize deseasonalize transformer. Args: seasonal_period: Seasonal period (auto-detected if not specified). max_period: Maximum period to check for auto-detection. """ super().__init__() self.seasonal_period = seasonal_period self.max_period = max_period set_tags( self, scitype_input="SeriesLike", scitype_output="SeriesLike", handles_missing=False, requires_sorted_index=True, )
[docs] def fit( self, y: Any, X: Optional[Any] = None, **fit_params: Any ) -> "DeseasonalizeTransformer": """Fit the deseasonalize transformer. Args: y: Target time series. X: Optional exogenous data (ignored). **fit_params: Additional fit parameters. Returns: Self for method chaining. """ if isinstance(y, pd.Series): self.y_ = y.values elif isinstance(y, pd.DataFrame) and y.shape[1] == 1: self.y_ = y.iloc[:, 0].values else: self.y_ = np.asarray(y, dtype=float) valid_mask = np.isfinite(self.y_) self.y_ = self.y_[valid_mask] # Remove trend first for better seasonality detection trend_info = detect_trend(self.y_, method="linear") detrended = self.y_ - trend_info["trend"] # Detect seasonal period if not provided if self.seasonal_period is None: period = _detect_seasonal_period(detrended, self.max_period) if period is None: # No seasonality detected self.seasonal_ = np.zeros_like(self.y_) self.seasonal_period = None else: self.seasonal_period = period else: period = self.seasonal_period # Extract seasonal pattern n = len(detrended) seasonal = np.zeros_like(self.y_) if period and period > 1: n_periods = n // period if n_periods > 0: seasonal_pattern = np.zeros(period) for i in range(period): indices = np.arange(i, n, period) if len(indices) > 0: seasonal_pattern[i] = np.mean(detrended[indices]) # Center pattern seasonal_pattern = seasonal_pattern - np.mean(seasonal_pattern) # Replicate pattern for i in range(n): seasonal[i] = seasonal_pattern[i % period] self.seasonal_ = seasonal self._is_fitted = True return self
[docs] def transform(self, y: Any, X: Optional[Any] = None) -> pd.Series: """Remove seasonality from time series. Args: y: Target time series (should match fit data). X: Optional exogenous data (ignored). Returns: Deseasonalized series. """ self._check_is_fitted() deseasonalized = self.y_ - self.seasonal_ return pd.Series(deseasonalized)
[docs] def inverse_transform(self, y: Any, X: Optional[Any] = None) -> pd.Series: """Add seasonality back to deseasonalized series. Args: y: Deseasonalized series. X: Optional exogenous data (ignored). Returns: Series with seasonality restored. """ self._check_is_fitted() if isinstance(y, pd.Series): deseasonalized = y.values elif isinstance(y, pd.DataFrame) and y.shape[1] == 1: deseasonalized = y.iloc[:, 0].values else: deseasonalized = np.asarray(y) reconstructed = deseasonalized + self.seasonal_ return pd.Series(reconstructed)