""" GPX file processing utilities for extracting route statistics and creating map routes """ import xml.etree.ElementTree as ET import math import os import json import gpxpy from typing import Dict, Optional, Tuple, List from app import db def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """ Calculate the great circle distance between two points on the earth (specified in decimal degrees) Returns distance in kilometers """ # Convert decimal degrees to radians lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2]) # Haversine formula dlat = lat2 - lat1 dlon = lon2 - lon1 a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2 c = 2 * math.asin(math.sqrt(a)) # Radius of earth in kilometers r = 6371 return c * r def extract_gpx_statistics(file_path: str) -> Optional[Dict]: """ Extract statistics from a GPX file Returns: Dictionary with keys: total_distance, elevation_gain, max_elevation, min_elevation, total_points, or None if file cannot be processed """ if not os.path.exists(file_path): return None try: # Parse GPX file tree = ET.parse(file_path) root = tree.getroot() # Handle GPX namespace namespace = {'gpx': 'http://www.topografix.com/GPX/1/1'} if not root.tag.endswith('gpx'): # Try without namespace namespace = {} # Find track points track_points = [] # Look for track points in tracks tracks = root.findall('.//gpx:trk', namespace) if namespace else root.findall('.//trk') for track in tracks: segments = track.findall('.//gpx:trkseg', namespace) if namespace else track.findall('.//trkseg') for segment in segments: points = segment.findall('.//gpx:trkpt', namespace) if namespace else segment.findall('.//trkpt') for point in points: lat = float(point.get('lat')) lon = float(point.get('lon')) # Get elevation if available ele_elem = point.find('gpx:ele', namespace) if namespace else point.find('ele') elevation = float(ele_elem.text) if ele_elem is not None and ele_elem.text else 0.0 track_points.append({ 'lat': lat, 'lon': lon, 'elevation': elevation }) # Also look for waypoints if no track points found if not track_points: waypoints = root.findall('.//gpx:wpt', namespace) if namespace else root.findall('.//wpt') for point in waypoints: lat = float(point.get('lat')) lon = float(point.get('lon')) ele_elem = point.find('gpx:ele', namespace) if namespace else point.find('ele') elevation = float(ele_elem.text) if ele_elem is not None and ele_elem.text else 0.0 track_points.append({ 'lat': lat, 'lon': lon, 'elevation': elevation }) if not track_points: return { 'total_distance': 0.0, 'elevation_gain': 0.0, 'max_elevation': 0.0, 'min_elevation': 0.0, 'total_points': 0 } # Calculate statistics total_distance = 0.0 elevation_gain = 0.0 elevations = [point['elevation'] for point in track_points if point['elevation'] > 0] # Calculate distance and elevation gain for i in range(1, len(track_points)): current = track_points[i] previous = track_points[i-1] # Distance distance = calculate_distance( previous['lat'], previous['lon'], current['lat'], current['lon'] ) total_distance += distance # Elevation gain (only uphill) if current['elevation'] > 0 and previous['elevation'] > 0: elevation_diff = current['elevation'] - previous['elevation'] if elevation_diff > 0: elevation_gain += elevation_diff # Elevation statistics max_elevation = max(elevations) if elevations else 0.0 min_elevation = min(elevations) if elevations else 0.0 return { 'total_distance': round(total_distance, 2), 'elevation_gain': round(elevation_gain, 1), 'max_elevation': round(max_elevation, 1), 'min_elevation': round(min_elevation, 1), 'total_points': len(track_points) } except Exception as e: print(f"Error processing GPX file {file_path}: {e}") return None def process_gpx_file(gpx_file_record) -> bool: """ Process a GPXFile record and update its statistics Args: gpx_file_record: GPXFile model instance Returns: True if processing was successful, False otherwise """ from flask import current_app # Build file path if gpx_file_record.post.media_folder: file_path = os.path.join( current_app.root_path, 'static', 'media', 'posts', gpx_file_record.post.media_folder, 'gpx', gpx_file_record.filename ) else: file_path = os.path.join( current_app.root_path, 'static', 'uploads', 'gpx', gpx_file_record.filename ) # Extract statistics stats = extract_gpx_statistics(file_path) if stats is None: return False # Update the record gpx_file_record.total_distance = stats['total_distance'] gpx_file_record.elevation_gain = stats['elevation_gain'] gpx_file_record.max_elevation = stats['max_elevation'] gpx_file_record.min_elevation = stats['min_elevation'] gpx_file_record.total_points = stats['total_points'] return True def simplify_coordinates(coordinates: List[List[float]], tolerance: float = 0.0001) -> List[List[float]]: """ Simplify coordinates using Douglas-Peucker algorithm Returns a reduced set of points while maintaining the general shape """ if len(coordinates) <= 2: return coordinates def perpendicular_distance(point, line_start, line_end): """Calculate perpendicular distance from point to line""" if line_start == line_end: return ((point[0] - line_start[0]) ** 2 + (point[1] - line_start[1]) ** 2) ** 0.5 # Calculate the perpendicular distance A = point[0] - line_start[0] B = point[1] - line_start[1] C = line_end[0] - line_start[0] D = line_end[1] - line_start[1] dot = A * C + B * D len_sq = C * C + D * D if len_sq == 0: return (A * A + B * B) ** 0.5 param = dot / len_sq if param < 0: xx = line_start[0] yy = line_start[1] elif param > 1: xx = line_end[0] yy = line_end[1] else: xx = line_start[0] + param * C yy = line_start[1] + param * D dx = point[0] - xx dy = point[1] - yy return (dx * dx + dy * dy) ** 0.5 def douglas_peucker(points, tolerance): """Douglas-Peucker algorithm implementation""" if len(points) <= 2: return points # Find the point with maximum distance max_dist = 0 index = 0 for i in range(1, len(points) - 1): dist = perpendicular_distance(points[i], points[0], points[-1]) if dist > max_dist: index = i max_dist = dist # If max distance is greater than tolerance, recursively simplify if max_dist > tolerance: # Recursive call rec_results1 = douglas_peucker(points[:index + 1], tolerance) rec_results2 = douglas_peucker(points[index:], tolerance) # Build the result list result = rec_results1[:-1] + rec_results2 return result else: return [points[0], points[-1]] return douglas_peucker(coordinates, tolerance) def calculate_bounds(coordinates: List[List[float]]) -> Optional[Dict]: """Calculate bounding box for coordinates""" if not coordinates: return None lats = [coord[0] for coord in coordinates] lngs = [coord[1] for coord in coordinates] return { 'north': max(lats), 'south': min(lats), 'east': max(lngs), 'west': min(lngs) } def create_map_route_from_gpx(gpx_file_id: int) -> bool: """ Process a GPX file and create/update corresponding MapRoute entry """ try: from app.models import MapRoute, GPXFile, Post from flask import current_app # Get the GPX file record gpx_file = GPXFile.query.get(gpx_file_id) if not gpx_file: print(f"GPX file with ID {gpx_file_id} not found") return False # Get the file path if gpx_file.post.media_folder: file_path = os.path.join( current_app.root_path, 'static', 'media', 'posts', gpx_file.post.media_folder, 'gpx', gpx_file.filename ) else: file_path = os.path.join( current_app.root_path, 'static', 'uploads', 'gpx', gpx_file.filename ) if not os.path.exists(file_path): print(f"GPX file not found: {file_path}") return False # Parse GPX file using gpxpy for better coordinate extraction try: with open(file_path, 'r', encoding='utf-8') as f: gpx = gpxpy.parse(f) except Exception as e: print(f"Error parsing GPX file with gpxpy: {e}") return False # Extract coordinates from all tracks and segments all_coordinates = [] total_distance = 0 elevations = [] for track in gpx.tracks: for segment in track.segments: prev_point = None for point in segment.points: coord = [point.latitude, point.longitude] all_coordinates.append(coord) if point.elevation is not None: elevations.append(point.elevation) # Calculate distance if prev_point: distance = prev_point.distance_2d(point) if distance: total_distance += distance prev_point = point # If no track points, try routes if not all_coordinates: for route in gpx.routes: prev_point = None for point in route.points: coord = [point.latitude, point.longitude] all_coordinates.append(coord) if point.elevation is not None: elevations.append(point.elevation) # Calculate distance if prev_point: distance = prev_point.distance_2d(point) if distance: total_distance += distance prev_point = point # If no track or route points, try waypoints if not all_coordinates: for waypoint in gpx.waypoints: coord = [waypoint.latitude, waypoint.longitude] all_coordinates.append(coord) if waypoint.elevation is not None: elevations.append(waypoint.elevation) if not all_coordinates: print("No coordinates found in GPX file") return False # Calculate statistics elevation_gain = 0 if elevations: for i in range(1, len(elevations)): gain = elevations[i] - elevations[i-1] if gain > 0: elevation_gain += gain # Simplify coordinates for overview map simplified_coords = simplify_coordinates(all_coordinates, tolerance=0.001) # Calculate bounds bounds = calculate_bounds(all_coordinates) if not bounds: print("Could not calculate bounds for coordinates") return False # Create or update MapRoute existing_route = MapRoute.query.filter_by(post_id=gpx_file.post_id).first() if existing_route: # Update existing route map_route = existing_route else: # Create new route map_route = MapRoute(post_id=gpx_file.post_id, gpx_file_id=gpx_file_id) # Set route data map_route.coordinates = json.dumps(all_coordinates) map_route.simplified_coordinates = json.dumps(simplified_coords) # Set start and end points map_route.start_latitude = all_coordinates[0][0] map_route.start_longitude = all_coordinates[0][1] map_route.end_latitude = all_coordinates[-1][0] map_route.end_longitude = all_coordinates[-1][1] # Set bounds map_route.bounds_north = bounds['north'] map_route.bounds_south = bounds['south'] map_route.bounds_east = bounds['east'] map_route.bounds_west = bounds['west'] # Set statistics map_route.total_distance = total_distance / 1000 if total_distance else 0 # Convert to kilometers map_route.elevation_gain = elevation_gain map_route.max_elevation = max(elevations) if elevations else 0 map_route.min_elevation = min(elevations) if elevations else 0 map_route.total_points = len(all_coordinates) map_route.simplified_points = len(simplified_coords) # Save to database if not existing_route: db.session.add(map_route) db.session.commit() print(f"Successfully created map route for GPX file {gpx_file.filename} (Post {gpx_file.post_id})") print(f"- Total points: {len(all_coordinates)}") print(f"- Simplified points: {len(simplified_coords)}") print(f"- Distance: {map_route.total_distance:.2f} km") print(f"- Elevation gain: {map_route.elevation_gain:.0f} m") return True except Exception as e: db.session.rollback() print(f"Error creating map route for GPX file {gpx_file_id}: {str(e)}") return False def process_post_approval(post_id: int) -> bool: """ Process all GPX files for a post when it gets approved Creates map routes for efficient map loading """ try: from app.models import Post, GPXFile post = Post.query.get(post_id) if not post: print(f"Post with ID {post_id} not found") return False # Get all GPX files for this post gpx_files = GPXFile.query.filter_by(post_id=post_id).all() if not gpx_files: print(f"No GPX files found for post {post_id}") return True # Not an error if no GPX files # Process the first GPX file (assuming one route per post) # If multiple files exist, you might want to merge them or process separately gpx_file = gpx_files[0] success = create_map_route_from_gpx(gpx_file.id) if success: print(f"Successfully processed post {post_id} approval - map route created") else: print(f"Failed to create map route for post {post_id}") return success except Exception as e: print(f"Error processing post approval for post {post_id}: {str(e)}") return False def get_all_map_routes() -> List[Dict]: """ Get all map routes for the community map Returns simplified data optimized for map display """ try: from app.models import MapRoute, Post routes = MapRoute.query.join(Post).filter(Post.published == True).all() map_data = [] for route in routes: try: map_data.append({ 'id': route.id, 'post_id': route.post_id, 'post_title': route.post.title, 'post_author': route.post.author.nickname, 'coordinates': route.get_simplified_coordinates_json(), 'start_point': route.get_start_point(), 'end_point': route.get_end_point(), 'bounds': route.get_bounds(), 'stats': { 'distance': route.total_distance, 'elevation_gain': route.elevation_gain, 'max_elevation': route.max_elevation } }) except Exception as e: print(f"Error processing route {route.id}: {e}") continue return map_data except Exception as e: print(f"Error getting map routes: {str(e)}") return [] def get_post_route_details(post_id: int) -> Optional[Dict]: """ Get detailed route data for a specific post Returns full coordinate data for detailed view """ try: from app.models import MapRoute route = MapRoute.query.filter_by(post_id=post_id).first() if not route: return None return { 'id': route.id, 'post_id': route.post_id, 'coordinates': route.get_coordinates_json(), 'simplified_coordinates': route.get_simplified_coordinates_json(), 'start_point': route.get_start_point(), 'end_point': route.get_end_point(), 'bounds': route.get_bounds(), 'stats': { 'distance': route.total_distance, 'elevation_gain': route.elevation_gain, 'max_elevation': route.max_elevation, 'min_elevation': route.min_elevation, 'total_points': route.total_points, 'simplified_points': route.simplified_points } } except Exception as e: print(f"Error getting route details for post {post_id}: {str(e)}") return None