Files
2025-07-07 12:20:16 +03:00

493 lines
18 KiB
Python

import os
import json
import requests
from cryptography.fernet import Fernet
import math
import datetime
RESOURCES_FOLDER = "resources"
CREDENTIALS_FILE = os.path.join(RESOURCES_FOLDER, "credentials.enc")
KEY_FILE = os.path.join(RESOURCES_FOLDER, "key.key")
SERVER_SETTINGS_FILE = os.path.join(RESOURCES_FOLDER, "server_settings.enc")
# --- Encryption Utilities ---
def generate_key():
"""Generate and save a key for encryption."""
if not os.path.exists(KEY_FILE):
key = Fernet.generate_key()
with open(KEY_FILE, "wb") as key_file:
key_file.write(key)
def load_key():
"""Load the encryption key."""
with open(KEY_FILE, "rb") as key_file:
return key_file.read()
def encrypt_data(data):
"""Encrypt data using the encryption key."""
key = load_key()
fernet = Fernet(key)
return fernet.encrypt(data.encode())
def decrypt_data(data):
"""Decrypt data using the encryption key."""
key = load_key()
fernet = Fernet(key)
return fernet.decrypt(data).decode()
# --- Server Settings ---
def check_server_settings():
"""Load and decrypt server settings from file."""
if not os.path.exists(SERVER_SETTINGS_FILE):
return None
try:
with open(SERVER_SETTINGS_FILE, "rb") as file:
encrypted_data = file.read()
decrypted_data = decrypt_data(encrypted_data)
settings = json.loads(decrypted_data)
return settings
except Exception as e:
print(f"Failed to load server settings: {e}")
return None
def save_server_settings(settings_data):
"""Encrypt and save server settings."""
encrypted_data = encrypt_data(json.dumps(settings_data))
with open(SERVER_SETTINGS_FILE, "wb") as file:
file.write(encrypted_data)
# --- Traccar Server Connection ---
def test_connection(server_url, username=None, password=None, token=None):
"""
Test the connection with the Traccar server.
Returns: dict with 'status' (bool) and 'message' (str)
"""
if not server_url:
return {"status": False, "message": "Please provide the server URL."}
if not token and (not username or not password):
return {"status": False, "message": "Please provide either a token or username and password."}
try:
headers = {"Authorization": f"Bearer {token}"} if token else None
auth = None if token else (username, password)
response = requests.get(f"{server_url}/api/server", headers=headers, auth=auth, timeout=10)
if response.status_code == 200:
return {"status": True, "message": "Connection successful! Server is reachable."}
else:
return {"status": False, "message": f"Error: {response.status_code} - {response.reason}"}
except requests.exceptions.Timeout:
return {"status": False, "message": "Connection timed out. Please try again."}
except requests.exceptions.RequestException as e:
return {"status": False, "message": f"Connection failed: {str(e)}"}
# --- Device Fetching ---
def get_devices_from_server():
"""Retrieve a mapping of device names to IDs from the Traccar server."""
settings = check_server_settings()
if not settings:
return None
server_url = settings.get("server_url")
token = settings.get("token")
if not server_url or not token:
return None
headers = {"Authorization": f"Bearer {token}"}
try:
response = requests.get(f"{server_url}/api/devices", headers=headers)
if response.status_code == 200:
devices = response.json()
return {device.get("name", "Unnamed Device"): device.get("id", "Unknown ID") for device in devices}
else:
print(f"Error: {response.status_code} - {response.reason}")
return None
except Exception as e:
print(f"Error retrieving devices: {str(e)}")
return None
# --- Route Saving ---
def save_route_to_file(route_name, positions, base_folder="resources/projects"):
"""
Save the given positions as a route in resources/projects/<route_name>/positions.json.
Returns (success, message, file_path)
"""
if not route_name:
return False, "Please enter a route name.", None
if not positions:
return False, "No positions to save.", None
folder_path = os.path.join(base_folder, route_name)
os.makedirs(folder_path, exist_ok=True)
file_path = os.path.join(folder_path, "positions.json")
try:
with open(file_path, "w") as f:
json.dump(positions, f, indent=2)
return True, f"Route '{route_name}' saved!", file_path
except Exception as e:
return False, f"Failed to save route: {str(e)}", None
def fetch_positions(server_url, token, device_id, from_time, to_time):
"""
Fetch positions from the Traccar API.
Returns (positions, error_message)
"""
url = f"{server_url}/api/reports/route"
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
params = {
"deviceId": device_id,
"from": from_time,
"to": to_time
}
try:
response = requests.get(url, params=params, headers=headers, timeout=15)
if response.status_code == 200:
return response.json(), None
elif response.status_code == 400:
return None, "Bad Request: Please check the request payload and token."
else:
return None, f"Failed: {response.status_code} - {response.reason}"
except requests.exceptions.RequestException as e:
return None, f"Error fetching positions: {str(e)}"
def fetch_positions_for_selected_day(settings, device_mapping, device_name, start_date, end_date, start_hour, end_hour):
"""
Fetch positions for the selected day/device using Traccar API.
Returns (positions, error_message)
"""
if not settings:
return [], "Server settings not found."
server_url = settings.get("server_url")
token = settings.get("token")
device_id = device_mapping.get(device_name)
if not device_id:
return [], "Device ID not found."
from_time = f"{start_date}T{start_hour}:00:00Z"
to_time = f"{end_date}T{end_hour}:59:59Z"
positions, error = fetch_positions(server_url, token, device_id, from_time, to_time)
if error:
return [], error
return positions, None
def html_to_image(html_path, img_path, width=1280, height=720, delay=2, driver_path='/usr/bin/chromedriver'):
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from PIL import Image
import time
import os
selenium_height = int(height * 1.2) # 10% taller for compensation
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument(f"--window-size={width},{selenium_height}")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
service = Service(driver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)
try:
driver.set_window_size(width, selenium_height)
driver.get("file://" + os.path.abspath(html_path))
time.sleep(delay)
tmp_img = img_path + ".tmp.png"
driver.save_screenshot(tmp_img)
driver.quit()
img = Image.open(tmp_img)
img = img.crop((0, 0, width, height)) # Crop to original map size
img.save(img_path)
os.remove(tmp_img)
print(f"Image saved to: {img_path} ({width}x{height})")
except Exception as e:
print(f"Error converting HTML to image: {e}")
driver.quit()
def process_preview_util(
project_name,
RESOURCES_FOLDER,
label,
progress,
popup,
preview_image_widget,
set_preview_image_path,
Clock,
width=800,
height=600
):
import folium
import os
import json
# Import html_to_image function from within the same module
# (it's defined later in this file)
try:
project_folder = os.path.join(RESOURCES_FOLDER, "projects", project_name)
positions_path = os.path.join(project_folder, "positions.json")
html_path = os.path.join(project_folder, "preview.html")
img_path = os.path.join(project_folder, "preview.png")
if not os.path.exists(positions_path):
label.text = "positions.json not found!"
progress.value = 100
return
with open(positions_path, "r") as f:
positions = json.load(f)
if not positions:
label.text = "No positions to preview."
progress.value = 100
return
coords = [(pos['latitude'], pos['longitude']) for pos in positions]
width, height = 1280, 720 # 16:9 HD
m = folium.Map(
location=coords[0],
width=width,
height=height,
control_scale=True
)
folium.PolyLine(coords, color="blue", weight=4.5, opacity=1).add_to(m)
folium.Marker(coords[0], tooltip="Start", icon=folium.Icon(color="green")).add_to(m)
folium.Marker(coords[-1], tooltip="End", icon=folium.Icon(color="red")).add_to(m)
# --- Add pause markers if pauses.json exists ---
pauses_path = os.path.join(project_folder, "pauses.json")
if os.path.exists(pauses_path):
with open(pauses_path, "r") as pf:
pauses = json.load(pf)
for pause in pauses:
lat = pause["location"]["latitude"]
lon = pause["location"]["longitude"]
duration = pause["duration_seconds"]
start = pause["start_time"]
end = pause["end_time"]
folium.Marker(
[lat, lon],
tooltip=f"Pause: {duration//60} min {duration%60} sec",
popup=f"Pause from {start} to {end} ({duration//60} min {duration%60} sec)",
icon=folium.Icon(color="orange", icon="pause", prefix="fa")
).add_to(m)
m.fit_bounds(coords, padding=(80, 80))
m.get_root().html.add_child(folium.Element(f"""
<style>
html, body {{
height: 100%;
margin: 0;
padding: 0;
overflow: hidden;
}}
#{m.get_name()} {{
position: absolute;
top: 0; bottom: 0; left: 0; right: 0;
width: 100vw;
height: 100vh;
}}
</style>
"""))
m.save(html_path)
html_to_image(html_path, img_path, width=width, height=height)
set_preview_image_path(img_path)
preview_image_widget.reload()
label.text = "Preview ready!"
progress.value = 100
def close_popup(dt):
popup.dismiss()
Clock.schedule_once(close_popup, 1)
except Exception as e:
label.text = f"Error: {e}"
progress.value = 100
def close_popup(dt):
popup.dismiss()
Clock.schedule_once(close_popup, 2)
def haversine(lat1, lon1, lat2, lon2):
# Returns distance in meters between two lat/lon points
R = 6371000 # Earth radius in meters
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlambda = math.radians(lon2 - lon1)
a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2
return 2 * R * math.asin(math.sqrt(a))
def optimize_route_entries_util(
project_name,
RESOURCES_FOLDER,
label,
progress,
popup,
Clock,
on_save=None
):
def process_entries(dt):
project_folder = os.path.join(RESOURCES_FOLDER, "projects", project_name)
positions_path = os.path.join(project_folder, "positions.json")
pauses_path = os.path.join(project_folder, "pauses.json")
if not os.path.exists(positions_path):
label.text = "positions.json not found!"
progress.value = 100
return
with open(positions_path, "r") as f:
positions = json.load(f)
# Detect duplicate positions at the start
start_remove = 0
if positions:
first = positions[0]
for pos in positions:
if pos['latitude'] == first['latitude'] and pos['longitude'] == first['longitude']:
start_remove += 1
else:
break
if start_remove > 0:
start_remove -= 1
# Detect duplicate positions at the end
end_remove = 0
if positions:
last = positions[-1]
for pos in reversed(positions):
if pos['latitude'] == last['latitude'] and pos['longitude'] == last['longitude']:
end_remove += 1
else:
break
if end_remove > 0:
end_remove -= 1
# Shorten the positions list
new_positions = positions[start_remove:len(positions)-end_remove if end_remove > 0 else None]
# --- PAUSE DETECTION ---
pauses = []
if new_positions:
pause_start = None
pause_end = None
pause_location = None
for i in range(1, len(new_positions)):
prev = new_positions[i-1]
curr = new_positions[i]
# Check if stopped (same location)
if curr['latitude'] == prev['latitude'] and curr['longitude'] == prev['longitude']:
if pause_start is None:
pause_start = prev['deviceTime']
pause_location = (prev['latitude'], prev['longitude'])
pause_end = curr['deviceTime']
else:
if pause_start and pause_end:
# Calculate pause duration
t1 = datetime.datetime.fromisoformat(pause_start.replace('Z', '+00:00'))
t2 = datetime.datetime.fromisoformat(pause_end.replace('Z', '+00:00'))
duration = (t2 - t1).total_seconds()
if duration >= 120:
pauses.append({
"start_time": pause_start,
"end_time": pause_end,
"duration_seconds": int(duration),
"location": {"latitude": pause_location[0], "longitude": pause_location[1]}
})
pause_start = None
pause_end = None
pause_location = None
# Check for pause at the end
if pause_start and pause_end:
t1 = datetime.datetime.fromisoformat(pause_start.replace('Z', '+00:00'))
t2 = datetime.datetime.fromisoformat(pause_end.replace('Z', '+00:00'))
duration = (t2 - t1).total_seconds()
if duration >= 120:
pauses.append({
"start_time": pause_start,
"end_time": pause_end,
"duration_seconds": int(duration),
"location": {"latitude": pause_location[0], "longitude": pause_location[1]}
})
# --- FILTER PAUSES ---
# 1. Remove pauses near start/end
filtered_pauses = []
if new_positions and pauses:
start_lat, start_lon = new_positions[0]['latitude'], new_positions[0]['longitude']
end_lat, end_lon = new_positions[-1]['latitude'], new_positions[-1]['longitude']
for pause in pauses:
plat = pause["location"]["latitude"]
plon = pause["location"]["longitude"]
dist_start = haversine(start_lat, start_lon, plat, plon)
dist_end = haversine(end_lat, end_lon, plat, plon)
if dist_start < 50 or dist_end < 50:
continue # Skip pauses near start or end
filtered_pauses.append(pause)
else:
filtered_pauses = pauses
# 2. Merge pauses close in time and space
merged_pauses = []
filtered_pauses.sort(key=lambda p: p["start_time"])
for pause in filtered_pauses:
if not merged_pauses:
merged_pauses.append(pause)
else:
last = merged_pauses[-1]
# Time difference in seconds
t1 = datetime.datetime.fromisoformat(last["end_time"].replace('Z', '+00:00'))
t2 = datetime.datetime.fromisoformat(pause["start_time"].replace('Z', '+00:00'))
time_diff = (t2 - t1).total_seconds()
# Distance in meters
last_lat = last["location"]["latitude"]
last_lon = last["location"]["longitude"]
plat = pause["location"]["latitude"]
plon = pause["location"]["longitude"]
dist = haversine(last_lat, last_lon, plat, plon)
if time_diff < 300 and dist < 50:
# Merge: extend last pause's end_time and duration
last["end_time"] = pause["end_time"]
last["duration_seconds"] += pause["duration_seconds"]
else:
merged_pauses.append(pause)
pauses = merged_pauses
progress.value = 100
label.text = (
f"Entries removable at start: {start_remove}\n"
f"Entries removable at end: {end_remove}\n"
f"Detected pauses: {len(pauses)}"
)
from kivy.uix.button import Button
from kivy.uix.boxlayout import BoxLayout
btn_save = Button(text="Save optimized file", background_color=(0.008, 0.525, 0.290, 1))
btn_cancel = Button(text="Cancel")
btn_box = BoxLayout(orientation='horizontal', spacing=10, size_hint_y=None, height=44)
btn_box.add_widget(btn_save)
btn_box.add_widget(btn_cancel)
popup.content.add_widget(btn_box)
def save_optimized(instance):
with open(positions_path, "w") as f:
json.dump(new_positions, f, indent=2)
with open(pauses_path, "w") as f:
json.dump(pauses, f, indent=2)
label.text = "File optimized and pauses saved!"
btn_save.disabled = True
btn_cancel.disabled = True
def close_and_refresh(dt):
popup.dismiss()
if on_save:
on_save()
Clock.schedule_once(close_and_refresh, 1)
btn_save.bind(on_press=save_optimized)
btn_cancel.bind(on_press=lambda x: popup.dismiss())
Clock.schedule_once(process_entries, 0.5)