Source code for pyorps.graph.api.igraph_api

"""
PYORPS: An Open-Source Tool for Automated Power Line Routing

Reference:
[1] Hofmann, M., Stetz, T., Kammer, F., Repo, S.: 'PYORPS: An Open-Source Tool for
    Automated Power Line Routing', CIRED 2025 - 28th Conference and Exhibition on
    Electricity Distribution, 16 - 19 June 2025, Geneva, Switzerland
"""
from typing import Optional

# Third party
import igraph as ig
from numpy import float64, ndarray, max as np_max

# Project files
from pyorps.core.exceptions import NoPathFoundError, AlgorithmNotImplementedError
from pyorps.core.types import Node, NodeList, NodePathList
from pyorps.graph.api.graph_library_api import GraphLibraryAPI


[docs] class IGraphAPI(GraphLibraryAPI):
[docs] def create_graph( self, from_nodes: NodeList, to_nodes: NodeList, cost: Optional[ndarray[int]] = None, **kwargs ) -> ig.Graph: """ Creates a graph object with the igraph library. Parameters: from_nodes: The starting node indices from the edge data to_nodes: The ending node indices from the edge data cost: The weight of the edge data kwargs: Additional parameters for the underlying graph library Returns: The graph object """ # Determine number of nodes if (n := kwargs.get('n', None)) is not None: n_vertices = n else: n_vertices = np_max([np_max(from_nodes), np_max(to_nodes)]) + 1 # Create graph self.graph = ig.Graph(n=int(n_vertices), directed=False) # Create edges edges = list(zip(from_nodes, to_nodes)) # Add edges with weights if provided, otherwise without weights if cost is not None: weights = cost.astype(float64, copy=False) self.graph.add_edges(edges) self.graph.es['weight'] = weights else: self.graph.add_edges(edges) # Remove isolated nodes if requested if kwargs.get('remove_isolated_nodes', False): self.remove_isolates() return self.graph
[docs] def get_number_of_nodes(self): """ Returns the number of nodes in the graph. Returns: The number of nodes """ return self.graph.vcount()
[docs] def get_number_of_edges(self): """ Returns the number of edges in the graph. Returns: The number of edges """ return self.graph.ecount()
[docs] def remove_isolates(self): """ If the graph object was initialized with the maximum number of nodes, this function helps to reduce the occupied memory by removing nodes without any edge (degree == 0). Returns: None """ # Get vertices with degree 0 isolated_vertices = [v.index for v in self.graph.vs if v.degree() == 0] # Delete them from highest index to lowest to avoid reindexing issues isolated_vertices.sort(reverse=True) for v in isolated_vertices: self.graph.delete_vertices(v)
[docs] def get_nodes(self) -> NodeList: """ This method returns the nodes in the graph as a list or numpy array of node indices. Returns: List or array of node indices of the nodes in the graph """ return [v.index for v in self.graph.vs()]
def _compute_single_path( self, source: Node, target: Node, algorithm: str, **kwargs ) -> NodeList: """ Computes shortest path between a single source and target. Parameters: source: Source node identifier target: Target node identifier algorithm: Algorithm to use for computation kwargs: Additional algorithm-specific parameters Returns: List of node identifiers representing the shortest path """ if 'weight' in self.graph.es.attributes(): weights = self.graph.es.get_attribute_values('weight') else: weights = None if algorithm == "dijkstra": path = self.graph.get_shortest_paths(source, target, weights=weights, output="vpath")[0] elif algorithm == "bellman_ford": path = self.graph.get_shortest_paths(source, target, weights=weights, output="vpath", algorithm="bellman_ford")[0] elif algorithm == "astar": heuristic_function = kwargs.get('heu', None) if heuristic_function is None: _, heuristic = self.get_a_star_heuristic(target, **kwargs) def heuristic_function(_graph, node, _target): return heuristic[node] path = self.graph.get_shortest_path_astar(source, target, heuristics=heuristic_function, weights=weights, output="vpath", mode="all") else: raise AlgorithmNotImplementedError(algorithm, self.__class__.__name__) if len(path) == 0: raise NoPathFoundError(source=source, target=target) path = self._ensure_path_endpoints(path, source, target) return path def _compute_single_source_multiple_targets( self, source: Node, targets: NodeList, algorithm: str, **kwargs ) -> NodePathList: """ Computes shortest paths from a single source to multiple targets. Parameters: source: Source node identifier targets: List of target node identifiers algorithm: Algorithm to use for computation kwargs: Additional algorithm-specific parameters Returns: List of paths from the source to each target """ if 'weight' in self.graph.es.attributes(): weights = self.graph.es.get_attribute_values('weight') else: weights = None if algorithm in ["dijkstra", "bellman_ford"]: # igraph can compute paths to multiple targets at once algo = "dijkstra" if algorithm == "dijkstra" else "bellman_ford" all_paths = self.graph.get_shortest_paths(source, targets, weights=weights, output="vpath", algorithm=algo) # Process each path paths = [] for i, path in enumerate(all_paths): if len(path) == 0: paths.append([]) # No path found else: paths.append(self._ensure_path_endpoints(path, source, targets[i])) return paths elif algorithm == "astar": # For A*, handle each target separately paths = [] for target in targets: try: path = self._compute_single_path(source, target, algorithm, **kwargs) paths.append(path) except NoPathFoundError: paths.append([]) return paths else: raise AlgorithmNotImplementedError(algorithm, self.__class__.__name__) def _all_pairs_shortest_path( self, sources: NodeList, targets: NodeList, algorithm: str, **kwargs ) -> NodePathList: """ Computes shortest paths between all pairs of sources and targets. Parameters: sources: List of source node identifiers targets: List of target node identifiers algorithm: Algorithm to use for computation kwargs: Additional algorithm-specific parameters Returns: List of paths for all source-target combinations """ if algorithm in ["dijkstra", "bellman_ford"]: paths = [] if 'weight' in self.graph.es.attributes(): weights = self.graph.es.get_attribute_values('weight') else: weights = None algo = "dijkstra" if algorithm == "dijkstra" else "bellman_ford" for source in sources: # For each source, compute paths to all targets at once target_paths = self.graph.get_shortest_paths(source, targets, weights=weights, output="vpath", algorithm=algo) # Process each path for i, path in enumerate(target_paths): if len(path) == 0: paths.append([]) # No path found else: paths.append(self._ensure_path_endpoints(path, source, targets[i])) return paths else: # For other algorithms, compute each path individually return self._compute_all_pairs_shortest_paths(sources, targets, algorithm, **kwargs)