Source code for timesmith.core.decomposition
"""Time series decomposition transformers.
Provides trend and seasonality detection and removal for time series analysis.
"""
import logging
from typing import TYPE_CHECKING, Any, Dict, Optional, Union
import numpy as np
import pandas as pd
from timesmith.core.base import BaseTransformer
from timesmith.core.tags import set_tags
from timesmith.typing import SeriesLike
if TYPE_CHECKING:
from timesmith.typing import TableLike
logger = logging.getLogger(__name__)
# Optional scipy imports
try:
from scipy import signal, stats
from scipy.ndimage import uniform_filter1d
HAS_SCIPY = True
except ImportError:
HAS_SCIPY = False
signal = None
stats = None
uniform_filter1d = None
logger.warning(
"scipy not installed. Decomposition functionality will be limited. "
"Install with: pip install scipy or pip install timesmith[scipy]"
)
def _detect_seasonal_period(data: np.ndarray, max_period: int = 50) -> Optional[int]:
"""Detect seasonal period using autocorrelation.
Args:
data: Time series data.
max_period: Maximum period to check.
Returns:
Detected seasonal period or None.
"""
n = len(data)
if n < max_period * 2:
return None
# Compute autocorrelation
autocorr = np.correlate(data, data, mode="full")
autocorr = autocorr[n - 1 :] / autocorr[n - 1]
# Find peaks in autocorrelation (potential seasonal periods)
if not HAS_SCIPY:
# Fallback: find peaks manually
peaks = []
for i in range(1, min(max_period, len(autocorr) - 1)):
if (
autocorr[i] > 0.3
and autocorr[i] > autocorr[i - 1]
and autocorr[i] > autocorr[i + 1]
):
peaks.append(i)
peaks = np.array(peaks)
else:
peaks, _ = signal.find_peaks(autocorr[1:max_period], height=0.3)
if len(peaks) > 0:
# Return first significant peak
return int(peaks[0] + 1)
return None
[docs]
def detect_trend(y: SeriesLike, method: str = "linear") -> Dict[str, Any]:
"""Detect trend in time series data.
Args:
y: Time series values.
method: Trend detection method: 'linear', 'polynomial', or 'moving_average'.
Returns:
Dictionary with trend information.
"""
if isinstance(y, pd.Series):
y_arr = y.values
elif isinstance(y, pd.DataFrame) and y.shape[1] == 1:
y_arr = y.iloc[:, 0].values
else:
y_arr = np.asarray(y, dtype=float)
valid_mask = np.isfinite(y_arr)
y_arr = y_arr[valid_mask]
if len(y_arr) < 3:
raise ValueError("Need at least 3 data points")
time_arr = np.arange(len(y_arr))
if method == "linear":
slope, intercept, r_value, _, _ = np.polyfit(time_arr, y_arr, 1, full=False)
trend = slope * time_arr + intercept
strength = abs(r_value)
return {
"trend": trend,
"slope": float(slope),
"intercept": float(intercept),
"strength": float(strength),
}
elif method == "theil_sen":
# Theil-Sen estimator: more robust to outliers
if not HAS_SCIPY:
logger.warning("scipy not available, falling back to linear trend")
# Fall back to linear
slope, intercept, r_value, _, _ = np.polyfit(time_arr, y_arr, 1, full=False)
trend = slope * time_arr + intercept
return {
"trend": trend,
"slope": float(slope),
"intercept": float(intercept),
"strength": float(abs(r_value)),
}
else:
try:
slope, intercept = stats.theilslopes(y_arr, time_arr)[:2]
# Approximate correlation for Theil-Sen
r_value = np.corrcoef(time_arr, y_arr)[0, 1]
trend = slope * time_arr + intercept
return {
"trend": trend,
"slope": float(slope),
"intercept": float(intercept),
"strength": float(abs(r_value)),
}
except Exception as e:
logger.warning(f"Theil-Sen failed: {e}, falling back to linear")
# Fall back to linear
slope, intercept, r_value, _, _ = np.polyfit(
time_arr, y_arr, 1, full=False
)
trend = slope * time_arr + intercept
return {
"trend": trend,
"slope": float(slope),
"intercept": float(intercept),
"strength": float(abs(r_value)),
}
elif method == "polynomial":
coeffs = np.polyfit(time_arr, y_arr, deg=2)
trend = np.polyval(coeffs, time_arr)
# Calculate R-squared as strength
ss_res = np.sum((y_arr - trend) ** 2)
ss_tot = np.sum((y_arr - np.mean(y_arr)) ** 2)
strength = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0
return {
"trend": trend,
"coefficients": coeffs.tolist(),
"strength": float(strength),
}
elif method == "moving_average":
window = max(3, len(y_arr) // 10)
if not HAS_SCIPY:
# Fallback: simple moving average
trend = np.convolve(y_arr, np.ones(window) / window, mode="same")
else:
trend = uniform_filter1d(y_arr, size=window, mode="nearest")
# Calculate trend strength as variance reduction
var_original = np.var(y_arr)
var_residual = np.var(y_arr - trend)
strength = 1 - (var_residual / var_original) if var_original > 0 else 0
return {
"trend": trend,
"strength": float(strength),
}
else:
raise ValueError(
f"Unknown method: {method}. "
"Use 'linear', 'theil_sen', 'polynomial', or 'moving_average'"
)
[docs]
def detect_seasonality(y: SeriesLike, max_period: int = 50) -> Dict[str, Any]:
"""Detect seasonality in time series data.
Args:
y: Time series values.
max_period: Maximum period to check.
Returns:
Dictionary with seasonality information.
"""
if isinstance(y, pd.Series):
y_arr = y.values
elif isinstance(y, pd.DataFrame) and y.shape[1] == 1:
y_arr = y.iloc[:, 0].values
else:
y_arr = np.asarray(y, dtype=float)
valid_mask = np.isfinite(y_arr)
y_arr = y_arr[valid_mask]
if len(y_arr) < max_period * 2:
return {
"period": None,
"strength": 0.0,
"pattern": None,
}
# Remove trend first
trend_info = detect_trend(y_arr, method="linear")
detrended = y_arr - trend_info["trend"]
# Detect seasonal period
period = _detect_seasonal_period(detrended, max_period)
if period is None:
return {
"period": None,
"strength": 0.0,
"pattern": None,
}
# Extract seasonal pattern
n = len(detrended)
n_periods = n // period
seasonal_pattern = np.zeros(period)
for i in range(period):
indices = np.arange(i, n, period)
if len(indices) > 0:
seasonal_pattern[i] = np.mean(detrended[indices])
# Center pattern
seasonal_pattern = seasonal_pattern - np.mean(seasonal_pattern)
# Calculate strength as variance explained
seasonal_component = np.tile(seasonal_pattern, n_periods + 1)[:n]
var_seasonal = np.var(seasonal_component)
var_total = np.var(detrended)
strength = var_seasonal / var_total if var_total > 0 else 0.0
return {
"period": int(period),
"strength": float(strength),
"pattern": seasonal_pattern.tolist(),
}
[docs]
class DecomposeTransformer(BaseTransformer):
"""Decompose time series into trend, seasonal, and residual components."""
[docs]
def __init__(
self,
method: str = "moving_average",
seasonal_period: Optional[int] = None,
trend_window: Optional[int] = None,
):
"""Initialize decomposition transformer.
Args:
method: Decomposition method: 'moving_average' or 'stl'.
seasonal_period: Seasonal period
(auto-detected if not specified).
trend_window: Window size for trend extraction
(auto-determined if not specified).
"""
super().__init__()
self.method = method
self.seasonal_period = seasonal_period
self.trend_window = trend_window
set_tags(
self,
scitype_input="SeriesLike",
scitype_output="SeriesLike",
handles_missing=False,
requires_sorted_index=True,
)
[docs]
def fit(
self,
y: Union[SeriesLike, Any],
X: Optional[Union["TableLike", Any]] = None,
**fit_params: Any,
) -> "DecomposeTransformer":
"""Fit the decomposition transformer.
Args:
y: Target time series.
X: Optional exogenous data (ignored).
**fit_params: Additional fit parameters.
Returns:
Self for method chaining.
"""
if isinstance(y, pd.Series):
self.y_ = y.values
elif isinstance(y, pd.DataFrame) and y.shape[1] == 1:
self.y_ = y.iloc[:, 0].values
else:
self.y_ = np.asarray(y, dtype=float)
# Remove invalid values
valid_mask = np.isfinite(self.y_)
self.y_ = self.y_[valid_mask]
if len(self.y_) < 10:
raise ValueError("Need at least 10 data points for decomposition")
# Store decomposition components
self.components_ = self._decompose()
self._is_fitted = True
return self
[docs]
def transform(self, y: Any, X: Optional[Any] = None) -> pd.Series:
"""Return residual component (original - trend - seasonal).
Args:
y: Target time series (should match fit data).
X: Optional exogenous data (ignored).
Returns:
Residual component as Series.
"""
self._check_is_fitted()
return pd.Series(self.components_["residual"])
[docs]
def inverse_transform(self, y: Any, X: Optional[Any] = None) -> pd.Series:
"""Reconstruct original from residual by adding trend and seasonal.
Args:
y: Residual component.
X: Optional exogenous data (ignored).
Returns:
Reconstructed original series.
"""
self._check_is_fitted()
if isinstance(y, pd.Series):
residual = y.values
elif isinstance(y, pd.DataFrame) and y.shape[1] == 1:
residual = y.iloc[:, 0].values
else:
residual = np.asarray(y)
# Reconstruct: residual + trend + seasonal
reconstructed = (
residual + self.components_["trend"] + self.components_["seasonal"]
)
return pd.Series(reconstructed)
def _decompose(self) -> Dict[str, np.ndarray]:
"""Perform decomposition."""
n = len(self.y_)
# Auto-determine trend window
if self.trend_window is None:
trend_window = max(3, n // 10)
else:
trend_window = self.trend_window
# Extract trend using moving average
if not HAS_SCIPY:
# Fallback: simple moving average
trend = np.convolve(
self.y_, np.ones(trend_window) / trend_window, mode="same"
)
else:
trend = uniform_filter1d(self.y_, size=trend_window, mode="nearest")
# Detrend
detrended = self.y_ - trend
# Extract seasonal component
if self.seasonal_period is None:
seasonal_period = _detect_seasonal_period(detrended)
else:
seasonal_period = self.seasonal_period
seasonal = np.zeros_like(self.y_)
if seasonal_period and seasonal_period > 1:
# Average over seasonal periods
n_periods = n // seasonal_period
if n_periods > 0:
seasonal_pattern = np.zeros(seasonal_period)
for i in range(seasonal_period):
indices = np.arange(i, n, seasonal_period)
if len(indices) > 0:
seasonal_pattern[i] = np.mean(detrended[indices])
# Center seasonal pattern
seasonal_pattern = seasonal_pattern - np.mean(seasonal_pattern)
# Replicate pattern
for i in range(n):
seasonal[i] = seasonal_pattern[i % seasonal_period]
# Residual
residual = detrended - seasonal
return {
"trend": trend,
"seasonal": seasonal,
"residual": residual,
"original": self.y_,
}
[docs]
def get_components(self) -> Dict[str, np.ndarray]:
"""Get decomposition components.
Returns:
Dictionary with 'trend', 'seasonal', 'residual', and 'original' components.
"""
self._check_is_fitted()
return self.components_
[docs]
class DetrendTransformer(BaseTransformer):
"""Remove trend from time series."""
[docs]
def __init__(self, method: str = "linear"):
"""Initialize detrend transformer.
Args:
method: Trend removal method: 'linear', 'polynomial', or 'moving_average'.
"""
super().__init__()
self.method = method
set_tags(
self,
scitype_input="SeriesLike",
scitype_output="SeriesLike",
handles_missing=False,
requires_sorted_index=True,
)
[docs]
def fit(
self,
y: Union[SeriesLike, Any],
X: Optional[Union["TableLike", Any]] = None,
**fit_params: Any,
) -> "DetrendTransformer":
"""Fit the detrend transformer.
Args:
y: Target time series.
X: Optional exogenous data (ignored).
**fit_params: Additional fit parameters.
Returns:
Self for method chaining.
"""
if isinstance(y, pd.Series):
self.y_ = y.values
elif isinstance(y, pd.DataFrame) and y.shape[1] == 1:
self.y_ = y.iloc[:, 0].values
else:
self.y_ = np.asarray(y, dtype=float)
valid_mask = np.isfinite(self.y_)
self.y_ = self.y_[valid_mask]
if len(self.y_) < 3:
raise ValueError("Need at least 3 data points")
# Detect and store trend
trend_info = detect_trend(self.y_, method=self.method)
self.trend_ = trend_info["trend"]
self._is_fitted = True
return self
[docs]
def transform(self, y: Any, X: Optional[Any] = None) -> pd.Series:
"""Remove trend from time series.
Args:
y: Target time series (should match fit data).
X: Optional exogenous data (ignored).
Returns:
Detrended series.
"""
self._check_is_fitted()
detrended = self.y_ - self.trend_
return pd.Series(detrended)
[docs]
def inverse_transform(self, y: Any, X: Optional[Any] = None) -> pd.Series:
"""Add trend back to detrended series.
Args:
y: Detrended series.
X: Optional exogenous data (ignored).
Returns:
Series with trend restored.
"""
self._check_is_fitted()
if isinstance(y, pd.Series):
detrended = y.values
elif isinstance(y, pd.DataFrame) and y.shape[1] == 1:
detrended = y.iloc[:, 0].values
else:
detrended = np.asarray(y)
reconstructed = detrended + self.trend_
return pd.Series(reconstructed)
[docs]
class DeseasonalizeTransformer(BaseTransformer):
"""Remove seasonality from time series."""
[docs]
def __init__(self, seasonal_period: Optional[int] = None, max_period: int = 50):
"""Initialize deseasonalize transformer.
Args:
seasonal_period: Seasonal period (auto-detected if not specified).
max_period: Maximum period to check for auto-detection.
"""
super().__init__()
self.seasonal_period = seasonal_period
self.max_period = max_period
set_tags(
self,
scitype_input="SeriesLike",
scitype_output="SeriesLike",
handles_missing=False,
requires_sorted_index=True,
)
[docs]
def fit(
self,
y: Union[SeriesLike, Any],
X: Optional[Union["TableLike", Any]] = None,
**fit_params: Any,
) -> "DeseasonalizeTransformer":
"""Fit the deseasonalize transformer.
Args:
y: Target time series.
X: Optional exogenous data (ignored).
**fit_params: Additional fit parameters.
Returns:
Self for method chaining.
"""
if isinstance(y, pd.Series):
self.y_ = y.values
elif isinstance(y, pd.DataFrame) and y.shape[1] == 1:
self.y_ = y.iloc[:, 0].values
else:
self.y_ = np.asarray(y, dtype=float)
valid_mask = np.isfinite(self.y_)
self.y_ = self.y_[valid_mask]
# Remove trend first for better seasonality detection
trend_info = detect_trend(self.y_, method="linear")
detrended = self.y_ - trend_info["trend"]
# Detect seasonal period if not provided
if self.seasonal_period is None:
period = _detect_seasonal_period(detrended, self.max_period)
if period is None:
# No seasonality detected
self.seasonal_ = np.zeros_like(self.y_)
self.seasonal_period = None
else:
self.seasonal_period = period
else:
period = self.seasonal_period
# Extract seasonal pattern
n = len(detrended)
seasonal = np.zeros_like(self.y_)
if period and period > 1:
n_periods = n // period
if n_periods > 0:
seasonal_pattern = np.zeros(period)
for i in range(period):
indices = np.arange(i, n, period)
if len(indices) > 0:
seasonal_pattern[i] = np.mean(detrended[indices])
# Center pattern
seasonal_pattern = seasonal_pattern - np.mean(seasonal_pattern)
# Replicate pattern
for i in range(n):
seasonal[i] = seasonal_pattern[i % period]
self.seasonal_ = seasonal
self._is_fitted = True
return self
[docs]
def transform(self, y: Any, X: Optional[Any] = None) -> pd.Series:
"""Remove seasonality from time series.
Args:
y: Target time series (should match fit data).
X: Optional exogenous data (ignored).
Returns:
Deseasonalized series.
"""
self._check_is_fitted()
deseasonalized = self.y_ - self.seasonal_
return pd.Series(deseasonalized)
[docs]
def inverse_transform(self, y: Any, X: Optional[Any] = None) -> pd.Series:
"""Add seasonality back to deseasonalized series.
Args:
y: Deseasonalized series.
X: Optional exogenous data (ignored).
Returns:
Series with seasonality restored.
"""
self._check_is_fitted()
if isinstance(y, pd.Series):
deseasonalized = y.values
elif isinstance(y, pd.DataFrame) and y.shape[1] == 1:
deseasonalized = y.iloc[:, 0].values
else:
deseasonalized = np.asarray(y)
reconstructed = deseasonalized + self.seasonal_
return pd.Series(reconstructed)