Source code for pyorps.graph.api.rustworkx_api

"""
PYORPS: An Open-Source Tool for Automated Power Line Routing

Reference:
[1] Hofmann, M., Stetz, T., Kammer, F., Repo, S.: 'PYORPS: An Open-Source Tool for
    Automated Power Line Routing', CIRED 2025 - 28th Conference and Exhibition on
    Electricity Distribution, 16 - 19 June 2025, Geneva, Switzerland
"""
# Third party
import rustworkx as rx
from numpy import where, ndarray, ravel_multi_index, max as np_max
from typing import Optional

# Project files
from pyorps.core.exceptions import NoPathFoundError, AlgorithmNotImplementedError
from pyorps.core.types import Node, NodeList, NodePathList
from pyorps.graph.api.graph_library_api import GraphLibraryAPI


[docs] class RustworkxAPI(GraphLibraryAPI):
[docs] def create_graph( self, from_nodes: NodeList, to_nodes: NodeList, cost: Optional[ndarray[int]] = None, **kwargs ) -> rx.PyGraph: """ Creates a graph object using rustworkx. Parameters: from_nodes: The starting node indices from the edge data to_nodes: The ending node indices from the edge data cost: The weight of the edge data kwargs: Additional parameters for the underlying graph library Returns: The graph object """ # Get total number of nodes needed for the graph if n := kwargs.get('n', None): max_node = n - 1 else: max_node = np_max([np_max(from_nodes), np_max(to_nodes)]) # Initialize graph self.graph = rx.PyGraph() self.graph.add_nodes_from(range(max_node + 1)) # Add edges with costs if cost is not None: self.graph.add_edges_from(list(zip(from_nodes, to_nodes, cost))) else: # Add edges with default weight of 1.0 # Rustworkx only takes a list of tuples instead of edges! edge_list = list(zip(from_nodes, to_nodes, [1.0] * len(from_nodes))) self.graph.add_edges_from(edge_list) if kwargs.get('remove_isolated_nodes', False): self.remove_isolates() return self.graph
[docs] def get_number_of_nodes(self) -> int: """ Returns the number of nodes in the graph. Returns: The number of nodes """ return self.graph.num_nodes()
[docs] def get_number_of_edges(self) -> int: """ Returns the number of edges in the graph. Returns: The number of edges """ return self.graph.num_edges()
[docs] def get_nodes(self) -> NodeList: """ This method returns the nodes in the graph as a list or numpy array of node indices. Returns: List or array of node indices of the nodes in the graph """ return self.graph.nodes()
[docs] def remove_isolates(self) -> None: """ If the graph object was initialized with the maximum number of nodes, this function helps to reduce the occupied memory by removing nodes without any edge (degree == 0). Returns: None """ indices_max_values = where(self.raster_data == 65535) nodes_max_values = ravel_multi_index(indices_max_values, self.raster_data.shape) self.graph.remove_nodes_from(nodes_max_values)
def _compute_single_path( self, source: Node, target: Node, algorithm: str, **kwargs ) -> NodeList: """ Computes shortest path between a single source and target. Parameters: source: Source node identifier target: Target node identifier algorithm: Algorithm to use for computation kwargs: Additional algorithm-specific parameters Returns: List of node identifiers representing the shortest path """ def weight_fn(edge_weight): return edge_weight try: if algorithm == "dijkstra": path = rx.dijkstra_shortest_paths(self.graph, source, target, weight_fn=weight_fn) path = list(path[target]) elif algorithm == "astar": # Get heuristic function or use default as heuristic heuristic_function = kwargs.get('heu', None) if heuristic_function is None: nodes, heuristic = self.get_a_star_heuristic(target, source, **kwargs) heuristic_dict = dict(zip(nodes, heuristic)) def heuristic_function(node): return heuristic_dict[node] def goal_reached(node): return node == target path = rx.astar_shortest_path(self.graph, source, goal_fn=goal_reached, edge_cost_fn=weight_fn, estimate_cost_fn=heuristic_function) path = list(path) elif algorithm == "bellman_ford": path = rx.bellman_ford_shortest_paths(self.graph, source, target=target, weight_fn=weight_fn) path = list(path[target]) else: raise AlgorithmNotImplementedError(algorithm, self.__class__.__name__) return self._ensure_path_endpoints(path, source, target) except rx.NoPathFound: raise NoPathFoundError(source=source, target=target) def _compute_single_source_multiple_targets( self, source: Node, targets: NodeList, algorithm: str, **kwargs ) -> NodePathList: """ Computes shortest paths from a single source to multiple targets. Parameters: source: Source node identifier targets: List of target node identifiers algorithm: Algorithm to use for computation kwargs: Additional algorithm-specific parameters Returns: List of paths from the source to each target """ paths = [] for target in targets: try: path = self._compute_single_path(source, target, algorithm, **kwargs) paths.append(path) except NoPathFoundError: paths.append([]) return paths def _all_pairs_shortest_path( self, sources: NodeList, targets: NodeList, algorithm: str, **kwargs ) -> NodePathList: """ Computes shortest paths between all pairs of sources and targets. Parameters: sources: List of source node identifiers targets: List of target node identifiers algorithm: Algorithm to use for computation kwargs: Additional algorithm-specific parameters Returns: List of paths for all source-target combinations """ paths = [] for source in sources: for target in targets: try: path = self._compute_single_path(source, target, algorithm, **kwargs) paths.append(path) except NoPathFoundError: paths.append([]) return paths