Files
traccar_animation/main.py
2025-06-03 16:34:15 +03:00

463 lines
19 KiB
Python

import kivy
from kivy.app import App
from kivy.uix.screenmanager import ScreenManager, Screen
import requests
import os
import json
from cryptography.fernet import Fernet
from kivy.clock import Clock
from kivy.properties import StringProperty, ListProperty
from utils import get_devices_from_server
from utils import check_server_settings # Import the refactored function
from utils import test_connection # Import the refactored function
from datetime import date
from kivy.uix.popup import Popup
from kivy.uix.gridlayout import GridLayout
from kivy.uix.button import Button
from kivy.uix.label import Label
kivy.require("2.0.0") # Ensure the correct Kivy version is used
# Paths
RESOURCES_FOLDER = "resources"
CREDENTIALS_FILE = os.path.join(RESOURCES_FOLDER, "credentials.enc")
KEY_FILE = os.path.join(RESOURCES_FOLDER, "key.key")
# Utility functions for encryption
def generate_key():
"""Generate and save a key for encryption."""
if not os.path.exists(KEY_FILE):
key = Fernet.generate_key()
with open(KEY_FILE, "wb") as key_file:
key_file.write(key)
def load_key():
"""Load the encryption key."""
with open(KEY_FILE, "rb") as key_file:
return key_file.read()
def encrypt_data(data):
"""Encrypt data using the encryption key."""
key = load_key()
fernet = Fernet(key)
return fernet.encrypt(data.encode())
def decrypt_data(data):
"""Decrypt data using the encryption key."""
key = load_key()
fernet = Fernet(key)
return fernet.decrypt(data).decode()
# Login Screen
class LoginScreen(Screen):
def login(self):
"""Handle login and save credentials."""
username = self.ids.username_input.text.strip()
password = self.ids.password_input.text.strip()
if not username or not password:
self.manager.get_screen("home").ids.result_label.text = "Please fill in all fields."
return
# Encrypt and save credentials
credentials = {"username": username, "password": password}
encrypted_data = encrypt_data(json.dumps(credentials))
with open(CREDENTIALS_FILE, "wb") as file:
file.write(encrypted_data)
# Navigate to the home screen
self.manager.current = "home"
# Settings Screen (renamed from MainScreen)
class SettingsScreen(Screen):
server_response = "Waiting to test connection..."
def on_pre_enter(self):
"""Load existing settings into the input fields when the screen is entered."""
settings_file = os.path.join(RESOURCES_FOLDER, "server_settings.enc")
if os.path.exists(settings_file):
try:
with open(settings_file, "rb") as file:
encrypted_data = file.read()
decrypted_data = decrypt_data(encrypted_data)
settings_data = json.loads(decrypted_data)
# Populate the input fields with the existing settings
self.ids.server_url_input.text = settings_data.get("server_url", "")
self.ids.username_input.text = settings_data.get("username", "")
self.ids.password_input.text = settings_data.get("password", "")
self.ids.token_input.text = settings_data.get("token", "") # Populate the token field
except Exception as e:
self.ids.result_label.text = f"Failed to load settings: {str(e)}"
else:
# Clear the input fields if no settings exist
self.ids.server_url_input.text = ""
self.ids.username_input.text = ""
self.ids.password_input.text = ""
self.ids.token_input.text = ""
def test_connection(self):
"""Test the connection with the Traccar server."""
# Get input values from the screen
server_url = self.ids.server_url_input.text.strip()
username = self.ids.username_input.text.strip()
password = self.ids.password_input.text.strip()
token = self.ids.token_input.text.strip() # Get the token input
# Call the refactored function
result = test_connection(server_url, username, password, token)
# Update the UI based on the result
self.server_response = result["message"]
self.ids.result_label.text = self.server_response
def save_settings(self):
"""Save the server settings to an encrypted file and navigate to the HomeScreen."""
server_url = self.ids.server_url_input.text.strip()
username = self.ids.username_input.text.strip()
password = self.ids.password_input.text.strip()
token = self.ids.token_input.text.strip() # Get the token input
if not server_url or not username or not password or not token:
self.ids.result_label.text = "Please fill in all fields."
return
settings_data = {
"server_url": server_url,
"username": username,
"password": password,
"token": token, # Include the token in the data
}
encrypted_data = encrypt_data(json.dumps(settings_data))
try:
# Save the encrypted data to the server_settings.enc file
settings_file = os.path.join(RESOURCES_FOLDER, "server_settings.enc")
with open(settings_file, "wb") as file:
file.write(encrypted_data)
self.ids.result_label.text = "Settings saved successfully!"
# Navigate back to the HomeScreen
self.manager.current = "home"
except Exception as e:
self.ids.result_label.text = f"Failed to save settings: {str(e)}"
# Get Trip From Server Screen
class GetTripFromServer(Screen): # Renamed from HomeScreen
server_info_text = StringProperty("LOADING DATA...") # Default text for the label
server_box_color = ListProperty([0.984, 0.553, 0.078, 1]) # Default yellow color (#FB8D14)
device_mapping = {} # Store the mapping of device names to IDs
def on_pre_enter(self):
"""Start the flow for checking server information and connection."""
self.server_box_color = [0.984, 0.553, 0.078, 1] # Yellow color (#FB8D14)
self.server_info_text = "LOADING DATA..."
Clock.schedule_once(self.check_server_settings, 1) # Wait 1 second before checking settings
def check_server_settings(self, dt):
"""Check server settings and update the UI."""
settings = check_server_settings() # Call the refactored function
if settings:
server_url = settings["server_url"]
username = settings["username"]
password = settings["password"]
token = settings["token"]
# Update the label to indicate checking connection
self.server_info_text = f"CHECKING server: {server_url}"
self.server_box_color = [0.984, 0.553, 0.078, 1] # Keep yellow color (#FB8D14)
self.ids.devices_spinner.text = "Loading devices..." # Show loading text
# Test the connection after loading settings
Clock.schedule_once(lambda dt: self.test_connection(server_url, username, password, token), 1)
else:
self.server_info_text = "Go to settings and set Traccar Server"
self.server_box_color = [0.909, 0.031, 0.243, 1] # Red color (#E8083E)
self.ids.devices_spinner.text = "No devices available"
def test_connection(self, server_url, username, password, token):
"""Test the connection with the Traccar server."""
try:
# Test the connection with the server
headers = {"Authorization": f"Bearer {token}"} if token else None
auth = None if token else (username, password)
print(f"Testing connection to {server_url}")
response = requests.get(f"{server_url}/api/server", headers=headers, auth=auth, timeout=10)
if response.status_code == 200:
# Update the label and box color for a successful connection
self.server_info_text = f"Connected to {server_url}"
self.server_box_color = [0.008, 0.525, 0.290, 1] # Green color (#02864A)
print(f"Connection successful: {self.server_info_text}")
# Fetch devices and populate the dropdown
devices = get_devices_from_server()
if devices:
self.device_mapping = devices # Store the mapping of device names to IDs
device_names = list(devices.keys()) # Get the list of device names
self.ids.devices_spinner.values = device_names # Populate the dropdown
self.ids.devices_spinner.text = "Select a device" # Default text after loading
else:
self.ids.devices_spinner.text = "No devices found"
self.ids.devices_spinner.values = []
else:
# Update the label and box color for a failed connection
self.server_info_text = f"Error: {response.status_code} - {response.reason}"
self.server_box_color = [0.909, 0.031, 0.243, 1] # Red color (#E8083E)
print(f"Connection failed: {self.server_info_text}")
except requests.exceptions.RequestException as e:
# Update the label and box color for a connection error
self.server_info_text = f"Connection failed: {str(e)}"
self.server_box_color = [0.909, 0.031, 0.243, 1] # Red color (#E8083E)
print(f"Connection error: {str(e)}")
def on_device_selected(self, device_name):
"""Change the background color of the Spinner when a device is selected."""
if device_name != "Loading devices..." and device_name != "No devices found":
self.ids.devices_spinner.background_color = (0.008, 0.525, 0.290, 1) # Green color (#02864A)
print(f"Device selected: {device_name}")
else:
self.ids.devices_spinner.background_color = (1, 1, 1, 1) # Reset to white if no valid device is selected
def open_date_picker(self, which):
"""Open a popup to select a date for start or end."""
today = date.today()
selected_date = [None]
def on_date_selected(instance):
selected_date[0] = instance.text
date_str = f"{today.year}-{today.month:02d}-{int(selected_date[0]):02d}"
if which == 'start':
self.ids.start_date_picker_button.text = date_str
else:
self.ids.end_date_picker_button.text = date_str
popup.dismiss()
layout = GridLayout(cols=7, spacing=5, padding=10)
for day in range(1, 32):
try:
current_date = date(today.year, today.month, day)
button = Button(text=str(day), size_hint=(None, None), size=(40, 40))
button.bind(on_press=on_date_selected)
layout.add_widget(button)
except ValueError:
pass
popup = Popup(title="Select a Date", content=layout, size_hint=(0.8, 0.8))
popup.open()
def get_trip_server_data(self):
"""Handle the Get trip server data button press."""
selected_device = self.ids.devices_spinner.text
start_date = self.ids.start_date_picker_button.text
end_date = self.ids.end_date_picker_button.text
if selected_device == "Loading devices..." or selected_device == "No devices found":
print("No valid device selected.")
self.ids.result_label.text = "Please select a valid device."
return
if start_date == "Select Date" or end_date == "Select Date":
print("No valid date selected.")
self.ids.result_label.text = "Please select valid start and end dates."
return
# Fetch trip data from the server
print(f"Fetching trip data for device: {selected_device} from {start_date} to {end_date}")
self.ids.result_label.text = f"Fetching trip data for {selected_device} from {start_date} to {end_date}..."
positions = self.fetch_positions_for_selected_day()
if positions:
print("Positions received:")
for pos in positions:
print(f"{pos['deviceTime']}: {pos['latitude']}, {pos['longitude']}")
else:
print("No positions found or error occurred.")
def fetch_positions_for_selected_day(self):
"""Fetch all positions for the selected device and date/time range from the Traccar server."""
settings = check_server_settings()
if not settings:
self.ids.result_label.text = "Server settings not found."
return []
server_url = settings["server_url"]
token = settings["token"]
selected_device = self.ids.devices_spinner.text
if selected_device not in self.device_mapping:
self.ids.result_label.text = "Please select a valid device."
return []
device_id = self.device_mapping[selected_device]
# Get start/end date and hour from UI
start_date = self.ids.start_date_picker_button.text
start_hour = self.ids.start_hour_spinner.text
end_date = self.ids.end_date_picker_button.text
end_hour = self.ids.end_hour_spinner.text
# Validate
if "Select" in start_date or "Select" in end_date:
self.ids.result_label.text = "Please select both start and end dates."
return []
# Build ISO 8601 time strings
from_time = f"{start_date}T{start_hour}:00:00Z"
to_time = f"{end_date}T{end_hour}:59:59Z"
# Prepare request for /reports/route
url = f"{server_url}/reports/route"
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
params = {
"deviceId": device_id,
"from": from_time,
"to": to_time
}
try:
print(f"Request Payload: {params}")
response = requests.get(url, params=params, headers=headers, timeout=15)
print(f"Response Status Code: {response.status_code}")
print(f"Response Content: {response.text}")
if response.status_code == 200:
positions = response.json()
print(f"Retrieved {len(positions)} positions.")
self.ids.result_label.text = f"Retrieved {len(positions)} positions."
return positions
elif response.status_code == 400:
self.ids.result_label.text = "Bad Request: Please check the request payload and token."
return []
else:
self.ids.result_label.text = f"Failed: {response.status_code} - {response.reason}"
return []
except requests.exceptions.RequestException as e:
self.ids.result_label.text = f"Error fetching positions: {str(e)}"
return []
class RegisterScreen(Screen):
def create_user(self):
"""Handle user creation."""
username = self.ids.set_username_input.text.strip()
password = self.ids.set_password_input.text.strip()
confirm_password = self.ids.confirm_password_input.text.strip()
email = self.ids.set_email_input.text.strip()
if not username or not password or not confirm_password or not email:
self.ids.result_label.text = "Please fill in all fields."
return
if password != confirm_password:
self.ids.result_label.text = "Passwords do not match."
return
# Check if the username or email already exists
if self.user_exists(username, email):
self.ids.result_label.text = "User or email already exists."
return
# Save user data (encrypted)
user_data = {
"username": username,
"password": password,
"email": email,
}
encrypted_data = encrypt_data(json.dumps(user_data))
try:
with open(CREDENTIALS_FILE, "ab") as file: # Append encrypted data
file.write(encrypted_data + b"\n") # Add a newline for separation
except Exception as e:
self.ids.result_label.text = f"Failed to save user: {str(e)}"
return
self.ids.result_label.text = "User created successfully!"
# Navigate back to the login screen
self.manager.current = "login"
def user_exists(self, username, email):
"""Check if a username or email already exists in the credentials.enc file."""
try:
with open(CREDENTIALS_FILE, "rb") as file:
for line in file:
decrypted_data = decrypt_data(line.strip())
user = json.loads(decrypted_data)
if user["username"] == username or user["email"] == email:
return True
except FileNotFoundError:
pass
except Exception as e:
self.ids.result_label.text = f"Error checking user: {str(e)}"
return False
# Home Screen
class HomeScreen(Screen):
def on_pre_enter(self):
"""Load existing projects/trips when the screen is entered."""
self.load_existing_projects()
def load_existing_projects(self):
"""Load the list of existing projects/trips."""
projects_folder = os.path.join(RESOURCES_FOLDER, "projects")
if not os.path.exists(projects_folder):
os.makedirs(projects_folder)
# Clear the list area
self.ids.projects_list.clear_widgets()
# Populate the list with existing projects/trips
for project in os.listdir(projects_folder):
project_button = Button(
text=project,
size_hint_y=None,
height=40,
on_press=lambda instance: self.open_project(instance.text)
)
self.ids.projects_list.add_widget(project_button)
def open_project(self, project_name):
"""Handle opening an existing project/trip."""
print(f"Opening project: {project_name}")
self.ids.result_label.text = f"Opened project: {project_name}"
def create_new_project(self):
"""Navigate to the GetTripFromServer screen to create a new project/trip."""
self.manager.current = "get_trip_from_server"
# Main App
class TraccarApp(App):
def build(self):
# Ensure resources folder exists
if not os.path.exists(RESOURCES_FOLDER):
os.makedirs(RESOURCES_FOLDER)
# Generate encryption key if it doesn't exist
generate_key()
# Screen manager
sm = ScreenManager()
sm.add_widget(LoginScreen(name="login"))
sm.add_widget(HomeScreen(name="home")) # Add the HomeScreen
sm.add_widget(GetTripFromServer(name="get_trip_from_server")) # Updated reference
sm.add_widget(SettingsScreen(name="settings")) # Add the renamed SettingsScreen
sm.add_widget(RegisterScreen(name="register")) # Add the RegisterScreen
# Debugging: Print all screen names
print("Screens added to ScreenManager:", [screen.name for screen in sm.screens])
return sm
if __name__ == "__main__":
TraccarApp().run()