updated app
This commit is contained in:
492
py_scripts/utils.py
Normal file
492
py_scripts/utils.py
Normal file
@@ -0,0 +1,492 @@
|
||||
import os
|
||||
import json
|
||||
import requests
|
||||
from cryptography.fernet import Fernet
|
||||
import math
|
||||
import datetime
|
||||
RESOURCES_FOLDER = "resources"
|
||||
CREDENTIALS_FILE = os.path.join(RESOURCES_FOLDER, "credentials.enc")
|
||||
KEY_FILE = os.path.join(RESOURCES_FOLDER, "key.key")
|
||||
SERVER_SETTINGS_FILE = os.path.join(RESOURCES_FOLDER, "server_settings.enc")
|
||||
|
||||
# --- Encryption Utilities ---
|
||||
|
||||
def generate_key():
|
||||
"""Generate and save a key for encryption."""
|
||||
if not os.path.exists(KEY_FILE):
|
||||
key = Fernet.generate_key()
|
||||
with open(KEY_FILE, "wb") as key_file:
|
||||
key_file.write(key)
|
||||
|
||||
def load_key():
|
||||
"""Load the encryption key."""
|
||||
with open(KEY_FILE, "rb") as key_file:
|
||||
return key_file.read()
|
||||
|
||||
def encrypt_data(data):
|
||||
"""Encrypt data using the encryption key."""
|
||||
key = load_key()
|
||||
fernet = Fernet(key)
|
||||
return fernet.encrypt(data.encode())
|
||||
|
||||
def decrypt_data(data):
|
||||
"""Decrypt data using the encryption key."""
|
||||
key = load_key()
|
||||
fernet = Fernet(key)
|
||||
return fernet.decrypt(data).decode()
|
||||
|
||||
# --- Server Settings ---
|
||||
def check_server_settings():
|
||||
"""Load and decrypt server settings from file."""
|
||||
if not os.path.exists(SERVER_SETTINGS_FILE):
|
||||
return None
|
||||
try:
|
||||
with open(SERVER_SETTINGS_FILE, "rb") as file:
|
||||
encrypted_data = file.read()
|
||||
decrypted_data = decrypt_data(encrypted_data)
|
||||
settings = json.loads(decrypted_data)
|
||||
return settings
|
||||
except Exception as e:
|
||||
print(f"Failed to load server settings: {e}")
|
||||
return None
|
||||
|
||||
def save_server_settings(settings_data):
|
||||
"""Encrypt and save server settings."""
|
||||
encrypted_data = encrypt_data(json.dumps(settings_data))
|
||||
with open(SERVER_SETTINGS_FILE, "wb") as file:
|
||||
file.write(encrypted_data)
|
||||
|
||||
# --- Traccar Server Connection ---
|
||||
def test_connection(server_url, username=None, password=None, token=None):
|
||||
"""
|
||||
Test the connection with the Traccar server.
|
||||
Returns: dict with 'status' (bool) and 'message' (str)
|
||||
"""
|
||||
if not server_url:
|
||||
return {"status": False, "message": "Please provide the server URL."}
|
||||
if not token and (not username or not password):
|
||||
return {"status": False, "message": "Please provide either a token or username and password."}
|
||||
try:
|
||||
headers = {"Authorization": f"Bearer {token}"} if token else None
|
||||
auth = None if token else (username, password)
|
||||
response = requests.get(f"{server_url}/api/server", headers=headers, auth=auth, timeout=10)
|
||||
if response.status_code == 200:
|
||||
return {"status": True, "message": "Connection successful! Server is reachable."}
|
||||
else:
|
||||
return {"status": False, "message": f"Error: {response.status_code} - {response.reason}"}
|
||||
except requests.exceptions.Timeout:
|
||||
return {"status": False, "message": "Connection timed out. Please try again."}
|
||||
except requests.exceptions.RequestException as e:
|
||||
return {"status": False, "message": f"Connection failed: {str(e)}"}
|
||||
|
||||
# --- Device Fetching ---
|
||||
def get_devices_from_server():
|
||||
"""Retrieve a mapping of device names to IDs from the Traccar server."""
|
||||
settings = check_server_settings()
|
||||
if not settings:
|
||||
return None
|
||||
server_url = settings.get("server_url")
|
||||
token = settings.get("token")
|
||||
if not server_url or not token:
|
||||
return None
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
try:
|
||||
response = requests.get(f"{server_url}/api/devices", headers=headers)
|
||||
if response.status_code == 200:
|
||||
devices = response.json()
|
||||
return {device.get("name", "Unnamed Device"): device.get("id", "Unknown ID") for device in devices}
|
||||
else:
|
||||
print(f"Error: {response.status_code} - {response.reason}")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"Error retrieving devices: {str(e)}")
|
||||
return None
|
||||
|
||||
# --- Route Saving ---
|
||||
def save_route_to_file(route_name, positions, base_folder="resources/projects"):
|
||||
"""
|
||||
Save the given positions as a route in resources/projects/<route_name>/positions.json.
|
||||
Returns (success, message, file_path)
|
||||
"""
|
||||
if not route_name:
|
||||
return False, "Please enter a route name.", None
|
||||
if not positions:
|
||||
return False, "No positions to save.", None
|
||||
|
||||
folder_path = os.path.join(base_folder, route_name)
|
||||
os.makedirs(folder_path, exist_ok=True)
|
||||
file_path = os.path.join(folder_path, "positions.json")
|
||||
try:
|
||||
with open(file_path, "w") as f:
|
||||
json.dump(positions, f, indent=2)
|
||||
return True, f"Route '{route_name}' saved!", file_path
|
||||
except Exception as e:
|
||||
return False, f"Failed to save route: {str(e)}", None
|
||||
|
||||
def fetch_positions(server_url, token, device_id, from_time, to_time):
|
||||
"""
|
||||
Fetch positions from the Traccar API.
|
||||
Returns (positions, error_message)
|
||||
"""
|
||||
url = f"{server_url}/api/reports/route"
|
||||
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
|
||||
params = {
|
||||
"deviceId": device_id,
|
||||
"from": from_time,
|
||||
"to": to_time
|
||||
}
|
||||
try:
|
||||
response = requests.get(url, params=params, headers=headers, timeout=15)
|
||||
if response.status_code == 200:
|
||||
return response.json(), None
|
||||
elif response.status_code == 400:
|
||||
return None, "Bad Request: Please check the request payload and token."
|
||||
else:
|
||||
return None, f"Failed: {response.status_code} - {response.reason}"
|
||||
except requests.exceptions.RequestException as e:
|
||||
return None, f"Error fetching positions: {str(e)}"
|
||||
|
||||
def fetch_positions_for_selected_day(settings, device_mapping, device_name, start_date, end_date, start_hour, end_hour):
|
||||
"""
|
||||
Fetch positions for the selected day/device using Traccar API.
|
||||
Returns (positions, error_message)
|
||||
"""
|
||||
if not settings:
|
||||
return [], "Server settings not found."
|
||||
|
||||
server_url = settings.get("server_url")
|
||||
token = settings.get("token")
|
||||
device_id = device_mapping.get(device_name)
|
||||
if not device_id:
|
||||
return [], "Device ID not found."
|
||||
|
||||
from_time = f"{start_date}T{start_hour}:00:00Z"
|
||||
to_time = f"{end_date}T{end_hour}:59:59Z"
|
||||
|
||||
positions, error = fetch_positions(server_url, token, device_id, from_time, to_time)
|
||||
if error:
|
||||
return [], error
|
||||
return positions, None
|
||||
|
||||
def html_to_image(html_path, img_path, width=1280, height=720, delay=2, driver_path='/usr/bin/chromedriver'):
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.chrome.service import Service
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
from PIL import Image
|
||||
import time
|
||||
import os
|
||||
|
||||
selenium_height = int(height * 1.2) # 10% taller for compensation
|
||||
|
||||
chrome_options = Options()
|
||||
chrome_options.add_argument("--headless")
|
||||
chrome_options.add_argument(f"--window-size={width},{selenium_height}")
|
||||
chrome_options.add_argument("--no-sandbox")
|
||||
chrome_options.add_argument("--disable-dev-shm-usage")
|
||||
|
||||
service = Service(driver_path)
|
||||
driver = webdriver.Chrome(service=service, options=chrome_options)
|
||||
|
||||
try:
|
||||
driver.set_window_size(width, selenium_height)
|
||||
driver.get("file://" + os.path.abspath(html_path))
|
||||
time.sleep(delay)
|
||||
tmp_img = img_path + ".tmp.png"
|
||||
driver.save_screenshot(tmp_img)
|
||||
driver.quit()
|
||||
|
||||
img = Image.open(tmp_img)
|
||||
img = img.crop((0, 0, width, height)) # Crop to original map size
|
||||
img.save(img_path)
|
||||
os.remove(tmp_img)
|
||||
print(f"Image saved to: {img_path} ({width}x{height})")
|
||||
except Exception as e:
|
||||
print(f"Error converting HTML to image: {e}")
|
||||
driver.quit()
|
||||
|
||||
def process_preview_util(
|
||||
project_name,
|
||||
RESOURCES_FOLDER,
|
||||
label,
|
||||
progress,
|
||||
popup,
|
||||
preview_image_widget,
|
||||
set_preview_image_path,
|
||||
Clock,
|
||||
width=800,
|
||||
height=600
|
||||
):
|
||||
import folium
|
||||
import os
|
||||
import json
|
||||
# Import html_to_image function from within the same module
|
||||
# (it's defined later in this file)
|
||||
|
||||
try:
|
||||
project_folder = os.path.join(RESOURCES_FOLDER, "projects", project_name)
|
||||
positions_path = os.path.join(project_folder, "positions.json")
|
||||
html_path = os.path.join(project_folder, "preview.html")
|
||||
img_path = os.path.join(project_folder, "preview.png")
|
||||
|
||||
if not os.path.exists(positions_path):
|
||||
label.text = "positions.json not found!"
|
||||
progress.value = 100
|
||||
return
|
||||
|
||||
with open(positions_path, "r") as f:
|
||||
positions = json.load(f)
|
||||
|
||||
if not positions:
|
||||
label.text = "No positions to preview."
|
||||
progress.value = 100
|
||||
return
|
||||
|
||||
coords = [(pos['latitude'], pos['longitude']) for pos in positions]
|
||||
width, height = 1280, 720 # 16:9 HD
|
||||
|
||||
m = folium.Map(
|
||||
location=coords[0],
|
||||
width=width,
|
||||
height=height,
|
||||
control_scale=True
|
||||
)
|
||||
folium.PolyLine(coords, color="blue", weight=4.5, opacity=1).add_to(m)
|
||||
folium.Marker(coords[0], tooltip="Start", icon=folium.Icon(color="green")).add_to(m)
|
||||
folium.Marker(coords[-1], tooltip="End", icon=folium.Icon(color="red")).add_to(m)
|
||||
|
||||
# --- Add pause markers if pauses.json exists ---
|
||||
pauses_path = os.path.join(project_folder, "pauses.json")
|
||||
if os.path.exists(pauses_path):
|
||||
with open(pauses_path, "r") as pf:
|
||||
pauses = json.load(pf)
|
||||
for pause in pauses:
|
||||
lat = pause["location"]["latitude"]
|
||||
lon = pause["location"]["longitude"]
|
||||
duration = pause["duration_seconds"]
|
||||
start = pause["start_time"]
|
||||
end = pause["end_time"]
|
||||
folium.Marker(
|
||||
[lat, lon],
|
||||
tooltip=f"Pause: {duration//60} min {duration%60} sec",
|
||||
popup=f"Pause from {start} to {end} ({duration//60} min {duration%60} sec)",
|
||||
icon=folium.Icon(color="orange", icon="pause", prefix="fa")
|
||||
).add_to(m)
|
||||
|
||||
m.fit_bounds(coords, padding=(80, 80))
|
||||
m.get_root().html.add_child(folium.Element(f"""
|
||||
<style>
|
||||
html, body {{
|
||||
height: 100%;
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
overflow: hidden;
|
||||
}}
|
||||
#{m.get_name()} {{
|
||||
position: absolute;
|
||||
top: 0; bottom: 0; left: 0; right: 0;
|
||||
width: 100vw;
|
||||
height: 100vh;
|
||||
}}
|
||||
</style>
|
||||
"""))
|
||||
m.save(html_path)
|
||||
|
||||
html_to_image(html_path, img_path, width=width, height=height)
|
||||
|
||||
set_preview_image_path(img_path)
|
||||
preview_image_widget.reload()
|
||||
label.text = "Preview ready!"
|
||||
progress.value = 100
|
||||
|
||||
def close_popup(dt):
|
||||
popup.dismiss()
|
||||
Clock.schedule_once(close_popup, 1)
|
||||
|
||||
except Exception as e:
|
||||
label.text = f"Error: {e}"
|
||||
progress.value = 100
|
||||
def close_popup(dt):
|
||||
popup.dismiss()
|
||||
Clock.schedule_once(close_popup, 2)
|
||||
|
||||
|
||||
def haversine(lat1, lon1, lat2, lon2):
|
||||
# Returns distance in meters between two lat/lon points
|
||||
R = 6371000 # Earth radius in meters
|
||||
phi1 = math.radians(lat1)
|
||||
phi2 = math.radians(lat2)
|
||||
dphi = math.radians(lat2 - lat1)
|
||||
dlambda = math.radians(lon2 - lon1)
|
||||
a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2
|
||||
return 2 * R * math.asin(math.sqrt(a))
|
||||
|
||||
def optimize_route_entries_util(
|
||||
project_name,
|
||||
RESOURCES_FOLDER,
|
||||
label,
|
||||
progress,
|
||||
popup,
|
||||
Clock,
|
||||
on_save=None
|
||||
):
|
||||
def process_entries(dt):
|
||||
project_folder = os.path.join(RESOURCES_FOLDER, "projects", project_name)
|
||||
positions_path = os.path.join(project_folder, "positions.json")
|
||||
pauses_path = os.path.join(project_folder, "pauses.json")
|
||||
if not os.path.exists(positions_path):
|
||||
label.text = "positions.json not found!"
|
||||
progress.value = 100
|
||||
return
|
||||
|
||||
with open(positions_path, "r") as f:
|
||||
positions = json.load(f)
|
||||
|
||||
# Detect duplicate positions at the start
|
||||
start_remove = 0
|
||||
if positions:
|
||||
first = positions[0]
|
||||
for pos in positions:
|
||||
if pos['latitude'] == first['latitude'] and pos['longitude'] == first['longitude']:
|
||||
start_remove += 1
|
||||
else:
|
||||
break
|
||||
if start_remove > 0:
|
||||
start_remove -= 1
|
||||
|
||||
# Detect duplicate positions at the end
|
||||
end_remove = 0
|
||||
if positions:
|
||||
last = positions[-1]
|
||||
for pos in reversed(positions):
|
||||
if pos['latitude'] == last['latitude'] and pos['longitude'] == last['longitude']:
|
||||
end_remove += 1
|
||||
else:
|
||||
break
|
||||
if end_remove > 0:
|
||||
end_remove -= 1
|
||||
|
||||
# Shorten the positions list
|
||||
new_positions = positions[start_remove:len(positions)-end_remove if end_remove > 0 else None]
|
||||
|
||||
# --- PAUSE DETECTION ---
|
||||
pauses = []
|
||||
if new_positions:
|
||||
pause_start = None
|
||||
pause_end = None
|
||||
pause_location = None
|
||||
for i in range(1, len(new_positions)):
|
||||
prev = new_positions[i-1]
|
||||
curr = new_positions[i]
|
||||
# Check if stopped (same location)
|
||||
if curr['latitude'] == prev['latitude'] and curr['longitude'] == prev['longitude']:
|
||||
if pause_start is None:
|
||||
pause_start = prev['deviceTime']
|
||||
pause_location = (prev['latitude'], prev['longitude'])
|
||||
pause_end = curr['deviceTime']
|
||||
else:
|
||||
if pause_start and pause_end:
|
||||
# Calculate pause duration
|
||||
t1 = datetime.datetime.fromisoformat(pause_start.replace('Z', '+00:00'))
|
||||
t2 = datetime.datetime.fromisoformat(pause_end.replace('Z', '+00:00'))
|
||||
duration = (t2 - t1).total_seconds()
|
||||
if duration >= 120:
|
||||
pauses.append({
|
||||
"start_time": pause_start,
|
||||
"end_time": pause_end,
|
||||
"duration_seconds": int(duration),
|
||||
"location": {"latitude": pause_location[0], "longitude": pause_location[1]}
|
||||
})
|
||||
pause_start = None
|
||||
pause_end = None
|
||||
pause_location = None
|
||||
# Check for pause at the end
|
||||
if pause_start and pause_end:
|
||||
t1 = datetime.datetime.fromisoformat(pause_start.replace('Z', '+00:00'))
|
||||
t2 = datetime.datetime.fromisoformat(pause_end.replace('Z', '+00:00'))
|
||||
duration = (t2 - t1).total_seconds()
|
||||
if duration >= 120:
|
||||
pauses.append({
|
||||
"start_time": pause_start,
|
||||
"end_time": pause_end,
|
||||
"duration_seconds": int(duration),
|
||||
"location": {"latitude": pause_location[0], "longitude": pause_location[1]}
|
||||
})
|
||||
|
||||
# --- FILTER PAUSES ---
|
||||
# 1. Remove pauses near start/end
|
||||
filtered_pauses = []
|
||||
if new_positions and pauses:
|
||||
start_lat, start_lon = new_positions[0]['latitude'], new_positions[0]['longitude']
|
||||
end_lat, end_lon = new_positions[-1]['latitude'], new_positions[-1]['longitude']
|
||||
for pause in pauses:
|
||||
plat = pause["location"]["latitude"]
|
||||
plon = pause["location"]["longitude"]
|
||||
dist_start = haversine(start_lat, start_lon, plat, plon)
|
||||
dist_end = haversine(end_lat, end_lon, plat, plon)
|
||||
if dist_start < 50 or dist_end < 50:
|
||||
continue # Skip pauses near start or end
|
||||
filtered_pauses.append(pause)
|
||||
else:
|
||||
filtered_pauses = pauses
|
||||
|
||||
# 2. Merge pauses close in time and space
|
||||
merged_pauses = []
|
||||
filtered_pauses.sort(key=lambda p: p["start_time"])
|
||||
for pause in filtered_pauses:
|
||||
if not merged_pauses:
|
||||
merged_pauses.append(pause)
|
||||
else:
|
||||
last = merged_pauses[-1]
|
||||
# Time difference in seconds
|
||||
t1 = datetime.datetime.fromisoformat(last["end_time"].replace('Z', '+00:00'))
|
||||
t2 = datetime.datetime.fromisoformat(pause["start_time"].replace('Z', '+00:00'))
|
||||
time_diff = (t2 - t1).total_seconds()
|
||||
# Distance in meters
|
||||
last_lat = last["location"]["latitude"]
|
||||
last_lon = last["location"]["longitude"]
|
||||
plat = pause["location"]["latitude"]
|
||||
plon = pause["location"]["longitude"]
|
||||
dist = haversine(last_lat, last_lon, plat, plon)
|
||||
if time_diff < 300 and dist < 50:
|
||||
# Merge: extend last pause's end_time and duration
|
||||
last["end_time"] = pause["end_time"]
|
||||
last["duration_seconds"] += pause["duration_seconds"]
|
||||
else:
|
||||
merged_pauses.append(pause)
|
||||
pauses = merged_pauses
|
||||
|
||||
progress.value = 100
|
||||
label.text = (
|
||||
f"Entries removable at start: {start_remove}\n"
|
||||
f"Entries removable at end: {end_remove}\n"
|
||||
f"Detected pauses: {len(pauses)}"
|
||||
)
|
||||
|
||||
from kivy.uix.button import Button
|
||||
from kivy.uix.boxlayout import BoxLayout
|
||||
|
||||
btn_save = Button(text="Save optimized file", background_color=(0.008, 0.525, 0.290, 1))
|
||||
btn_cancel = Button(text="Cancel")
|
||||
btn_box = BoxLayout(orientation='horizontal', spacing=10, size_hint_y=None, height=44)
|
||||
btn_box.add_widget(btn_save)
|
||||
btn_box.add_widget(btn_cancel)
|
||||
popup.content.add_widget(btn_box)
|
||||
|
||||
def save_optimized(instance):
|
||||
with open(positions_path, "w") as f:
|
||||
json.dump(new_positions, f, indent=2)
|
||||
with open(pauses_path, "w") as f:
|
||||
json.dump(pauses, f, indent=2)
|
||||
label.text = "File optimized and pauses saved!"
|
||||
btn_save.disabled = True
|
||||
btn_cancel.disabled = True
|
||||
def close_and_refresh(dt):
|
||||
popup.dismiss()
|
||||
if on_save:
|
||||
on_save()
|
||||
Clock.schedule_once(close_and_refresh, 1)
|
||||
|
||||
btn_save.bind(on_press=save_optimized)
|
||||
btn_cancel.bind(on_press=lambda x: popup.dismiss())
|
||||
|
||||
Clock.schedule_once(process_entries, 0.5)
|
||||
Reference in New Issue
Block a user