Source code for pyorps.graph.path_finder

"""
PYORPS: An Open-Source Tool for Automated Power Line Routing

Reference:
[1] Hofmann, M., Stetz, T., Kammer, F., Repo, S.: 'PYORPS: An Open-Source Tool for
    Automated Power Line Routing', CIRED 2025 - 28th Conference and Exhibition on
    Electricity Distribution, 16 - 19 June 2025, Geneva, Switzerland
"""
from time import time
from typing import Optional, Union, Any, Generator
from contextlib import contextmanager
from warnings import warn

from numpy import (array, ndarray, ravel_multi_index, unravel_index, sqrt, uint32,
                   int32, asarray, iinfo, uint16)
from geopandas import GeoDataFrame, GeoSeries
from shapely.geometry import LineString, Point, MultiPoint
from rasterio.transform import Affine

# Project imports
from pyorps.core.path import Path, PathCollection
from pyorps.core.types import (BboxType, GeometryMaskType, InputDataType,
                               CostAssumptionsType, CoordinateInput, Node, NodeList,
                               NodePathList, NormalizedCoordinate, CoordinateTuple,
                               CoordinateList)
from pyorps.core.exceptions import NoPathFoundError, RasterShapeError
from pyorps.graph.api.graph_api import GraphAPI
from pyorps.raster.rasterizer import GeoRasterizer
from pyorps.raster.handler import RasterHandler
from pyorps.utils.neighborhood import get_neighborhood_steps
from pyorps.io.geo_dataset import initialize_geo_dataset, VectorDataset, RasterDataset
from pyorps.utils.traversal import (calculate_path_metrics_numba,
                                    find_nearest_valid_positions_numba,
                                    check_max_values)


[docs] @contextmanager def timed(name: str, timings_dict: Optional[dict[str, float]]) -> Generator: """ Simple context manager for timing code blocks. Parameters: name: The name of the code block to be timed and used as a key within the timings_dict timings_dict: Dictionary to add the timing information to with the specified name as key Returns: A context manager that times the code block """ start_time = time() try: yield finally: timings_dict[name] = time() - start_time
[docs] def get_graph_api_class(graph_api: str) -> type: """ Return the graph API class based on the selected graph API using pattern matching. Parameters: graph_api (str): The name of the graph API to use ("networkit", "igraph", "networkx" or "rustworkx). Respective graph library must be installed! Networkit is a dependency of pyorps and will be installed automatically. Returns: class: The corresponding graph API class. Raises: ImportError: If the specified graph API module cannot be imported. ValueError: If the specified graph API is not supported. """ match graph_api.lower(): case "networkit": from pyorps.graph.api.networkit_api import NetworkitAPI return NetworkitAPI case "igraph": from pyorps.graph.api.igraph_api import IGraphAPI return IGraphAPI case "rustworkx": from pyorps.graph.api.rustworkx_api import RustworkxAPI return RustworkxAPI case "networkx": from pyorps.graph.api.networkx_api import NetworkxAPI return NetworkxAPI case "cython": from pyorps.graph.api.cython_api import CythonAPI return CythonAPI case _: raise ValueError(f"Unsupported graph API: {graph_api}")
[docs] class PathFinder: """ A class that encapsulates RasterReader and graph-based routing capabilities. This class provides functionality to: 1. Read raster data using RasterReader or create a raster using GeoRasterizer 2. Create a graph representation of the raster with a defined Graph library 3. Find the shortest paths between coordinates 4. Convert resulting paths of graph node indices back to coordinates 5. Create GeoDataFrames of paths and export to other geo-formats for further analysis The class supports various graph APIs to create a graph from a raster. """
[docs] def __init__( self, dataset_source: InputDataType, source_coords: Optional[CoordinateInput], target_coords: Optional[CoordinateInput], search_space_buffer_m: Optional[float] = None, neighborhood_str: Optional[Union[str, int]] = "r2", steps: Optional[ndarray[int]] = None, ignore_max_cost: bool = True, graph_api: str = "cython", cost_assumptions: Optional[CostAssumptionsType] = None, datasets_to_modify: Optional[list[dict[str, Any]]] = None, crs: Optional[str] = None, bbox: Optional[BboxType] = None, mask: Optional[GeometryMaskType] = None, transform: Optional[Affine] = None, raster_save_path: Optional[str] = None, **kwargs ): """ Initialize the RasterGraph with a dataset source and routing parameters. Parameters: dataset_source: Either: - Path to a file (str) - Tuple of (data_array, crs, transform) - GeoDataset object - Dictionary with url/layer for WFS source_coords: CoordinateInput Can be: tuple, list of tuples, array of arrays, shapely Point, shapely MultiPoint, GeoSeries of points, or GeoDataFrame of points. target_coords: CoordinateInput Can be: tuple, list of tuples, array of arrays, shapely Point, shapely MultiPoint, GeoSeries of points, or GeoDataFrame of points. search_space_buffer_m: Buffer around the source and target coordinates in meters. If set to 0, the entire Raster will be considered! neighborhood_str: Neighborhood type. Defaults to "r2". steps: Steps which define the neighborhood. If None, will be created from neighborhood_str. ignore_max_cost: Whether to ignore all cells in the raster which have the maximum cost value or not graph_api: Graph API to use. Available graph libraries: "networkit" (default), "rustworkx", "igraph", "networkx" cost_assumptions: Cost assumptions to use for rasterization. Required if dataset_source is vector data. datasets_to_modify: List of datasets to use to modify the raster using GeoRasterizer.modify_raster_from_dataset crs: The coordinate reference system to be used as project crs (crs of the dataset_source and all other datasets will be converted to this crs) bbox: The bounding box to be used as project bounding box. Defines the area in which path finding is processed. mask: Defines the area in which path finding is processed similar to the bbox parameter. In this case a more complex Polygon, a Multipolygon or even a GeoSeries/GeoDataFrame with multiple Polygons can be used to define the search space for path finding. transform: Affine transformation describing the transform of a RasterDataset. Can be used ia a raster dataset is passed directly to dataset_source. raster_save_path: Path to save the raster dataset to. **kwargs: Additional keyword arguments to pass to the rasterize function of the RasterHandler (if a VectorDataset or a source to a VectorDataset has been provided with dataset_source) or to the load function of the RasterDataset (if a source to a RasterDataset has been provided with dataset_source). Minimal example: >>> from pyorps import PathFinder >>> source = (472000, 5593400) >>> target = (472800, 5594000) >>> raster_path = r"./data/raster/sample_raster.tiff" >>> path_finder = PathFinder( >>> dataset_source=raster_path, >>> source_coords=source, >>> target_coords=target, >>> ) >>> path_finder.find_route() Path(path_id=0, source=(472000, 5593400), target=(472800, 5594000), total_length=1192.43, total_cost=133578.05) """ self.source_coords = PathFinder.normalize_coordinates(source_coords) self.target_coords = PathFinder.normalize_coordinates(target_coords) self.search_space_buffer_m = search_space_buffer_m self.neighborhood_str = neighborhood_str self.graph_api_name = graph_api self.ignore_max_cost = ignore_max_cost if steps is None and neighborhood_str: directed = True if self.graph_api_name == "cython" else False self.steps = get_neighborhood_steps(neighborhood_str, directed=directed) else: self.steps = steps self.runtimes = {} self.paths = PathCollection() # Initialize PathCollection instead of list # Initialize as None (to be lazily loaded/created) self.raster_handler = None self.geo_rasterizer = None self._graph_api = None self.path_gdf = None # Load the dataset self.dataset = initialize_geo_dataset(dataset_source, crs, bbox, mask, transform) if self.source_coords is not None and self.target_coords is not None: self.create_raster_handler(cost_assumptions, datasets_to_modify, raster_save_path, **kwargs)
[docs] @staticmethod def normalize_coordinates( input_data: Optional[CoordinateInput] ) -> Optional[NormalizedCoordinate]: """ Normalize different coordinate formats into tuples or lists of tuples. Parameters: input_data: Can be a tuple, a list of tuples, an array of arrays, a shapely Point, a shapely MultiPoint, a GeoSeries of points, or a GeoDataFrame of points. Returns: CoordinateOutput: A single coordinate tuple (x, y) or list of coordinate tuples [(x1, y1), (x2, y2), ...] """ if input_data is None: coordinate_output = None # Case: Input is a tuple with two elements elif isinstance(input_data, tuple) and len(input_data) == 2: coordinate_output = input_data # Case: Input is a shapely Point elif isinstance(input_data, Point): coordinate_output = input_data.x, input_data.y # Case: Input is a shapely MultiPoint elif isinstance(input_data, MultiPoint): coordinate_output = [(p.x, p.y) for p in input_data.geoms] # Case: Input is a GeoSeries elif isinstance(input_data, GeoSeries): coordinate_output = PathFinder._point_or_multipoints(input_data) # Case: Input is a GeoDataFrame elif isinstance(input_data, GeoDataFrame): coordinate_output = PathFinder._point_or_multipoints(input_data.geometry) # Case: Input is a list of tuples elif isinstance(input_data, list): if all(isinstance(item, tuple) and len(item) == 2 for item in input_data): coordinate_output = input_data elif all(isinstance(item, list) and len(item) == 2 for item in input_data): coordinate_output = [(float(i[0]), float(i[1])) for i in input_data] else: coordinate_output = PathFinder._point_or_multipoints(input_data) # Case: Input is a numpy array elif isinstance(input_data, ndarray): if len(input_data.shape) == 2 and input_data.shape[1] == 2: coordinate_output = [(float(c[0]), float(c[1])) for c in input_data] else: coordinate_output = PathFinder._point_or_multipoints(input_data) else: # If input doesn't match any expected format raise ValueError("Input data cannot be interpreted as coordinates") if isinstance(coordinate_output, list) and len(coordinate_output) == 1: return coordinate_output[0] else: return coordinate_output
@staticmethod def _point_or_multipoints(input_data: CoordinateInput) -> NormalizedCoordinate: """ Converts a Points or a Multipoint to a NormalizedCoordinate Parameters: input_data: Point or Multipoint to be converted to a NormalizedCoordinate Returns: A NormalizedCoordinate of the input_data """ if len(input_data) == 0: return [] elif all(isinstance(item, Point) for item in input_data): return PathFinder._get_point_coordinates(input_data) elif all(isinstance(item, MultiPoint) for item in input_data): return PathFinder._get_multipoint_coordinates(input_data) else: raise ValueError("Input data cannot be interpreted as coordinates") @staticmethod def _get_multipoint_coordinates(input_data): """ Extracts coordinates from a collection of MultiPoint geometries Parameters: input_data: Collection of MultiPoint objects Returns: List of (x, y) coordinate tuples extracted from all points within all MultiPoint geometries """ # Iterate through each MultiPoint item and extract coordinates from each # point geometry return [(p.x, p.y) for item in input_data for p in item.geoms] @staticmethod def _get_point_coordinates(input_data): """ Extracts coordinates from a collection of Point geometries Parameters: input_data: Collection of Point objects Returns: List of (x, y) coordinate tuples from the Point objects """ # Extract x, y coordinates from each Point object return [(point.x, point.y) for point in input_data]
[docs] def create_raster_handler( self, cost_assumptions: Optional[CostAssumptionsType] = None, datasets_to_modify: Optional[list[dict[str, Any]]] = None, raster_save_path: Optional[str] = None, **kwargs ) -> RasterHandler: """ Create a RasterReader object for the specified file and parameters. Parameters: cost_assumptions: Cost assumptions to use for rasterization. Required if dataset_source is vector data datasets_to_modify: List of datasets to use to modify the raster using GeoRasterizer.modify_raster_from_dataset raster_save_path: Path to save the raster dataset to. Returns: RasterReader: The created RasterReader object """ # Using timed context manager instead of manual timing with timed("raster_loading", self.runtimes): # Check if we have vector data but no cost_assumptions if isinstance(self.dataset, VectorDataset) and cost_assumptions is None: msg = "Cost assumptions must be provided when using vector data" raise ValueError(msg) # Process the dataset based on its type and parameters if isinstance(self.dataset, VectorDataset) and cost_assumptions is not None: # Create a GeoRasterizer and rasterize the vector data self.geo_rasterizer = GeoRasterizer(self.dataset, cost_assumptions) self.geo_rasterizer.rasterize(**kwargs) # Apply any additional dataset modifications if datasets_to_modify: for dataset_params in datasets_to_modify: self.geo_rasterizer.modify_raster_from_dataset(**dataset_params) if raster_save_path is not None: self.geo_rasterizer.save_raster(save_path=raster_save_path) # Create RasterHandler with the rasterized data self.raster_handler = RasterHandler( self.geo_rasterizer.raster_dataset, self.source_coords, self.target_coords, self.search_space_buffer_m ) elif isinstance(self.dataset, RasterDataset): if cost_assumptions is not None: # If we have a raster but also cost assumptions, use GeoRasterizer # to modify it self.dataset.load_data(**kwargs) self.geo_rasterizer = GeoRasterizer(self.dataset, cost_assumptions) # Apply any additional dataset modifications if datasets_to_modify: for params in datasets_to_modify: self.geo_rasterizer.modify_raster_from_dataset(**params) if raster_save_path is not None: self.geo_rasterizer.save_raster(raster_save_path) # Create RasterHandler with the modified raster self.raster_handler = RasterHandler( self.geo_rasterizer.raster_dataset, self.source_coords, self.target_coords, self.search_space_buffer_m ) else: # Direct use of the raster without modifications self.dataset.load_data(**kwargs) self.raster_handler = RasterHandler( self.dataset, self.source_coords, self.target_coords, self.search_space_buffer_m ) if raster_save_path is not None: self.raster_handler.save_section_as_raster(raster_save_path) else: raise ValueError(f"Unsupported dataset type: {type(self.dataset)}") if self.search_space_buffer_m is None: self.search_space_buffer_m = self.raster_handler.search_space_buffer_m return self.raster_handler
[docs] def create_graph(self, band_index: int = 0) -> Any: """ Create a graph from the raster data. Parameters: band_index: Index of the raster band to use. Defaults to 0. Returns: The created graph object. """ # Importing the specified graph API using the timed context manager with timed("import_time_graph_api", self.runtimes): graph_api_class_constructor = get_graph_api_class(self.graph_api_name) # Get raster data for the specified band raster_data = self.raster_handler.data[band_index] # Create graph using the graph API self._graph_api = graph_api_class_constructor(raster_data, self.steps, ignore_max=self.ignore_max_cost) # Save edge construction and graph creation times if (hasattr(self._graph_api, 'edge_construction_time') and hasattr(self._graph_api, 'graph_creation_time')): self.runtimes["edge_construction"] = self._graph_api.edge_construction_time self.runtimes["graph_creation"] = self._graph_api.graph_creation_time return self._graph_api.graph else: self.runtimes["edge_construction"] = 0.0 self.runtimes["graph_creation"] = 0.0 return None
@property def graph_api(self) -> GraphAPI: if self._graph_api is None: self.create_graph() # Overwrite the shortest_path_start_time, to make sure, that graph creation # is not part of it self.runtimes["shortest_path_start_time"] = time() return self._graph_api
[docs] def get_node_indices_from_coords( self, coords: Union[CoordinateTuple, CoordinateList] ) -> Union[Node, NodeList, NodePathList]: """ Convert coordinates to node indices. Parameters: coords: Either: - A single coordinate pair (x, y) - A list of coordinate pairs [(x1, y1), (x2, y2), ...] Returns: List of node indices. """ # Check if coords is a single coordinate pair and not a list if not isinstance(coords, list): coords = [coords] # Convert coordinates to 2D indices indices_2d = self.raster_handler.coords_to_indices(coords) # Correct positions with max cost if needed (using Numba-optimized method) indices_2d = self._correct_max_cost_positions(indices_2d) # Get shape of raster if len(self.raster_handler.data.shape) == 3: _, rows, cols = self.raster_handler.data.shape elif len(self.raster_handler.data.shape) == 2: rows, cols = self.raster_handler.data.shape else: raise RasterShapeError(self.raster_handler.data.shape) # Convert 2D indices to 1D node indices using ravel_multi_index node_indices = ravel_multi_index( (indices_2d[:, 0], indices_2d[:, 1]), (rows, cols)) if len(coords) == 1: result = node_indices[0] else: result = node_indices return result
def _correct_max_cost_positions(self, indices_2d: ndarray) -> ndarray: """ Check and correct positions that have maximum cost value (uint16 max) using Numba-optimized functions. If positions in the raster have the maximum value (65535 for uint16), find the nearest position that doesn't have the maximum value. Parameters: indices_2d: Array of (row, col) indices to check and potentially correct Returns: Corrected array of (row, col) indices """ if not self.ignore_max_cost: return indices_2d # Get the maximum value for uint16 max_value = iinfo(uint16).max # 65535 # Get raster data (handle different shapes) if len(self.raster_handler.data.shape) == 3: raster_data = self.raster_handler.data[0] # Use first band _, rows, cols = self.raster_handler.data.shape elif len(self.raster_handler.data.shape) == 2: raster_data = self.raster_handler.data rows, cols = self.raster_handler.data.shape else: raise RasterShapeError(self.raster_handler.data.shape) # Ensure indices are in the right format for Numba indices_2d = asarray(indices_2d, dtype=int32) # Check which positions have max values using Numba function [[11]] has_max_values, invalid_mask, invalid_indices = check_max_values( raster_data, indices_2d, max_value ) if not has_max_values: return indices_2d # Find nearest valid positions for all invalid positions at once corrected_positions = find_nearest_valid_positions_numba( raster_data, invalid_indices, max_value ) # Create corrected indices array corrected_indices = indices_2d.copy() # Update the corrected positions and print warnings invalid_idx = 0 for i in range(len(indices_2d)): if invalid_mask[i]: original_row, original_col = indices_2d[i] new_row, new_col = corrected_positions[invalid_idx] if new_row != original_row or new_col != original_col: # Convert back to coordinates for warning message original_coords = self.raster_handler.indices_to_coords( [(original_row, original_col)] )[0] corrected_coords = self.raster_handler.indices_to_coords( [(new_row, new_col)] )[0] warning_msg = (f"Position at coordinates {original_coords} " f"(indices [{original_row}, {original_col}]) has " f"maximum cost value ({max_value}). Correcting to " f"nearest valid position at {corrected_coords} " f"(indices [{new_row}, {new_col}]).") warn(warning_msg, UserWarning) corrected_indices[i] = [new_row, new_col] invalid_idx += 1 return corrected_indices
[docs] def get_coords_from_node_indices( self, node_indices: Union[Node, NodeList], ) -> CoordinateList: """ Convert node indices to coordinates. Parameters: node_indices: List of node indices. Returns: List of coordinates (x, y). """ # Get shape of raster _, rows, cols = self.raster_handler.data.shape # Convert 1D indices to 2D indices using unravel_index indices_2d = array(unravel_index(node_indices, (rows, cols))).T # Convert 2D indices to coordinates coords = self.raster_handler.indices_to_coords(indices_2d) return coords
[docs] def find_route( self, source: Optional[CoordinateInput] = None, target: Optional[CoordinateInput] = None, algorithm: str = "dijkstra", calculate_metrics: bool = True, pairwise: bool = False, raster_parameters: Optional[dict[str, Any]] = None, **kwargs ) -> Union[Path, PathCollection]: """ Find the shortest path between source and target coordinates. Parameter: source: CoordinateInput - Source coordinates. If None, uses the source_coords provided at initialization. Can be: tuple, list of tuples, array of arrays, shapely Point, shapely MultiPoint, GeoSeries of points, or GeoDataFrame of points. target: Target coordinates. If None, uses the target_coords provided at initialization. Can be a single pair (x, y) or a list of pairs [(x1, y1), (x2, y2), ...]. algorithm: Algorithm to use for shortest path. Defaults to "dijkstra". calculate_metrics: Whether to calculate path metrics. Defaults to True. pairwise: Whether to calculate paths pairwise (requires equal number of sources and targets). Default is False. Returns: Dictionary or list of dictionaries containing path information """ # Get source and target coords if source is None: source = self.source_coords else: source = PathFinder.normalize_coordinates(source) if target is None: target = self.target_coords else: target = PathFinder.normalize_coordinates(target) if source is None or target is None: raise ValueError(f"Source and target coordinates must not be None!") if self.raster_handler is None: self.create_raster_handler(**raster_parameters) # Convert coordinates to node indices source_indices = self.get_node_indices_from_coords(source) target_indices = self.get_node_indices_from_coords(target) # Time the shortest path calculation self.runtimes["shortest_path_start_time"] = time() # Find the shortest path using the graph API with timed("shortest_path", self.runtimes): path_indices = self.graph_api.shortest_path( source_indices=source_indices, target_indices=target_indices, algorithm=algorithm, pairwise=pairwise, **kwargs ) if len(path_indices) == 0: msg = (f" In some cases, this happens if source or target are within a " f"pixel with max cost and ignore_max is set to True! " f"Either change the coordinates of source or target, change the " f"cost value to a vlue smaller than the maximum or set ignore_max " f"to False!") raise NoPathFoundError(source_indices, target_indices, msg) # Case 1: Single source, single target -> single path if (not isinstance(path_indices[0], list) and not isinstance(path_indices[0], ndarray)): return self._create_path_result(path_indices, source, target, algorithm, calculate_metrics) else: # Case 2 & 3: Multiple paths # For single source + multiple targets OR multiple sources + # multiple targets results = self._extract_path_results(path_indices, algorithm, calculate_metrics) return results
def _extract_path_results(self, path_indices, algorithm, calculate_metrics): results = PathCollection() for path in path_indices: if not path: continue source = self.get_coords_from_node_indices(path[0])[0] target = self.get_coords_from_node_indices(path[-1])[0] path = self._create_path_result(path, source, target, algorithm, calculate_metrics) results.add(path) return results def _create_path_result(self, path_indices, source, target, algorithm, calculate_metrics): """ Helper method to create a path result dictionary from path indices. Parameters: path_indices: List of node indices for the path source: Source coordinate(s) target: Target coordinate(s) algorithm: The routing algorithm used calculate_metrics: Whether to calculate metrics Returns: Dictionary containing path information """ # Convert path indices to coordinates path_coords = self.get_coords_from_node_indices(path_indices) # Calculate the Euclidean distance euclidean_distance = sqrt((path_coords[0][0] - path_coords[-1][0]) ** 2 + (path_coords[0][1] - path_coords[-1][1]) ** 2) # Create LineString from path coordinates path_geometry = LineString(path_coords) # Calculate total runtime based on the graph API used if self.graph_api_name == "cython": self.runtimes["total"] = self.runtimes.get("raster_loading", 0) + \ self.runtimes.get("shortest_path", 0.0) else: self.runtimes["total"] = self.runtimes.get("raster_loading", 0) + \ self.runtimes.get("graph_creation", 0) + \ self.runtimes.get("edge_construction", 0) + \ self.runtimes.get("import_time_graph_api", 0) + \ self.runtimes.get("shortest_path", 0.0) # Create path object using the Path dataclass path_id = len(self.paths) path = Path( source=source, target=target, algorithm=algorithm, graph_api=self.graph_api_name, path_indices=path_indices, path_coords=path_coords, path_geometry=path_geometry, euclidean_distance=euclidean_distance, runtimes=self.runtimes.copy(), path_id=path_id, search_space_buffer_m=self.search_space_buffer_m, neighborhood=self.neighborhood_str ) # Calculate path metrics if requested if calculate_metrics: with timed("path_metrics", self.runtimes): self.calculate_path_metrics(path_indices, path) # Store path in PathCollection self.paths.add(path) return path
[docs] def calculate_path_metrics(self, path_indices, path): """ Calculate metrics about the path and add directly to the Path object. Parameters: path_indices: List of node indices for the path. path: Path object to update with metrics. """ # Ensure path_indices is a numpy array path_indices = array(path_indices, dtype=uint32) # Get the raster data (costs) raster_data = self.raster_handler.data[0] # Calculate metrics using Numba-accelerated function path.total_length, cat, length = calculate_path_metrics_numba(raster_data, path_indices) # Convert to regular Python dictionary path.length_by_category = dict(zip(cat, length)) tot = path.total_length l_by_cat = path.length_by_category.items() # Calculate percentages path.length_by_category_percent = {k: (v / tot) * 100 if tot > 0 else 0 for k, v in l_by_cat} # Calculate total cost path.total_cost = sum(cat * length for cat, length in l_by_cat)
[docs] def get_path(self, path_id=None, source=None, target=None): """ Retrieve a stored path by ID, or by source AND target. Parameters: path_id: Numerical ID of the path source: Source coordinates to search for target: Target coordinates to search for Returns: Path object or None if not found """ return self.paths.get(path_id, source, target)
[docs] def create_path_geodataframe(self): """ Create a GeoDataFrame containing all stored paths. Returns: GeoDataFrame containing path data, or None if no paths available """ # Check if there are any paths if not self.paths: return None # Use the PathCollection method to get all path records records = self.paths.to_geodataframe_records() # Create GeoDataFrame directly from records self.path_gdf = GeoDataFrame(records, geometry="geometry", crs=self.dataset.crs) return self.path_gdf
[docs] def save_paths(self, save_file_path: Optional[str] = None) -> None: """ Save all calculated paths to a file in a GIS-compatible format. This method creates a GeoDataFrame containing all paths from the PathCollection and saves it to the specified file. The file format is automatically determined from the file extension (e.g., '.shp' for Shapefile, '.gpkg' for GeoPackage). Parameters: save_file_path: Path to save the paths file. If None, no file is saved. Common formats include: - Shapefile (.shp) - GeoPackage (.gpkg) - GeoJSON (.geojson) - CSV (.csv) Returns: None Notes: - The saved file includes all path attributes (ID, length, cost data) - The geometries are saved as LineString features with the CRS from the source dataset - If no paths have been calculated, an empty GeoDataFrame will be created first """ if self.path_gdf is None: self.create_path_geodataframe() if save_file_path is not None and save_file_path != '': self.path_gdf.to_file(save_file_path)
[docs] def save_raster(self, save_path: Optional[str] = None) -> None: """ Save the raster data used for path calculations to a GeoTIFF file. This method exports the current raster data to the specified file location. The raster contains the cost values used for path calculations, including any modifications from additional datasets. The exported file includes complete geo referencing information and preserves the original CRS. Parameters: save_path: Path where the raster file should be saved. If None, uses the default filename "pyorps_raster.tiff" in the current directory. Returns: None Notes: - The saved raster includes all cost modifications from additional datasets - The file is saved in GeoTIFF format which preserves geo referencing information - If the PathFinder uses a GeoRasterizer, the complete raster is saved - Otherwise, only the section loaded in the RasterHandler is saved - For large areas, the resulting file size may be substantial """ if save_path is None: save_path = "pyorps_raster.tiff" if self.geo_rasterizer is not None: self.geo_rasterizer.save_raster(save_path) else: self.raster_handler.save_section_as_raster(save_path)
[docs] def plot_paths(self, paths: Optional[Union[Path, PathCollection, list[Path]]] = None, plot_all: bool = True, subplots: bool = True, subplot_size: tuple[int, int] = (10, 8), source_color: str = 'green', target_color: str = 'red', path_colors: Optional[Union[str, list[str]]] = None, source_marker: str = 'o', target_marker: str = 'x', path_line_width: int = 2, show_raster: bool = True, title: Optional[Union[str, list[str]]] = None, sup_title: Optional[str] = None, path_id: Optional[Union[int, list[int]]] = None, reverse_colors: bool = False) -> Union[Any, list[Any]]: """ Plot paths with customizable styling and layout options. This method visualizes the calculated paths, allowing for detailed customization of the plot appearance. It delegates to the PathPlotter class to handle the actual visualization. Parameters: paths: Specific path(s) to plot. If None, uses all paths in this PathFinder instance. Can be a single Path object, a list of Path objects, or a PathCollection. plot_all: If True, plots all paths. If False, plots only the path with path_id. subplots: If True and multiple paths are plotted, creates separate subplots for each path. subplot_size: Size of each individual subplot in inches (width, height). source_color: Color for source markers. target_color: Color for target markers. path_colors: Colors for path lines. Can be a single color or a list of colors. If None, default color scheme is used. source_marker: Marker style for source points. target_marker: Marker style for target points. path_line_width: Line width for the paths. show_raster: Whether to display the raster data as background. title: Title for the plot or individual subplot titles if a list is provided. sup_title: Overall title for the figure (when using multiple subplots). path_id: ID of specific path to plot when plot_all is False. Can be a single ID or a list of IDs. reverse_colors: Whether to reverse the color scheme for raster data (dark=low cost, bright=high cost). Returns: The matplotlib axes object(s) for the plot. Returns a list of axes if multiple subplots are created, otherwise returns a single axes object. Runtime Notes: - The plotting operation itself is generally quick (0.1-0.5 seconds) - Most time is spent on data preparation in the initial PathFinder setup - When plotting many paths, using subplots=True can improve readability - Displaying the raster background (show_raster=True) adds minimal overhead once the PathFinder is initialized """ from pyorps.utils.plotting import PathPlotter # Determine which paths to plot based on the input if paths is None: # Use all paths from this PathFinder instance path_collection = self.paths elif isinstance(paths, Path): # Create a collection with a single path path_collection = PathCollection() path_collection.add(paths) elif isinstance(paths, list): # Create a collection from a list of paths path_collection = PathCollection() for path in paths: path_collection.add(path, replace=False) else: # Assume it's already a PathCollection path_collection = paths # Create PathPlotter and delegate the plotting plotter = PathPlotter(paths=path_collection, raster_handler=self.raster_handler) return plotter.plot_paths( plot_all=plot_all, subplots=subplots, subplotsize=subplot_size, source_color=source_color, target_color=target_color, path_colors=path_colors, source_marker=source_marker, target_marker=target_marker, path_linewidth=path_line_width, show_raster=show_raster, title=title, suptitle=sup_title, path_id=path_id, reverse_colors=reverse_colors )