"""Multivariate network construction from distance matrices.
Implements k-NN, ε-NN, and weighted network builders from distance matrices.
"""
from typing import Optional
import numpy as np
# Optional networkx
try:
import networkx as nx
HAS_NETWORKX = True
except ImportError:
HAS_NETWORKX = False
nx = None
# Try to import numba for JIT compilation (optional)
try:
from numba import njit, prange
HAS_NUMBA = True
except ImportError:
HAS_NUMBA = False
def njit(*args, **kwargs):
def decorator(func):
return func
if args and callable(args[0]):
return args[0]
return decorator
prange = range
@njit(cache=True)
def _knn_adjacency_numba(D: np.ndarray, k: int, weighted: bool) -> np.ndarray:
"""Compute k-NN adjacency matrix using Numba JIT (fast path).
Args:
D: Distance matrix (n, n).
k: Number of nearest neighbors.
weighted: If True, use distances as weights.
Returns:
Adjacency matrix (n, n).
"""
n = D.shape[0]
A = np.zeros((n, n))
for i in range(n):
# Find k nearest neighbors (excluding self)
row = D[i].copy()
row[i] = np.inf # Exclude self
neighbors = np.argpartition(row, k - 1)[:k]
# Assign weights
for j in neighbors:
if weighted:
A[i, j] = D[i, j]
else:
A[i, j] = 1.0
return A
[docs]
def net_knn(
D: np.ndarray,
k: int,
mutual: bool = False,
weighted: bool = False,
directed: bool = False,
):
if not HAS_NETWORKX:
raise ImportError(
"NetworkX is required for net_knn. "
"Install with: pip install networkx or pip install timesmith[network]"
)
"""k-Nearest Neighbors network from distance matrix.
Each node is connected to its k nearest neighbors.
Args:
D: Distance matrix (n, n) - smaller = more similar.
k: Number of nearest neighbors per node.
mutual: If True, require mutual k-NN (i in kNN(j) AND j in kNN(i)).
weighted: If True, edge weights = distances.
directed: If True, create directed graph (i → j if j in kNN(i)).
Returns:
Tuple of (NetworkX graph, adjacency matrix).
"""
n = D.shape[0]
if D.shape != (n, n):
raise ValueError(f"D must be square, got shape {D.shape}")
if k <= 0 or k >= n:
raise ValueError(f"k must be in range [1, {n - 1}], got {k}")
# Use Numba JIT if available (much faster for large matrices)
if HAS_NUMBA and n > 50: # Only use JIT for larger matrices
try:
A = _knn_adjacency_numba(D, k, weighted)
except Exception as e:
# Fallback to Python implementation
import logging
logger = logging.getLogger(__name__)
logger.warning(f"Numba JIT failed for k-NN, using Python fallback: {e}")
# Continue to Python implementation below
A = None
else:
A = None
# Python fallback (or for small matrices)
if A is None:
A = np.zeros((n, n))
# Use vectorized approach for k-NN (faster than loop)
# For each row, find k smallest (excluding diagonal)
for i in range(n):
# Find k nearest neighbors (excluding self)
row = D[i].copy()
row[i] = np.inf # Exclude self
neighbors = np.argpartition(row, k - 1)[:k]
# Vectorized assignment
if weighted:
A[i, neighbors] = D[i, neighbors]
else:
A[i, neighbors] = 1.0
# Apply mutual k-NN constraint
if mutual:
if directed:
A = A * A.T # Both i→j and j→i must exist
else:
A = np.minimum(A, A.T) # Symmetric mutual k-NN
# Make symmetric for undirected
if not directed:
A = (A + A.T) / 2
# Build NetworkX graph
if directed:
G = nx.DiGraph()
else:
G = nx.Graph()
G.add_nodes_from(range(n))
for i in range(n):
for j in range(n):
if A[i, j] > 0:
if weighted:
G.add_edge(i, j, weight=float(A[i, j]))
else:
G.add_edge(i, j)
return G, A
[docs]
def net_enn(
D: np.ndarray,
epsilon: Optional[float] = None,
percentile: Optional[float] = None,
weighted: bool = False,
directed: bool = False,
):
"""ε-Nearest Neighbors network from distance matrix.
Each node is connected to all nodes within distance epsilon.
Args:
D: Distance matrix (n, n) - smaller = more similar.
epsilon: Distance threshold. If None, use percentile.
percentile: Percentile of distances to use as threshold (0-100).
weighted: If True, edge weights = distances.
directed: If True, create directed graph.
Returns:
Tuple of (NetworkX graph, adjacency matrix).
"""
n = D.shape[0]
if D.shape != (n, n):
raise ValueError(f"D must be square, got shape {D.shape}")
# Determine threshold
if epsilon is None and percentile is None:
raise ValueError("Either epsilon or percentile must be provided")
if percentile is not None:
# Use percentile of all distances (excluding diagonal)
distances_flat = D[np.triu_indices(n, k=1)]
epsilon = np.percentile(distances_flat, percentile)
# Create adjacency matrix (vectorized)
# Create mask excluding diagonal
mask = (D <= epsilon) & ~np.eye(n, dtype=bool)
A = np.where(mask, D if weighted else 1.0, 0.0)
# Make symmetric for undirected
if not directed:
A = (A + A.T) / 2
# Build NetworkX graph (vectorized edge addition)
if directed:
G = nx.DiGraph()
else:
G = nx.Graph()
G.add_nodes_from(range(n))
# Find edges using vectorized operations
edges = np.argwhere(A > 0)
if weighted:
weights = A[A > 0]
G.add_weighted_edges_from(zip(edges[:, 0], edges[:, 1], weights))
else:
G.add_edges_from(edges)
return G, A