"""
PYORPS: An Open-Source Tool for Automated Power Line Routing
Reference:
[1] Hofmann, M., Stetz, T., Kammer, F., Repo, S.: 'PYORPS: An Open-Source Tool for
Automated Power Line Routing', CIRED 2025 - 28th Conference and Exhibition on
Electricity Distribution, 16 - 19 June 2025, Geneva, Switzerland
"""
from typing import Tuple, Union
import numpy as np
import numba as nb
# Define Numba types for clarity and performance optimization
pyint_type = nb.types.intp
int8_type = nb.types.int8
uint16_type = nb.types.uint16
int32_type = nb.types.int32
uint32_type = nb.types.uint32
float64_type = nb.types.float64
boolean_type = nb.types.boolean
# Array types - using contiguous arrays for best performance
int8_2d_array = nb.types.Array(int8_type, 2, 'A')
uint16_2d_array = nb.types.Array(uint16_type, 2, 'A')
uint8_2d_array = nb.types.Array(nb.types.uint8, 2, 'A')
int32_1d_array = nb.types.Array(int32_type, 1, 'A')
uint32_1d_array = nb.types.Array(uint32_type, 1, 'A')
float64_1d_array = nb.types.Array(float64_type, 1, 'A')
uint16_1d_array_c = nb.types.Array(uint16_type, 1, 'C')
float64_1d_array_c = nb.types.Array(float64_type, 1, 'C')
[docs]
@nb.njit(float64_type(int8_type, int8_type, pyint_type), cache=True,
fastmath=True)
def get_cost_factor_numba(dr: int8_type, dc: int8_type,
intermediates_count: pyint_type) -> float64_type:
"""
Calculate the cost factor for an edge based on its geometric length.
The cost factor normalizes edge weights by distributing the Euclidean
distance over all cells involved in the traversal (source, target, and
intermediates).
Parameters:
dr (int): Row difference between source and target
dc (int): Column difference between source and target
intermediates_count (int): Number of intermediate cells traversed
Returns:
float: Cost factor for edge weight calculation
References:
[1]
"""
# Calculate Euclidean distance using Pythagorean theorem
distance = np.sqrt(dr * dr + dc * dc)
# Normalize by total number of cells (source + target + intermediates)
divisor = 2.0 + intermediates_count
return distance / divisor
[docs]
@nb.njit(uint32_type(pyint_type, pyint_type, pyint_type),
cache=True, fastmath=True)
def ravel_index(row: pyint_type,
col: pyint_type,
cols: pyint_type) -> uint32_type:
"""
Convert 2D grid coordinates to 1D linear index.
This is a high-performance replacement for np.ravel_multi_index optimized
for regular grid indexing operations in graph construction.
Parameters:
row (int): Row coordinate in the grid
col (int): Column coordinate in the grid
cols (int): Total number of columns in the grid
Returns:
int: Linear index corresponding to the 2D coordinates
"""
return uint32_type(row * cols + col)
@nb.njit(uint32_1d_array(int8_type, pyint_type, int8_type, pyint_type),
cache=True, fastmath=True)
def _calculate_target_region_bounds(dr, rows, dc, cols):
# Calculate target region bounds based on step direction
if dr > 0:
t_rows_start, t_rows_end = dr, rows
else:
t_rows_start, t_rows_end = 0, rows + dr if dr != 0 else rows
if dc > 0:
t_cols_start, t_cols_end = dc, cols
else:
t_cols_start, t_cols_end = 0, cols + dc if dc != 0 else cols
return np.array([t_rows_start, t_rows_end, t_cols_start, t_cols_end],
dtype=np.uint32)
@nb.njit(uint32_1d_array(int8_type, pyint_type, int8_type, pyint_type),
cache=True, fastmath=True)
def _calculate_source_region_bounds(dr, rows, dc, cols):
# Calculate source region bounds based on step direction
if dr > 0:
s_rows_start, s_rows_end = 0, rows - dr
else:
s_rows_start, s_rows_end = abs(dr) if dr != 0 else 0, rows
if dc > 0:
s_cols_start, s_cols_end = 0, cols - dc
else:
s_cols_start, s_cols_end = abs(dc) if dc != 0 else 0, cols
return np.array([s_rows_start, s_rows_end, s_cols_start, s_cols_end],
dtype=np.uint32)
[docs]
@nb.njit(uint32_1d_array(int8_type, int8_type, pyint_type, pyint_type),
cache=True, fastmath=True)
def calculate_region_bounds(dr: int8_type, dc: int8_type,
rows: pyint_type, cols: pyint_type) -> uint32_1d_array:
"""
Calculate valid region bounds for source and target areas in graph construction.
This function determines the valid coordinate ranges for source and target
nodes when creating edges with the given step direction, ensuring all
coordinates remain within grid boundaries.
Parameters:
dr (int): Row step direction
dc (int): Column step direction
rows (int): Total number of rows in the grid
cols (int): Total number of columns in the grid
Returns:
np.ndarray: Array containing bounds for source and target regions
References:
[1]
"""
source_region_bounds = _calculate_source_region_bounds(dr, rows, dc, cols)
return source_region_bounds
[docs]
@nb.njit(boolean_type(
pyint_type, pyint_type, pyint_type, pyint_type,
uint8_2d_array, int8_2d_array, uint16_2d_array,
pyint_type, pyint_type, float64_1d_array), cache=True, fastmath=True)
def is_valid_node(sr: pyint_type, sc: pyint_type, tr: pyint_type, tc: pyint_type,
exclude_mask: uint8_2d_array, intermediates: int8_2d_array,
raster: uint16_2d_array, rows: uint8_2d_array, cols: uint8_2d_array,
out_cost: float64_1d_array) -> boolean_type:
"""
Check if a node transition is valid and calculate its traversal cost.
This function validates that a path from source to target coordinates is
feasible by checking boundary conditions, exclusion masks, and intermediate
cell validity. It also calculates the total cost for traversing this path.
Parameters:
sr (int): Source row coordinate
sc (int): Source column coordinate
tr (int): Target row coordinate
tc (int): Target column coordinate
exclude_mask (np.ndarray): Binary mask indicating forbidden areas
intermediates (np.ndarray): Array of intermediate step coordinates
raster (np.ndarray): Cost raster with terrain/construction costs
rows (int): Total number of rows in the grid
cols (int): Total number of columns in the grid
out_cost (np.ndarray): Output array to store calculated cost
Returns:
bool: True if the node transition is valid, False otherwise
References:
[1]
"""
# Check if source or target coordinates are out of bounds
if (sr < 0 or sr >= rows or sc < 0 or sc >= cols or tr < 0 or tr >= rows or
tc < 0 or tc >= cols):
return False
# Skip if source or target is in forbidden area
if exclude_mask[sr, sc] == 0 or exclude_mask[tr, tc] == 0:
return False
cost = 0.0
# Check intermediate points and accumulate cost
for i in range(intermediates.shape[0]):
ir = sr + intermediates[i, 0]
ic = sc + intermediates[i, 1]
# Check if intermediate point is valid
if (ir < 0 or ir >= rows or ic < 0 or ic >= cols or
exclude_mask[ir, ic] == 0):
return False
# Add intermediate traversal cost
cost += raster[ir, ic]
# Add source and target costs to total
cost += raster[sr, sc] + raster[tr, tc]
# Store the calculated cost
out_cost[0] = cost
return True
[docs]
@nb.njit(nb.types.Tuple((uint32_1d_array, uint32_1d_array,
float64_1d_array, pyint_type))
(int8_type, int8_type, pyint_type, pyint_type, pyint_type, pyint_type,
uint8_2d_array, uint16_2d_array, int8_2d_array, pyint_type, pyint_type,
float64_type, pyint_type),
cache=True, parallel=True, fastmath=True)
def find_valid_nodes(dr: int8_type, dc: int8_type,
s_rows_start: pyint_type, s_rows_end: pyint_type,
s_cols_start: pyint_type, s_cols_end: pyint_type,
exclude_mask: uint8_2d_array, raster: uint16_2d_array,
intermediates: int8_2d_array, rows: pyint_type, cols: pyint_type,
cost_factor: float64_type, max_nodes: pyint_type
) -> nb.types.Tuple((uint32_1d_array, uint32_1d_array,
float64_1d_array, pyint_type)):
"""
Find all valid node transitions for a given step direction within bounds.
This function systematically searches through a defined region to identify
all valid edges (node transitions) for a specific step direction,
calculating their costs and storing them for graph construction.
Parameters:
dr (int): Row step direction
dc (int): Column step direction
s_rows_start (int): Starting row for source region search
s_rows_end (int): Ending row for source region search
s_cols_start (int): Starting column for source region search
s_cols_end (int): Ending column for source region search
exclude_mask (np.ndarray): Binary mask indicating forbidden areas
raster (np.ndarray): Cost raster with terrain/construction costs
intermediates (np.ndarray): Array of intermediate step coordinates
rows (int): Total number of rows in the grid
cols (int): Total number of columns in the grid
cost_factor (float): Cost normalization factor for this step direction
max_nodes (int): Maximum number of valid nodes to find
Returns:
Tuple[np.ndarray, np.ndarray, np.ndarray, int]: Edge data and count
References:
[1]
"""
# Pre-allocate arrays for maximum possible valid nodes
max_valid_nodes = min((s_rows_end - s_rows_start) *
(s_cols_end - s_cols_start), max_nodes)
from_nodes = np.zeros(max_valid_nodes, dtype=np.uint32)
to_nodes = np.zeros(max_valid_nodes, dtype=np.uint32)
costs = np.zeros(max_valid_nodes, dtype=np.float64)
valid_count = 0
cost_temp = np.zeros(1, dtype=np.float64)
dr_int, dc_int = int(dr), int(dc)
# Systematically search through the defined region
for sr in range(s_rows_start, s_rows_end):
for sc in range(s_cols_start, s_cols_end):
tr = sr + dr_int
tc = sc + dc_int
# Check validity and get cost for this node transition
if is_valid_node(sr, sc, tr, tc, exclude_mask, intermediates,
raster, rows, cols, cost_temp):
if valid_count < max_valid_nodes: # Prevent array overflow
# Store linear indices and normalized cost
from_nodes[valid_count] = ravel_index(sr, sc, cols)
to_nodes[valid_count] = ravel_index(tr, tc, cols)
costs[valid_count] = cost_temp[0] * cost_factor
valid_count += 1
else:
break
# Return only the valid entries
return (from_nodes[:valid_count], to_nodes[:valid_count],
costs[:valid_count], valid_count)
[docs]
@nb.njit(uint32_type(uint32_type, uint32_type, int8_2d_array), fastmath=True,
cache=True)
def get_max_number_of_edges(n: uint32_type,
m: uint32_type,
steps: int8_2d_array) -> uint32_type:
"""
Calculate the maximum number of edges for a given raster shape and neighborhood.
This function estimates the upper bound on the number of edges that will be
created when converting a raster into a graph using the specified
neighborhood steps. This is used for memory pre-allocation optimization.
Parameters:
n (int): Number of rows in the raster
m (int): Number of columns in the raster
steps (np.ndarray): Array of neighborhood step directions
Returns:
int: Maximum possible number of edges
References:
[1]
"""
max_nr_of_edges = 0
for step_idx in range(steps.shape[0]):
dr = steps[step_idx, 0]
dc = steps[step_idx, 1]
# Calculate valid region size for this step direction
max_nr_of_edges = (max_nr_of_edges + (n - uint32_type(abs(dr))) *
(m - uint32_type(abs(dc))))
return max_nr_of_edges
[docs]
@nb.njit(nb.types.Tuple((uint32_1d_array, uint32_1d_array, float64_1d_array))
(uint16_2d_array, int8_2d_array, nb.types.boolean),
parallel=True, cache=True, fastmath=True)
def construct_edges(raster: uint16_2d_array,
steps: int8_2d_array,
ignore_max: nb.types.boolean = True
) -> nb.types.Tuple((uint32_1d_array,
uint32_1d_array,
float64_1d_array)):
"""
Construct graph edges from rasterized geodata using specified neighborhood steps.
This is the main function for converting rasterized cost data into a
weighted graph representation suitable for least-cost path analysis. It
processes each step direction in the neighborhood to create edges between
valid grid cells.
Parameters:
raster (np.ndarray): 2D cost raster representing terrain/construction costs
steps (np.ndarray): Array of neighborhood step directions (Rk neighborhood)
ignore_max (bool): If True, treats maximum cost values as forbidden areas
Returns:
Tuple[np.ndarray, np.ndarray, np.ndarray]: Complete edge list for graph
References:
[1]
"""
rows, cols = raster.shape
nr_of_edges = get_max_number_of_edges(rows, cols, steps)
# Pre-allocate result arrays for optimal performance
from_nodes_edges = np.zeros(nr_of_edges, dtype=np.uint32)
to_nodes_edges = np.zeros(nr_of_edges, dtype=np.uint32)
cost_edges = np.zeros(nr_of_edges, dtype=np.float64)
last_index = 0
# Create exclusion mask for forbidden areas
if ignore_max:
# Maximum uint16 value represents forbidden areas
max_cost = np.iinfo(np.uint16).max
exclude_mask = np.zeros((rows, cols), dtype=np.uint8)
for i in range(rows):
for j in range(cols):
exclude_mask[i, j] = 1 if raster[i, j] != max_cost else 0
else:
exclude_mask = np.ones((rows, cols), dtype=np.uint8)
# Process each step direction in the neighborhood
for step_idx in range(steps.shape[0]):
dr = steps[step_idx, 0]
dc = steps[step_idx, 1]
# Calculate intermediate steps for this direction
intermediates = intermediate_steps_numba(dr, dc)
# Determine valid regions for source and target nodes
bounds = calculate_region_bounds(dr, dc, rows, cols)
s_rows_start, s_rows_end, s_cols_start, s_cols_end = bounds[:4]
# Calculate cost normalization factor
cost_factor = get_cost_factor_numba(dr, dc, intermediates.shape[0])
# Calculate remaining array capacity
remaining = nr_of_edges - last_index
# Find all valid edges for this step direction
from_nodes, to_nodes, costs, valid_count = find_valid_nodes(
dr, dc, s_rows_start, s_rows_end, s_cols_start, s_cols_end,
exclude_mask, raster, intermediates, rows, cols, cost_factor,
remaining)
# Add valid edges to result arrays
if valid_count > 0:
end_idx = last_index + valid_count
from_nodes_edges[last_index:end_idx] = from_nodes
to_nodes_edges[last_index:end_idx] = to_nodes
cost_edges[last_index:end_idx] = costs
last_index = end_idx
# Return arrays trimmed to actual edge count
return (from_nodes_edges[:last_index], to_nodes_edges[:last_index],
cost_edges[:last_index])
[docs]
@nb.njit(cache=True)
def calculate_segment_length(abs_dr: int, abs_dc: int) -> float:
"""
Calculate the geometric length of a path segment between grid cells.
This function provides optimized calculations for common step patterns
and falls back to the Pythagorean theorem for arbitrary steps.
Parameters:
abs_dr (int): Absolute row difference
abs_dc (int): Absolute column difference
Returns:
float: Euclidean length of the segment
"""
# Optimized calculations for common patterns
if abs_dr <= 1 and abs_dc <= 1:
# sqrt(2) or 1
return 1.4142135623730951 if (abs_dr == 1 and abs_dc == 1) else 1.0
elif (abs_dr == 2 and abs_dc == 1) or (abs_dr == 1 and abs_dc == 2):
return 2.236067977499789 # sqrt(5)
elif (abs_dr == 3 and abs_dc == 1) or (abs_dr == 1 and abs_dc == 3):
return 3.1622776601683795 # sqrt(10)
elif (abs_dr == 3 and abs_dc == 2) or (abs_dr == 2 and abs_dc == 3):
return 3.605551275463989 # sqrt(13)
else:
# General case using Pythagorean theorem
return np.sqrt(abs_dr * abs_dr + abs_dc * abs_dc)
[docs]
@nb.njit(nb.types.Tuple((float64_type, uint16_1d_array_c, float64_1d_array_c))
(uint16_2d_array, uint32_1d_array),
fastmath=True, parallel=True)
def calculate_path_metrics_numba(raster:uint16_2d_array,
path_indices: uint32_1d_array
) -> nb.types.Tuple((float64_type,
uint16_1d_array_c,
float64_1d_array_c)):
"""
Calculate comprehensive metrics for a power line path.
This function analyzes an optimal path found by the routing algorithm to
provide detailed statistics about path length, terrain traversed, and cost
distribution. This information is essential for power line planning and
cost estimation.
Parameters:
raster (np.ndarray): 2D cost raster representing terrain/construction costs
path_indices (np.ndarray): Array of linear indices representing the path
Returns:
Tuple[float, np.ndarray, np.ndarray]: Total length, categories, lengths
References:
[1]
"""
# Get raster dimensions for coordinate conversion
rows, cols = raster.shape
n_segments = len(path_indices) - 1
# Convert linear indices to 2D coordinates for path analysis
path_2d = np.empty((len(path_indices), 2), dtype=np.int64)
for i in nb.prange(len(path_indices)):
path_2d[i, 0] = path_indices[i] // cols # Row coordinate
path_2d[i, 1] = path_indices[i] % cols # Column coordinate
# Identify unique cost categories in the raster
categories_array = np.sort(np.unique(raster))
num_categories = len(categories_array)
# Create efficient mapping from category values to array indices
min_category = categories_array[0]
max_category = categories_array[-1]
range_size = max_category - min_category + 1
category_to_index = np.full(range_size, -1, dtype=np.int32)
for i in range(num_categories):
category_to_index[categories_array[i] - min_category] = i
# Thread-local storage to avoid race conditions in parallel processing
num_threads = nb.get_num_threads()
thread_local_lengths = np.zeros((num_threads, num_categories),
dtype=np.float64)
thread_local_total_lengths = np.zeros(num_threads, dtype=np.float64)
# Process each path segment in parallel for performance
for i in nb.prange(n_segments):
thread_id = nb.get_thread_id()
# Get segment endpoints
row, col = path_2d[i, 0], path_2d[i, 1]
next_row, next_col = path_2d[i + 1, 0], path_2d[i + 1, 1]
# Calculate step direction and segment length
dr = next_row - row
dc = next_col - col
abs_dr = abs(dr)
abs_dc = abs(dc)
segment_length = calculate_segment_length(abs_dr, abs_dc)
thread_local_total_lengths[thread_id] += segment_length
# Get all cells traversed by this segment (including intermediates)
intermediates = intermediate_steps_numba(np.int8(dr), np.int8(dc))
all_cells = np.empty((intermediates.shape[0] + 2, 2), dtype=np.int64)
# Source cell
all_cells[0, 0] = row
all_cells[0, 1] = col
# Intermediate cells
for j in range(intermediates.shape[0]):
all_cells[j + 1, 0] = row + intermediates[j, 0]
all_cells[j + 1, 1] = col + intermediates[j, 1]
# Target cell
all_cells[-1, 0] = next_row
all_cells[-1, 1] = next_col
# Distribute segment length proportionally among traversed cells
cell_length = segment_length / all_cells.shape[0]
# Accumulate length for each terrain category
for j in range(all_cells.shape[0]):
r, c = all_cells[j, 0], all_cells[j, 1]
if 0 <= r < rows and 0 <= c < cols:
category = raster[r, c]
if min_category <= category <= max_category:
map_idx = category - min_category
idx = category_to_index[map_idx]
if idx >= 0:
thread_local_lengths[thread_id, idx] += cell_length
# Combine results from all threads
total_length = 0.0
for i in range(num_threads):
total_length += thread_local_total_lengths[i]
lengths_array = np.zeros(num_categories, dtype=np.float64)
for i in range(num_threads):
for j in range(num_categories):
lengths_array[j] += thread_local_lengths[i, j]
return total_length, categories_array, lengths_array
[docs]
@nb.njit(fastmath=True, parallel=True)
def euclidean_distances_numba(raster: np.ndarray,
target_point: np.ndarray) -> np.ndarray:
"""
Calculate Euclidean distances from all points to a target point.
This function is optimized for spatial analysis and can be used for various
distance-based calculations in power line routing applications.
Parameters:
raster (np.ndarray): Array of coordinate points
target_point (np.ndarray): Target point coordinates
Returns:
np.ndarray: Array of distances from each point to the target
"""
n_points = raster.shape[0]
distances = np.empty(n_points, dtype=np.float64)
# Optimized implementation for 2D coordinates (most common case)
if raster.shape[1] == 2:
for i in nb.prange(n_points):
dx = raster[i, 0] - target_point[0]
dy = raster[i, 1] - target_point[1]
distances[i] = np.sqrt(dx * dx + dy * dy)
else:
# General case for any number of dimensions
n_dims = raster.shape[1]
for i in nb.prange(n_points):
squared_dist = 0.0
for j in range(n_dims):
diff = raster[i, j] - target_point[j]
squared_dist += diff * diff
distances[i] = np.sqrt(squared_dist)
return distances
[docs]
@nb.njit(cache=True)
def get_outgoing_edges(node_idx: int, raster: np.ndarray, steps: np.ndarray,
rows: int, cols: int,
exclude_mask: Union[np.ndarray, None] = None
) -> Tuple[np.ndarray, np.ndarray]:
"""
Get outgoing edges from a specific node for dynamic graph traversal.
This function calculates outgoing edges on-demand rather than pre-computing
the entire graph, which can be memory-efficient for large rasters or
specialized pathfinding algorithms.
Parameters:
node_idx (int): Linear index of the source node
raster (np.ndarray): 2D cost raster
steps (np.ndarray): Array of neighborhood step directions
rows (int): Number of rows in the raster
cols (int): Number of columns in the raster
exclude_mask (Union[np.ndarray, None]): Optional exclusion mask
Returns:
Tuple[np.ndarray, np.ndarray]: Target nodes and edge costs
References:
[1]
"""
# Convert linear index to 2D coordinates
row = node_idx // cols
col = node_idx % cols
# Prepare result arrays for maximum possible edges
max_edges = steps.shape[0]
to_nodes = np.zeros(max_edges, dtype=np.uint32)
costs = np.zeros(max_edges, dtype=np.float64)
edge_count = 0
# Create default exclusion mask if not provided
if exclude_mask is None:
exclude_mask = np.ones((rows, cols), dtype=np.uint8)
max_cost = np.iinfo(np.uint16).max
for i in range(rows):
for j in range(cols):
if raster[i, j] == max_cost:
exclude_mask[i, j] = 0
# Process each possible step direction
for step_idx in range(steps.shape[0]):
dr = steps[step_idx, 0]
dc = steps[step_idx, 1]
# Calculate target coordinates
tr = row + dr
tc = col + dc
# Check boundary conditions
if tr < 0 or tr >= rows or tc < 0 or tc >= cols:
continue
# Check if target is accessible
if exclude_mask[tr, tc] == 0:
continue
# Get intermediate steps and validate path
intermediates = intermediate_steps_numba(dr, dc)
valid = True
cost = raster[row, col] # Start with source cost
# Check all intermediate cells
for i in range(intermediates.shape[0]):
ir = row + intermediates[i, 0]
ic = col + intermediates[i, 1]
if (ir < 0 or ir >= rows or ic < 0 or ic >= cols or
exclude_mask[ir, ic] == 0):
valid = False
break
cost += raster[ir, ic]
if not valid:
continue
# Add target cost and calculate final edge weight
cost += raster[tr, tc]
cost_factor = get_cost_factor_numba(dr, dc, intermediates.shape[0])
# Store edge information
to_nodes[edge_count] = tr * cols + tc
costs[edge_count] = cost * cost_factor
edge_count += 1
return to_nodes[:edge_count], costs[:edge_count]