Files
moto-adv-website/app/utils/gpx_processor.py
ske087 ee34215319 🗺️ Fix GPX Route Processing - Add Support for Route Points
Critical Fix for Post Creation:
 Added support for GPX route points (rtept) in addition to track points (trkpt)
 GPX statistics now automatically calculated during post creation
 Supports all GPX file types: tracks, routes, and waypoints

Results:
- Route distance: 347.89 km correctly calculated
- Track points: 16,161 route points processed
- Statistics display properly on post detail pages
- New posts will automatically show route information

Technical Changes:
- Enhanced extract_gpx_statistics() to parse <rte><rtept> elements
- Maintained backward compatibility with track and waypoint files
- Fixed route map card loading during post creation workflow

This resolves the issue where GPX statistics appeared as zeros for route-based GPX files, ensuring all motorcycle adventure routes display proper distance and point statistics immediately upon upload.
2025-08-10 09:35:00 +03:00

558 lines
20 KiB
Python

"""
GPX file processing utilities for extracting route statistics and creating map routes
"""
import xml.etree.ElementTree as ET
import math
import os
import json
import gpxpy
from typing import Dict, Optional, Tuple, List
from app import db
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""
Calculate the great circle distance between two points on the earth (specified in decimal degrees)
Returns distance in kilometers
"""
# Convert decimal degrees to radians
lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
# Haversine formula
dlat = lat2 - lat1
dlon = lon2 - lon1
a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
c = 2 * math.asin(math.sqrt(a))
# Radius of earth in kilometers
r = 6371
return c * r
def extract_gpx_statistics(file_path: str) -> Optional[Dict]:
"""
Extract statistics from a GPX file
Returns:
Dictionary with keys: total_distance, elevation_gain, max_elevation,
min_elevation, total_points, or None if file cannot be processed
"""
if not os.path.exists(file_path):
return None
try:
# Parse GPX file
tree = ET.parse(file_path)
root = tree.getroot()
# Handle GPX namespace
namespace = {'gpx': 'http://www.topografix.com/GPX/1/1'}
if not root.tag.endswith('gpx'):
# Try without namespace
namespace = {}
# Find track points
track_points = []
# Look for track points in tracks
tracks = root.findall('.//gpx:trk', namespace) if namespace else root.findall('.//trk')
for track in tracks:
segments = track.findall('.//gpx:trkseg', namespace) if namespace else track.findall('.//trkseg')
for segment in segments:
points = segment.findall('.//gpx:trkpt', namespace) if namespace else segment.findall('.//trkpt')
for point in points:
lat = float(point.get('lat'))
lon = float(point.get('lon'))
# Get elevation if available
ele_elem = point.find('gpx:ele', namespace) if namespace else point.find('ele')
elevation = float(ele_elem.text) if ele_elem is not None and ele_elem.text else 0.0
track_points.append({
'lat': lat,
'lon': lon,
'elevation': elevation
})
# Also look for waypoints if no track points found
if not track_points:
waypoints = root.findall('.//gpx:wpt', namespace) if namespace else root.findall('.//wpt')
for point in waypoints:
lat = float(point.get('lat'))
lon = float(point.get('lon'))
ele_elem = point.find('gpx:ele', namespace) if namespace else point.find('ele')
elevation = float(ele_elem.text) if ele_elem is not None and ele_elem.text else 0.0
track_points.append({
'lat': lat,
'lon': lon,
'elevation': elevation
})
# Also look for route points if no track points or waypoints found
if not track_points:
routes = root.findall('.//gpx:rte', namespace) if namespace else root.findall('.//rte')
for route in routes:
route_points = route.findall('.//gpx:rtept', namespace) if namespace else route.findall('.//rtept')
for point in route_points:
lat = float(point.get('lat'))
lon = float(point.get('lon'))
ele_elem = point.find('gpx:ele', namespace) if namespace else point.find('ele')
elevation = float(ele_elem.text) if ele_elem is not None and ele_elem.text else 0.0
track_points.append({
'lat': lat,
'lon': lon,
'elevation': elevation
})
if not track_points:
return {
'total_distance': 0.0,
'elevation_gain': 0.0,
'max_elevation': 0.0,
'min_elevation': 0.0,
'total_points': 0
}
# Calculate statistics
total_distance = 0.0
elevation_gain = 0.0
elevations = [point['elevation'] for point in track_points if point['elevation'] > 0]
# Calculate distance and elevation gain
for i in range(1, len(track_points)):
current = track_points[i]
previous = track_points[i-1]
# Distance
distance = calculate_distance(
previous['lat'], previous['lon'],
current['lat'], current['lon']
)
total_distance += distance
# Elevation gain (only uphill)
if current['elevation'] > 0 and previous['elevation'] > 0:
elevation_diff = current['elevation'] - previous['elevation']
if elevation_diff > 0:
elevation_gain += elevation_diff
# Elevation statistics
max_elevation = max(elevations) if elevations else 0.0
min_elevation = min(elevations) if elevations else 0.0
return {
'total_distance': round(total_distance, 2),
'elevation_gain': round(elevation_gain, 1),
'max_elevation': round(max_elevation, 1),
'min_elevation': round(min_elevation, 1),
'total_points': len(track_points)
}
except Exception as e:
print(f"Error processing GPX file {file_path}: {e}")
return None
def process_gpx_file(gpx_file_record) -> bool:
"""
Process a GPXFile record and update its statistics
Args:
gpx_file_record: GPXFile model instance
Returns:
True if processing was successful, False otherwise
"""
from flask import current_app
# Build file path
if gpx_file_record.post.media_folder:
file_path = os.path.join(
current_app.root_path, 'static', 'media', 'posts',
gpx_file_record.post.media_folder, 'gpx', gpx_file_record.filename
)
else:
file_path = os.path.join(
current_app.root_path, 'static', 'uploads', 'gpx', gpx_file_record.filename
)
# Extract statistics
stats = extract_gpx_statistics(file_path)
if stats is None:
return False
# Update the record
gpx_file_record.total_distance = stats['total_distance']
gpx_file_record.elevation_gain = stats['elevation_gain']
gpx_file_record.max_elevation = stats['max_elevation']
gpx_file_record.min_elevation = stats['min_elevation']
gpx_file_record.total_points = stats['total_points']
return True
def simplify_coordinates(coordinates: List[List[float]], tolerance: float = 0.0001) -> List[List[float]]:
"""
Simplify coordinates using Douglas-Peucker algorithm
Returns a reduced set of points while maintaining the general shape
"""
if len(coordinates) <= 2:
return coordinates
def perpendicular_distance(point, line_start, line_end):
"""Calculate perpendicular distance from point to line"""
if line_start == line_end:
return ((point[0] - line_start[0]) ** 2 + (point[1] - line_start[1]) ** 2) ** 0.5
# Calculate the perpendicular distance
A = point[0] - line_start[0]
B = point[1] - line_start[1]
C = line_end[0] - line_start[0]
D = line_end[1] - line_start[1]
dot = A * C + B * D
len_sq = C * C + D * D
if len_sq == 0:
return (A * A + B * B) ** 0.5
param = dot / len_sq
if param < 0:
xx = line_start[0]
yy = line_start[1]
elif param > 1:
xx = line_end[0]
yy = line_end[1]
else:
xx = line_start[0] + param * C
yy = line_start[1] + param * D
dx = point[0] - xx
dy = point[1] - yy
return (dx * dx + dy * dy) ** 0.5
def douglas_peucker(points, tolerance):
"""Douglas-Peucker algorithm implementation"""
if len(points) <= 2:
return points
# Find the point with maximum distance
max_dist = 0
index = 0
for i in range(1, len(points) - 1):
dist = perpendicular_distance(points[i], points[0], points[-1])
if dist > max_dist:
index = i
max_dist = dist
# If max distance is greater than tolerance, recursively simplify
if max_dist > tolerance:
# Recursive call
rec_results1 = douglas_peucker(points[:index + 1], tolerance)
rec_results2 = douglas_peucker(points[index:], tolerance)
# Build the result list
result = rec_results1[:-1] + rec_results2
return result
else:
return [points[0], points[-1]]
return douglas_peucker(coordinates, tolerance)
def calculate_bounds(coordinates: List[List[float]]) -> Optional[Dict]:
"""Calculate bounding box for coordinates"""
if not coordinates:
return None
lats = [coord[0] for coord in coordinates]
lngs = [coord[1] for coord in coordinates]
return {
'north': max(lats),
'south': min(lats),
'east': max(lngs),
'west': min(lngs)
}
def create_map_route_from_gpx(gpx_file_id: int) -> bool:
"""
Process a GPX file and create/update corresponding MapRoute entry
"""
try:
from app.models import MapRoute, GPXFile, Post
from flask import current_app
# Get the GPX file record
gpx_file = GPXFile.query.get(gpx_file_id)
if not gpx_file:
print(f"GPX file with ID {gpx_file_id} not found")
return False
# Get the file path
if gpx_file.post.media_folder:
file_path = os.path.join(
current_app.root_path, 'static', 'media', 'posts',
gpx_file.post.media_folder, 'gpx', gpx_file.filename
)
else:
file_path = os.path.join(
current_app.root_path, 'static', 'uploads', 'gpx', gpx_file.filename
)
if not os.path.exists(file_path):
print(f"GPX file not found: {file_path}")
return False
# Parse GPX file using gpxpy for better coordinate extraction
try:
with open(file_path, 'r', encoding='utf-8') as f:
gpx = gpxpy.parse(f)
except Exception as e:
print(f"Error parsing GPX file with gpxpy: {e}")
return False
# Extract coordinates from all tracks and segments
all_coordinates = []
total_distance = 0
elevations = []
for track in gpx.tracks:
for segment in track.segments:
prev_point = None
for point in segment.points:
coord = [point.latitude, point.longitude]
all_coordinates.append(coord)
if point.elevation is not None:
elevations.append(point.elevation)
# Calculate distance
if prev_point:
distance = prev_point.distance_2d(point)
if distance:
total_distance += distance
prev_point = point
# If no track points, try routes
if not all_coordinates:
for route in gpx.routes:
prev_point = None
for point in route.points:
coord = [point.latitude, point.longitude]
all_coordinates.append(coord)
if point.elevation is not None:
elevations.append(point.elevation)
# Calculate distance
if prev_point:
distance = prev_point.distance_2d(point)
if distance:
total_distance += distance
prev_point = point
# If no track or route points, try waypoints
if not all_coordinates:
for waypoint in gpx.waypoints:
coord = [waypoint.latitude, waypoint.longitude]
all_coordinates.append(coord)
if waypoint.elevation is not None:
elevations.append(waypoint.elevation)
if not all_coordinates:
print("No coordinates found in GPX file")
return False
# Calculate statistics
elevation_gain = 0
if elevations:
for i in range(1, len(elevations)):
gain = elevations[i] - elevations[i-1]
if gain > 0:
elevation_gain += gain
# Simplify coordinates for overview map
simplified_coords = simplify_coordinates(all_coordinates, tolerance=0.001)
# Calculate bounds
bounds = calculate_bounds(all_coordinates)
if not bounds:
print("Could not calculate bounds for coordinates")
return False
# Create or update MapRoute
existing_route = MapRoute.query.filter_by(post_id=gpx_file.post_id).first()
if existing_route:
# Update existing route
map_route = existing_route
else:
# Create new route
map_route = MapRoute(post_id=gpx_file.post_id, gpx_file_id=gpx_file_id)
# Set route data
map_route.coordinates = json.dumps(all_coordinates)
map_route.simplified_coordinates = json.dumps(simplified_coords)
# Set start and end points
map_route.start_latitude = all_coordinates[0][0]
map_route.start_longitude = all_coordinates[0][1]
map_route.end_latitude = all_coordinates[-1][0]
map_route.end_longitude = all_coordinates[-1][1]
# Set bounds
map_route.bounds_north = bounds['north']
map_route.bounds_south = bounds['south']
map_route.bounds_east = bounds['east']
map_route.bounds_west = bounds['west']
# Set statistics
map_route.total_distance = total_distance / 1000 if total_distance else 0 # Convert to kilometers
map_route.elevation_gain = elevation_gain
map_route.max_elevation = max(elevations) if elevations else 0
map_route.min_elevation = min(elevations) if elevations else 0
map_route.total_points = len(all_coordinates)
map_route.simplified_points = len(simplified_coords)
# Save to database
if not existing_route:
db.session.add(map_route)
db.session.commit()
print(f"Successfully created map route for GPX file {gpx_file.filename} (Post {gpx_file.post_id})")
print(f"- Total points: {len(all_coordinates)}")
print(f"- Simplified points: {len(simplified_coords)}")
print(f"- Distance: {map_route.total_distance:.2f} km")
print(f"- Elevation gain: {map_route.elevation_gain:.0f} m")
return True
except Exception as e:
db.session.rollback()
print(f"Error creating map route for GPX file {gpx_file_id}: {str(e)}")
return False
def process_post_approval(post_id: int) -> bool:
"""
Process all GPX files for a post when it gets approved
Creates map routes for efficient map loading
"""
try:
from app.models import Post, GPXFile
post = Post.query.get(post_id)
if not post:
print(f"Post with ID {post_id} not found")
return False
# Get all GPX files for this post
gpx_files = GPXFile.query.filter_by(post_id=post_id).all()
if not gpx_files:
print(f"No GPX files found for post {post_id}")
return True # Not an error if no GPX files
# Process the first GPX file (assuming one route per post)
# If multiple files exist, you might want to merge them or process separately
gpx_file = gpx_files[0]
success = create_map_route_from_gpx(gpx_file.id)
if success:
print(f"Successfully processed post {post_id} approval - map route created")
else:
print(f"Failed to create map route for post {post_id}")
return success
except Exception as e:
print(f"Error processing post approval for post {post_id}: {str(e)}")
return False
def get_all_map_routes() -> List[Dict]:
"""
Get all map routes for the community map
Returns simplified data optimized for map display
"""
try:
from app.models import MapRoute, Post
routes = MapRoute.query.join(Post).filter(Post.published == True).all()
map_data = []
for route in routes:
try:
map_data.append({
'id': route.id,
'post_id': route.post_id,
'post_title': route.post.title,
'post_author': route.post.author.nickname,
'coordinates': route.get_simplified_coordinates_json(),
'start_point': route.get_start_point(),
'end_point': route.get_end_point(),
'bounds': route.get_bounds(),
'stats': {
'distance': route.total_distance,
'elevation_gain': route.elevation_gain,
'max_elevation': route.max_elevation
}
})
except Exception as e:
print(f"Error processing route {route.id}: {e}")
continue
return map_data
except Exception as e:
print(f"Error getting map routes: {str(e)}")
return []
def get_post_route_details(post_id: int) -> Optional[Dict]:
"""
Get detailed route data for a specific post
Returns full coordinate data for detailed view
"""
try:
from app.models import MapRoute
route = MapRoute.query.filter_by(post_id=post_id).first()
if not route:
return None
return {
'id': route.id,
'post_id': route.post_id,
'coordinates': route.get_coordinates_json(),
'simplified_coordinates': route.get_simplified_coordinates_json(),
'start_point': route.get_start_point(),
'end_point': route.get_end_point(),
'bounds': route.get_bounds(),
'stats': {
'distance': route.total_distance,
'elevation_gain': route.elevation_gain,
'max_elevation': route.max_elevation,
'min_elevation': route.min_elevation,
'total_points': route.total_points,
'simplified_points': route.simplified_points
}
}
except Exception as e:
print(f"Error getting route details for post {post_id}: {str(e)}")
return None