Source code for pyorps.raster.rasterizer


from typing import Union, Optional
from copy import deepcopy

import numpy as np
from geopandas import GeoDataFrame
from rasterio import open as rio_open
from rasterio.features import rasterize, geometry_mask
from rasterio.transform import Affine, from_bounds
from shapely.geometry import Polygon, box

# Changed to relative imports from other modules
from pyorps.io.geo_dataset import (initialize_geo_dataset, VectorDataset, 
                                   RasterDataset, InMemoryRasterDataset, GeoDataset)
from pyorps.core.types import (InputDataType, CostAssumptionsType, BboxType, 
                               GeometryMaskType)
from pyorps.core.cost_assumptions import CostAssumptions


[docs] class GeoRasterizer: """ A class for preparing and rasterizing geospatial data with cost assumptions. This class integrates: - GeoDataset for representing datasets with metadata - CostAssumptions for handling cost mappings - Rasterization functionality for converting vector data to rasters """ def __init__( self, input_data: GeoDataset, cost_assumptions: CostAssumptionsType, bbox: Optional[BboxType] = None, mask: Optional[GeometryMaskType] = None, default_crs: Optional[str] = None, **kwargs ): """ Initialize the GeoRasterizer with a base dataset and optional parameters. Parameters: input_data: The base dataset to rasterize (file path, GeoDataFrame, dict with web params, or GeoDataset) mask: Window or polygon mask to limit data reading cost_assumptions: Cost values for rasterization (dict, file path, or CostAssumptions object) default_crs: Default coordinate reference system to use **kwargs: Additional parameters passed to load function of the GeoDataset if base_dataset is not a GeoDataset """ self.base_dataset = input_data self.additional_datasets = [] self.default_crs = default_crs self.bbox = bbox self.mask = mask if isinstance(cost_assumptions, CostAssumptions): self.cost_manager = cost_assumptions else: self.cost_manager = CostAssumptions(cost_assumptions) if self.base_dataset.data is None: self.base_dataset.load_data(**kwargs) if isinstance(self.base_dataset, RasterDataset): self.raster = self.base_dataset.data self.transform = self.base_dataset.transform self.raster_dataset = self.base_dataset else: self.raster = None self.transform = None self.raster_dataset = None @property def base_data(self) -> GeoDataFrame: """ Property to directly access the data attribute of the base_dataset. Returns: The base dataset (GeoDataFrame """ return self.base_dataset.data
[docs] def clip_to_area( self, clip_geometry: Union[GeoDataFrame, Polygon] ) -> GeoDataset: """ Clip the base dataset to a specific area. Parameters: clip_geometry: The geometry to clip by Returns: The clipped base dataset """ if self.base_data is None: raise ValueError("No base data loaded to clip") self.base_dataset.data = self.base_data.clip(clip_geometry) return self.base_dataset
[docs] @staticmethod def create_buffer( dataset: Union[VectorDataset, GeoDataFrame], geometry_buffer_m: float, inplace: bool = True ) -> Union[VectorDataset, GeoDataFrame]: """ Add a buffer to geometries in a dataset. Parameters: dataset: The dataset to buffer (GeoDataset or GeoDataFrame) geometry_buffer_m: Distance to buffer in dataset's CRS units inplace: If True, modify the dataset in place Returns: The buffered dataset """ if isinstance(dataset, VectorDataset): data = dataset.data else: data = dataset if data is None: raise ValueError("Dataset has no data to buffer") if geometry_buffer_m <= 0: return dataset if isinstance(dataset, VectorDataset) else data if inplace: data['geometry'] = data.buffer(geometry_buffer_m) return dataset if isinstance(dataset, VectorDataset) else data else: buffered_data = data.copy() buffered_data['geometry'] = buffered_data.buffer(geometry_buffer_m) if isinstance(dataset, VectorDataset): # Create a new GeoDataset with the buffered data buffered_dataset = deepcopy(dataset) buffered_dataset.data = buffered_data return buffered_dataset else: return buffered_data
[docs] def create_bounds_geodataframe( self, target_crs: Optional[str] = None ) -> GeoDataFrame: """ Creates a GeoDataFrame from the bounds of the base data in a specified CRS. Parameters: target_crs: The desired CRS for the new GeoDataFrame Returns: A new GeoDataFrame containing the bounds of the base data """ if self.base_dataset is None or self.base_dataset.data is None: raise ValueError("No base data loaded to create bounds from") # Calculate the bounds of the source GeoDataFrame minx, miny, maxx, maxy = self.base_dataset.data.total_bounds # Create a bounding box geometry bounding_box = box(minx, miny, maxx, maxy) # Create a new GeoDataFrame with the bounding box bounds_gdf = GeoDataFrame(geometry=[bounding_box], crs=self.crs) # Set the CRS of the new GeoDataFrame to the target CRS if specified if target_crs: bounds_gdf = bounds_gdf.to_crs(target_crs) return bounds_gdf
@property def crs(self): """ Passing crs property of base_dataset. Returns: The desired CRS of the base dataset """ return self.base_dataset.crs
[docs] def rasterize( self, field_name: str = 'cost', resolution_in_m: float = 1.0, fill_value: int = 65535, save_path: Optional[str] = None, dtype: str = "uint16", geometry_buffer_m: float = 0, bounding_box: Optional[Polygon] = None ) -> RasterDataset: """ Rasterize the base dataset based on a specified field. Parameters: field_name: The field to use for rasterization values resolution_in_m: The resolution of the output raster in meters fill_value: Value to use for areas with no data save_path: Path to save the rasterized output dtype: Data type for the output raster geometry_buffer_m: Buffer to apply to the dataset geometries bounding_box: Bounding box to define the rasterization extent Returns: tuple of (raster_data, transform) """ if self.base_data.shape[0] == 0: raise ValueError("Base data is empty - nothing to rasterize!") if self.base_dataset is None or self.base_dataset.data is None: raise ValueError("No base dataset loaded to rasterize") # Add cost field if field_name == 'cost': self.cost_manager.apply_to_geodataframe(self.base_data) # Fill NA values in the field if field_name not in self.base_data.columns: raise ValueError(f"Field '{field_name}' not found in the dataset") if self.base_data[field_name].isna().any(): self.base_data[field_name] = self.base_data[field_name].fillna(fill_value) # Round the values in the specified field and convert to the desired data type self.base_data[field_name] = self.base_data[field_name].round().astype(dtype) # Sort values by field to ensure higher cost values have higher priority data = self.base_data.sort_values(by=field_name, ascending=True) # Apply buffer if needed if geometry_buffer_m > 0: buffered = self.create_buffer(data, geometry_buffer_m, inplace=False) else: buffered = data if bounding_box is None: # Calculate the output shape based on the GeoDataFrame's bounds and the # specified resolution out_shape = self._calculate_out_shape_from_geodataframe(buffered, resolution_in_m) # Create a transformation object to convert between coordinate systems self.transform = from_bounds(*buffered.total_bounds, *out_shape[::-1]) # Create a generator of shapes (geometry, value) pairs for rasterization geometry_field_name = zip(buffered['geometry'], buffered[field_name]) shapes = ((geom, value) for geom, value in geometry_field_name) # Rasterize the shapes into a 2D array self.raster = rasterize( shapes, out_shape=out_shape, fill=fill_value, dtype=dtype, transform=self.transform ) else: # Calculate the output shape based on the bounding box out_shape = self._calculate_out_shape_from_bounding_box(bounding_box, resolution_in_m) # Create a transformation object self.transform = from_bounds(*bounding_box.bounds, *out_shape[::-1]) # Create initial raster with fill value self.raster = rasterize( [(bounding_box, fill_value)], out_shape=out_shape, fill=fill_value, dtype=dtype, transform=self.transform ) # Override with dataset values for unique_value in buffered[field_name].unique(): value_geoms = buffered.loc[buffered[field_name] == unique_value] geoms_field_name = zip(value_geoms['geometry'], value_geoms[field_name]) shapes = ((geom, value) for geom, value in geoms_field_name) rasterize( shapes, out_shape=out_shape, fill=fill_value, out=self.raster, dtype=dtype, transform=self.transform ) self.raster_dataset = InMemoryRasterDataset(self.raster, self.crs, self.transform) # Write the rasterized data to a new raster file if a save path is provided if save_path is not None: self.save_raster(save_path) return self.raster_dataset
[docs] def _calculate_out_shape_from_bounding_box( self, bounding_box: Polygon, resolution_m2: float = 1.0 ) -> tuple[int, int]: """ Calculate the output shape (rows, columns) based on a bounding box and resolution. Parameters: bounding_box: The bounding box defining the output shape in a planar CRS resolution_m2: The resolution in square meters Returns: tuple of (rows, columns) representing the output shape """ # Calculate the bounding box dimensions bounds = bounding_box.bounds # (minx, miny, maxx, maxy) width = bounds[2] - bounds[0] # maxx - minx height = bounds[3] - bounds[1] # maxy - miny # Calculate the total area of the bounding box in square meters total_area_m2 = width * height return self._get_rows_and_columns(width, height, resolution_m2, total_area_m2)
[docs] def _calculate_out_shape_from_geodataframe( self, gdf: GeoDataFrame, resolution_m2: float = 1.0, bounding_box: Optional[Polygon] = None ) -> tuple[int, int]: """ Calculate the output shape (rows, columns) based on a GeoDataFrame and resolution. Parameters: gdf: The GeoDataFrame containing the geometries to cover resolution_m2: The resolution in square meters bounding_box: Optional bounding box defining the output shape Returns: tuple of (rows, columns) representing the output shape """ # Ensure the GeoDataFrame is in a projected CRS that uses meters if gdf.crs.is_geographic: # Transform to a planar CRS (e.g., Web Mercator) gdf = gdf.to_crs(epsg=3857) # Calculate the bounding box of the GeoDataFrame bounds = gdf.total_bounds # (minx, miny, maxx, maxy) width = bounds[2] - bounds[0] # maxx - minx height = bounds[3] - bounds[1] # maxy - miny # Calculate the total area of the GeoDataFrame in square meters if bounding_box is None: total_area_m2 = width * height else: bounds_bbox = bounding_box.bounds bbox_width = bounds_bbox[2] - bounds_bbox[0] bbox_height = bounds_bbox[3] - bounds_bbox[1] total_area_m2 = bbox_width * bbox_height return self._get_rows_and_columns(width, height, resolution_m2, total_area_m2)
[docs] @staticmethod def _get_rows_and_columns(width, height, resolution_m2, total_area_m2): """ Calculate rows and columns based on width, height, and resolution. Parameters: width: Width of the area height: Height of the area resolution_m2: Resolution in square meters total_area_m2: Total area in square meters Returns: tuple of (rows, columns) """ # Calculate the aspect ratio aspect_ratio = width / height if height != 0 else 1.0 # Calculate the area of each pixel pixel_area = resolution_m2 # Calculate the total number of pixels needed total_pixels = total_area_m2 / pixel_area # Calculate the height and width based on the aspect ratio calculated_height = (total_pixels / aspect_ratio) ** 0.5 calculated_width = calculated_height * aspect_ratio # Convert to integers for output shape rows = int(calculated_height) columns = int(calculated_width) # Adjusting to ensure the total area is covered if rows * columns < total_pixels: # Increase columns if needed if (calculated_width - columns) > (calculated_height - rows): columns += 1 else: rows += 1 return rows, columns
[docs] def modify_raster_with_geodataframe( self, gdf: GeoDataFrame, value: float, ignore_value: Optional[float] = 65535, multiply: bool = False) -> np.ndarray: """ Modifies the raster cells inside the polygons of a GeoDataFrame. Parameters: gdf: The GeoDataFrame containing polygons to use for masking value: The value to set for the raster cells inside the polygons ignore_value: Value in the raster to ignore during modification multiply: If True, multiply the raster values by the given value Returns: The modified raster """ if self.raster is None or self.transform is None: raise ValueError("No raster data available to modify") # Create a mask from the geometries in the GeoDataFrame mask_array = geometry_mask( gdf['geometry'].values, transform=self.transform, invert=True, # Invert the mask to keep the area inside the polygons out_shape=self.raster.shape ) if ignore_value is None: ignore_value_mask = np.ones_like(self.raster, dtype=bool) else: ignore_value_mask = self.raster != ignore_value mask = mask_array & ignore_value_mask # Modify the raster values based on the specified parameters if multiply: # Set the raster cells to a multiple of the existing values self.raster[mask] = self.raster[mask] * value else: # Set the raster cells to the new value self.raster[mask] = value return self.raster
[docs] def modify_raster_from_dataset( self, input_data: InputDataType, cost_assumptions: Optional[Union[CostAssumptionsType, int, float]] = None, bbox: Optional[BboxType] = None, mask: Optional[GeometryMaskType] = None, transform: Optional[Affine] = None, geometry_buffer_m: float = 0, ignore_value: Optional[float] = 65535, multiply: bool = False, zone_field: Optional[str] = None, forbidden_zone: Optional[str] = None, forbidden_value: int = 65535, **kwargs ) -> np.ndarray: """ Modify the raster with an additional dataset. Parameters: input_data: Path to the additional dataset file cost_assumptions: The CostAssumptionsType or numeric to apply as cost values to the base_dataset bbox: The bounding box to apply to the input data mask: The geometry mask to apply to the input data transform: The transform describing the input data geometry_buffer_m: Buffer to apply to the dataset geometries ignore_value: Value in the raster to ignore multiply: If True, multiply the raster values by the given value (in cost_assumptions) zone_field: Field name for zones in the dataset forbidden_zone: Zone value that should be treated as forbidden forbidden_value: Value to use for forbidden areas **kwargs: Additional keyword arguments, passed to the loading function of the GeoDataset Returns: The modified raster """ if self.raster is None or self.transform is None: msg = "No raster data available to modify. Call rasterize() first." raise ValueError(msg) # Create bounds for data reading if bbox is None: bbox = self.create_bounds_geodataframe() if mask is None: mask = self.mask dataset = initialize_geo_dataset(input_data, crs=self.crs, bbox=bbox, mask=mask, transform=transform) dataset.load_data(**kwargs) gdf = dataset.data # Apply buffer if needed gdf = self.create_buffer(gdf, geometry_buffer_m) if isinstance(cost_assumptions, float) or isinstance(cost_assumptions, int): self._modify_raster_from_dataset_simple_cost_assumptions(gdf, cost_assumptions, ignore_value, multiply, zone_field, forbidden_zone, forbidden_value) else: if isinstance(cost_assumptions, str) or isinstance(cost_assumptions, dict): ca = CostAssumptions(source=cost_assumptions) else: ca = cost_assumptions ca.apply_to_geodataframe(gdf) for unique_value in gdf['cost'].unique(): value_geoms = gdf.loc[gdf['cost'] == unique_value] if value_geoms.empty: continue geometry_field_names = zip(value_geoms['geometry'], value_geoms['cost']) shapes = ((geom, value) for geom, value in geometry_field_names) # Create a mask from the geometries in the GeoDataFrame mask_array = geometry_mask( shapes, transform=self.transform, invert=True, # Invert the mask to keep the area inside the polygons out_shape=self.raster.shape ) if ignore_value is None: ignore_value_mask = np.ones_like(self.raster, dtype=bool) else: ignore_value_mask = self.raster != ignore_value mask = mask_array & ignore_value_mask self.raster[mask] = unique_value return self.raster
[docs] def _modify_raster_from_dataset_simple_cost_assumptions( self, gdf: GeoDataFrame, cost_assumptions: Optional[Union[CostAssumptionsType, int, float]] = None, ignore_value: Optional[float] = 65535, multiply: bool = False, zone_field: Optional[str] = None, forbidden_zone: Optional[str] = None, forbidden_value: int = 65535, ): """ Modify the raster with an additional GeoDataFrame. Parameters: gdf: GeoDataFrame, used to modify the raster dataset cost_assumptions: The CostAssumptionsType or numeric to apply as cost values to the base_dataset ignore_value: Value in the raster to ignore multiply: If True, multiply the raster values by the given value (in cost_assumptions) zone_field: Field name for zones in the dataset forbidden_zone: Zone value that should be treated as forbidden forbidden_value: Value to use for forbidden areas Returns: The modified raster """ # Handle zoning if specified if zone_field and forbidden_zone: forbidden_areas = gdf.loc[gdf[zone_field] == forbidden_zone] other_areas = gdf.loc[gdf[zone_field] != forbidden_zone] # Apply multiplication factor to non-forbidden zones if not other_areas.empty: self.modify_raster_with_geodataframe( gdf=other_areas, value=cost_assumptions, ignore_value=ignore_value, multiply=multiply ) # Set forbidden zones to forbidden value if not forbidden_areas.empty: self.modify_raster_with_geodataframe( gdf=forbidden_areas, value=forbidden_value, ignore_value=ignore_value, multiply=False ) else: # Standard modification for the entire dataset self.modify_raster_with_geodataframe( gdf=gdf, value=cost_assumptions, ignore_value=ignore_value, multiply=multiply )
[docs] def save_raster(self, save_path: str) -> None: """ Save the rasterized data to a file. Parameters: save_path: Path to save the raster file """ if self.raster is None or self.transform is None: msg = "No raster data available to save. Call rasterize() first." raise ValueError(msg) with rio_open( save_path, 'w', # Specify the output format as GeoTIFF driver='GTiff', # Height of the raster height=self.raster_dataset.shape[0], # Width of the raster width=self.raster_dataset.shape[1], # Number of bands in the output raster count=1, # Data type of the raster dtype=self.raster_dataset.dtype, # Coordinate reference system of the raster crs=self.raster_dataset.crs, # Transformation for the raster transform=self.raster_dataset.transform ) as dst: # Write the raster data to the first band dst.write(self.raster_dataset.data, 1)
[docs] def shrink_raster(self, exclude_value: int) -> np.ndarray: """ Shrink the raster by removing outer bounds with a specific value. Parameters: exclude_value: Value to exclude from the outer bounds Returns: The shrunk raster """ if self.raster is None: msg = "No raster data available to shrink. Call rasterize() first." raise ValueError(msg) # Create a mask where the raster does not equal the exclude_value mask = self.raster != exclude_value # Find the first and last rows and columns that contain non-excluded values rows = np.any(mask, axis=1) cols = np.any(mask, axis=0) if not np.any(rows) or not np.any(cols): return self.raster # Return original if no non-excluded values # Determine the indices for the outer bounds to be excluded # First row with a non-excluded value first_row = np.argmax(rows) # Last row with a non-excluded value last_row = len(rows) - np.argmax(rows[::-1]) # First column with a non-excluded value first_col = np.argmax(cols) # Last column with a non-excluded value last_col = len(cols) - np.argmax(cols[::-1]) # Use the indices to slice the array and return the shrunk raster self.raster = self.raster[first_row:last_row, first_col:last_col] # Update the transform to account for the change in origin self.transform = Affine( self.transform.a, self.transform.b, self.transform.c + first_col * self.transform.a, self.transform.d, self.transform.e, self.transform.f + first_row * self.transform.e ) return self.raster