Source code for timesmith.network.graph

"""Lightweight graph representation for time series networks.

Primary storage is edges + optional adjacency matrix.
NetworkX conversion is lazy and optional.
"""

from typing import List, Optional, Tuple

import numpy as np
from numpy.typing import NDArray

# Import for type hints only
try:
    from typing import TYPE_CHECKING

    if TYPE_CHECKING:
        import networkx as nx  # noqa: F401
except ImportError:
    pass


[docs] class Graph: """Lightweight graph representation. Primary storage is edges + optional adjacency matrix. NetworkX conversion is lazy and optional. Attributes: edges: List of (int, int) or (int, int, float) tuples (edge list). n_nodes: Number of nodes. directed: Whether graph is directed. weighted: Whether edges have weights. """
[docs] def __init__( self, edges: List[Tuple], n_nodes: int, directed: bool = False, weighted: bool = False, ): """Initialize graph. Args: edges: List of edge tuples (unweighted or weighted). n_nodes: Number of nodes. directed: Whether graph is directed. weighted: Whether edges have weights. """ self.edges = edges self.n_nodes = n_nodes self.directed = directed self.weighted = weighted self._adjacency: Optional = None # scipy sparse matrix, not dense self._degrees: Optional[NDArray] = None self._in_degrees: Optional[NDArray] = None self._out_degrees: Optional[NDArray] = None
@property def n_edges(self) -> int: """Number of edges.""" if hasattr(self, "_n_edges_cached"): return self._n_edges_cached return len(self.edges)
[docs] def degree_sequence(self) -> NDArray[np.int64]: """Degree sequence (cached). For undirected graphs, returns total degree. For directed graphs, returns out-degree. Returns: Array of degrees for each node. """ if self.directed: return self.out_degree_sequence() else: if self._degrees is None: degrees = np.zeros(self.n_nodes, dtype=np.int64) for edge in self.edges: i, j = edge[0], edge[1] degrees[i] += 1 if i != j: degrees[j] += 1 self._degrees = degrees return self._degrees
[docs] def in_degree_sequence(self) -> NDArray[np.int64]: """In-degree sequence for directed graphs (cached). Returns: Array of in-degrees for each node. """ if not self.directed: raise ValueError("in_degree_sequence() only valid for directed graphs") if self._in_degrees is None: in_degrees = np.zeros(self.n_nodes, dtype=np.int64) for edge in self.edges: j = edge[1] # Destination node in_degrees[j] += 1 self._in_degrees = in_degrees return self._in_degrees
[docs] def out_degree_sequence(self) -> NDArray[np.int64]: """Out-degree sequence for directed graphs (cached). Returns: Array of out-degrees for each node. """ if not self.directed: raise ValueError("out_degree_sequence() only valid for directed graphs") if self._out_degrees is None: out_degrees = np.zeros(self.n_nodes, dtype=np.int64) for edge in self.edges: i = edge[0] # Source node out_degrees[i] += 1 self._out_degrees = out_degrees return self._out_degrees
[docs] def adjacency_matrix(self, format: str = "sparse"): """Adjacency matrix (lazy, sparse by default). Args: format: Output format: "sparse" (CSR), "dense", or "coo". Returns: Adjacency matrix. Sparse by default to avoid memory blowup. Raises: ValueError: If format="dense" and n_nodes > 50_000. """ from scipy import sparse as sp # Safety guardrail: refuse dense for large graphs if format == "dense" and self.n_nodes > 50_000: raise ValueError( f"Refusing to build dense adjacency matrix for n={self.n_nodes} nodes. " f"This would require ~{self.n_nodes**2 * 8 / 1e9:.1f} GB of memory. " f"Use format='sparse' or format='coo' instead." ) # Build sparse COO from edges (memory efficient) if self._adjacency is None: if len(self.edges) == 0: # Empty graph self._adjacency = sp.coo_matrix((self.n_nodes, self.n_nodes)) else: # Extract edge data if self.weighted: rows = [e[0] for e in self.edges] cols = [e[1] for e in self.edges] data = [e[2] for e in self.edges] else: rows = [e[0] for e in self.edges] cols = [e[1] for e in self.edges] data = [1.0] * len(self.edges) # Add reverse edges for undirected graphs if not self.directed: reverse_rows = cols.copy() reverse_cols = rows.copy() rows = rows + reverse_rows cols = cols + reverse_cols data = data + data self._adjacency = sp.coo_matrix( (data, (rows, cols)), shape=(self.n_nodes, self.n_nodes) ) # Convert to requested format if format == "dense": return self._adjacency.toarray() elif format == "coo": return self._adjacency.tocoo() else: # sparse (default) return self._adjacency.tocsr()
[docs] def as_networkx(self, force: bool = False): """Convert to NetworkX graph (optional dependency). Args: force: If False, refuse conversion for n > 200_000 nodes. Returns: NetworkX graph object. Raises: ImportError: If NetworkX is not installed. ValueError: If n_nodes > 200_000 and force=False. """ # Safety guardrail for large graphs if not force and self.n_nodes > 200_000: raise ValueError( f"Refusing NetworkX conversion for n={self.n_nodes} nodes. " f"NetworkX is not designed for graphs this large. " f"Use force=True to override, or work with edges_coo() / " f"adjacency_matrix(format='sparse') instead." ) try: import networkx as nx except ImportError: raise ImportError( "NetworkX is required for as_networkx(). " "Install with: pip install networkx" ) if self.directed: G = nx.DiGraph() else: G = nx.Graph() G.add_nodes_from(range(self.n_nodes)) if self.weighted: G.add_weighted_edges_from(self.edges) else: G.add_edges_from(self.edges) return G
[docs] def summary(self, include_triangles: bool = False) -> dict: """Graph summary statistics (computed from edges/degrees, no dense matrix). Args: include_triangles: If True, compute triangle count (requires edge list, slower). Returns: Dictionary with graph statistics. """ degrees = self.degree_sequence() max_edges = self.n_nodes * (self.n_nodes - 1) if not self.directed: max_edges //= 2 stats = { "n_nodes": self.n_nodes, "n_edges": self.n_edges, "avg_degree": float(np.mean(degrees)), "std_degree": float(np.std(degrees)) if len(degrees) > 1 else 0.0, "min_degree": int(np.min(degrees)) if len(degrees) > 0 else 0, "max_degree": int(np.max(degrees)) if len(degrees) > 0 else 0, "density": self.n_edges / max_edges if max_edges > 0 else 0.0, } # For directed graphs, add in/out degree statistics if self.directed: in_degrees = self.in_degree_sequence() out_degrees = self.out_degree_sequence() total_degrees = in_degrees + out_degrees stats["avg_in_degree"] = float(np.mean(in_degrees)) stats["std_in_degree"] = ( float(np.std(in_degrees)) if len(in_degrees) > 1 else 0.0 ) stats["avg_out_degree"] = float(np.mean(out_degrees)) stats["std_out_degree"] = ( float(np.std(out_degrees)) if len(out_degrees) > 1 else 0.0 ) # Irreversibility score irreversibility = np.zeros(self.n_nodes, dtype=np.float64) mask = total_degrees > 0 irreversibility[mask] = ( np.abs(in_degrees[mask] - out_degrees[mask]) / total_degrees[mask] ) stats["irreversibility_score"] = float(np.mean(irreversibility)) if include_triangles and len(self.edges) > 0: triangles = self._count_triangles() stats["triangles"] = triangles return stats
def _count_triangles(self) -> int: """Count triangles from edge list (memory efficient).""" if len(self.edges) == 0: return 0 # Build neighbor sets (sparse representation) neighbors = {i: set() for i in range(self.n_nodes)} for edge in self.edges: i, j = edge[0], edge[1] neighbors[i].add(j) if not self.directed: neighbors[j].add(i) # Count triangles triangles = 0 for i in range(self.n_nodes): for j in neighbors[i]: if j > i: # Avoid double counting # Count common neighbors common = neighbors[i] & neighbors[j] triangles += len(common) # Each triangle counted 3 times (once per edge) return triangles // 3 if not self.directed else triangles def __repr__(self) -> str: return f"Graph(n_nodes={self.n_nodes}, n_edges={self.n_edges}, directed={self.directed})"