"""
PYORPS: An Open-Source Tool for Automated Power Line Routing
Reference:
[1] Hofmann, M., Stetz, T., Kammer, F., Repo, S.: 'PYORPS: An Open-Source Tool for
Automated Power Line Routing', CIRED 2025 - 28th Conference and Exhibition on
Electricity Distribution, 16 - 19 June 2025, Geneva, Switzerland
"""
from typing import Optional, Any
from dataclasses import dataclass
from numpy import ndarray
from shapely.geometry import LineString
from pyorps.core.types import CoordinateTuple, NodeList, CoordinateList
[docs]
@dataclass
class Path:
"""
Dataclass representing a path in a raster graph. Used as container for all path
metrics and information.
"""
source: CoordinateTuple
target: CoordinateTuple
algorithm: str
graph_api: str
path_indices: NodeList
path_coords: CoordinateList
path_geometry: LineString
euclidean_distance: float
runtimes: dict[str, float]
path_id: int
search_space_buffer_m: float
neighborhood: str
# Optional metrics that may be calculated
total_length: Optional[float] = None
total_cost: Optional[float] = None
length_by_category: Optional[dict[float, float]] = None
length_by_category_percent: Optional[dict[float, float]] = None
[docs]
def to_geodataframe_dict(self) -> dict:
"""
Convert Path object to a dictionary suitable for GeoDataFrame creation.
Returns:
dictionary with path data formatted for GeoDataFrame
"""
# Add runtime information
result = {f"runtime_{key}": value for key, value in self.runtimes.items()}
# Basic path information
result.update({
"path_id": self.path_id,
"source": str(self.source),
"target": str(self.target),
"algorithm": self.algorithm,
"graph_api": self.graph_api,
"geometry": self.path_geometry,
"search_space_buffer_m": self.search_space_buffer_m,
"euclidean_distance": self.euclidean_distance,
"neighborhood": self.neighborhood,
})
# Add metrics if they exist
if self.total_length is not None:
result["path_length"] = self.total_length
result["path_cost"] = self.total_cost
# Add length by category columns if available
if self.length_by_category:
for category, length in self.length_by_category.items():
result[f"length_cost_{category}"] = length
lbc = self.length_by_category_percent[category]
result[f"percent_cost_{category}"] = lbc
return result
[docs]
def __str__(self) -> str:
"""
Return a string representation of the path including the path_id, source and
target, as well as the path's total length and total cost.
Returns:
A string representation of the path.
"""
result = f"Path(id={self.path_id}, source={self.source}, target={self.target}"
if self.total_length is not None:
result += f", length_m={self.total_length:.2f}"
if self.total_cost is not None:
result += f", cost={self.total_cost:.2f}"
if "runtime_total" in self.runtimes:
result += f", runtime_total={self.total_cost:.2f}"
result += ")"
return result
[docs]
def __repr__(self) -> str:
"""
Return a detailed string representation of the path.
"""
return str(self)
[docs]
def __eq__(self, other: Any) -> bool:
"""
Check for equality between two paths.
"""
equal = True
same_source = self.source.__eq__(other.source)
if isinstance(same_source, ndarray):
equal &= same_source.all()
else:
equal &= same_source
same_target = self.target.__eq__(other.target)
if isinstance(same_target, ndarray):
equal &= same_target.all()
else:
equal &= same_target
equal &= all(pi in self.path_indices for pi in other.path_indices)
equal &= self.euclidean_distance.__eq__(other.euclidean_distance)
equal &= self.search_space_buffer_m.__eq__(other.search_space_buffer_m)
equal &= self.neighborhood.__eq__(other.neighborhood)
return equal
[docs]
class PathCollection:
"""
Container for Path objects with O(1) retrieval by path ID and O(n) lookup for
source and target information. Paths can be added with new id by replacing a Path
object with the same ID already existing in th PathCollection.
"""
_paths: dict[int, Path]
_next_id: int
[docs]
def __init__(self):
"""
Create an empty PathCollection for collecting Paths with their IDs in a
dictionary.
"""
self._paths = {} # dictionary with path_id as keys for O(1) lookup
self._next_id = 0 # Track the next available path ID
[docs]
def add(self, path: Path, replace: bool = False) -> None:
"""
Add a path to the PathCollection. If the Path's path_id is None or if replace is
False, the path_id of the Path object will set to self._next_id and
self._next_id will be incremented. If the Path's path_id is not None and
replace is True, a Path with the same path_id (if present) will be replaced
with the new Path object.
Parameters:
path: A Path object which should be added to the PathCollection.
replace: Whether to replace an existing Path object with the same path_id
(if present) or not.
"""
if path.path_id is None or not replace:
path.path_id = self._next_id
self._next_id += 1
else:
# If an explicit path_id is provided, update _next_id if needed
self._next_id = max(self._next_id, path.path_id + 1)
self._paths[path.path_id] = path
[docs]
def get(
self,
path_id: int = None,
source: Any = None,
target: Any = None
) -> Optional[Path]:
"""
Retrieve a stored path by ID, or by source AND target.
Parameters:
path_id: The ID of the Path object to retrieve (must be None if path
should be found by source and target)
source: The source Path object to retrieve (only used if path_id is None
and target os set too; neglected otherwise)
target: The target Path object to retrieve (only used if path_id is None
and target os set too; neglected otherwise)
Returns:
The Path object with the specified ID or source/target pair. None if no
such path exists.
"""
if path_id is not None:
# O(1) lookup by ID
return self._paths.get(path_id)
if source is not None and target is not None:
# O(n) lookup by source AND target - still need to iterate
for path in self._paths.values():
if path.source == source and path.target == target:
return path
# If criteria not met or path not found, return None
return None
[docs]
def to_geodataframe_records(self) -> list:
"""
Convert all paths to a list of dictionaries suitable for a GeoDataFrame.
Returns:
List of dictionaries with path data formatted for a GeoDataFrame
"""
return [path.to_geodataframe_dict() for path in self._paths.values()]
[docs]
def __iter__(self):
"""
Iterate through all paths in the PathCollection.
"""
return iter(self._paths.values())
[docs]
def __len__(self):
"""
Return the number of paths in the PathCollection.
"""
return len(self._paths)
[docs]
def __getitem__(self, path_id):
"""
Get path by path_id of the Path object from the PathCollection.
"""
return self._paths[path_id]
[docs]
def __str__(self) -> str:
"""
Return a string representation of the path collection.
"""
if len(self._paths) <= 5:
paths_str = ""
for path in self._paths:
if paths_str != "":
paths_str += ",\n"
paths_str += str(path)
else:
# Show first 2 paths and last path for large collections
paths_str = (f"\n\t{str(self.all[0])},"
f"\n\t{str(self.all[1])},"
f"\n\t ..., "
f"\n\t{str(self.all[-1])}")
return f"PathCollection(paths=[{paths_str}], count={len(self._paths)})"
[docs]
def __repr__(self) -> str:
"""
Return a detailed string representation of the path collection.
"""
if len(self._paths) <= 5:
paths_repr = ""
for path in self._paths:
if paths_repr != "":
paths_repr += ",\n"
paths_repr += repr(path)
else:
# Show first 2 paths and last path for large collections
paths_repr = (f"\n\t{repr(self.all[0])},"
f"\n\t{repr(self.all[1])},"
f"\n\t ..., "
f"\n\t{repr(self.all[-1])}")
return f"PathCollection(paths=[{paths_repr}], count={len(self._paths)})"
@property
def all(self):
"""
Return all Path objects from the values of the PathCollection's _paths
dictionary as a list.
Returns:
A list of all Path objects in the PathCollection.
"""
return list(self._paths.values())
[docs]
def __eq__(self, other) -> bool:
"""
Check if PathCollections are equal. They do not have to be in the same order
to be equal!
"""
return all(any(o == p for p in self.all) for o in other.all)