"""
This file contains the abstract base class for the interface to the graph libraries.
All specific graph library interfaces should inherit from this class. The workflow of
the specific interfaces are determined by the respective graph library. The workflow
of the graph libraries can vary!
- For rustworkx and igraph the nodes need to be created before the edges can be added
- For networkit and networkx the edges can be added on the fly when adding the nodes
- For rustworkx and igraph the edges can only be added as a list of tuples. This means
that the edge information as retrieved by numpy arrays, need to be converted into a
list, which leads to a much higher (more than double) memory usage!
- For networkit and networkx edges can be added as a sparse matrix or as numpy arrays
Please see the specific interfaces to the specific graph libraries for more details!
"""
from typing import Optional, Any, Union, List
from abc import abstractmethod
import numpy as np
from time import time
from .graph_api import GraphAPI
from pyorps.core.exceptions import NoPathFoundError, PairwiseError, PairwiseError
from pyorps.core.types import SourceTargetType, Node, NodeList, NodePathList
from pyorps.utils.traversal import construct_edges
[docs]
class GraphLibraryAPI(GraphAPI):
"""
Base class for all graph library-based APIs.
This class extends GraphAPI with common functionality needed by standard graph
libraries that require edge data to be explicitly provided and a graph to be
constructed.
"""
def __init__(
self,
raster_data: np.ndarray[int],
steps: np.ndarray[int],
from_nodes: Optional[np.ndarray] = None,
to_nodes: Optional[np.ndarray] = None,
cost: Optional[np.ndarray] = None,
ignore_max: Optional[bool] = True,
**kwargs
):
"""
Initialize the graph library API.
Parameters:
raster_data: 2D numpy array representing the raster
steps: Array defining the neighborhood connections
from_nodes: Source node indices for edges
to_nodes: Target node indices for edges
cost: Edge weights
ignore_max: Ignore edges whose weights are equal to the maximum value in
the raster data
"""
super().__init__(raster_data, steps)
self.edge_construction_time = 0.0
if from_nodes is None or to_nodes is None:
before_constructing_edge_data = time()
from_nodes, to_nodes, cost = construct_edges(
self.raster_data,
self.steps,
ignore_max
)
self.edge_construction_time = time() - before_constructing_edge_data
before_graph_creation = time()
self.graph = self.create_graph(from_nodes, to_nodes, cost, **kwargs)
self.graph_creation_time = time() - before_graph_creation
[docs]
@staticmethod
def _ensure_path_endpoints(
path: list[int],
source: int,
target: int
) -> list[int]:
"""
Ensures the path starts with the source node and ends with the target node.
Parameters:
path: List of node IDs representing a path
source: ID of the source node that should be at the start of the path
target: ID of the target node that should be at the end of the path
Returns:
list of node IDs with source and target at endpoints if needed
"""
if len(path) > 0:
if path[0] != source:
path.insert(0, source)
if path[-1] != target:
path.append(target)
return path
[docs]
@abstractmethod
def create_graph(
self,
from_nodes: np.ndarray[int],
to_nodes: np.ndarray[int],
cost: Optional[np.ndarray[int]] = None,
**kwargs
) -> Any:
"""
Creates a graph object with the graph library specified in the selected interface.
Parameters:
from_nodes: The starting node indices from the edge data
to_nodes: The ending node indices from the edge data
cost: The weight of the edge data
kwargs: Additional parameters for the underlying graph library
Returns:
The graph object
"""
[docs]
@abstractmethod
def get_number_of_nodes(self) -> int:
"""
Returns the number of nodes in the graph.
Returns:
The number of nodes
"""
[docs]
@abstractmethod
def get_number_of_edges(self) -> int:
"""
Returns the number of edges in the graph.
Returns:
The number of edges
"""
[docs]
@abstractmethod
def remove_isolates(self) -> None:
"""
If the graph object was initialized with the maximum number of nodes, this
function helps to reduce the occupied memory by removing nodes without any
edge (degree == 0).
Returns:
None
"""
[docs]
@abstractmethod
def get_nodes(self) -> Union[List[int], np.ndarray]:
"""
This method returns the nodes in the graph as a list or numpy array of node
indices.
Returns:
List or array of node indices of the nodes in the graph
"""
[docs]
def shortest_path(
self,
source_indices: Optional[SourceTargetType],
target_indices: Optional[SourceTargetType],
algorithm: str = "dijkstra",
**kwargs
) -> Union[NodeList, NodePathList]:
"""
This method applies the specified shortest path algorithm on the created graph
object and finds the shortest path between source(s) and target(s) as a list of
node indices.
Parameters:
source_indices: Index or indices of source node(s) (int or list[int])
target_indices: Index or indices of target node(s) (int or list[int])
algorithm: Algorithm to use for shortest path computation.
Options depend on the specific library implementation.
kwargs: Additional parameters for specific algorithms library including
specific keywords and:
"pairwise": If True, compute pairwise shortest paths between
source_indices and target_indices.
Only allowed if len(source_indices) == len(target_indices)
Returns:
List of node indices representing the shortest path(s)
"""
source_has_len = hasattr(source_indices, '__len__')
target_has_len = hasattr(target_indices, '__len__')
# Single source, single target
if not source_has_len and not target_has_len:
return self._compute_single_path(source_indices,
target_indices,
algorithm,
**kwargs)
# Single source, multiple targets
elif not source_has_len and target_has_len:
return self._compute_single_source_multiple_targets(source_indices,
target_indices,
algorithm,
**kwargs)
# Multiple sources, single target
elif source_has_len and not target_has_len:
paths = self._compute_single_source_multiple_targets(target_indices,
source_indices,
algorithm,
**kwargs)
return [p[::-1] for p in paths]
# Multiple sources, multiple targets (all pairs or pairwise)
else:
# Check for pairwise computation
pairwise = kwargs.get('pairwise', False)
if pairwise:
if len(source_indices) != len(target_indices):
raise PairwiseError()
return self._pairwise_shortest_path(source_indices, target_indices,
algorithm, **kwargs)
else:
return self._all_pairs_shortest_path(source_indices, target_indices,
algorithm, **kwargs)
[docs]
@abstractmethod
def _compute_single_path(
self,
source: Node,
target: Node,
algorithm: str,
**kwargs
) -> NodeList:
"""
Computes shortest path between a single source and target.
Parameters:
source: Source node identifier
target: Target node identifier
algorithm: Algorithm to use for computation
kwargs: Additional algorithm-specific parameters
Returns:
List of node identifiers representing the shortest path
"""
[docs]
@abstractmethod
def _compute_single_source_multiple_targets(
self,
source: Node,
targets: NodeList,
algorithm: str,
**kwargs
) -> NodePathList:
"""
Computes shortest paths from a single source to multiple targets.
Parameters:
source: Source node identifier
targets: List of target node identifiers
algorithm: Algorithm to use for computation
kwargs: Additional algorithm-specific parameters
Returns:
List of paths from the source to each target
"""
[docs]
def _pairwise_shortest_path(
self,
sources: NodeList,
targets: NodeList,
algorithm: str,
**kwargs
) -> NodePathList:
"""
Default implementation for pairwise shortest path computation.
Subclasses can override this for library-specific optimizations.
Parameters:
sources: List of source node identifiers
targets: List of target node identifiers
algorithm: Algorithm to use for computation
kwargs: Additional algorithm-specific parameters
Returns:
List of paths, each connecting corresponding source-target pairs
"""
paths = []
for source, target in zip(sources, targets):
try:
path = self._compute_single_path(source, target, algorithm, **kwargs)
paths.append(path)
except NoPathFoundError:
paths.append([])
return paths
[docs]
def _compute_all_pairs_shortest_paths(
self,
sources: NodeList,
targets: NodeList,
algorithm: str,
**kwargs
) -> NodePathList:
"""
Computes paths individually for each source-target pair using the specified
algorithm. Returns empty paths for unreachable targets.
Parameters:
sources: List of source node identifiers
targets: List of target node identifiers
algorithm: Algorithm to use for computation
kwargs: Additional algorithm-specific parameters
Returns:
List of paths for all source-target combinations
"""
paths = []
for source in sources:
for target in targets:
try:
path = self._compute_single_path(source, target, algorithm,
**kwargs)
paths.append(path)
except NoPathFoundError:
paths.append([])
return paths
[docs]
@abstractmethod
def _all_pairs_shortest_path(
self,
sources: NodeList,
targets: NodeList,
algorithm: str,
**kwargs
) -> NodePathList:
"""
Computes shortest paths between all pairs of sources and targets.
Parameters:
sources: List of source node identifiers
targets: List of target node identifiers
algorithm: Algorithm to use for computation.
kwargs: Additional algorithm-specific parameters
Returns:
List of paths for all source-target combinations
"""
[docs]
def get_a_star_heuristic(
self,
target: Node,
source: Optional[Node] = None,
**kwargs
) -> tuple[np.ndarray, np.ndarray]:
"""
Calculate the A* heuristic based on the Euclidean distance from the target node.
Parameters:
target: The index of the target node in the raster data
source: Optional source node for calculating area-specific minimum values
kwargs: Additional parameters, including optional heu_weight for scaling
Returns:
tuple containing:
- An array of node indices in the graph
- An array of heuristic values corresponding to each node
"""
# Retrieve the current nodes in the graph
nodes = self.get_nodes()
# Convert node indices to 2D coordinates (x, y) based on the raster data shape
x_nodes, y_nodes = np.unravel_index(nodes, self.raster_data.shape)
# Convert the target index to its corresponding 2D coordinates
x_target, y_target = np.unravel_index(target, self.raster_data.shape)
# Calculate the Euclidean distance from each node to the target node
x_square = np.power(x_target - x_nodes, 2)
y_square = np.power(y_target - y_nodes, 2)
euclidean_distance = np.sqrt(x_square + y_square)
# Use localized min value between source and target
if source is not None:
buffer_radius = kwargs.get('buffer_radius', 200)
# Convert the source index to its 2D coordinates
x_source, y_source = np.unravel_index(source, self.raster_data.shape)
# Create a bounding box around the source-target line with buffer
min_x = max(0, min(x_source, x_target) - buffer_radius)
max_x = min(self.raster_data.shape[0] - 1,
max(x_source, x_target) + buffer_radius)
min_y = max(0, min(y_source, y_target) - buffer_radius)
max_y = min(self.raster_data.shape[1] - 1,
max(y_source, y_target) + buffer_radius)
# Extract the subset of raster data within the bounding box
subset_data = self.raster_data[min_x:max_x + 1, min_y:max_y + 1]
# Use minimum value in the area
min_value = np.min(subset_data)
else:
min_value = self.raster_data.min()
# Calculate the heuristic by scaling the Euclidean distance
heuristic = euclidean_distance * min_value
# Apply any heuristic weight scaling
if 'heu_weight' in kwargs:
heuristic *= kwargs['heu_weight']
return nodes, heuristic
[docs]
def get_advanced_a_star_heuristic(
self,
target: Node,
source: Optional[Node] = None,
**kwargs
) -> tuple[np.ndarray, np.ndarray]:
"""
Calculate the A* heuristic based on the Euclidean distance from the target node.
Parameters:
target: The index of the target node in the raster data
source: Optional source node for calculating area-specific minimum values
kwargs: Additional parameters, including optional heu_weight for scaling
Returns:
tuple containing:
- An array of node indices in the graph
- An array of heuristic values corresponding to each node
"""
# Retrieve the current nodes in the graph
nodes = self.get_nodes()
# Convert node indices to 2D coordinates (x, y) based on the raster data shape
x_nodes, y_nodes = np.unravel_index(nodes, self.raster_data.shape)
# Convert the target index to its corresponding 2D coordinates
x_target, y_target = np.unravel_index(target, self.raster_data.shape)
# Calculate the Euclidean distance from each node to the target node
euclidean_distance = np.sqrt(
np.square(x_target - x_nodes) + np.square(y_target - y_nodes)
)
min_value = None
# Use Bresenham's algorithm to find cells along the source-target line
if source is not None:
# Convert the source index to its 2D coordinates
x_source, y_source = np.unravel_index(source, self.raster_data.shape)
# Get line coordinates using optimized Bresenham implementation
line_coords = self._vectorized_bresenham(x_source, y_source, x_target,
y_target)
# If buffer is requested, expand the line cells
if buffer_radius := kwargs.get('buffer_radius', 0):
# Create a mask of the entire raster to track which pixels are in the
# buffer. This avoids duplicates without needing to use np.unique
mask = np.zeros(self.raster_data.shape, dtype=bool)
# Extract x and y coordinates from line_coords
x_line, y_line = line_coords[:, 0], line_coords[:, 1]
# Create buffer offsets using meshgrid
y_offsets, x_offsets = np.meshgrid(
np.arange(-buffer_radius, buffer_radius + 1),
np.arange(-buffer_radius, buffer_radius + 1)
)
# Reshape to 1D arrays
x_offsets = x_offsets.flatten()
y_offsets = y_offsets.flatten()
# For each line point, add a buffer around it
for x, y in zip(x_line, y_line):
# Calculate all buffer coordinates at once
buffer_x = x + x_offsets
buffer_y = y + y_offsets
# Filter out coordinates outside the raster bounds
valid_indices = (
(buffer_x >= 0) & (buffer_x < self.raster_data.shape[0]) &
(buffer_y >= 0) & (buffer_y < self.raster_data.shape[1])
)
# Mark valid buffer pixels in the mask
mask[buffer_x[valid_indices], buffer_y[valid_indices]] = True
# Get coordinates of all marked pixels
buffer_coords = np.column_stack(np.where(mask))
else:
buffer_coords = line_coords
# Extract values from the raster data
cell_values = self.raster_data[buffer_coords[:, 0], buffer_coords[:, 1]]
# Otherwise just use minimum value in the area
min_value = np.min(cell_values)
# If no source provided or no valid min_value was calculated, fall back to
# global minimum
if min_value is None:
min_value = self.raster_data.min()
# Calculate the heuristic by scaling the Euclidean distance
heuristic = euclidean_distance * min_value
# Apply any heuristic weight scaling
if 'heu_weight' in kwargs:
heuristic *= kwargs['heu_weight']
return nodes, heuristic
[docs]
def _vectorized_bresenham(self, x0: int, y0: int, x1: int, y1: int) -> np.ndarray:
"""
Optimized implementation of Bresenham's line algorithm that avoids generating
duplicate coordinates.
Parameters:
x0: x-coordinate of the first point
y0: y-coordinate of the first point
x1: x-coordinate of the second point
y1: y-coordinate of the second point
Returns:
Array of (x,y) coordinates of cells that the line passes through
"""
# Determine if the line is steep (more vertical than horizontal)
steep = abs(y1 - y0) > abs(x1 - x0)
# If steep, swap x and y coordinates to ensure we step along the longer
# dimension
if steep:
x0, y0 = y0, x0
x1, y1 = y1, x1
# Ensure we always iterate from left to right
if x0 > x1:
x0, x1 = x1, x0
y0, y1 = y1, y0
# Calculate differences and step direction
dx = x1 - x0
dy = abs(y1 - y0)
y_step = 1 if y0 < y1 else -1
# Calculate points along the line using the Bresenham algorithm
# Pre-allocate arrays for the coordinates (one point per x step guarantees no
# duplicates)
n_points = dx + 1
x_coords = np.arange(x0, x1 + 1, dtype=np.int32)
y_coords = np.zeros(n_points, dtype=np.int32)
# Calculate y-coordinates based on error terms
error = 0
y = y0
for i in range(n_points):
y_coords[i] = y
error += dy
if error * 2 >= dx:
y += y_step
error -= dx
# Combine into coordinate pairs and handle the steep case
if steep:
return np.column_stack((y_coords, x_coords))
else:
return np.column_stack((x_coords, y_coords))