import os import json import requests from cryptography.fernet import Fernet import math import datetime RESOURCES_FOLDER = "resources" CREDENTIALS_FILE = os.path.join(RESOURCES_FOLDER, "credentials.enc") KEY_FILE = os.path.join(RESOURCES_FOLDER, "key.key") SERVER_SETTINGS_FILE = os.path.join(RESOURCES_FOLDER, "server_settings.enc") # --- Encryption Utilities --- def generate_key(): """Generate and save a key for encryption.""" if not os.path.exists(KEY_FILE): key = Fernet.generate_key() with open(KEY_FILE, "wb") as key_file: key_file.write(key) def load_key(): """Load the encryption key.""" with open(KEY_FILE, "rb") as key_file: return key_file.read() def encrypt_data(data): """Encrypt data using the encryption key.""" key = load_key() fernet = Fernet(key) return fernet.encrypt(data.encode()) def decrypt_data(data): """Decrypt data using the encryption key.""" key = load_key() fernet = Fernet(key) return fernet.decrypt(data).decode() # --- Server Settings --- def check_server_settings(): """Load and decrypt server settings from file.""" if not os.path.exists(SERVER_SETTINGS_FILE): return None try: with open(SERVER_SETTINGS_FILE, "rb") as file: encrypted_data = file.read() decrypted_data = decrypt_data(encrypted_data) settings = json.loads(decrypted_data) return settings except Exception as e: print(f"Failed to load server settings: {e}") return None def save_server_settings(settings_data): """Encrypt and save server settings.""" encrypted_data = encrypt_data(json.dumps(settings_data)) with open(SERVER_SETTINGS_FILE, "wb") as file: file.write(encrypted_data) # --- Traccar Server Connection --- def test_connection(server_url, username=None, password=None, token=None): """ Test the connection with the Traccar server. Returns: dict with 'status' (bool) and 'message' (str) """ if not server_url: return {"status": False, "message": "Please provide the server URL."} if not token and (not username or not password): return {"status": False, "message": "Please provide either a token or username and password."} try: headers = {"Authorization": f"Bearer {token}"} if token else None auth = None if token else (username, password) response = requests.get(f"{server_url}/api/server", headers=headers, auth=auth, timeout=10) if response.status_code == 200: return {"status": True, "message": "Connection successful! Server is reachable."} else: return {"status": False, "message": f"Error: {response.status_code} - {response.reason}"} except requests.exceptions.Timeout: return {"status": False, "message": "Connection timed out. Please try again."} except requests.exceptions.RequestException as e: return {"status": False, "message": f"Connection failed: {str(e)}"} # --- Device Fetching --- def get_devices_from_server(): """Retrieve a mapping of device names to IDs from the Traccar server.""" settings = check_server_settings() if not settings: return None server_url = settings.get("server_url") token = settings.get("token") if not server_url or not token: return None headers = {"Authorization": f"Bearer {token}"} try: response = requests.get(f"{server_url}/api/devices", headers=headers) if response.status_code == 200: devices = response.json() return {device.get("name", "Unnamed Device"): device.get("id", "Unknown ID") for device in devices} else: print(f"Error: {response.status_code} - {response.reason}") return None except Exception as e: print(f"Error retrieving devices: {str(e)}") return None # --- Route Saving --- def save_route_to_file(route_name, positions, base_folder="resources/projects"): """ Save the given positions as a route in resources/projects//positions.json. Returns (success, message, file_path) """ if not route_name: return False, "Please enter a route name.", None if not positions: return False, "No positions to save.", None folder_path = os.path.join(base_folder, route_name) os.makedirs(folder_path, exist_ok=True) file_path = os.path.join(folder_path, "positions.json") try: with open(file_path, "w") as f: json.dump(positions, f, indent=2) return True, f"Route '{route_name}' saved!", file_path except Exception as e: return False, f"Failed to save route: {str(e)}", None def fetch_positions(server_url, token, device_id, from_time, to_time): """ Fetch positions from the Traccar API. Returns (positions, error_message) """ url = f"{server_url}/api/reports/route" headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"} params = { "deviceId": device_id, "from": from_time, "to": to_time } try: response = requests.get(url, params=params, headers=headers, timeout=15) if response.status_code == 200: return response.json(), None elif response.status_code == 400: return None, "Bad Request: Please check the request payload and token." else: return None, f"Failed: {response.status_code} - {response.reason}" except requests.exceptions.RequestException as e: return None, f"Error fetching positions: {str(e)}" def fetch_positions_for_selected_day(settings, device_mapping, device_name, start_date, end_date, start_hour, end_hour): """ Fetch positions for the selected day/device using Traccar API. Returns (positions, error_message) """ if not settings: return [], "Server settings not found." server_url = settings.get("server_url") token = settings.get("token") device_id = device_mapping.get(device_name) if not device_id: return [], "Device ID not found." from_time = f"{start_date}T{start_hour}:00:00Z" to_time = f"{end_date}T{end_hour}:59:59Z" positions, error = fetch_positions(server_url, token, device_id, from_time, to_time) if error: return [], error return positions, None def html_to_image(html_path, img_path, width=1280, height=720, delay=2, driver_path='/usr/bin/chromedriver'): from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from PIL import Image import time import os selenium_height = int(height * 1.2) # 10% taller for compensation chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument(f"--window-size={width},{selenium_height}") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") service = Service(driver_path) driver = webdriver.Chrome(service=service, options=chrome_options) try: driver.set_window_size(width, selenium_height) driver.get("file://" + os.path.abspath(html_path)) time.sleep(delay) tmp_img = img_path + ".tmp.png" driver.save_screenshot(tmp_img) driver.quit() img = Image.open(tmp_img) img = img.crop((0, 0, width, height)) # Crop to original map size img.save(img_path) os.remove(tmp_img) print(f"Image saved to: {img_path} ({width}x{height})") except Exception as e: print(f"Error converting HTML to image: {e}") driver.quit() def process_preview_util( project_name, RESOURCES_FOLDER, label, progress, popup, preview_image_widget, set_preview_image_path, Clock, width=800, height=600 ): import folium import os import json # Import html_to_image function from within the same module # (it's defined later in this file) try: project_folder = os.path.join(RESOURCES_FOLDER, "projects", project_name) positions_path = os.path.join(project_folder, "positions.json") html_path = os.path.join(project_folder, "preview.html") img_path = os.path.join(project_folder, "preview.png") if not os.path.exists(positions_path): label.text = "positions.json not found!" progress.value = 100 return with open(positions_path, "r") as f: positions = json.load(f) if not positions: label.text = "No positions to preview." progress.value = 100 return coords = [(pos['latitude'], pos['longitude']) for pos in positions] width, height = 1280, 720 # 16:9 HD m = folium.Map( location=coords[0], width=width, height=height, control_scale=True ) folium.PolyLine(coords, color="blue", weight=4.5, opacity=1).add_to(m) folium.Marker(coords[0], tooltip="Start", icon=folium.Icon(color="green")).add_to(m) folium.Marker(coords[-1], tooltip="End", icon=folium.Icon(color="red")).add_to(m) # --- Add pause markers if pauses.json exists --- pauses_path = os.path.join(project_folder, "pauses.json") if os.path.exists(pauses_path): with open(pauses_path, "r") as pf: pauses = json.load(pf) for pause in pauses: lat = pause["location"]["latitude"] lon = pause["location"]["longitude"] duration = pause["duration_seconds"] start = pause["start_time"] end = pause["end_time"] folium.Marker( [lat, lon], tooltip=f"Pause: {duration//60} min {duration%60} sec", popup=f"Pause from {start} to {end} ({duration//60} min {duration%60} sec)", icon=folium.Icon(color="orange", icon="pause", prefix="fa") ).add_to(m) m.fit_bounds(coords, padding=(80, 80)) m.get_root().html.add_child(folium.Element(f""" """)) m.save(html_path) html_to_image(html_path, img_path, width=width, height=height) set_preview_image_path(img_path) preview_image_widget.reload() label.text = "Preview ready!" progress.value = 100 def close_popup(dt): popup.dismiss() Clock.schedule_once(close_popup, 1) except Exception as e: label.text = f"Error: {e}" progress.value = 100 def close_popup(dt): popup.dismiss() Clock.schedule_once(close_popup, 2) def haversine(lat1, lon1, lat2, lon2): # Returns distance in meters between two lat/lon points R = 6371000 # Earth radius in meters phi1 = math.radians(lat1) phi2 = math.radians(lat2) dphi = math.radians(lat2 - lat1) dlambda = math.radians(lon2 - lon1) a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2 return 2 * R * math.asin(math.sqrt(a)) def optimize_route_entries_util( project_name, RESOURCES_FOLDER, label, progress, popup, Clock, on_save=None ): def process_entries(dt): project_folder = os.path.join(RESOURCES_FOLDER, "projects", project_name) positions_path = os.path.join(project_folder, "positions.json") pauses_path = os.path.join(project_folder, "pauses.json") if not os.path.exists(positions_path): label.text = "positions.json not found!" progress.value = 100 return with open(positions_path, "r") as f: positions = json.load(f) # Detect duplicate positions at the start start_remove = 0 if positions: first = positions[0] for pos in positions: if pos['latitude'] == first['latitude'] and pos['longitude'] == first['longitude']: start_remove += 1 else: break if start_remove > 0: start_remove -= 1 # Detect duplicate positions at the end end_remove = 0 if positions: last = positions[-1] for pos in reversed(positions): if pos['latitude'] == last['latitude'] and pos['longitude'] == last['longitude']: end_remove += 1 else: break if end_remove > 0: end_remove -= 1 # Shorten the positions list new_positions = positions[start_remove:len(positions)-end_remove if end_remove > 0 else None] # --- PAUSE DETECTION --- pauses = [] if new_positions: pause_start = None pause_end = None pause_location = None for i in range(1, len(new_positions)): prev = new_positions[i-1] curr = new_positions[i] # Check if stopped (same location) if curr['latitude'] == prev['latitude'] and curr['longitude'] == prev['longitude']: if pause_start is None: pause_start = prev['deviceTime'] pause_location = (prev['latitude'], prev['longitude']) pause_end = curr['deviceTime'] else: if pause_start and pause_end: # Calculate pause duration t1 = datetime.datetime.fromisoformat(pause_start.replace('Z', '+00:00')) t2 = datetime.datetime.fromisoformat(pause_end.replace('Z', '+00:00')) duration = (t2 - t1).total_seconds() if duration >= 120: pauses.append({ "start_time": pause_start, "end_time": pause_end, "duration_seconds": int(duration), "location": {"latitude": pause_location[0], "longitude": pause_location[1]} }) pause_start = None pause_end = None pause_location = None # Check for pause at the end if pause_start and pause_end: t1 = datetime.datetime.fromisoformat(pause_start.replace('Z', '+00:00')) t2 = datetime.datetime.fromisoformat(pause_end.replace('Z', '+00:00')) duration = (t2 - t1).total_seconds() if duration >= 120: pauses.append({ "start_time": pause_start, "end_time": pause_end, "duration_seconds": int(duration), "location": {"latitude": pause_location[0], "longitude": pause_location[1]} }) # --- FILTER PAUSES --- # 1. Remove pauses near start/end filtered_pauses = [] if new_positions and pauses: start_lat, start_lon = new_positions[0]['latitude'], new_positions[0]['longitude'] end_lat, end_lon = new_positions[-1]['latitude'], new_positions[-1]['longitude'] for pause in pauses: plat = pause["location"]["latitude"] plon = pause["location"]["longitude"] dist_start = haversine(start_lat, start_lon, plat, plon) dist_end = haversine(end_lat, end_lon, plat, plon) if dist_start < 50 or dist_end < 50: continue # Skip pauses near start or end filtered_pauses.append(pause) else: filtered_pauses = pauses # 2. Merge pauses close in time and space merged_pauses = [] filtered_pauses.sort(key=lambda p: p["start_time"]) for pause in filtered_pauses: if not merged_pauses: merged_pauses.append(pause) else: last = merged_pauses[-1] # Time difference in seconds t1 = datetime.datetime.fromisoformat(last["end_time"].replace('Z', '+00:00')) t2 = datetime.datetime.fromisoformat(pause["start_time"].replace('Z', '+00:00')) time_diff = (t2 - t1).total_seconds() # Distance in meters last_lat = last["location"]["latitude"] last_lon = last["location"]["longitude"] plat = pause["location"]["latitude"] plon = pause["location"]["longitude"] dist = haversine(last_lat, last_lon, plat, plon) if time_diff < 300 and dist < 50: # Merge: extend last pause's end_time and duration last["end_time"] = pause["end_time"] last["duration_seconds"] += pause["duration_seconds"] else: merged_pauses.append(pause) pauses = merged_pauses progress.value = 100 label.text = ( f"Entries removable at start: {start_remove}\n" f"Entries removable at end: {end_remove}\n" f"Detected pauses: {len(pauses)}" ) from kivy.uix.button import Button from kivy.uix.boxlayout import BoxLayout btn_save = Button(text="Save optimized file", background_color=(0.008, 0.525, 0.290, 1)) btn_cancel = Button(text="Cancel") btn_box = BoxLayout(orientation='horizontal', spacing=10, size_hint_y=None, height=44) btn_box.add_widget(btn_save) btn_box.add_widget(btn_cancel) popup.content.add_widget(btn_box) def save_optimized(instance): with open(positions_path, "w") as f: json.dump(new_positions, f, indent=2) with open(pauses_path, "w") as f: json.dump(pauses, f, indent=2) label.text = "File optimized and pauses saved!" btn_save.disabled = True btn_cancel.disabled = True def close_and_refresh(dt): popup.dismiss() if on_save: on_save() Clock.schedule_once(close_and_refresh, 1) btn_save.bind(on_press=save_optimized) btn_cancel.bind(on_press=lambda x: popup.dismiss()) Clock.schedule_once(process_entries, 0.5)