Source code for pyorps.core.cost_assumptions

"""
PYORPS: An Open-Source Tool for Automated Power Line Routing

Reference:
[1] Hofmann, M., Stetz, T., Kammer, F., Repo, S.: 'PYORPS: An Open-Source Tool for
    Automated Power Line Routing', CIRED 2025 - 28th Conference and Exhibition on
    Electricity Distribution, 16 - 19 June 2025, Geneva, Switzerland
"""
from typing import Union, Optional, Any, Callable
import os
from pathlib import Path
import csv
import json
import numpy as np
import pandas as pd

from geopandas import GeoDataFrame, GeoSeries

from .exceptions import (InvalidSourceError, FileLoadError, FormatError,
                         NoSuitableColumnsError)


[docs] class CostAssumptions: """ A class for handling cost assumptions for rasterization. This class handles: - Loading cost assumptions from files (CSV, Excel, JSON) or generating of cost assumptions from a dictionary or a GeoDataFrame. - Mapping costs to features in a GeoDataFrame - Managing hierarchical cost structures """
[docs] def __init__( self, source: Optional[Union[str, dict]] = None ): """ Initialize the CostAssumptions object. Parameters: source: 1. Path to a cost assumptions file 2. A dictionary of cost values """ self.source = source self.cost_assumptions = {} self.main_feature = None self.side_features = [] if source is not None: if isinstance(source, (dict, str)): self.load(source) else: raise InvalidSourceError( f"Parameter 'source' must be either a string, a dictionary or a " f"GeoDataFrame, not {type(source)}" ) if not self.cost_assumptions: raise FormatError(f"The format of the cost assumptions file or " f"dictionary is invalid. Please check " f"the format of your cost assumptions input: " f"{self.source}")
[docs] def load( self, source: Union[str, dict] ) -> dict: """ Load cost assumptions from a file or dictionary. Parameters: source: Path to a file or a dictionary containing cost assumptions Returns: dictionary of cost assumptions """ if isinstance(source, dict): keys, costs = next(iter(source.items())) if isinstance(keys, tuple): self.main_feature, *self.side_features = keys else: self.main_feature = keys self.cost_assumptions = costs return self.cost_assumptions if isinstance(source, str) and os.path.isfile(source): file_ext = Path(source).suffix.lower() loader_map = { '.csv': self._load_csv_cost_assumptions, '.json': self._load_json_cost_assumptions, '.xlsx': self._load_excel_cost_assumptions, '.xls': self._load_excel_cost_assumptions, } loader: Callable[[str], dict] | None = loader_map.get(file_ext) if not loader: raise InvalidSourceError(f"Unsupported file format: {file_ext}") return loader(source) raise InvalidSourceError("Source must be a dictionary or a valid file path")
def _load_csv_cost_assumptions( self, filepath: str ) -> dict: """ Load cost assumptions from a CSV file with auto-detection of encoding, delimiter, and decimal separator. Parameters: filepath: Path to the CSV file Returns: dictionary of cost assumptions """ encodings = ['utf-8', 'latin-1', 'ISO-8859-1', 'cp1252'] decimal_separators = ['.', ','] common_delimiters = [',', ';', '\t', '|'] # Try using csv.Sniffer to detect the delimiter for encoding in encodings: try: # Read a sample to detect the dialect with open(filepath, 'r', encoding=encoding) as f: sample = f.read(4096) sniffer = csv.Sniffer() dialect = sniffer.sniff(sample) delimiter = dialect.delimiter # Try with detected delimiter and different decimal separators for decimal in decimal_separators: try: df = pd.read_csv( filepath, encoding=encoding, delimiter=delimiter, decimal=decimal ) if df.empty: continue df = self._convert_numeric_columns(df) self.cost_assumptions = self.convert_df_to_cost_dict(df) return self.cost_assumptions except (pd.errors.ParserError, ValueError): continue except (csv.Error, UnicodeDecodeError, IOError): # If auto-detection fails, try common delimiters for delimiter in common_delimiters: for decimal in decimal_separators: try: df = pd.read_csv( filepath, encoding=encoding, delimiter=delimiter, decimal=decimal ) if df.empty: continue df = self._convert_numeric_columns(df) self.cost_assumptions = self.convert_df_to_cost_dict(df) return self.cost_assumptions except (pd.errors.ParserError, ValueError, UnicodeDecodeError): continue raise FileLoadError(f"Could not read CSV file {filepath}. Tried multiple " f"encodings and formats.") def _load_json_cost_assumptions( self, filepath: str ) -> dict: """ Load cost assumptions from a JSON file with auto-detection of encoding. Parameters: filepath: Path to the JSON file Returns: dictionary of cost assumptions """ encodings = ['utf-8', 'latin-1', 'ISO-8859-1', 'cp1252'] last_error = None for encoding in encodings: try: with open(filepath, 'r', encoding=encoding) as f: data = json.load(f) # Check if it's the new format with metadata if (isinstance(data, dict) and 'metadata' in data and 'cost_assumptions' in data): self.main_feature = data['metadata']['main_feature'] self.side_features = data['metadata'].get('side_features', []) # Handle tuple keys if necessary if self.side_features: cost_dict = {} for key_str, value in data['cost_assumptions'].items(): if "__" in key_str: # Convert string representation back to tuple tuple_key = tuple(key_str.split("__")) cost_dict[tuple_key] = value else: cost_dict[key_str] = value self.cost_assumptions = cost_dict else: self.cost_assumptions = data['cost_assumptions'] else: # Legacy format - just a plain dictionary self.cost_assumptions = data if len(self.cost_assumptions) == 0: raise FileLoadError(f"Failed to read json file {filepath}. " f"File contains no data or is not in " f"the correct format!") return self.cost_assumptions except (UnicodeDecodeError, json.JSONDecodeError) as e: last_error = e continue raise FileLoadError(f"Could not read JSON file {filepath}: {last_error}") def _load_excel_cost_assumptions( self, filepath: str ) -> dict: """ Load cost assumptions from an Excel file, handling different decimal separators. Parameters: filepath: Path to the Excel file Returns: dictionary of cost assumptions """ try: # First try default settings df = pd.read_excel(filepath) if df.empty: raise FileLoadError(f"Failed to read Excel file {filepath}. File " f"contains no data or is not in the " f"correct format!") df = self._convert_numeric_columns(df) self.cost_assumptions = self.convert_df_to_cost_dict(df) return self.cost_assumptions except (pd.errors.ParserError, ValueError, IOError) as first_error: # If there's an issue, try reading as strings and convert manually try: df = pd.read_excel(filepath, dtype=str) if df.empty: msg = (f"Failed to read Excel file {filepath}. File contains no " f"data or is not in the correct format!") raise FileLoadError(msg) df = self._convert_numeric_columns(df) self.cost_assumptions = self.convert_df_to_cost_dict(df) return self.cost_assumptions except (pd.errors.ParserError, ValueError, IOError) as e: msg = (f"Failed to read Excel file {filepath}. Original error: " f"{first_error}. Second attempt error: {e}") raise FileLoadError(msg)
[docs] def convert_df_to_cost_dict( self, df: pd.DataFrame ) -> dict: """ Convert a DataFrame to a nested dictionary for cost assumptions. Parameters: df: DataFrame containing cost assumptions with hierarchical structure Returns: dictionary of cost assumptions with nested structure based on DataFrame columns Uses one numeric column for costs, and all other columns as a hierarchical index: - The first column is the 'main_feature' - All additional columns are 'side_features' """ # First ensure numeric columns are properly converted df = self._convert_numeric_columns(df) # Find the numeric column for costs numeric_columns = df.select_dtypes(include=[np.number]).columns.tolist() if not numeric_columns: raise FormatError("No numeric column found for cost values") # Use the first numeric column as the cost column cost_column = numeric_columns[0] # All non-numeric columns form the hierarchical index index_columns = [col for col in df.columns if col != cost_column] if not index_columns: raise FormatError("No columns found for feature hierarchy") # Fill NaN values and assign features for ci, column in enumerate(index_columns): df[column] = df[column].fillna('') if ci == 0: self.main_feature = column else: self.side_features.append(column) # Create a series with a MultiIndex and convert to nested dictionaries cost_series = df.set_index(index_columns)[cost_column] return cost_series.to_dict()
@staticmethod def _convert_numeric_columns( df: pd.DataFrame ) -> pd.DataFrame: """ Convert columns to numeric, handling different decimal separators. Parameters: df: DataFrame with potential numeric columns that might use different decimal separators Returns: DataFrame with properly converted numeric columns """ for col in df.columns: # Skip columns that are already numeric or clearly not numeric if pd.api.types.is_numeric_dtype(df[col]) or df[col].dtype != object: continue # Try to convert using various decimal separators original_values = df[col].copy() # Try standard conversion df[col] = pd.to_numeric(df[col], errors='coerce') # If conversion successful and no NaN values were introduced, continue if df[col].isna().sum() == original_values.isna().sum(): continue # Restore original values for next attempt df[col] = original_values # Try comma as decimal separator try: df[col] = df[col].str.replace(',', '.').astype(float) except (ValueError, AttributeError): # Revert to original if both attempts fail df[col] = original_values return df
[docs] def apply_to_geodataframe( self, gdf: GeoDataFrame, main_feature: Optional[str] = None, side_features: Optional[list[str]] = None ): """ Apply cost assumptions to a GeoDataFrame. Parameters: gdf: GeoDataFrame to apply costs to main_feature: Main feature column name side_features: list of side feature column names or single side feature name Returns: GeoDataFrame with 'cost' column added """ main_feature = main_feature or self.main_feature if side_features is None: side_features = self.side_features elif isinstance(side_features, str): side_features = [side_features] if main_feature is None: raise FormatError("Main feature column not specified") CostAssumptions._init_feature_columns(gdf, main_feature, side_features) self._set_cost_column(gdf, main_feature, side_features) return gdf
def _set_cost_column(self, gdf, main_feature, side_features): # Handle different cost assumption structures first_key = next(iter(self.cost_assumptions), None) if isinstance(first_key, tuple): # Complex tuple keys structure - from multi-index self._apply_tuple_costs(gdf, main_feature, side_features) elif side_features and isinstance(next(iter(self.cost_assumptions.values()), None), dict): # Nested dictionary structure self._apply_nested_costs(gdf, main_feature, side_features) else: # Simple mapping with numeric values gdf['cost'] = gdf[main_feature].map(self.cost_assumptions) @staticmethod def _init_feature_columns(gdf, main_feature, side_features): # Fill NA values gdf[main_feature] = gdf[main_feature].fillna('') for feat in side_features: gdf[feat] = gdf[feat].fillna('') def _apply_tuple_costs( self, gdf: GeoDataFrame, main_feature: Optional[str] = None, side_features: Optional[list[str]] = None ): """ Apply costs to the GeoDataFrame based on tuple keys in cost assumptions. Parameters: gdf: GeoDataFrame to update with cost values main_feature: Column name for the primary feature side_features: List of column names for secondary features Returns: None (modifies gdf in-place) """ # Create wildcard dictionary for default values iter_items = self.cost_assumptions.items() wild_cards = {keys[0]: value for keys, value in iter_items if '' in keys} # Apply specific mappings for keys, value in self.cost_assumptions.items(): main_key, *side_keys = keys mask = gdf[main_feature] == main_key for side_feature, side_key in zip(side_features, side_keys): mask &= gdf[side_feature] == side_key gdf.loc[mask, 'cost'] = value # Apply wildcards for missing values cost_nan = gdf['cost'].isna() for wild_card_key, wild_card_value in wild_cards.items(): mask = (gdf[main_feature] == wild_card_key) & cost_nan gdf.loc[mask, 'cost'] = wild_card_value def _apply_nested_costs( self, gdf: GeoDataFrame, main_feature: Optional[str] = None, side_features: Optional[list[str]] = None ): """ Apply costs to the GeoDataFrame based on nested dictionary cost assumptions. Parameters: gdf: GeoDataFrame to update with cost values main_feature: Column name for the primary feature side_features: List containing a single column name for the secondary feature Returns: None (modifies gdf in-place) """ if len(side_features) != 1: msg = "Multiple side features not supported for nested dictionary structure" raise FormatError(msg) side_feature = side_features[0] # Iterate over each main feature value and its inner dictionary for main_value, inner_dict in self.cost_assumptions.items(): # Create mask for the main feature main_mask = gdf[main_feature] == main_value # Apply costs for each side feature value for side_value, cost in inner_dict.items(): if side_value == "" or pd.isnull(side_value): # Handle wildcard/default values side_mask = (gdf[side_feature].isnull() | (gdf[side_feature] == side_value) | ~gdf[side_feature].isin(inner_dict.keys())) else: # Standard case - exact match side_mask = gdf[side_feature] == side_value # Apply cost where both masks match combined_mask = main_mask & side_mask gdf.loc[combined_mask, 'cost'] = cost
[docs] def to_csv( self, filepath: str, separator: str = ';', decimal: str = '.', encoding: str = 'ISO-8859-1' ) -> None: """ Save the cost assumptions to a CSV file. Parameters: filepath: Path where to save the CSV file separator: Column separator character (default is ';') decimal: Decimal separator character (default is '.') encoding: The encoding of the file (default is 'ISO-8859-1') """ # Convert the nested dictionary to DataFrame df = self.cost_dict_to_df(self.cost_assumptions) # Handle decimal separator conversion if needed if decimal == ',': # Convert numeric columns to use comma as decimal separator for col in df.select_dtypes(include=[np.number]).columns: df[col] = df[col].astype(str).str.replace('.', ',') # Save DataFrame to CSV df.to_csv(filepath, sep=separator, index=False, encoding=encoding)
[docs] def to_json( self, filepath: str, indent: int = 2, encoding: str = 'ISO-8859-1' ) -> None: """ Save the cost assumptions to a JSON file. Parameters: filepath: Path where to save the JSON file indent: Number of spaces for indentation (default is 2) encoding: The encoding of the file (default is 'ISO-8859-1') """ # Create a structure that can be properly serialized to JSON output_dict = { 'metadata': { 'main_feature': self.main_feature, 'side_features': self.side_features }, 'cost_assumptions': {} } # Convert the cost assumptions dictionary to a JSON-serializable format if self.cost_assumptions: first_key = next(iter(self.cost_assumptions)) if isinstance(first_key, tuple): # Handle tuple keys by converting them to string representations for key, value in self.cost_assumptions.items(): key_str = "__".join(str(k) for k in key) output_dict['cost_assumptions'][key_str] = value else: # Regular keys can be directly serialized output_dict['cost_assumptions'] = self.cost_assumptions with open(filepath, mode='w', encoding=encoding) as f: json.dump(output_dict, f, indent=indent)
[docs] def to_excel( self, filepath: str, sheet_name: str = 'CostAssumptions', index: bool = False ) -> None: """ Save the cost assumptions to an Excel file. Parameters: filepath: Path where to save the Excel file sheet_name: Name of the worksheet (default is 'CostAssumptions') index: Whether to write row indices (default is False) """ # Convert the nested dictionary to DataFrame df = self.cost_dict_to_df(self.cost_assumptions) # Save DataFrame to Excel df.to_excel(filepath, sheet_name=sheet_name, index=index)
[docs] def cost_dict_to_df( self, cost_dict: dict ) -> pd.DataFrame: """ Convert cost assumptions dictionary to DataFrame. Parameters: cost_dict: Dictionary of cost assumptions Returns: DataFrame representation of cost assumptions """ if cost_dict is None: cost_dict = self.cost_assumptions # Check if it's a simple or nested dictionary first_key = next(iter(cost_dict), None) if isinstance(first_key, tuple): # Handle tuple-based structure data = [] for keys, cost in cost_dict.items(): main_key, *side_keys = keys row = {self.main_feature: main_key} for side_feature, side_key in zip(self.side_features, side_keys): row[side_feature] = side_key row['cost'] = cost data.append(row) return pd.DataFrame(data) elif (self.side_features and isinstance(next(iter(cost_dict.values()), None), dict)): # Handle nested dictionary structure data = [] for main_value, inner_dict in cost_dict.items(): for side_value, cost in inner_dict.items(): row = { self.main_feature: main_value, self.side_features[0]: side_value, 'cost': cost } data.append(row) return pd.DataFrame(data) else: # Simple mapping return pd.DataFrame({ self.main_feature: list(cost_dict.keys()), 'cost': list(cost_dict.values()) })
[docs] def save_empty_cost_assumptions( geo_dataset: Any, save_path: Union[str, Path], main_feature: Optional[str] = None, side_features: Optional[list[str]] = None, file_type: str = 'csv', **kwargs ) -> dict: """ Generate and save empty cost assumptions with zero values for a geo dataset. This function analyzes the given dataset to detect appropriate feature columns, creates a CostAssumptions object with zero costs for all feature combinations, and saves it to the specified path in the requested format. Parameters: geo_dataset: GeoDataset object with a 'data' attribute containing a GeoDataFrame save_path: File path where the cost assumptions should be saved main_feature: Column name for the primary feature side_features: List containing a single column name for the secondary feature file_type: Output file format - one of 'json', 'csv', or 'excel' (default is 'json') Raises: TypeError: If file_type is not one of the supported formats NoSuitableColumnsError: If no suitable columns can be detected in the dataset Returns: None: This function saves to a file and doesn't return a value """ if main_feature is None or not side_features: # Detect main feature and side features from the GeoDataFrame mf, sf = detect_feature_columns(geo_dataset.data) main_feature = mf if main_feature is None else main_feature side_features = sf if not side_features else side_features # Generate cost assumptions with zero costs for all feature combinations cost_assumptions = get_zero_cost_assumptions(geo_dataset.data, main_feature, side_features) # Save the cost assumptions in the appropriate format if file_type == 'json': cost_assumptions.to_json(save_path, **kwargs) elif file_type == 'csv': cost_assumptions.to_csv(save_path, **kwargs) elif file_type == "excel": cost_assumptions.to_excel(save_path, **kwargs) else: raise TypeError("Parameter file_type must be 'json', 'csv' or 'excel'!") return cost_assumptions.cost_assumptions
[docs] def detect_feature_columns( gdf: GeoDataFrame, max_features_per_column: int = 50 ) -> tuple[str, list[str]]: """ Analyze columns in a geodataframe to identify the best candidates for main_feature and side_features based on statistical metrics. Parameters: gdf: GeoDataFrame to analyze max_features_per_column: Maximum number of unique values allowed in a categorical column Returns: tuple of (main_feature, side_features) Raises: NoSuitableColumnsError: When no suitable columns are found for feature selection """ # Filter out geometry and standard spatial columns ignore_columns = ['geometry', 'id', 'fid', 'gid', 'oid'] non_spatial_cols = [col for col in gdf.columns if col not in ignore_columns] if not non_spatial_cols: msg = "No suitable feature columns found in the geodataframe" raise NoSuitableColumnsError(msg) # Analyze columns by their data characteristics col_stats = calculate_column_statistics(gdf, non_spatial_cols, max_features_per_column) # No good candidates found if not col_stats: msg = "No suitable categorical columns found in the geodataframe" raise NoSuitableColumnsError(msg) # Select main feature column (nutzart) main_feature = select_main_feature(col_stats) # Find suitable side features (bez) side_features = find_side_features(gdf, main_feature, col_stats) return main_feature, side_features
[docs] def find_side_features( gdf: GeoDataFrame, main_feature: str, col_stats: dict[str, dict[str, Any]] ) -> list[str]: """ Find suitable side feature columns that refine the main feature. Parameters: gdf: GeoDataFrame to analyze main_feature: Selected main feature column name col_stats: dictionary with column statistics Returns: list of side feature column names """ def is_column_candidate(column: str) -> bool: return column != main_feature and col_stats[column]['null_ratio'] <= 0.7 # For all columns allow up to 70% nulls general_candidates = [col for col in col_stats if is_column_candidate(col)] side_features = [] # Then process other candidates with stricter criteria for col in general_candidates: if column_shows_relationship_to_main_feature(gdf, main_feature, col): side_features.append(col) # Sort side features by information content def get_entropy(col): return col_stats.get(col, {}).get('count_entropy', 0) side_features.sort(key=get_entropy, reverse=True) return side_features if side_features else None
[docs] def column_shows_relationship_to_main_feature( gdf: GeoDataFrame, main_feature: str, side_feature: str ) -> bool: """ Determine if a column adds meaningful information in relation to the main feature. Parameters: gdf: GeoDataFrame containing the data main_feature: Name of the main feature column side_feature: Name of the potential side feature column Returns: True if the column shows a meaningful relationship, False otherwise """ try: # Create a cross-tabulation of the two columns crosstab = pd.crosstab(gdf[main_feature], gdf[side_feature]) # Skip columns with too many unique values if len(crosstab.columns) > 100: return False # Check for non-empty cells density non_empty_cells = pd.DataFrame(crosstab > 0).sum().sum() total_cells = crosstab.size # If there's a good density of non-empty combinations, that's a good sign if non_empty_cells / total_cells > 0.05: return True # Even with many nulls, check if there's a pattern to the non-nulls for _, row in crosstab.iterrows(): non_zero_vals = row[row > 0] # Skip rows with only one value if len(non_zero_vals) <= 1: continue # Check if there's diversity in the values if len(non_zero_vals) >= 2: return True # Special check for columns with many nulls: # If certain main values have side values while others don't, that's meaningful null_cols = [col for col in crosstab.columns if pd.isna(col) or col == ''] if null_cols: non_null_main_values = 0 for _, row in crosstab.iterrows(): if row.drop(null_cols, errors='ignore').sum() > 0: non_null_main_values += 1 # If some main values have side values and others don't, that's meaningful if 0 < non_null_main_values < len(crosstab.index): return True return False except (ValueError, TypeError): # If analysis fails, be conservative and return False return False
[docs] def get_zero_cost_assumptions( gdf: GeoDataFrame, main_feature: str, side_features: list[str] ) -> CostAssumptions: """ Generate cost assumptions with zero values for all feature combinations. Creates structures matching format for CostAssumptions: - Without side features: {main_feature: {val1: 0, val2: 0, ...}} - With side features: {(main_feature, side_feature1, ...): {(val1, val2, ...): 0, ...}} Parameters: gdf: GeoDataFrame with feature columns main_feature: Primary feature column name side_features: List of secondary feature column names Returns: CostAssumptions: Instacne of zero-cost assumptions """ if not side_features: # For simple case with only main feature unique_values = gdf[main_feature].unique() cost_dict = {main_feature: dict(zip(unique_values, unique_values.size * [0]))} else: # For complex case with side features columns = [main_feature] + side_features keys = pd.MultiIndex.from_frame(gdf.loc[:, columns]).values def key_valid(key): return not isinstance(key, str) and np.isnan(key) keys = [tuple(['' if key_valid(key) else key for key in row]) for row in keys] cost_dict = {tuple(columns): dict(zip(keys, len(keys) * [0]))} return CostAssumptions(cost_dict)
[docs] def calculate_geometry_area( geometries: Union[GeoSeries,] ) -> float: """ Calculate the sum of areas for a collection of geometries. Parameters: geometries: Collection of geometry objects Returns: Sum of areas of all geometries with area attribute """ total_area = 0 for geom in geometries: if hasattr(geom, 'area'): total_area += geom.area return total_area
[docs] def calculate_column_statistics( gdf: GeoDataFrame, columns: list[str], max_features_per_column: int = 50 ) -> dict[str, dict[str, Any]]: """ Calculate statistical properties of columns for feature selection. Parameters: gdf: GeoDataFrame to analyze columns: list of column names to analyze max_features_per_column: Maximum number of unique values for a column to be considered categorical Returns: dictionary with column statistics Raises: ColumnAnalysisError: When column analysis fails unexpectedly """ col_stats = {} # First pass: filter columns and calculate basic stats candidate_columns = [] for col in columns: # Skip numeric columns with many unique values if pd.api.types.is_numeric_dtype(gdf[col]) and gdf[col].nunique() > 20: continue # Calculate value counts value_counts = gdf[col].value_counts() # Skip columns with too many unique values (likely not categorical) if len(value_counts) > max_features_per_column: continue _get_column_statistics(gdf, col, col_stats, candidate_columns, value_counts) # Second pass: calculate area-based statistics only for candidates for col in candidate_columns: # Initialize area-based statistics area_entropy = 0 area_by_value = None area_fraction = None area_by_value, area_entropy, area_fraction = _get_column_metrics(gdf, col, area_by_value, area_entropy, area_fraction) # Update with area-based statistics col_stats[col].update({ 'area_by_value': area_by_value, 'area_fraction': area_fraction, 'area_entropy': area_entropy, }) return col_stats
def _get_column_statistics(gdf, col, col_stats, candidate_columns, value_counts): """ Calculate basic statistical properties for a single column. Parameters: gdf: GeoDataFrame containing the data col: Column name to analyze col_stats: Dictionary to store calculated statistics candidate_columns: List to append good candidate columns value_counts: Pre-calculated value counts for the column """ # Calculate basic statistics null_ratio = gdf[col].isna().mean() # Proportion of missing values # Determine if column is a good candidate for analysis is_good_candidate = ( len(value_counts) > 1 and # More than one unique value (len(value_counts) < len(gdf) * 0.3) and # Not too many categories null_ratio < 0.2 # Low missing data rate ) # Calculate entropy of count distribution (measures diversity) count_fractions = value_counts / len(gdf) # Convert counts to proportions count_entropy = -sum( (count_fractions * np.log2(count_fractions)).dropna()) # Shannon entropy # Store basic stats col_stats[col] = { 'unique_values': len(value_counts), 'max_count': value_counts.max() if len(value_counts) > 0 else 0, 'min_count': value_counts.min() if len(value_counts) > 0 else 0, 'count_entropy': count_entropy, 'null_ratio': null_ratio, 'is_good': is_good_candidate } candidate_columns.append(col) def _get_column_metrics(gdf, col, area_by_value, area_entropy, area_fraction): """ Calculate area-based metrics for a column using geometric information. Parameters: gdf: GeoDataFrame with geometry column col: Column name to calculate metrics for area_by_value: Initial area by value (will be calculated) area_entropy: Initial area entropy (will be calculated) area_fraction: Initial area fraction (will be calculated) Returns: tuple: (area_by_value, area_entropy, area_fraction) with calculated metrics """ try: # Group by column and calculate total area for each value area_by_value = gdf.groupby(col)['geometry'].apply(calculate_geometry_area) total_area = area_by_value.sum() # Calculate area-based statistics if total area is positive if total_area > 0: # Calculate what fraction of total area each value represents area_fraction = area_by_value / total_area # Calculate entropy of area distribution (measures spatial diversity) if not area_fraction.isna().all(): # Shannon entropy formula: -sum(p * log2(p)) where p is proportion entropies = (area_fraction * np.log2(area_fraction)).dropna() area_entropy = -sum(entropies) except (AttributeError, ValueError, TypeError): # Continue with default values for area statistics if geometry operations fail # This handles cases where geometry might be invalid or missing pass return area_by_value, area_entropy, area_fraction
[docs] def calculate_entropy_score( column_name: str, col_stats: dict[str, dict[str, Any]] ) -> float: """ Calculate combined entropy score for a column, weighing area entropy more heavily. Parameters: column_name: Name of the column to calculate score for col_stats: dictionary with column statistics Returns: Combined entropy score """ stats = col_stats[column_name] return stats['area_entropy'] * 0.7 + stats['count_entropy'] * 0.3
[docs] def select_main_feature( col_stats: dict[str, dict[str, Any]] ) -> str: """ Select the best main feature column based on statistics. Parameters: col_stats: dictionary with column statistics Returns: Name of the best main feature column """ # Select the best main feature column main_candidates = [col for col, stats in col_stats.items() if stats['is_good']] if not main_candidates: # Fall back to any column if no good candidates main_candidates = list(col_stats.keys()) # Sort by entropy score (higher is better) sorted_candidates = sorted( main_candidates, key=lambda c: calculate_entropy_score(c, col_stats), reverse=True ) return sorted_candidates[0]