Source code for timesmith.utils.autocorrelation

"""Autocorrelation and partial autocorrelation functions for time series."""

import logging
from typing import List, Optional

import numpy as np
import pandas as pd

from timesmith.typing import SeriesLike

logger = logging.getLogger(__name__)

try:
    from numba import njit, prange

    HAS_NUMBA = True
except ImportError:
    HAS_NUMBA = False

    def njit(*args, **kwargs):
        def decorator(func):
            return func

        return decorator

    prange = range


[docs] def autocorrelation(data: SeriesLike, max_lag: Optional[int] = None) -> List[float]: """Calculate autocorrelation function (ACF). Computes the autocorrelation coefficients for different lags, measuring the correlation between a time series and a lagged version of itself. Args: data: Time series data (Series, DataFrame, or array-like). max_lag: Maximum lag to calculate (default: len(data) - 1). Returns: List of autocorrelation coefficients for each lag (lag 0 to max_lag). Raises: ValueError: If data is too short or max_lag is invalid. Example: >>> data = [1, 2, 3, 4, 5, 4, 3, 2, 1] >>> acf = autocorrelation(data, max_lag=3) >>> len(acf) 4 >>> acf[0] # Lag 0 is always 1.0 1.0 """ # Convert to numpy array if isinstance(data, pd.Series): data_array = data.values elif isinstance(data, pd.DataFrame): if data.shape[1] == 1: data_array = data.iloc[:, 0].values else: raise ValueError("DataFrame must have exactly one column") else: data_array = np.asarray(data, dtype=float) if len(data_array) < 2: raise ValueError("Data must contain at least 2 values") if max_lag is None: max_lag = len(data_array) - 1 elif max_lag < 0 or max_lag >= len(data_array): raise ValueError( f"max_lag must be between 0 and {len(data_array) - 1}, got {max_lag}" ) mean = np.mean(data_array) var = np.var(data_array, ddof=0) # Population variance if var == 0: # Constant data: ACF is 1 at lag 0, 0 elsewhere return [1.0] + [0.0] * max_lag # Use optimized autocorrelation if available if HAS_NUMBA and len(data_array) > 100: acf_array = _autocorrelation_numba(data_array, mean, var, max_lag) acf = acf_array.tolist() else: acf = [] for lag in range(max_lag + 1): if lag == 0: acf.append(1.0) else: numerator = np.sum( (data_array[:-lag] - mean) * (data_array[lag:] - mean) ) denominator = len(data_array) * var acf.append(float(numerator / denominator)) return acf
@njit(cache=True, fastmath=True) def _autocorrelation_numba( data_array: np.ndarray, mean: float, var: float, max_lag: int ) -> np.ndarray: """Numba-optimized autocorrelation computation. Args: data_array: Input data array. mean: Mean of the data. var: Variance of the data. max_lag: Maximum lag. Returns: Array of autocorrelation coefficients. """ n = len(data_array) acf = np.zeros(max_lag + 1, dtype=np.float64) acf[0] = 1.0 # Lag 0 is always 1.0 for lag in range(1, max_lag + 1): numerator = 0.0 for i in range(lag, n): numerator += (data_array[i] - mean) * (data_array[i - lag] - mean) denominator = var * n acf[lag] = numerator / denominator if denominator != 0 else 0.0 return acf
[docs] def partial_autocorrelation( data: SeriesLike, max_lag: Optional[int] = None ) -> List[float]: """Calculate partial autocorrelation function (PACF). Computes the partial autocorrelation coefficients, which measure the correlation between observations at different lags while controlling for intermediate lags. Uses the Durbin-Levinson algorithm for computation. Args: data: Time series data (Series, DataFrame, or array-like). max_lag: Maximum lag to calculate (default: min(len(data)//2, 10)). Returns: List of partial autocorrelation coefficients (lag 0 to max_lag). Raises: ValueError: If data is too short. Example: >>> data = [1, 2, 3, 4, 5, 4, 3, 2, 1] >>> pacf = partial_autocorrelation(data, max_lag=3) >>> len(pacf) 4 >>> pacf[0] # Lag 0 is always 1.0 1.0 """ # Convert to numpy array if isinstance(data, pd.Series): data_array = data.values elif isinstance(data, pd.DataFrame): if data.shape[1] == 1: data_array = data.iloc[:, 0].values else: raise ValueError("DataFrame must have exactly one column") else: data_array = np.asarray(data, dtype=float) if len(data_array) < 2: raise ValueError("Data must contain at least 2 values") if max_lag is None: max_lag = min(len(data_array) // 2, 10) elif max_lag < 0 or max_lag >= len(data_array): raise ValueError( f"max_lag must be between 0 and {len(data_array) - 1}, got {max_lag}" ) # Get ACF values acf_values = autocorrelation(data_array, max_lag=max_lag) pacf = [1.0] # PACF at lag 0 is always 1 # Durbin-Levinson algorithm for k in range(1, max_lag + 1): if k == 1: pacf.append(acf_values[1]) else: # Calculate numerator numerator = acf_values[k] for j in range(1, k): numerator -= pacf[j] * acf_values[k - j] # Calculate denominator denominator = 1.0 for j in range(1, k): denominator -= pacf[j] * acf_values[j] if abs(denominator) < 1e-10: pacf.append(0.0) else: pacf.append(float(numerator / denominator)) return pacf
[docs] def autocorrelation_plot_data(data: SeriesLike, max_lag: Optional[int] = None) -> dict: """Calculate ACF and PACF for plotting. Convenience function that returns both ACF and PACF along with lag indices for easy plotting. Args: data: Time series data. max_lag: Maximum lag to calculate. Returns: Dictionary with keys: - 'lags': Array of lag values - 'acf': Autocorrelation values - 'pacf': Partial autocorrelation values """ if max_lag is None: if isinstance(data, (pd.Series, pd.DataFrame)): max_lag = min(len(data) // 2, 40) else: max_lag = min(len(data) // 2, 40) acf = autocorrelation(data, max_lag=max_lag) pacf = partial_autocorrelation(data, max_lag=max_lag) return { "lags": np.arange(len(acf)), "acf": np.array(acf), "pacf": np.array(pacf), }