updated view

This commit is contained in:
2025-06-05 13:25:19 +03:00
parent 26640678cb
commit 418ebc6f49
19 changed files with 124737 additions and 8263 deletions

Binary file not shown.

613
main.py
View File

@@ -1,257 +1,312 @@
import kivy
from kivy.app import App
from kivy.uix.screenmanager import ScreenManager, Screen
import requests
import os
import json
from cryptography.fernet import Fernet
from kivy.clock import Clock
from kivy.properties import StringProperty, ListProperty
from utils import get_devices_from_server
from utils import check_server_settings # Import the refactored function
from utils import test_connection # Import the refactored function
from utils import (
generate_key, load_key, encrypt_data, decrypt_data,
check_server_settings, save_server_settings,
test_connection, get_devices_from_server, save_route_to_file, fetch_positions_for_selected_day
)
from datetime import date
from kivy.uix.popup import Popup
from kivy.uix.gridlayout import GridLayout
from kivy.uix.button import Button
from kivy.uix.label import Label
from kivy.uix.boxlayout import BoxLayout
from threading import Thread
from kivy.clock import mainthread
kivy.require("2.0.0") # Ensure the correct Kivy version is used
kivy.require("2.0.0")
from kivy.core.window import Window
Window.size = (360, 780)
# Paths
RESOURCES_FOLDER = "resources"
CREDENTIALS_FILE = os.path.join(RESOURCES_FOLDER, "credentials.enc")
KEY_FILE = os.path.join(RESOURCES_FOLDER, "key.key")
# Utility functions for encryption
def generate_key():
"""Generate and save a key for encryption."""
if not os.path.exists(KEY_FILE):
key = Fernet.generate_key()
with open(KEY_FILE, "wb") as key_file:
key_file.write(key)
def load_key():
"""Load the encryption key."""
with open(KEY_FILE, "rb") as key_file:
return key_file.read()
def encrypt_data(data):
"""Encrypt data using the encryption key."""
key = load_key()
fernet = Fernet(key)
return fernet.encrypt(data.encode())
def decrypt_data(data):
"""Decrypt data using the encryption key."""
key = load_key()
fernet = Fernet(key)
return fernet.decrypt(data).decode()
# Login Screen
class LoginScreen(Screen):
def login(self):
"""Handle login and save credentials."""
username = self.ids.username_input.text.strip()
password = self.ids.password_input.text.strip()
password = self.ids.username_input.text.strip()
if not username or not password:
self.manager.get_screen("home").ids.result_label.text = "Please fill in all fields."
return
# Encrypt and save credentials
credentials = {"username": username, "password": password}
encrypted_data = encrypt_data(json.dumps(credentials))
with open(CREDENTIALS_FILE, "wb") as file:
file.write(encrypted_data)
# Navigate to the home screen
self.manager.current = "home"
# Settings Screen (renamed from MainScreen)
class SettingsScreen(Screen):
server_response = "Waiting to test connection..."
def on_pre_enter(self):
"""Load existing settings into the input fields when the screen is entered."""
settings_file = os.path.join(RESOURCES_FOLDER, "server_settings.enc")
if os.path.exists(settings_file):
try:
with open(settings_file, "rb") as file:
encrypted_data = file.read()
decrypted_data = decrypt_data(encrypted_data)
settings_data = json.loads(decrypted_data)
# Populate the input fields with the existing settings
self.ids.server_url_input.text = settings_data.get("server_url", "")
self.ids.username_input.text = settings_data.get("username", "")
self.ids.password_input.text = settings_data.get("password", "")
self.ids.token_input.text = settings_data.get("token", "") # Populate the token field
except Exception as e:
self.ids.result_label.text = f"Failed to load settings: {str(e)}"
settings = check_server_settings()
if settings:
self.ids.server_url_input.text = settings.get("server_url", "")
self.ids.username_input.text = settings.get("username", "")
self.ids.password_input.text = settings.get("password", "")
self.ids.token_input.text = settings.get("token", "")
else:
# Clear the input fields if no settings exist
self.ids.server_url_input.text = ""
self.ids.username_input.text = ""
self.ids.password_input.text = ""
self.ids.token_input.text = ""
def test_connection(self):
"""Test the connection with the Traccar server."""
# Get input values from the screen
server_url = self.ids.server_url_input.text.strip()
username = self.ids.username_input.text.strip()
password = self.ids.password_input.text.strip()
token = self.ids.token_input.text.strip() # Get the token input
# Call the refactored function
token = self.ids.token_input.text.strip()
result = test_connection(server_url, username, password, token)
# Update the UI based on the result
self.server_response = result["message"]
self.ids.result_label.text = self.server_response
def save_settings(self):
"""Save the server settings to an encrypted file and navigate to the HomeScreen."""
server_url = self.ids.server_url_input.text.strip()
username = self.ids.username_input.text.strip()
password = self.ids.password_input.text.strip()
token = self.ids.token_input.text.strip() # Get the token input
token = self.ids.token_input.text.strip()
if not server_url or not username or not password or not token:
self.ids.result_label.text = "Please fill in all fields."
return
settings_data = {
"server_url": server_url,
"username": username,
"password": password,
"token": token, # Include the token in the data
"token": token,
}
encrypted_data = encrypt_data(json.dumps(settings_data))
try:
# Save the encrypted data to the server_settings.enc file
settings_file = os.path.join(RESOURCES_FOLDER, "server_settings.enc")
with open(settings_file, "wb") as file:
file.write(encrypted_data)
save_server_settings(settings_data)
self.ids.result_label.text = "Settings saved successfully!"
# Navigate back to the HomeScreen
self.manager.current = "home"
except Exception as e:
self.ids.result_label.text = f"Failed to save settings: {str(e)}"
# Get Trip From Server Screen
class GetTripFromServer(Screen): # Renamed from HomeScreen
server_info_text = StringProperty("LOADING DATA...") # Default text for the label
server_box_color = ListProperty([0.984, 0.553, 0.078, 1]) # Default yellow color (#FB8D14)
device_mapping = {} # Store the mapping of device names to IDs
class GetTripFromServer(Screen):
server_info_text = StringProperty("LOADING DATA...")
server_box_color = ListProperty([0.984, 0.553, 0.078, 1])
device_mapping = {}
def on_pre_enter(self):
"""Start the flow for checking server information and connection."""
self.server_box_color = [0.984, 0.553, 0.078, 1] # Yellow color (#FB8D14)
self.server_box_color = [0.984, 0.553, 0.078, 1]
self.server_info_text = "LOADING DATA..."
Clock.schedule_once(self.check_server_settings, 1) # Wait 1 second before checking settings
Clock.schedule_once(self.check_server_settings, 1)
def check_server_settings(self, dt):
"""Check server settings and update the UI."""
settings = check_server_settings() # Call the refactored function
settings = check_server_settings()
if settings:
server_url = settings["server_url"]
username = settings["username"]
password = settings["password"]
token = settings["token"]
# Update the label to indicate checking connection
self.server_info_text = f"CHECKING server: {server_url}"
self.server_box_color = [0.984, 0.553, 0.078, 1] # Keep yellow color (#FB8D14)
self.ids.devices_spinner.text = "Loading devices..." # Show loading text
# Test the connection after loading settings
self.server_box_color = [0.984, 0.553, 0.078, 1]
self.ids.devices_spinner.text = "Loading devices..."
Clock.schedule_once(lambda dt: self.test_connection(server_url, username, password, token), 1)
else:
self.server_info_text = "Go to settings and set Traccar Server"
self.server_box_color = [0.909, 0.031, 0.243, 1] # Red color (#E8083E)
self.server_box_color = [0.909, 0.031, 0.243, 1]
self.ids.devices_spinner.text = "No devices available"
def set_result_message(self, message, color=(1, 1, 1, 1)):
self.ids.result_label.text = message
self.ids.result_label.color = color
def update_devices_spinner(self, devices):
if devices:
self.device_mapping = devices
device_names = list(devices.keys())
self.ids.devices_spinner.values = device_names
self.ids.devices_spinner.text = "Select a device"
else:
self.ids.devices_spinner.text = "No devices found"
self.ids.devices_spinner.values = []
def test_connection(self, server_url, username, password, token):
"""Test the connection with the Traccar server."""
try:
# Test the connection with the server
headers = {"Authorization": f"Bearer {token}"} if token else None
auth = None if token else (username, password)
print(f"Testing connection to {server_url}")
response = requests.get(f"{server_url}/api/server", headers=headers, auth=auth, timeout=10)
if response.status_code == 200:
# Update the label and box color for a successful connection
self.server_info_text = f"Connected to {server_url}"
self.server_box_color = [0.008, 0.525, 0.290, 1] # Green color (#02864A)
print(f"Connection successful: {self.server_info_text}")
# Fetch devices and populate the dropdown
devices = get_devices_from_server()
if devices:
self.device_mapping = devices # Store the mapping of device names to IDs
device_names = list(devices.keys()) # Get the list of device names
self.ids.devices_spinner.values = device_names # Populate the dropdown
self.ids.devices_spinner.text = "Select a device" # Default text after loading
else:
self.ids.devices_spinner.text = "No devices found"
self.ids.devices_spinner.values = []
else:
# Update the label and box color for a failed connection
self.server_info_text = f"Error: {response.status_code} - {response.reason}"
self.server_box_color = [0.909, 0.031, 0.243, 1] # Red color (#E8083E)
print(f"Connection failed: {self.server_info_text}")
except requests.exceptions.RequestException as e:
# Update the label and box color for a connection error
self.server_info_text = f"Connection failed: {str(e)}"
self.server_box_color = [0.909, 0.031, 0.243, 1] # Red color (#E8083E)
print(f"Connection error: {str(e)}")
result = test_connection(server_url, username, password, token)
if result["status"]:
self.server_info_text = f"Connected to {server_url}"
self.server_box_color = [0.008, 0.525, 0.290, 1]
devices = get_devices_from_server()
self.update_devices_spinner(devices)
else:
self.server_info_text = result["message"]
self.server_box_color = [0.909, 0.031, 0.243, 1]
def on_device_selected(self, device_name):
"""Change the background color of the Spinner when a device is selected."""
if device_name != "Loading devices..." and device_name != "No devices found":
self.ids.devices_spinner.background_color = (0.008, 0.525, 0.290, 1) # Green color (#02864A)
print(f"Device selected: {device_name}")
self.ids.devices_spinner.background_color = (0.008, 0.525, 0.290, 1)
else:
self.ids.devices_spinner.background_color = (1, 1, 1, 1) # Reset to white if no valid device is selected
self.ids.devices_spinner.background_color = (1, 1, 1, 1)
def open_date_picker(self, which):
"""Open a popup to select a date for start or end."""
today = date.today()
selected_date = [None]
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.gridlayout import GridLayout
from kivy.uix.button import Button
from kivy.uix.label import Label
from kivy.uix.popup import Popup
def on_date_selected(instance):
selected_date[0] = instance.text
date_str = f"{today.year}-{today.month:02d}-{int(selected_date[0]):02d}"
if which == 'start':
self.ids.start_date_picker_button.text = date_str
import calendar
today = date.today()
selected = {"year": today.year, "month": today.month}
def update_grid():
grid.clear_widgets()
days_in_month = calendar.monthrange(selected["year"], selected["month"])[1]
for day in range(1, days_in_month + 1):
btn = Button(
text=str(day),
size_hint=(None, None),
size=(38, 38),
background_color=(0.341, 0.235, 0.980, 1),
color=(1, 1, 1, 1),
font_size=15
)
def on_date_selected(instance, day=day):
date_str = f"{selected['year']}-{selected['month']:02d}-{day:02d}"
if which == 'start':
self.ids.start_date_picker_button.text = date_str
else:
self.ids.end_date_picker_button.text = date_str
popup.dismiss()
btn.bind(on_press=on_date_selected)
grid.add_widget(btn)
def prev_month(instance):
if selected["month"] > 1:
selected["month"] -= 1
else:
self.ids.end_date_picker_button.text = date_str
selected["year"] -= 1
selected["month"] = 12
# Don't allow future months
if (selected["year"], selected["month"]) > (today.year, today.month):
selected["year"], selected["month"] = today.year, today.month
title_label.text = f"Select a Date ({selected['year']}-{selected['month']:02d})"
update_grid()
def next_month(instance):
# Only allow up to current month
if (selected["year"], selected["month"]) < (today.year, today.month):
if selected["month"] < 12:
selected["month"] += 1
else:
selected["year"] += 1
selected["month"] = 1
title_label.text = f"Select a Date ({selected['year']}-{selected['month']:02d})"
update_grid()
def on_cancel(instance):
popup.dismiss()
layout = GridLayout(cols=7, spacing=5, padding=10)
for day in range(1, 32):
try:
current_date = date(today.year, today.month, day)
button = Button(text=str(day), size_hint=(None, None), size=(40, 40))
button.bind(on_press=on_date_selected)
layout.add_widget(button)
except ValueError:
pass
# Main vertical layout
main_layout = BoxLayout(orientation='vertical', spacing=10, padding=10)
popup = Popup(title="Select a Date", content=layout, size_hint=(0.8, 0.8))
# Month navigation row
nav_layout = BoxLayout(orientation='horizontal', size_hint_y=None, height=40, spacing=10)
prev_btn = Button(text="<", size_hint_x=None, width=40, font_size=18, background_color=(0.341, 0.235, 0.980, 1), color=(1,1,1,1))
next_btn = Button(text=">", size_hint_x=None, width=40, font_size=18, background_color=(0.341, 0.235, 0.980, 1), color=(1,1,1,1))
title_label = Label(
text=f"Select a Date ({selected['year']}-{selected['month']:02d})",
size_hint_x=1,
font_size=18,
color=(1, 1, 1, 1)
)
prev_btn.bind(on_press=prev_month)
next_btn.bind(on_press=next_month)
nav_layout.add_widget(prev_btn)
nav_layout.add_widget(title_label)
nav_layout.add_widget(next_btn)
main_layout.add_widget(nav_layout)
# Grid of days: 6 columns
grid = GridLayout(cols=6, spacing=6, size_hint_y=None)
grid.bind(minimum_height=grid.setter('height'))
main_layout.add_widget(grid)
# Cancel button
cancel_btn = Button(
text="Cancel",
size_hint_y=None,
height=44,
background_color=(0.909, 0.031, 0.243, 1),
color=(1, 1, 1, 1),
font_size=16
)
cancel_btn.bind(on_press=on_cancel)
main_layout.add_widget(cancel_btn)
popup = Popup(
title="",
content=main_layout,
size_hint=(0.95, 0.8),
background_color=(0.11, 0.10, 0.15, 1),
separator_height=0,
auto_dismiss=False
)
update_grid()
popup.open()
def open_hour_picker(self, which):
from kivy.uix.popup import Popup
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.button import Button
from kivy.uix.scrollview import ScrollView
from kivy.uix.gridlayout import GridLayout
popup_layout = BoxLayout(orientation='vertical', spacing=8, padding=8)
scroll = ScrollView(size_hint=(1, 1))
grid = GridLayout(cols=1, size_hint_y=None, spacing=4)
grid.bind(minimum_height=grid.setter('height'))
# Add hour buttons
for h in range(24):
hour_str = f"{h:02d}"
btn = Button(
text=hour_str,
size_hint_y=None,
height=44,
font_size=18,
background_color=(0.341, 0.235, 0.980, 1),
color=(1, 1, 1, 1)
)
def set_hour(instance, hour=hour_str):
if which == 'start':
self.ids.start_hour_button.text = hour
else:
self.ids.end_hour_button.text = hour
popup.dismiss()
btn.bind(on_press=set_hour)
grid.add_widget(btn)
scroll.add_widget(grid)
popup_layout.add_widget(scroll)
# Cancel button
cancel_btn = Button(
text="Cancel",
size_hint_y=None,
height=44,
background_color=(0.909, 0.031, 0.243, 1),
color=(1, 1, 1, 1),
font_size=16
)
cancel_btn.bind(on_press=lambda x: popup.dismiss())
popup_layout.add_widget(cancel_btn)
popup = Popup(
title="Select Hour",
content=popup_layout,
size_hint=(0.6, 0.7),
auto_dismiss=False
)
popup.open()
def update_points_count(self, count):
@@ -261,30 +316,18 @@ class GetTripFromServer(Screen): # Renamed from HomeScreen
def save_route(self):
"""Save the current list of positions as a route in resources/projects/<route_name>/positions.json."""
route_name = self.ids.route_name_input.text.strip()
if not route_name:
self.ids.result_label.text = "Please enter a route name."
return
positions = getattr(self, "last_positions", None)
if not positions:
self.ids.result_label.text = "No positions to save."
return
folder_path = os.path.join("resources", "projects", route_name)
os.makedirs(folder_path, exist_ok=True)
file_path = os.path.join(folder_path, "positions.json")
try:
with open(file_path, "w") as f:
json.dump(positions, f, indent=2)
self.ids.result_label.text = f"Route '{route_name}' saved!"
success, message, file_path = save_route_to_file(route_name, positions)
self.ids.result_label.text = message
if success:
# Reset UI fields
self.ids.devices_spinner.text = "Select a device"
self.ids.start_date_picker_button.text = "Select Start Date"
self.ids.end_date_picker_button.text = "Select End Date"
self.ids.start_hour_spinner.text = "00"
self.ids.end_hour_spinner.text = "23"
self.ids.start_hour_button.text = "00"
self.ids.end_hour_button.text = "23"
self.ids.points_count_label.text = "Points: 0"
self.ids.route_name_input.text = ""
@@ -299,9 +342,6 @@ class GetTripFromServer(Screen): # Renamed from HomeScreen
self.manager.current = "home"
Clock.schedule_once(close_and_go_home, 3)
except Exception as e:
self.ids.result_label.text = f"Failed to save route: {str(e)}"
def get_trip_server_data(self):
"""Handle the Get trip server data button press."""
selected_device = self.ids.devices_spinner.text
@@ -333,68 +373,38 @@ class GetTripFromServer(Screen): # Renamed from HomeScreen
print("No positions found or error occurred.")
def fetch_positions_for_selected_day(self):
"""Fetch all positions for the selected device and date/time range from the Traccar server."""
settings = check_server_settings()
if not settings:
self.ids.result_label.text = "Server settings not found."
return []
server_url = settings["server_url"]
token = settings["token"]
selected_device = self.ids.devices_spinner.text
if selected_device not in self.device_mapping:
self.ids.result_label.text = "Please select a valid device."
return []
device_id = self.device_mapping[selected_device]
# Get start/end date and hour from UI
device_name = self.ids.devices_spinner.text
start_date = self.ids.start_date_picker_button.text
start_hour = self.ids.start_hour_spinner.text
end_date = self.ids.end_date_picker_button.text
end_hour = self.ids.end_hour_spinner.text
end_date = self.ids.end_date_picker.text
start_hour = self.ids.start_hour_button.text
end_hour = self.ids.end_hour_button.text
# Validate
if "Select" in start_date or "Select" in end_date:
self.ids.result_label.text = "Please select both start and end dates."
positions, error = fetch_positions_for_selected_day(
settings,
self.device_mapping,
device_name,
start_date,
end_date,
start_hour,
end_hour
)
if error:
self.ids.result_label.text = error
return []
self.ids.result_label.text = f"Retrieved {len(positions)} positions."
return positions
# Build ISO 8601 time strings
from_time = f"{start_date}T{start_hour}:00:00Z"
to_time = f"{end_date}T{end_hour}:59:59Z"
def fetch_devices_async(self):
Thread(target=self._fetch_devices_worker).start()
# Prepare request for /reports/route
url = f"{server_url}/reports/route" # If server_url ends with /api
# OR
url = f"{server_url}/api/reports/route" # If server_url does NOT end with /api
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
params = {
"deviceId": device_id,
"from": from_time,
"to": to_time
}
def _fetch_devices_worker(self):
devices = get_devices_from_server()
self.update_devices_spinner_mainthread(devices)
try:
print(f"Request Payload: {params}")
response = requests.get(url, params=params, headers=headers, timeout=15)
print(f"Response Status Code: {response.status_code}")
print(f"Response Content: {response.text}")
if response.status_code == 200:
positions = response.json()
print(f"Retrieved {len(positions)} positions.")
self.ids.result_label.text = f"Retrieved {len(positions)} positions."
return positions
elif response.status_code == 400:
self.ids.result_label.text = "Bad Request: Please check the request payload and token."
return []
else:
self.ids.result_label.text = f"Failed: {response.status_code} - {response.reason}"
return []
except requests.exceptions.RequestException as e:
self.ids.result_label.text = f"Error fetching positions: {str(e)}"
return []
@mainthread
def update_devices_spinner_mainthread(self, devices):
self.update_devices_spinner(devices)
class RegisterScreen(Screen):
@@ -452,31 +462,78 @@ class RegisterScreen(Screen):
self.ids.result_label.text = f"Error checking user: {str(e)}"
return False
class CreateAnimationScreen(Screen):
pass
# Home Screen
class HomeScreen(Screen):
def on_pre_enter(self):
"""Load existing projects/trips when the screen is entered."""
self.load_existing_projects()
def load_existing_projects(self):
"""Load the list of existing projects/trips."""
projects_folder = os.path.join(RESOURCES_FOLDER, "projects")
archive_folder = os.path.join(RESOURCES_FOLDER, "trip_archive")
if not os.path.exists(projects_folder):
os.makedirs(projects_folder)
if not os.path.exists(archive_folder):
os.makedirs(archive_folder)
# Clear the list area
self.ids.projects_list.clear_widgets()
# Populate the list with existing projects/trips
for project in os.listdir(projects_folder):
project_button = Button(
text=project,
row = BoxLayout(
orientation="horizontal",
size_hint_y=None,
height=40,
on_press=lambda instance: self.open_project(instance.text)
height=38,
spacing=6,
padding=(8, 4)
)
self.ids.projects_list.add_widget(project_button)
from kivy.graphics import Color, Rectangle
with row.canvas.before:
Color(0.11, 0.10, 0.15, 1) # Match app background
row.bg_rect = Rectangle(pos=row.pos, size=row.size)
def update_bg_rect(instance, value):
row.bg_rect.pos = row.pos
row.bg_rect.size = row.size
row.bind(pos=update_bg_rect, size=update_bg_rect)
project_label = Label(
text=project,
size_hint_x=0.64,
color=(1, 1, 1, 1),
font_size=15,
shorten=True,
shorten_from='right'
)
# Edit icon button
from kivy.uix.image import Image
from kivy.uix.behaviors import ButtonBehavior
class IconButton(ButtonBehavior, Image):
pass
edit_button = IconButton(
source="resources/images/edit.png",
size_hint_x=0.18,
allow_stretch=True,
keep_ratio=True
)
edit_button.bind(on_press=lambda instance, p=project: self.edit_project(p))
# Delete icon button
delete_button = IconButton(
source="resources/images/delete.png",
size_hint_x=0.18,
allow_stretch=True,
keep_ratio=True
)
delete_button.bind(on_press=lambda instance, p=project: self.confirm_delete_project(p))
row.add_widget(project_label)
row.add_widget(edit_button)
row.add_widget(delete_button)
self.ids.projects_list.add_widget(row)
def open_project(self, project_name):
"""Handle opening an existing project/trip."""
@@ -487,30 +544,76 @@ class HomeScreen(Screen):
"""Navigate to the GetTripFromServer screen to create a new project/trip."""
self.manager.current = "get_trip_from_server"
def edit_project(self, project_name):
# Navigate to the create_animation screen and pass the project name if needed
self.manager.current = "create_animation"
# Optionally, set a property or method to load the project in create_animation
def confirm_delete_project(self, project_name):
from kivy.uix.popup import Popup
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.button import Button
from kivy.uix.label import Label
layout = BoxLayout(orientation='vertical', spacing=10, padding=10)
label = Label(text=f"Delete project '{project_name}'?\nChoose an option:")
btn_delete = Button(text="Delete Completely", background_color=(1, 0, 0, 1))
btn_archive = Button(text="Archive Trip", background_color=(0.341, 0.235, 0.980, 1))
btn_cancel = Button(text="Cancel")
popup = Popup(title="Delete Project", content=layout, size_hint=(None, None), size=(400, 250), auto_dismiss=False)
layout.add_widget(label)
layout.add_widget(btn_delete)
layout.add_widget(btn_archive)
layout.add_widget(btn_cancel)
def do_delete(instance):
self.delete_project(project_name)
popup.dismiss()
self.load_existing_projects()
def do_archive(instance):
self.archive_project(project_name)
popup.dismiss()
self.load_existing_projects()
btn_delete.bind(on_press=do_delete)
btn_archive.bind(on_press=do_archive)
btn_cancel.bind(on_press=lambda x: popup.dismiss())
popup.open()
def delete_project(self, project_name):
import shutil
folder_path = os.path.join(RESOURCES_FOLDER, "projects", project_name)
if os.path.exists(folder_path):
shutil.rmtree(folder_path)
def archive_project(self, project_name):
import shutil
src_file = os.path.join(RESOURCES_FOLDER, "projects", project_name, "positions.json")
archive_folder = os.path.join(RESOURCES_FOLDER, "trip_archive")
if not os.path.exists(archive_folder):
os.makedirs(archive_folder)
dst_file = os.path.join(archive_folder, f"{project_name}.json")
if os.path.exists(src_file):
shutil.copy2(src_file, dst_file)
# Optionally, delete the project folder after archiving
self.delete_project(project_name)
# Main App
class TraccarApp(App):
def build(self):
# Ensure resources folder exists
if not os.path.exists(RESOURCES_FOLDER):
os.makedirs(RESOURCES_FOLDER)
# Generate encryption key if it doesn't exist
generate_key()
# Screen manager
sm = ScreenManager()
sm.add_widget(LoginScreen(name="login"))
sm.add_widget(HomeScreen(name="home")) # Add the HomeScreen
sm.add_widget(GetTripFromServer(name="get_trip_from_server")) # Updated reference
sm.add_widget(SettingsScreen(name="settings")) # Add the renamed SettingsScreen
sm.add_widget(RegisterScreen(name="register")) # Add the RegisterScreen
# Debugging: Print all screen names
sm.add_widget(HomeScreen(name="home"))
sm.add_widget(GetTripFromServer(name="get_trip_from_server"))
sm.add_widget(SettingsScreen(name="settings"))
sm.add_widget(RegisterScreen(name="register"))
sm.add_widget(CreateAnimationScreen(name="create_animation"))
print("Screens added to ScreenManager:", [screen.name for screen in sm.screens])
return sm
if __name__ == "__main__":
TraccarApp().run()

View File

@@ -1 +1 @@
gAAAAABoQFHNaG_pnMnzlvJJFMIV7sbnJPMG1qcbfMo7l9FqOUdbOXmP4KtjW0JfrRwFN2daFSD92zjWvarQCDgSBcCN3EDOD_T4IXgscWBscygqIJ_UY5QnkGAeczT0SgbcUyyRgn-v
gAAAAABoQW_vdU_6k9K8_uHQSqyH5Ym29SpcwEe8z092nCaELg2i2aRJkhcPfDzPwuPLqo8vzHHKhIQpTXTMpm7s3PPqCy1Xok7lGAm5UJIRF2y9BNe77jH2fNENMN_ipmAT4QgNIAFs

BIN
resources/images/delete.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.6 KiB

BIN
resources/images/edit.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 77 KiB

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -146,191 +146,179 @@
<HomeScreen>:
BoxLayout:
orientation: "vertical"
padding: 10
spacing: 10
orientation: 'vertical'
padding: [8, 8, 8, 8]
spacing: 8
canvas.before:
Color:
rgba: 0.11, 0.10, 0.15, 1 # Background color: #1C1A27
rgba: 0.11, 0.10, 0.15, 1 # Match app background
Rectangle:
pos: self.pos
size: self.size
Label:
text: "Welcome to Home Screen"
font_size: 24
text: "Your Trips"
font_size: 20
size_hint_y: None
height: 50
color: 1, 1, 1, 1 # White text color
height: 40
color: 1, 1, 1, 1
ScrollView:
size_hint: (1, 0.6)
do_scroll_x: False
GridLayout:
id: projects_list
cols: 1
size_hint_y: None
height: self.minimum_height
spacing: 4
Button:
text: "Create New Project / Trip"
text: "Create New Trip"
size_hint_y: None
height: 50
background_color: 0.341, 0.235, 0.980, 1 # Purple color (#573CFA)
height: 48
background_color: 0.341, 0.235, 0.980, 1
color: 1, 1, 1, 1
font_size: 16
on_press: root.create_new_project()
Label:
id: result_label
text: ""
size_hint_y: None
height: 30
color: 1, 1, 1, 1 # White text color
<GetTripFromServer>:
BoxLayout:
orientation: "vertical"
padding: 20
spacing: 20
padding: [12, 0, 12, 12]
spacing: 18
canvas.before:
Color:
rgba: 0.11, 0.10, 0.15, 1 # Background color: #1C1A27
rgba: 0.11, 0.10, 0.15, 1
Rectangle:
pos: self.pos
size: self.size
# First row: Server settings
# Responsive Server info row
BoxLayout:
id: server_info_settings
orientation: "horizontal"
id: server_info_box
orientation: 'horizontal' if self.width > 400 else 'vertical'
size_hint_y: None
height: 30
height: 40 if self.width > 400 else 80
spacing: 10
padding: [10, 10, 10, 10]
canvas.before:
Color:
rgba: root.server_box_color # Dynamic color for the box
rgba: root.server_box_color
Rectangle:
pos: self.pos
size: self.size
Label:
id: server_info_label
text: root.server_info_text # Dynamic text for the label
font_size: 14 # Reduced font size
size_hint_x: 0.8
text: root.server_info_text
font_size: 15
size_hint_x: 0.7 if server_info_box.orientation == 'horizontal' else 1
size_hint_y: 1
halign: 'left'
valign: 'middle'
text_size: self.size
Button:
text: "Settings"
size_hint_x: 0.2
background_color: 0.341, 0.235, 0.980, 1 # Purple color (#573CFA)
size_hint_x: 0.3 if server_info_box.orientation == 'horizontal' else 1
size_hint_y: 1
font_size: 15
background_color: 0.341, 0.235, 0.980, 1
on_press: app.root.current = "settings"
# Second row: Frame for device and date selection
# Device and date selection
BoxLayout:
orientation: "vertical"
size_hint_y: None
height: 200 # Adjusted height for the frame
spacing: 10
padding: 10
height: 300
spacing: 14
padding: 14
canvas.before:
Color:
rgba: 0.2, 0.2, 0.2, 1 # Frame background color
rgba: 0.2, 0.2, 0.2, 1
Rectangle:
pos: self.pos
size: self.size
Label:
text: "Select device and the day of the data" # Main label
font_size: 14 # Reduced font size
text: "Select device and date"
font_size: 15
size_hint_y: None
height: 20
height: 22
Spinner:
id: devices_spinner
text: "Loading devices..."
values: []
size_hint_y: None
height: 38
font_size: 15
on_text: root.on_device_selected(self.text)
BoxLayout:
orientation: "horizontal"
size_hint_y: None
height: 30
spacing: 10
Spinner:
id: devices_spinner
text: "Loading devices..." # Default text
values: [] # Initially empty
size_hint: (0.5, None)
height: 25
font_size: 14
on_text: root.on_device_selected(self.text)
# New row: Starting date and hour
BoxLayout:
orientation: "horizontal"
size_hint_y: None
height: 30
spacing: 10
height: 38
spacing: 8
Label:
text: "Starting Date"
size_hint_x: 0.3
font_size: 14
text: "Start"
font_size: 13
size_hint_x: 0.18
Button:
id: start_date_picker_button
text: "Select Start Date"
size_hint_x: 0.4
height: 25
font_size: 14
text: "Start Date"
size_hint_x: 0.42
font_size: 13
on_press: root.open_date_picker('start')
Spinner:
id: start_hour_spinner
Button:
id: start_hour_button
text: "00"
values: [f"{i:02d}" for i in range(24)]
size_hint_x: 0.2
height: 25
font_size: 14
size_hint_x: 0.18
font_size: 13
on_press: root.open_hour_picker('start')
# New row: End date and hour
BoxLayout:
orientation: "horizontal"
size_hint_y: None
height: 30
spacing: 10
height: 38
spacing: 8
Label:
text: "End Date"
size_hint_x: 0.3
font_size: 14
text: "End"
font_size: 13
size_hint_x: 0.18
Button:
id: end_date_picker_button
text: "Select End Date"
size_hint_x: 0.4
height: 25
font_size: 14
text: "End Date"
size_hint_x: 0.42
font_size: 13
on_press: root.open_date_picker('end')
Spinner:
id: end_hour_spinner
Button:
id: end_hour_button
text: "23"
values: [f"{i:02d}" for i in range(24)]
size_hint_x: 0.2
height: 25
font_size: 14
size_hint_x: 0.18
font_size: 13
on_press: root.open_hour_picker('end')
# New row: Get trip server data button
Button:
id: get_trip_data_button
text: "Get trip server data"
size_hint: (1, None)
height: 30
font_size: 14
background_color: 0.341, 0.235, 0.980, 1 # Purple color (#573CFA)
on_press: root.get_trip_server_data()
# Responsive button row
BoxLayout:
id: trip_button_box
orientation: 'horizontal' if self.width > 400 else 'vertical'
size_hint_y: None
height: 38 if self.width > 400 else 90
spacing: 10
Button:
id: get_trip_data_button
text: "Get trip data"
font_size: 15
background_color: 0.341, 0.235, 0.980, 1
on_press: root.get_trip_server_data()
# Third row: Route info and save
# Route info and save
BoxLayout:
orientation: "horizontal"
orientation: "vertical"
size_hint_y: None
height: 50
spacing: 10
padding: 10
height: 120
spacing: 14
padding: 14
canvas.before:
Color:
rgba: 0.15, 0.15, 0.15, 1
@@ -341,41 +329,59 @@
Label:
id: points_count_label
text: "Points: 0"
font_size: 16
size_hint_x: 0.2
font_size: 15
size_hint_y: None
height: 24
Label:
text: "Create Route"
font_size: 16
size_hint_x: 0.2
BoxLayout:
orientation: "horizontal"
size_hint_y: None
height: 38
spacing: 8
Label:
text: "Route:"
font_size: 15
size_hint_x: 0.25
TextInput:
id: route_name_input
hint_text: "Route name"
multiline: False
font_size: 15
size_hint_x: 0.5
# Responsive save button row
BoxLayout:
id: save_button_box
orientation: 'horizontal' if self.width > 400 else 'vertical'
size_hint_x: 0.25
size_hint_y: None
height: 38 if self.width > 400 else 90
spacing: 10
Button:
text: "Save"
font_size: 15
background_color: 0.008, 0.525, 0.290, 1
on_press: root.save_route()
TextInput:
id: route_name_input
hint_text: "Enter route name"
multiline: False
size_hint_x: 0.4
Button:
text: "Save"
size_hint_x: 0.2
background_color: 0.008, 0.525, 0.290, 1
on_press: root.save_route()
# Fourth row: Result label
# Result label
Label:
id: result_label
text: "Welcome to the Home Screen!"
font_size: 14 # Reduced font size
size_hint: (1, 0.8)
text: ""
font_size: 15
size_hint_y: None
height: 28
# Fifth row: Back to Home button
# Add empty space before the back button
Widget:
size_hint_y: None
height: 40
# Back button
Button:
text: "Back to Home"
size_hint_y: None
height: 50
size_hint_x: 0.8
pos_hint: {"center_x": 0.5}
background_color: 0.341, 0.235, 0.980, 1 # Purple color (#573CFA)
height: 48
font_size: 15
background_color: 0.341, 0.235, 0.980, 1
on_press: app.root.current = "home"
<SettingsScreen>:
@@ -481,3 +487,27 @@
pos_hint: {"center_x": 0.5}
background_color: 0.341, 0.235, 0.980, 1 # Purple color (#573CFA)
on_press: app.root.current = "home"
<CreateAnimationScreen>:
BoxLayout:
orientation: "vertical"
padding: 20
spacing: 20
canvas.before:
Color:
rgba: 0.11, 0.10, 0.15, 1 # Same background as other screens
Rectangle:
pos: self.pos
size: self.size
Label:
text: "Create Animation Screen"
font_size: 24
color: 1, 1, 1, 1
Button:
text: "Back to Home"
size_hint_y: None
height: 50
background_color: 0.341, 0.235, 0.980, 1
on_press: app.root.current = "home"

300
utils.py
View File

@@ -1,210 +1,74 @@
import os # Import the os module
import os
import json
import requests
from cryptography.fernet import Fernet
# Define the path to the server settings file
RESOURCES_FOLDER = "resources"
CREDENTIALS_FILE = os.path.join(RESOURCES_FOLDER, "credentials.enc")
KEY_FILE = os.path.join(RESOURCES_FOLDER, "key.key")
SERVER_SETTINGS_FILE = os.path.join(RESOURCES_FOLDER, "server_settings.enc")
# --- Encryption Utilities ---
def generate_key():
"""Generate and save a key for encryption."""
if not os.path.exists(KEY_FILE):
key = Fernet.generate_key()
with open(KEY_FILE, "wb") as key_file:
key_file.write(key)
def load_key():
"""Load the encryption key from the key file."""
key_file_path = os.path.join(RESOURCES_FOLDER, "key.key")
if not os.path.exists(key_file_path):
raise FileNotFoundError("Encryption key file not found.")
with open(key_file_path, "rb") as key_file:
"""Load the encryption key."""
with open(KEY_FILE, "rb") as key_file:
return key_file.read()
def decrypt_data(encrypted_data):
"""Decrypt the encrypted data using the loaded key."""
key = load_key() # Load the key from the file
def encrypt_data(data):
"""Encrypt data using the encryption key."""
key = load_key()
fernet = Fernet(key)
return fernet.decrypt(encrypted_data).decode()
return fernet.encrypt(data.encode())
def decrypt_data(data):
"""Decrypt data using the encryption key."""
key = load_key()
fernet = Fernet(key)
return fernet.decrypt(data).decode()
# --- Server Settings ---
def check_server_settings():
"""
Load server settings from a configuration file or encrypted storage.
Returns:
dict: A dictionary containing server settings (e.g., server_url, username, password, token).
"""
settings_file = os.path.join("resources", "server_settings.enc")
if not os.path.exists(settings_file):
print("Settings file not found.")
"""Load and decrypt server settings from file."""
if not os.path.exists(SERVER_SETTINGS_FILE):
return None
try:
with open(settings_file, "rb") as file:
with open(SERVER_SETTINGS_FILE, "rb") as file:
encrypted_data = file.read()
decrypted_data = decrypt_data(encrypted_data)
settings = json.loads(decrypted_data)
return settings
except Exception as e:
print(f"Error loading settings: {str(e)}")
print(f"Failed to load server settings: {e}")
return None
def get_devices_from_server():
"""
Retrieve a list of devices from the Traccar server and create a mapping of device names to IDs.
Returns:
dict: A dictionary mapping device names to their IDs if the request is successful.
None: If the request fails.
"""
# Check if the server settings file exists
settings = check_server_settings()
if not settings:
return None
try:
# Extract server details
server_url = settings.get("server_url")
token = settings.get("token") # Optional, if token is used for authentication
if not server_url:
print("Error: Missing server URL in server_settings.enc.")
return None
# Ensure the server_url has a valid scheme
if not server_url.startswith("http://") and not server_url.startswith("https://"):
server_url = f"https://{server_url}" # Default to https:// if no scheme is provided
# Determine authentication method
headers = {"Authorization": f"Bearer {token}"} if token else None
if not token:
print("Error: Missing authentication details (token).")
return None
# Make a GET request to the /devices endpoint
response = requests.get(f"{server_url}/api/devices", headers=headers)
# Check the response status
if response.status_code == 200:
print("Devices retrieved successfully!")
devices = response.json() # Get the list of devices
# Create a mapping of device names to IDs
device_mapping = {device.get("name", "Unnamed Device"): device.get("id", "Unknown ID") for device in devices}
# Debugging: Print the mapping
for name, device_id in device_mapping.items():
print(f"Device Name: {name}, Device ID: {device_id}")
return device_mapping # Return the mapping of device names to IDs
else:
print(f"Error: {response.status_code} - {response.reason}")
return None
except Exception as e:
print(f"Error retrieving devices: {str(e)}")
return None
def get_route_info(device_id, selected_date):
"""
Fetch route information for a specific device and date from the Traccar server.
Args:
device_id (int): The ID of the device.
selected_date (str): The selected date in the format 'YYYY-MM-DD'.
Returns:
list: The route information for the device on the selected date.
"""
# Load server settings
settings = check_server_settings()
if not settings:
print("Error: Unable to load server settings.")
return None
# Extract server details
server_url = settings.get("server_url")
token = settings.get("token")
if not server_url:
print("Error: Missing server URL in settings.")
return None
if not token:
print("Error: Missing token in settings.")
return None
# Ensure the server_url has a valid scheme
if not server_url.startswith("http://") and not server_url.startswith("https://"):
server_url = f"https://{server_url}" # Default to https:// if no scheme is provided
# Set the Authorization header with the token
headers = {"Authorization": f"Bearer {token}"}
# Convert the selected date to ISO 8601 format for the API
start_time = f"{selected_date}T00:00:00Z"
end_time = f"{selected_date}T23:59:59Z"
# API endpoint for fetching route reports
url = f"{server_url}/reports/route"
# Request payload
payload = {
"deviceId": device_id,
"from": start_time,
"to": end_time,
}
try:
# Log the payload for debugging
print(f"Request Payload: {payload}")
# Make the API request
response = requests.get(url, params=payload, headers=headers)
# Log the response status and content for debugging
print(f"Response Status Code: {response.status_code}")
print(f"Response Content: {response.text}")
# Check if the request was successful
if response.status_code == 200:
route = response.json()
print(f"Route for device {device_id} on {selected_date}:")
for position in route:
print(position)
return route
elif response.status_code == 400:
print("Bad Request: Please check the request payload and token.")
return None
else:
print(f"Failed to fetch route: {response.status_code} - {response.reason}")
return None
except requests.exceptions.RequestException as e:
print(f"Error fetching route: {str(e)}")
return None
def save_server_settings(settings_data):
"""Encrypt and save server settings."""
encrypted_data = encrypt_data(json.dumps(settings_data))
with open(SERVER_SETTINGS_FILE, "wb") as file:
file.write(encrypted_data)
# --- Traccar Server Connection ---
def test_connection(server_url, username=None, password=None, token=None):
"""
Test the connection with the Traccar server.
Args:
server_url (str): The URL of the Traccar server.
username (str, optional): The username for basic authentication.
password (str, optional): The password for basic authentication.
token (str, optional): The token for bearer authentication.
Returns:
dict: A dictionary containing the connection status and message.
Returns: dict with 'status' (bool) and 'message' (str)
"""
if not server_url:
return {"status": False, "message": "Please provide the server URL."}
if not token and (not username or not password):
return {"status": False, "message": "Please provide either a token or username and password."}
try:
# Determine authentication method
headers = {"Authorization": f"Bearer {token}"} if token else None
auth = None if token else (username, password)
# Make a GET request to the server
response = requests.get(f"{server_url}/api/server", headers=headers, auth=auth, timeout=10)
if response.status_code == 200:
return {"status": True, "message": "Connection successful! Server is reachable."}
else:
@@ -214,7 +78,91 @@ def test_connection(server_url, username=None, password=None, token=None):
except requests.exceptions.RequestException as e:
return {"status": False, "message": f"Connection failed: {str(e)}"}
# Call the function
test_device_id = 1 # Replace with the device ID from the spinner
test_date = "2025-06-01" # Replace with the selected date from the date picker
get_route_info(test_device_id, test_date)
# --- Device Fetching ---
def get_devices_from_server():
"""Retrieve a mapping of device names to IDs from the Traccar server."""
settings = check_server_settings()
if not settings:
return None
server_url = settings.get("server_url")
token = settings.get("token")
if not server_url or not token:
return None
headers = {"Authorization": f"Bearer {token}"}
try:
response = requests.get(f"{server_url}/api/devices", headers=headers)
if response.status_code == 200:
devices = response.json()
return {device.get("name", "Unnamed Device"): device.get("id", "Unknown ID") for device in devices}
else:
print(f"Error: {response.status_code} - {response.reason}")
return None
except Exception as e:
print(f"Error retrieving devices: {str(e)}")
return None
# --- Route Saving ---
def save_route_to_file(route_name, positions, base_folder="resources/projects"):
"""
Save the given positions as a route in resources/projects/<route_name>/positions.json.
Returns (success, message, file_path)
"""
if not route_name:
return False, "Please enter a route name.", None
if not positions:
return False, "No positions to save.", None
folder_path = os.path.join(base_folder, route_name)
os.makedirs(folder_path, exist_ok=True)
file_path = os.path.join(folder_path, "positions.json")
try:
with open(file_path, "w") as f:
json.dump(positions, f, indent=2)
return True, f"Route '{route_name}' saved!", file_path
except Exception as e:
return False, f"Failed to save route: {str(e)}", None
def fetch_positions(server_url, token, device_id, from_time, to_time):
"""
Fetch positions from the Traccar API.
Returns (positions, error_message)
"""
url = f"{server_url}/api/reports/route"
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
params = {
"deviceId": device_id,
"from": from_time,
"to": to_time
}
try:
response = requests.get(url, params=params, headers=headers, timeout=15)
if response.status_code == 200:
return response.json(), None
elif response.status_code == 400:
return None, "Bad Request: Please check the request payload and token."
else:
return None, f"Failed: {response.status_code} - {response.reason}"
except requests.exceptions.RequestException as e:
return None, f"Error fetching positions: {str(e)}"
def fetch_positions_for_selected_day(settings, device_mapping, device_name, start_date, end_date, start_hour, end_hour):
"""
Fetch positions for the selected day/device using Traccar API.
Returns (positions, error_message)
"""
if not settings:
return [], "Server settings not found."
server_url = settings.get("server_url")
token = settings.get("token")
device_id = device_mapping.get(device_name)
if not device_id:
return [], "Device ID not found."
from_time = f"{start_date}T{start_hour}:00:00Z"
to_time = f"{end_date}T{end_hour}:59:59Z"
positions, error = fetch_positions(server_url, token, device_id, from_time, to_time)
if error:
return [], error
return positions, None