Source code for timesmith.core.changepoint

"""Change point detection for time series."""

import logging
from typing import Any, Optional

import numpy as np
import pandas as pd

from timesmith.core.base import BaseDetector
from timesmith.core.tags import set_tags
from timesmith.typing import SeriesLike

logger = logging.getLogger(__name__)

# Optional scipy for median filtering
try:
    from scipy.ndimage import median_filter

    HAS_SCIPY = True
except ImportError:
    HAS_SCIPY = False

    def median_filter(arr, size):
        """Simple median filter using numpy (fallback when scipy not available)."""
        if size <= 1:
            return arr
        result = np.zeros_like(arr)
        half = size // 2
        for i in range(len(arr)):
            start = max(0, i - half)
            end = min(len(arr), i + half + 1)
            result[i] = np.median(arr[start:end])
        return result


# Optional ruptures library for PELT algorithm
try:
    import ruptures as rpt

    RUPTURES_AVAILABLE = True
except ImportError:
    RUPTURES_AVAILABLE = False
    logger.warning(
        "ruptures library not installed. PELT functionality will be limited. "
        "Install with: pip install ruptures"
    )

# Optional numba for acceleration
try:
    from numba import njit

    HAS_NUMBA = True
except ImportError:
    HAS_NUMBA = False

    def njit(*args, **kwargs):
        """Dummy decorator when numba is not available."""

        def decorator(func):
            return func

        if len(args) == 1 and callable(args[0]):
            return args[0]
        return decorator


[docs] def preprocess_for_changepoint( y: SeriesLike, median_window: int = 5, detrend_window: int = 100, ) -> np.ndarray: """Preprocess time series for change point detection. Applies median filtering to remove spikes and baseline removal to eliminate drift. Args: y: Time series values. median_window: Window size for spike removal. detrend_window: Window size for baseline removal (0 to skip). Returns: Preprocessed time series. """ if isinstance(y, pd.Series): y_arr = y.values elif isinstance(y, pd.DataFrame) and y.shape[1] == 1: y_arr = y.iloc[:, 0].values else: y_arr = np.asarray(y) if len(y_arr) == 0: raise ValueError("y cannot be empty") # Median filter to remove spikes while preserving sharp edges y_filtered = median_filter(y_arr, size=median_window) # Optional detrending (remove long-wavelength drift) if detrend_window > 0 and len(y_arr) > detrend_window: # Compute baseline with large median filter baseline = median_filter(y_filtered, size=detrend_window) # Remove baseline and restore median to preserve absolute scale y_processed = y_filtered - baseline + np.median(y_filtered) else: y_processed = y_filtered return y_processed
@njit(cache=True) def _bayesian_changepoint_kernel( y: np.ndarray, hazard: float, beta0: float, ) -> np.ndarray: """Numba-optimized kernel for Bayesian online change-point detection. Args: y: Time series values (1D array). hazard: Hazard rate (1 / expected_segment_length). beta0: Prior variance parameter. Returns: Change point probabilities at each time step. """ n = len(y) # Initialize run_length_probs = np.zeros(n + 1, dtype=np.float64) run_length_probs[0] = 1.0 change_point_probs = np.zeros(n, dtype=np.float64) # Track sufficient statistics sum_x = np.zeros(n + 1, dtype=np.float64) sum_x2 = np.zeros(n + 1, dtype=np.float64) count = np.zeros(n + 1, dtype=np.float64) for t in range(n): x = y[t] # Roll arrays manually (Numba doesn't support np.roll directly) for idx in range(n, 0, -1): sum_x[idx] = sum_x[idx - 1] sum_x2[idx] = sum_x2[idx - 1] count[idx] = count[idx - 1] sum_x[0] = 0.0 sum_x2[0] = 0.0 count[0] = 0.0 # Update statistics for idx in range(1, n + 1): sum_x[idx] += x sum_x2[idx] += x * x count[idx] += 1.0 # Compute predictive probabilities predictive_probs = np.ones(n + 1, dtype=np.float64) * 1e-10 max_r = min(t + 1, n) for r in range(max_r): if count[r] > 0.0: n_r = count[r] mean_r = sum_x[r] / n_r var_r = (sum_x2[r] / n_r - mean_r * mean_r) + beta0 # Gaussian log predictive diff = x - mean_r predictive_probs[r] = np.exp(-0.5 * (diff * diff) / (var_r + 1e-6)) # Growth probabilities (no change) run_length_probs = run_length_probs * predictive_probs # Change point probability cp_prob = np.sum(run_length_probs) * hazard change_point_probs[t] = cp_prob / (cp_prob + 1e-10) # Update run lengths (shift and apply hazard) for idx in range(n, 0, -1): run_length_probs[idx] = run_length_probs[idx - 1] * (1.0 - hazard) run_length_probs[0] = cp_prob # Normalize total = np.sum(run_length_probs) if total > 0.0: run_length_probs = run_length_probs / total return change_point_probs
[docs] class PELTDetector(BaseDetector): """Change point detector using PELT (Pruned Exact Linear Time) algorithm."""
[docs] def __init__( self, penalty: Optional[float] = None, model: str = "l2", min_size: int = 3, jump: int = 1, preprocess: bool = True, ): """Initialize PELT detector. Args: penalty: Penalty value (higher = fewer change points). Auto-tuned if None. model: Cost function model ('l2' for mean shift, 'rbf' for distributional change). min_size: Minimum segment length. jump: Subsample (1 = no subsampling). preprocess: Whether to preprocess data before detection. """ if not RUPTURES_AVAILABLE: raise ImportError( "ruptures library required for PELT detector. " "Install with: pip install ruptures" ) super().__init__() self.penalty = penalty self.model = model self.min_size = min_size self.jump = jump self.preprocess = preprocess set_tags( self, scitype_input="SeriesLike", scitype_output="SeriesLike", handles_missing=False, requires_sorted_index=True, )
[docs] def fit(self, y: Any, X: Optional[Any] = None, **fit_params: Any) -> "PELTDetector": """Fit the change point detector. Args: y: Target time series. X: Optional exogenous data (ignored). **fit_params: Additional fit parameters. Returns: Self for method chaining. """ if isinstance(y, pd.Series): self.y_ = y.values elif isinstance(y, pd.DataFrame) and y.shape[1] == 1: self.y_ = y.iloc[:, 0].values else: self.y_ = np.asarray(y) if len(self.y_) == 0: raise ValueError("y cannot be empty") # Preprocess if requested if self.preprocess: self.y_processed_ = preprocess_for_changepoint(self.y_) else: self.y_processed_ = self.y_ # Auto-tune penalty if not provided if self.penalty is None: n = len(self.y_processed_) self.penalty_ = np.log(n) * np.var(self.y_processed_) logger.info(f"Auto-tuned penalty: {self.penalty_:.2f}") else: self.penalty_ = self.penalty self._is_fitted = True return self
[docs] def score(self, y: Any, X: Optional[Any] = None) -> np.ndarray: """Compute change point scores (indices of detected change points). Args: y: Target time series (should match fit data). X: Optional exogenous data (ignored). Returns: Array of change point indices. """ self._check_is_fitted() # Use processed data y_data = self.y_processed_ # Create PELT model algo = rpt.Pelt(model=self.model, min_size=self.min_size, jump=self.jump) # Fit and predict algo.fit(y_data.reshape(-1, 1)) change_points = algo.predict(pen=self.penalty_) # Remove the final point (always equals length of signal) change_points = np.array(change_points[:-1]) logger.info( f"PELT detected {len(change_points)} change points (model={self.model})" ) return change_points
[docs] def predict( self, y: Any, X: Optional[Any] = None, threshold: Optional[float] = None ) -> np.ndarray: """Predict change point flags (binary array). Args: y: Target time series (should match fit data). X: Optional exogenous data (ignored). threshold: Optional threshold (ignored for PELT, uses penalty instead). Returns: Boolean array with True at change points. """ change_points = self.score(y, X) # Create boolean array flags = np.zeros(len(self.y_), dtype=bool) flags[change_points] = True return flags
[docs] class BayesianChangePointDetector(BaseDetector): """Change point detector using Bayesian online change-point detection."""
[docs] def __init__( self, expected_segment_length: float = 100.0, threshold: float = 0.5, preprocess: bool = True, ): """Initialize Bayesian change point detector. Args: expected_segment_length: Expected length between change points. threshold: Probability threshold for flagging change points (0-1). preprocess: Whether to preprocess data before detection. """ super().__init__() self.expected_segment_length = expected_segment_length self.threshold = threshold self.preprocess = preprocess set_tags( self, scitype_input="SeriesLike", scitype_output="SeriesLike", handles_missing=False, requires_sorted_index=True, )
[docs] def fit( self, y: Any, X: Optional[Any] = None, **fit_params: Any ) -> "BayesianChangePointDetector": """Fit the change point detector. Args: y: Target time series. X: Optional exogenous data (ignored). **fit_params: Additional fit parameters. Returns: Self for method chaining. """ if isinstance(y, pd.Series): self.y_ = y.values elif isinstance(y, pd.DataFrame) and y.shape[1] == 1: self.y_ = y.iloc[:, 0].values else: self.y_ = np.asarray(y, dtype=np.float64) if len(self.y_) == 0: raise ValueError("y cannot be empty") # Preprocess if requested if self.preprocess: self.y_processed_ = preprocess_for_changepoint(self.y_) else: self.y_processed_ = self.y_.astype(np.float64) self._is_fitted = True return self
[docs] def score(self, y: Any, X: Optional[Any] = None) -> np.ndarray: """Compute change point probabilities. Args: y: Target time series (should match fit data). X: Optional exogenous data (ignored). Returns: Array of change point probabilities at each time step. """ self._check_is_fitted() y_data = self.y_processed_ hazard = 1.0 / self.expected_segment_length beta0 = np.var(y_data) # Call optimized kernel change_point_probs = _bayesian_changepoint_kernel(y_data, hazard, beta0) return change_point_probs
[docs] def predict( self, y: Any, X: Optional[Any] = None, threshold: Optional[float] = None ) -> np.ndarray: """Predict change point flags. Args: y: Target time series (should match fit data). X: Optional exogenous data (ignored). threshold: Optional threshold (uses self.threshold if not provided). Returns: Boolean array with True at change points. """ threshold = threshold or self.threshold probs = self.score(y, X) # Extract change points above threshold change_points = np.where(probs > threshold)[0] logger.info( f"Bayesian detection found {len(change_points)} change points " f"(threshold={threshold})" ) # Create boolean array flags = np.zeros(len(self.y_), dtype=bool) flags[change_points] = True return flags
[docs] class CUSUMDetector(BaseDetector): """Change point detector using CUSUM (Cumulative Sum) method. CUSUM detects changes by tracking cumulative deviations from a baseline. Detects change points by looking for significant shifts in the rate values. """
[docs] def __init__( self, baseline_window: int = 10, threshold: float = 3.0, ): """Initialize CUSUM detector. Args: baseline_window: Window size for detecting changes. threshold: Z-score threshold for change detection (lower = more sensitive). """ super().__init__() self.baseline_window = baseline_window self.threshold = threshold set_tags( self, scitype_input="SeriesLike", scitype_output="SeriesLike", handles_missing=False, requires_sorted_index=True, )
[docs] def fit( self, y: Any, X: Optional[Any] = None, **fit_params: Any ) -> "CUSUMDetector": """Fit the change point detector. Args: y: Target time series. X: Optional exogenous data (ignored). **fit_params: Additional fit parameters. Returns: Self for method chaining. """ if isinstance(y, pd.Series): self.y_ = y.values elif isinstance(y, pd.DataFrame) and y.shape[1] == 1: self.y_ = y.iloc[:, 0].values else: self.y_ = np.asarray(y) if len(self.y_) == 0: raise ValueError("y cannot be empty") if len(self.y_) < self.baseline_window * 2: raise ValueError( f"Need at least {self.baseline_window * 2} data points for CUSUM" ) self._is_fitted = True return self
[docs] def score(self, y: Any, X: Optional[Any] = None) -> np.ndarray: """Compute change point scores (indices of detected change points). Args: y: Target time series (should match fit data). X: Optional exogenous data (ignored). Returns: Array of change point indices. """ self._check_is_fitted() rates = self.y_ change_points = [] skip_until = 0 # Slide a window and compare statistics before/after each point for i in range(self.baseline_window, len(rates) - self.baseline_window): # Skip if we're in a cooldown period after detecting a change if i < skip_until: continue # Get windows before and after this point before = rates[max(0, i - self.baseline_window) : i] after = rates[i : min(len(rates), i + self.baseline_window)] if len(before) > 0 and len(after) > 0: # Calculate means mean_before = np.mean(before) mean_after = np.mean(after) # Calculate pooled standard deviation std_before = np.std(before) std_after = np.std(after) pooled_std = np.sqrt( (std_before**2 + std_after**2) / 2 + 1e-10 ) # Add small constant to avoid div by zero # Calculate z-score of difference in means z_score = abs(mean_before - mean_after) / pooled_std if z_score > self.threshold: change_points.append(i) # Skip ahead to avoid detecting the same change multiple times skip_until = i + self.baseline_window change_points = np.array(change_points) logger.info(f"CUSUM detected {len(change_points)} change points") return change_points
[docs] def predict( self, y: Any, X: Optional[Any] = None, threshold: Optional[float] = None ) -> np.ndarray: """Predict change point flags (binary array). Args: y: Target time series (should match fit data). X: Optional exogenous data (ignored). threshold: Optional threshold (ignored for CUSUM, uses self.threshold). Returns: Boolean array with True at change points. """ change_points = self.score(y, X) # Create boolean array flags = np.zeros(len(self.y_), dtype=bool) flags[change_points] = True return flags