278 lines
9.7 KiB
Python
278 lines
9.7 KiB
Python
import os
|
|
import json
|
|
import requests
|
|
from cryptography.fernet import Fernet
|
|
|
|
RESOURCES_FOLDER = "resources"
|
|
CREDENTIALS_FILE = os.path.join(RESOURCES_FOLDER, "credentials.enc")
|
|
KEY_FILE = os.path.join(RESOURCES_FOLDER, "key.key")
|
|
SERVER_SETTINGS_FILE = os.path.join(RESOURCES_FOLDER, "server_settings.enc")
|
|
|
|
# --- Encryption Utilities ---
|
|
|
|
def generate_key():
|
|
"""Generate and save a key for encryption."""
|
|
if not os.path.exists(KEY_FILE):
|
|
key = Fernet.generate_key()
|
|
with open(KEY_FILE, "wb") as key_file:
|
|
key_file.write(key)
|
|
|
|
def load_key():
|
|
"""Load the encryption key."""
|
|
with open(KEY_FILE, "rb") as key_file:
|
|
return key_file.read()
|
|
|
|
def encrypt_data(data):
|
|
"""Encrypt data using the encryption key."""
|
|
key = load_key()
|
|
fernet = Fernet(key)
|
|
return fernet.encrypt(data.encode())
|
|
|
|
def decrypt_data(data):
|
|
"""Decrypt data using the encryption key."""
|
|
key = load_key()
|
|
fernet = Fernet(key)
|
|
return fernet.decrypt(data).decode()
|
|
|
|
# --- Server Settings ---
|
|
def check_server_settings():
|
|
"""Load and decrypt server settings from file."""
|
|
if not os.path.exists(SERVER_SETTINGS_FILE):
|
|
return None
|
|
try:
|
|
with open(SERVER_SETTINGS_FILE, "rb") as file:
|
|
encrypted_data = file.read()
|
|
decrypted_data = decrypt_data(encrypted_data)
|
|
settings = json.loads(decrypted_data)
|
|
return settings
|
|
except Exception as e:
|
|
print(f"Failed to load server settings: {e}")
|
|
return None
|
|
|
|
def save_server_settings(settings_data):
|
|
"""Encrypt and save server settings."""
|
|
encrypted_data = encrypt_data(json.dumps(settings_data))
|
|
with open(SERVER_SETTINGS_FILE, "wb") as file:
|
|
file.write(encrypted_data)
|
|
|
|
# --- Traccar Server Connection ---
|
|
def test_connection(server_url, username=None, password=None, token=None):
|
|
"""
|
|
Test the connection with the Traccar server.
|
|
Returns: dict with 'status' (bool) and 'message' (str)
|
|
"""
|
|
if not server_url:
|
|
return {"status": False, "message": "Please provide the server URL."}
|
|
if not token and (not username or not password):
|
|
return {"status": False, "message": "Please provide either a token or username and password."}
|
|
try:
|
|
headers = {"Authorization": f"Bearer {token}"} if token else None
|
|
auth = None if token else (username, password)
|
|
response = requests.get(f"{server_url}/api/server", headers=headers, auth=auth, timeout=10)
|
|
if response.status_code == 200:
|
|
return {"status": True, "message": "Connection successful! Server is reachable."}
|
|
else:
|
|
return {"status": False, "message": f"Error: {response.status_code} - {response.reason}"}
|
|
except requests.exceptions.Timeout:
|
|
return {"status": False, "message": "Connection timed out. Please try again."}
|
|
except requests.exceptions.RequestException as e:
|
|
return {"status": False, "message": f"Connection failed: {str(e)}"}
|
|
|
|
# --- Device Fetching ---
|
|
def get_devices_from_server():
|
|
"""Retrieve a mapping of device names to IDs from the Traccar server."""
|
|
settings = check_server_settings()
|
|
if not settings:
|
|
return None
|
|
server_url = settings.get("server_url")
|
|
token = settings.get("token")
|
|
if not server_url or not token:
|
|
return None
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
try:
|
|
response = requests.get(f"{server_url}/api/devices", headers=headers)
|
|
if response.status_code == 200:
|
|
devices = response.json()
|
|
return {device.get("name", "Unnamed Device"): device.get("id", "Unknown ID") for device in devices}
|
|
else:
|
|
print(f"Error: {response.status_code} - {response.reason}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"Error retrieving devices: {str(e)}")
|
|
return None
|
|
|
|
# --- Route Saving ---
|
|
def save_route_to_file(route_name, positions, base_folder="resources/projects"):
|
|
"""
|
|
Save the given positions as a route in resources/projects/<route_name>/positions.json.
|
|
Returns (success, message, file_path)
|
|
"""
|
|
if not route_name:
|
|
return False, "Please enter a route name.", None
|
|
if not positions:
|
|
return False, "No positions to save.", None
|
|
|
|
folder_path = os.path.join(base_folder, route_name)
|
|
os.makedirs(folder_path, exist_ok=True)
|
|
file_path = os.path.join(folder_path, "positions.json")
|
|
try:
|
|
with open(file_path, "w") as f:
|
|
json.dump(positions, f, indent=2)
|
|
return True, f"Route '{route_name}' saved!", file_path
|
|
except Exception as e:
|
|
return False, f"Failed to save route: {str(e)}", None
|
|
|
|
def fetch_positions(server_url, token, device_id, from_time, to_time):
|
|
"""
|
|
Fetch positions from the Traccar API.
|
|
Returns (positions, error_message)
|
|
"""
|
|
url = f"{server_url}/api/reports/route"
|
|
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
|
|
params = {
|
|
"deviceId": device_id,
|
|
"from": from_time,
|
|
"to": to_time
|
|
}
|
|
try:
|
|
response = requests.get(url, params=params, headers=headers, timeout=15)
|
|
if response.status_code == 200:
|
|
return response.json(), None
|
|
elif response.status_code == 400:
|
|
return None, "Bad Request: Please check the request payload and token."
|
|
else:
|
|
return None, f"Failed: {response.status_code} - {response.reason}"
|
|
except requests.exceptions.RequestException as e:
|
|
return None, f"Error fetching positions: {str(e)}"
|
|
|
|
def fetch_positions_for_selected_day(settings, device_mapping, device_name, start_date, end_date, start_hour, end_hour):
|
|
"""
|
|
Fetch positions for the selected day/device using Traccar API.
|
|
Returns (positions, error_message)
|
|
"""
|
|
if not settings:
|
|
return [], "Server settings not found."
|
|
|
|
server_url = settings.get("server_url")
|
|
token = settings.get("token")
|
|
device_id = device_mapping.get(device_name)
|
|
if not device_id:
|
|
return [], "Device ID not found."
|
|
|
|
from_time = f"{start_date}T{start_hour}:00:00Z"
|
|
to_time = f"{end_date}T{end_hour}:59:59Z"
|
|
|
|
positions, error = fetch_positions(server_url, token, device_id, from_time, to_time)
|
|
if error:
|
|
return [], error
|
|
return positions, None
|
|
|
|
def html_to_image(html_path, img_path, width=800, height=600, delay=2, driver_path='/usr/bin/chromedriver'):
|
|
"""
|
|
Convert an HTML file to an image using Selenium and Pillow.
|
|
Args:
|
|
html_path (str): Path to the HTML file.
|
|
img_path (str): Path to save the output image (PNG).
|
|
width (int): Width of the browser window.
|
|
height (int): Height of the browser window.
|
|
delay (int): Seconds to wait for the page to render.
|
|
driver_path (str): Path to chromedriver binary.
|
|
"""
|
|
from selenium import webdriver
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.chrome.options import Options
|
|
from PIL import Image
|
|
import time
|
|
import os
|
|
|
|
chrome_options = Options()
|
|
chrome_options.add_argument("--headless")
|
|
chrome_options.add_argument(f"--window-size={width},{height}")
|
|
chrome_options.add_argument("--no-sandbox")
|
|
chrome_options.add_argument("--disable-dev-shm-usage")
|
|
|
|
service = Service(driver_path)
|
|
driver = webdriver.Chrome(service=service, options=chrome_options)
|
|
|
|
try:
|
|
driver.get("file://" + os.path.abspath(html_path))
|
|
time.sleep(delay) # Wait for the page to render
|
|
tmp_img = img_path + ".tmp.png"
|
|
driver.save_screenshot(tmp_img)
|
|
driver.quit()
|
|
|
|
img = Image.open(tmp_img)
|
|
img = img.crop((0, 0, width, height))
|
|
img.save(img_path)
|
|
os.remove(tmp_img)
|
|
print(f"Image saved to: {img_path}")
|
|
except Exception as e:
|
|
print(f"Error converting HTML to image: {e}")
|
|
driver.quit()
|
|
|
|
def process_preview_util(
|
|
project_name,
|
|
RESOURCES_FOLDER,
|
|
label,
|
|
progress,
|
|
popup,
|
|
preview_image_widget,
|
|
set_preview_image_path,
|
|
Clock,
|
|
width=800,
|
|
height=600
|
|
):
|
|
import folium
|
|
import os
|
|
import json
|
|
from utils import html_to_image
|
|
|
|
try:
|
|
project_folder = os.path.join(RESOURCES_FOLDER, "projects", project_name)
|
|
positions_path = os.path.join(project_folder, "positions.json")
|
|
html_path = os.path.join(project_folder, "preview.html")
|
|
img_path = os.path.join(project_folder, "preview.png")
|
|
|
|
if not os.path.exists(positions_path):
|
|
label.text = "positions.json not found!"
|
|
progress.value = 100
|
|
return
|
|
|
|
with open(positions_path, "r") as f:
|
|
positions = json.load(f)
|
|
|
|
if not positions:
|
|
label.text = "No positions to preview."
|
|
progress.value = 100
|
|
return
|
|
|
|
coords = [(pos['latitude'], pos['longitude']) for pos in positions]
|
|
m = folium.Map(
|
|
location=coords[0],
|
|
width=width,
|
|
height=height
|
|
)
|
|
folium.PolyLine(coords, color="blue", weight=4.5, opacity=1).add_to(m)
|
|
folium.Marker(coords[0], tooltip="Start", icon=folium.Icon(color="green")).add_to(m)
|
|
folium.Marker(coords[-1], tooltip="End", icon=folium.Icon(color="red")).add_to(m)
|
|
m.fit_bounds(coords)
|
|
m.save(html_path)
|
|
|
|
html_to_image(html_path, img_path, width=width, height=height)
|
|
|
|
set_preview_image_path(img_path)
|
|
preview_image_widget.reload()
|
|
label.text = "Preview ready!"
|
|
progress.value = 100
|
|
|
|
def close_popup(dt):
|
|
popup.dismiss()
|
|
Clock.schedule_once(close_popup, 1)
|
|
|
|
except Exception as e:
|
|
label.text = f"Error: {e}"
|
|
progress.value = 100
|
|
def close_popup(dt):
|
|
popup.dismiss()
|
|
Clock.schedule_once(close_popup, 2)
|