Files
WMT/app.py
2025-09-25 08:32:39 +03:00

1360 lines
56 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#App version 2.9 - Added configuration mode for "notconfig" devices
import os
import sys
import subprocess
import stat
import pwd
import grp
# Import dependency check/install functions from dependency_utils
from dependency_utils import install_package_from_wheel, check_and_install_dependencies
# Global configuration mode flag
CONFIGURATION_MODE = False
# Run dependency check before importing anything else
try:
check_and_install_dependencies({
'rdm6300': 'rdm6300-0.1.1-py3-none-any.whl',
'gpiozero': None,
'requests': 'requests-2.32.3-py3-none-any.whl',
'aiohttp': 'aiohttp-3.11.18-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl',
'flask': None,
'urllib3': 'urllib3-2.3.0-py3-none-any.whl',
'certifi': 'certifi-2025.1.31-py3-none-any.whl',
'charset_normalizer': 'charset_normalizer-3.4.1-py3-none-any.whl',
'idna': 'idna-3.10-py3-none-any.whl',
'multidict': 'multidict-6.4.3-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl',
'aiosignal': 'aiosignal-1.3.2-py2.py3-none-any.whl',
'frozenlist': 'frozenlist-1.6.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl',
'attrs': 'attrs-25.3.0-py3-none-any.whl',
'yarl': 'yarl-1.20.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl',
'aiohappyeyeballs': 'aiohappyeyeballs-2.6.1-py3-none-any.whl',
'propcache': 'propcache-0.3.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl'
}, "./Files/reposytory")
except Exception as e:
print(f"Warning: Dependency check failed: {e}")
print("Continuing with existing packages...")
def safe_import(module_name, package_name=None):
"""
Safely import a module with error handling
"""
try:
if package_name:
module = __import__(package_name)
return getattr(module, module_name)
else:
return __import__(module_name)
except ImportError as e:
print(f"Warning: Could not import {module_name}: {e}")
return None
# Now import required modules with fallbacks
rdm6300 = safe_import('rdm6300')
if rdm6300 is None:
print("ERROR: rdm6300 is required for this application to work!")
print("Please ensure rdm6300 is installed from the repository.")
sys.exit(1)
# Import other required modules
import time
import logging
# Try to import GPIO-related modules
try:
from gpiozero import OutputDevice
print("✓ gpiozero imported successfully")
except ImportError as e:
print(f"Warning: Could not import gpiozero: {e}")
print("LED functionality will be disabled")
# Create a dummy OutputDevice class
class OutputDevice:
def __init__(self, pin):
self.pin = pin
def on(self):
print(f"LED {self.pin} would turn ON")
def off(self):
print(f"LED {self.pin} would turn OFF")
from multiprocessing import Process
# Import network-related modules
try:
import requests
print("✓ requests imported successfully")
except ImportError as e:
print(f"ERROR: requests is required: {e}")
sys.exit(1)
import threading
import urllib.parse
from datetime import datetime, timedelta
import socket
import signal
# Import async modules
try:
import aiohttp
import asyncio
print("✓ aiohttp and asyncio imported successfully")
except ImportError as e:
print(f"Warning: Could not import aiohttp: {e}")
print("Async functionality may be limited")
import asyncio
# Import Flask for command server
try:
from flask import Flask, request, jsonify
print("✓ Flask imported successfully")
FLASK_AVAILABLE = True
except ImportError as e:
print(f"Warning: Could not import Flask: {e}")
print("Command server functionality will be disabled")
FLASK_AVAILABLE = False
# Create dummy Flask classes
class Flask:
def __init__(self, name):
pass
def route(self, *args, **kwargs):
def decorator(f):
return f
return decorator
def run(self, *args, **kwargs):
pass
def request():
pass
def jsonify(data):
return data
import json
# Early configuration mode detection (before heavy initialization)
def early_launch_configuration_mode():
"""
Early launch of configuration mode with chromium displaying Screen.html
"""
try:
print("🔧 Configuration mode detected - launching Screen.html in Chromium")
# Get absolute path to Screen.html
current_dir = os.path.dirname(os.path.abspath(__file__))
screen_html_path = os.path.join(current_dir, "Files", "Screen.html")
if not os.path.exists(screen_html_path):
print(f"❌ Screen.html not found at: {screen_html_path}")
return False
print(f"📄 Loading Screen.html from: {screen_html_path}")
# Terminate any existing Chromium processes
chromium_process_name = "chromium"
# Local log only in configuration mode
logging.info("Refreshing Chromium process (configuration mode)")
try:
subprocess.run(["pkill", "-f", chromium_process_name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(5) # Wait for processes to terminate
# Launch Chromium with Screen.html
url = f"file://{screen_html_path}"
subprocess.Popen(
["chromium", "--test-type", "--noerrors", "--kiosk", "--start-fullscreen",
url],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, start_new_session=True
)
logging.info("Chromium process restarted successfully (configuration mode)")
except Exception as e:
logging.error(f"Failed to refresh Chromium process (configuration mode): {e}")
return False
print("✅ Configuration mode launched successfully")
print(" Network connectivity checks disabled")
print(" Server status messages disabled")
print("🔧 Device running in configuration mode")
print("🔧 To exit configuration mode, change idmasa.txt content to device name")
return True
except FileNotFoundError:
print("❌ Chromium browser not found. Please install chromium-browser:")
print(" sudo apt update && sudo apt install chromium-browser")
return False
except Exception as e:
print(f"❌ Error launching configuration mode: {e}")
return False
# EARLY CONFIGURATION MODE DETECTION
print("=" * 60)
print("CONFIGURATION MODE DETECTION")
print("=" * 60)
try:
with open("./data/idmasa.txt", "r") as f:
name = f.readline().strip() or "noconfig"
print(f"✓ Device name loaded: {name}")
# Check if device is in configuration mode
if name.lower() == "notconfig":
print("🔧 Device configured for setup mode (notconfig)")
# Launch configuration mode
if early_launch_configuration_mode():
print("🚀 Configuration mode active - application will run in setup mode")
print("⚠️ Network connectivity checks are DISABLED")
print("⚠️ Server status messages are DISABLED")
print("🔧 Change idmasa.txt to device name to exit configuration mode")
# Set global flag for configuration mode
CONFIGURATION_MODE = True
print("🔧 Configuration mode activated - continuing with RFID reader initialization")
print("🔧 Network and server monitoring will remain disabled")
else:
print("❌ Failed to launch configuration mode, continuing with normal operation")
CONFIGURATION_MODE = False
else:
print("✅ Device in normal operation mode")
CONFIGURATION_MODE = False
except FileNotFoundError:
print("Warning: idmasa.txt not found, using default 'noconfig'")
name = "noconfig"
CONFIGURATION_MODE = False
# Create the file with default value
try:
os.makedirs("./data", exist_ok=True)
with open("./data/idmasa.txt", "w") as f:
f.write("noconfig")
print("✓ Created default idmasa.txt file")
except Exception as e:
print(f"Could not create idmasa.txt: {e}")
except Exception as e:
print(f"Error reading idmasa.txt: {e}")
name = "noconfig"
CONFIGURATION_MODE = False
print("=" * 60)
print("CONTINUING WITH NORMAL INITIALIZATION" if not CONFIGURATION_MODE else "CONFIGURATION MODE ACTIVE")
print("=" * 60)
def check_system_requirements():
"""
Check and set up system requirements for the application
"""
print("Checking system requirements...")
# 1. Check and install required system packages
system_packages = {
'sshpass': 'sshpass_1.09-1_armhf.deb' # Required for auto-update functionality
}
for package, deb_file in system_packages.items():
try:
result = subprocess.run(['which', package], capture_output=True, text=True)
if result.returncode == 0:
print(f"{package} is installed")
else:
print(f"Installing {package}...")
# Try online installation first
try:
install_result = subprocess.run(['sudo', 'apt', 'update'], capture_output=True, text=True, timeout=120)
install_result = subprocess.run(['sudo', 'apt', 'install', '-y', package],
capture_output=True, text=True, timeout=300)
if install_result.returncode == 0:
print(f"{package} installed successfully (online)")
continue
else:
print(f"Online installation failed, trying offline...")
except Exception as online_error:
print(f"Online installation failed: {online_error}, trying offline...")
# Try offline installation from local .deb file
deb_path = f"./Files/system_packages/{deb_file}"
if os.path.exists(deb_path):
try:
print(f"Installing {package} from local package: {deb_path}")
offline_result = subprocess.run(['sudo', 'dpkg', '-i', deb_path],
capture_output=True, text=True, timeout=120)
if offline_result.returncode == 0:
print(f"{package} installed successfully (offline)")
else:
print(f"✗ Offline installation failed: {offline_result.stderr}")
# Try to fix dependencies
print("Attempting to fix dependencies...")
subprocess.run(['sudo', 'apt', '--fix-broken', 'install', '-y'],
capture_output=True, text=True, timeout=300)
except Exception as offline_error:
print(f"✗ Offline installation error: {offline_error}")
else:
print(f"✗ Local package not found: {deb_path}")
print(f" To add offline support, download with: apt download {package}")
except Exception as e:
print(f"Warning: Could not check/install {package}: {e}")
# 2. Check and create required directories
required_dirs = ['./data', './Files', './Files/reposytory', './Files/system_packages']
for dir_path in required_dirs:
try:
os.makedirs(dir_path, exist_ok=True)
print(f"✓ Directory ensured: {dir_path}")
except Exception as e:
print(f"✗ Failed to create directory {dir_path}: {e}")
return False
# 2. Check required files and create defaults if missing
required_files = {
'./data/idmasa.txt': 'noconfig',
'./data/log.txt': '',
'./data/tag.txt': '',
'./data/device_info.txt': 'unknown-device\n127.0.0.1\n'
}
for file_path, default_content in required_files.items():
try:
if not os.path.exists(file_path):
with open(file_path, 'w') as f:
f.write(default_content)
print(f"✓ Created default file: {file_path}")
else:
print(f"✓ File exists: {file_path}")
except Exception as e:
print(f"✗ Failed to create file {file_path}: {e}")
# 3. Check file permissions
try:
for file_path in required_files.keys():
if os.path.exists(file_path):
# Ensure read/write permissions for the application
os.chmod(file_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
print("✓ File permissions set correctly")
except Exception as e:
print(f"Warning: Could not set file permissions: {e}")
return True
def check_port_capabilities():
"""
Check if the application can bind to port 80 and set up capabilities if needed
"""
print("Checking port 80 capabilities...")
try:
# Check if we're running as root
if os.geteuid() == 0:
print("✓ Running as root - port 80 access available")
return True
# Check if capabilities are set
python_path = sys.executable
result = subprocess.run(['getcap', python_path], capture_output=True, text=True)
if 'cap_net_bind_service=ep' in result.stdout:
print("✓ Port binding capabilities already set")
return True
# Try to set capabilities
print("Setting up port 80 binding capabilities...")
setup_script = './setup_port_capability.sh'
if os.path.exists(setup_script):
result = subprocess.run(['sudo', 'bash', setup_script], capture_output=True, text=True)
if result.returncode == 0:
print("✓ Port capabilities set successfully")
return True
else:
print(f"✗ Failed to set capabilities: {result.stderr}")
else:
# Create the setup script if it doesn't exist
script_content = f'''#!/bin/bash
# Set port binding capability for Python to allow port 80 access
echo "Setting port binding capability for Python..."
sudo setcap cap_net_bind_service=ep {python_path}
echo "Capability set successfully"
'''
try:
with open(setup_script, 'w') as f:
f.write(script_content)
os.chmod(setup_script, stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH)
result = subprocess.run(['sudo', 'bash', setup_script], capture_output=True, text=True)
if result.returncode == 0:
print("✓ Port capabilities set successfully")
return True
else:
print(f"✗ Failed to set capabilities: {result.stderr}")
except Exception as e:
print(f"✗ Failed to create setup script: {e}")
except Exception as e:
print(f"Warning: Could not check port capabilities: {e}")
print("Warning: Port 80 may not be accessible. App will try to run on default port.")
return False
def check_hardware_interfaces():
"""
Check hardware interfaces (UART/Serial) required for RFID reader
"""
print("Checking hardware interfaces...")
# Check for serial devices
serial_devices = ['/dev/ttyS0', '/dev/ttyAMA0', '/dev/ttyUSB0']
available_devices = []
for device in serial_devices:
if os.path.exists(device):
try:
# Check if we can access the device
with open(device, 'r'):
pass
available_devices.append(device)
print(f"✓ Serial device available: {device}")
except PermissionError:
print(f"✗ Permission denied for {device}. Adding user to dialout group...")
try:
# Add current user to dialout group for serial access
username = pwd.getpwuid(os.getuid()).pw_name
subprocess.run(['sudo', 'usermod', '-a', '-G', 'dialout', username],
capture_output=True, text=True)
print(f"✓ User {username} added to dialout group (reboot may be required)")
available_devices.append(device)
except Exception as e:
print(f"✗ Failed to add user to dialout group: {e}")
except Exception as e:
print(f"Warning: Could not test {device}: {e}")
if not available_devices:
print("✗ No serial devices found. RFID reader may not work.")
# Enable UART if we're on Raspberry Pi
try:
config_file = '/boot/config.txt'
if os.path.exists(config_file):
print("Attempting to enable UART in Raspberry Pi config...")
result = subprocess.run(['sudo', 'raspi-config', 'nonint', 'do_serial', '0'],
capture_output=True, text=True)
if result.returncode == 0:
print("✓ UART enabled in config (reboot required)")
else:
print("Warning: Could not enable UART automatically")
except Exception as e:
print(f"Warning: Could not configure UART: {e}")
return False
return True
def check_network_connectivity():
"""
Check network connectivity and DNS resolution
"""
print("Checking network connectivity...")
try:
# Test basic connectivity
result = subprocess.run(['ping', '-c', '1', '8.8.8.8'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
print("✓ Internet connectivity available")
# Test DNS resolution
try:
import socket
socket.gethostbyname('google.com')
print("✓ DNS resolution working")
return True
except socket.gaierror:
print("✗ DNS resolution failed")
return False
else:
print("✗ No internet connectivity")
return False
except subprocess.TimeoutExpired:
print("✗ Network timeout")
return False
except Exception as e:
print(f"Warning: Could not test network: {e}")
return False
def initialize_gpio_permissions():
"""
Set up GPIO permissions for LED control
"""
print("Setting up GPIO permissions...")
try:
# Add user to gpio group if it exists
username = pwd.getpwuid(os.getuid()).pw_name
# Check if gpio group exists
try:
grp.getgrnam('gpio')
subprocess.run(['sudo', 'usermod', '-a', '-G', 'gpio', username],
capture_output=True, text=True)
print(f"✓ User {username} added to gpio group")
except KeyError:
print("Warning: gpio group not found - GPIO access may be limited")
# Set up GPIO access via /dev/gpiomem if available
gpio_devices = ['/dev/gpiomem', '/dev/mem']
for device in gpio_devices:
if os.path.exists(device):
print(f"✓ GPIO device available: {device}")
break
else:
print("Warning: No GPIO devices found")
except Exception as e:
print(f"Warning: Could not set up GPIO permissions: {e}")
def perform_system_initialization():
"""
Perform complete system initialization for first run
"""
print("=" * 60)
print("SYSTEM INITIALIZATION - Preparing for first run")
print("=" * 60)
initialization_steps = [
("System Requirements", check_system_requirements),
("Port Capabilities", check_port_capabilities),
("Hardware Interfaces", check_hardware_interfaces),
("GPIO Permissions", initialize_gpio_permissions),
("Network Connectivity", check_network_connectivity)
]
success_count = 0
total_steps = len(initialization_steps)
for step_name, step_function in initialization_steps:
print(f"\n--- {step_name} ---")
try:
if step_function():
success_count += 1
print(f"{step_name} completed successfully")
else:
print(f"{step_name} completed with warnings")
except Exception as e:
print(f"{step_name} failed: {e}")
print("\n" + "=" * 60)
print(f"INITIALIZATION COMPLETE: {success_count}/{total_steps} steps successful")
print("=" * 60)
if success_count < total_steps:
print("Warning: Some initialization steps failed. Application may have limited functionality.")
print("Check the messages above for details.")
return success_count >= (total_steps - 1) # Allow one failure
#configurare variabile
def get_device_info():
"""
Get hostname and device IP with file-based fallback to avoid socket errors
"""
config_file = "./data/device_info.txt"
hostname = None
device_ip = None
# Try to get current hostname and IP
try:
hostname = socket.gethostname()
device_ip = socket.gethostbyname(hostname)
print(f"Successfully resolved - Hostname: {hostname}, IP: {device_ip}")
# Save the working values to file for future fallback
try:
os.makedirs("./data", exist_ok=True) # Create data directory if it doesn't exist
with open(config_file, "w") as f:
f.write(f"{hostname}\n{device_ip}\n")
print(f"Saved device info to {config_file}")
except Exception as e:
print(f"Warning: Could not save device info to file: {e}")
return hostname, device_ip
except socket.gaierror as e:
print(f"Socket error occurred: {e}")
print("Attempting to load device info from file...")
# Try to load from file
try:
with open(config_file, "r") as f:
lines = f.read().strip().split('\n')
if len(lines) >= 2:
hostname = lines[0].strip()
device_ip = lines[1].strip()
print(f"Loaded from file - Hostname: {hostname}, IP: {device_ip}")
return hostname, device_ip
else:
print("File exists but doesn't contain valid data")
except FileNotFoundError:
print(f"No fallback file found at {config_file}")
except Exception as e:
print(f"Error reading fallback file: {e}")
except Exception as e:
print(f"Unexpected error getting device info: {e}")
# Try to load from file as fallback
try:
with open(config_file, "r") as f:
lines = f.read().strip().split('\n')
if len(lines) >= 2:
hostname = lines[0].strip()
device_ip = lines[1].strip()
print(f"Loaded from file after error - Hostname: {hostname}, IP: {device_ip}")
return hostname, device_ip
except Exception as file_error:
print(f"Could not load from file: {file_error}")
# Final fallback if everything fails
print("All methods failed - Using default values")
hostname = hostname or "unknown-device"
device_ip = "127.0.0.1"
# Try to save these default values for next time
try:
os.makedirs("./data", exist_ok=True)
with open(config_file, "w") as f:
f.write(f"{hostname}\n{device_ip}\n")
print(f"Saved fallback values to {config_file}")
except Exception as e:
print(f"Could not save fallback values: {e}")
return hostname, device_ip
# Perform system initialization (first run setup)
if not perform_system_initialization():
print("Warning: System initialization completed with errors.")
print("The application will continue but may have limited functionality.")
# Get device information with error handling
hostname, device_ip = get_device_info()
print(f"Final result - Hostname: {hostname}, Device IP: {device_ip}")
# Configure logging
logging.basicConfig(filename='./data/log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# function to delete old logs
def delete_old_logs():
log_dir = './data/'
log_file = 'log.txt'
log_path = os.path.join(log_dir, log_file)
if os.path.exists(log_path):
file_mod_time = datetime.fromtimestamp(os.path.getmtime(log_path))
if datetime.now() - file_mod_time > timedelta(days=10):
os.remove(log_path)
log_info_with_server(f"Deleted old log file: {log_file}")
else:
log_info_with_server(f"Log file is not older than 10 days: {log_file}")
else:
log_info_with_server(f"Log file does not exist: {log_file}")
# Function to read the name (idmasa) from the file
def read_name_from_file():
try:
with open("./data/idmasa.txt", "r") as file:
n_masa = file.readline().strip()
return n_masa
except FileNotFoundError:
logging.error("File ./data/idmasa.txt not found.")
return "unknown"
# Function to send logs to a remote server for the Server_monitorizare APP
def send_log_to_server(log_message, n_masa, hostname, device_ip):
host = hostname
device = device_ip
try:
log_data = {
"hostname": str(host),
"device_ip": str(device),
"nume_masa": str(n_masa),
"log_message": str(log_message)
}
server_url = "http://rpi-ansible:80/logs" # Replace with your server's URL
print(log_data) # Debugging: Print log_data to verify its contents
response = requests.post(server_url, json=log_data, timeout=5)
response.raise_for_status()
logging.info("Log successfully sent to server: %s", log_message)
except requests.exceptions.RequestException as e:
logging.error("Failed to send log to server: %s", e)
# Wrapper for logging.info to also send logs to the server Monitorizare APP
def log_info_with_server(message):
n_masa = read_name_from_file() # Read name (idmasa) from the file
formatted_message = f"{message} (n_masa: {n_masa})" # Format the message
logging.info(formatted_message) # Log the formatted message
# Only send to server if not in configuration mode
try:
if not CONFIGURATION_MODE:
send_log_to_server(message, n_masa, hostname, device_ip) # Send the original message to the server
else:
logging.info("Configuration mode: Server logging disabled")
except NameError:
# CONFIGURATION_MODE not defined yet (during initialization)
send_log_to_server(message, n_masa, hostname, device_ip) # Send the original message to the server
# Function to execute system commands with proper security
def execute_system_command(command):
"""
Execute system commands with proper logging and security checks
"""
# Define allowed commands for security
allowed_commands = [
"sudo apt update",
"sudo apt upgrade -y",
"sudo apt autoremove -y",
"sudo apt autoclean",
"sudo reboot",
"sudo shutdown -h now",
"df -h",
"free -m",
"uptime",
"systemctl status",
"sudo systemctl restart networking",
"sudo systemctl restart ssh"
]
try:
# Check if command is allowed
if command not in allowed_commands:
log_info_with_server(f"Command '{command}' is not allowed for security reasons")
return {"status": "error", "message": f"Command '{command}' is not allowed", "output": ""}
log_info_with_server(f"Executing command: {command}")
# Execute the command
result = subprocess.run(
command.split(),
capture_output=True,
text=True,
timeout=300 # 5 minute timeout
)
output = result.stdout + result.stderr
if result.returncode == 0:
log_info_with_server(f"Command '{command}' executed successfully")
return {"status": "success", "message": "Command executed successfully", "output": output}
else:
log_info_with_server(f"Command '{command}' failed with return code {result.returncode}")
return {"status": "error", "message": f"Command failed with return code {result.returncode}", "output": output}
except subprocess.TimeoutExpired:
log_info_with_server(f"Command '{command}' timed out")
return {"status": "error", "message": "Command timed out", "output": ""}
except Exception as e:
log_info_with_server(f"Error executing command '{command}': {str(e)}")
return {"status": "error", "message": f"Error: {str(e)}", "output": ""}
# Flask app for receiving commands (only if Flask is available)
if FLASK_AVAILABLE:
command_app = Flask(__name__)
@command_app.route('/execute_command', methods=['POST'])
def handle_command_execution():
"""
Endpoint to receive and execute system commands
"""
try:
data = request.json
if not data or 'command' not in data:
return jsonify({"error": "Invalid request. 'command' field is required"}), 400
command = data.get('command')
# Execute the command
result = execute_system_command(command)
return jsonify(result), 200 if result['status'] == 'success' else 400
except Exception as e:
log_info_with_server(f"Error handling command execution request: {str(e)}")
return jsonify({"error": f"Server error: {str(e)}"}), 500
@command_app.route('/status', methods=['GET'])
def get_device_status():
"""
Endpoint to get device status information
"""
try:
n_masa = read_name_from_file()
# Get system information
uptime_result = subprocess.run(['uptime'], capture_output=True, text=True)
df_result = subprocess.run(['df', '-h', '/'], capture_output=True, text=True)
free_result = subprocess.run(['free', '-m'], capture_output=True, text=True)
status_info = {
"hostname": hostname,
"device_ip": device_ip,
"nume_masa": n_masa,
"timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"uptime": uptime_result.stdout.strip() if uptime_result.returncode == 0 else "N/A",
"disk_usage": df_result.stdout.strip() if df_result.returncode == 0 else "N/A",
"memory_usage": free_result.stdout.strip() if free_result.returncode == 0 else "N/A"
}
return jsonify(status_info), 200
except Exception as e:
log_info_with_server(f"Error getting device status: {str(e)}")
return jsonify({"error": f"Error getting status: {str(e)}"}), 500
@command_app.route('/auto_update', methods=['POST'])
def auto_update_app():
"""
Auto-update the application from the central server
Checks version, downloads newer files if available, and restarts the device
"""
try:
# Configuration
SERVER_HOST = "rpi-ansible"
SERVER_USER = "pi"
SERVER_PASSWORD = "Initial01!"
SERVER_APP_PATH = "/home/pi/Desktop/prezenta/app.py"
SERVER_REPO_PATH = "/home/pi/Desktop/prezenta/Files/reposytory"
# Dynamically determine local paths based on current script location
current_script_path = os.path.abspath(__file__)
local_base_dir = os.path.dirname(current_script_path)
LOCAL_APP_PATH = current_script_path
LOCAL_REPO_PATH = os.path.join(local_base_dir, "Files", "reposytory")
log_info_with_server(f"Auto-update process initiated from: {LOCAL_APP_PATH}")
# Step 1: Get current local version
current_version = None
try:
with open(LOCAL_APP_PATH, 'r') as f:
first_line = f.readline()
if 'version' in first_line.lower():
# Extract version number (e.g., "2.5" from "#App version 2.5")
import re
version_match = re.search(r'version\s+(\d+\.?\d*)', first_line, re.IGNORECASE)
if version_match:
current_version = float(version_match.group(1))
log_info_with_server(f"Current local version: {current_version}")
except Exception as e:
log_info_with_server(f"Could not determine local version: {e}")
return jsonify({"error": f"Could not determine local version: {str(e)}"}), 500
# Step 2: Get remote version via SCP
temp_dir = "/tmp/app_update"
try:
# Create temporary directory
subprocess.run(['mkdir', '-p', temp_dir], check=True)
# Download remote app.py to check version
scp_command = [
'sshpass', '-p', SERVER_PASSWORD,
'scp', '-o', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null',
f'{SERVER_USER}@{SERVER_HOST}:{SERVER_APP_PATH}',
f'{temp_dir}/app.py'
]
result = subprocess.run(scp_command, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
log_info_with_server(f"Failed to download remote app.py: {result.stderr}")
return jsonify({"error": f"Failed to connect to server: {result.stderr}"}), 500
# Check remote version
remote_version = None
with open(f'{temp_dir}/app.py', 'r') as f:
first_line = f.readline()
if 'version' in first_line.lower():
import re
version_match = re.search(r'version\s+(\d+\.?\d*)', first_line, re.IGNORECASE)
if version_match:
remote_version = float(version_match.group(1))
log_info_with_server(f"Remote version: {remote_version}")
except subprocess.TimeoutExpired:
return jsonify({"error": "Connection to server timed out"}), 500
except Exception as e:
log_info_with_server(f"Error checking remote version: {e}")
return jsonify({"error": f"Error checking remote version: {str(e)}"}), 500
# Step 3: Compare versions
if remote_version is None:
return jsonify({"error": "Could not determine remote version"}), 500
if current_version is None or remote_version <= current_version:
log_info_with_server(f"No update needed. Current: {current_version}, Remote: {remote_version}")
return jsonify({
"status": "no_update_needed",
"current_version": current_version,
"remote_version": remote_version,
"message": "Application is already up to date"
}), 200
# Step 4: Download updated files
log_info_with_server(f"Update available! Downloading version {remote_version}")
try:
# Create backup of current app
backup_path = f"{LOCAL_APP_PATH}.backup.{current_version}"
subprocess.run(['cp', LOCAL_APP_PATH, backup_path], check=True)
log_info_with_server(f"Backup created: {backup_path}")
# Download new app.py
subprocess.run(['cp', f'{temp_dir}/app.py', LOCAL_APP_PATH], check=True)
log_info_with_server("New app.py downloaded successfully")
# Download repository folder
repo_scp_command = [
'sshpass', '-p', SERVER_PASSWORD,
'scp', '-r', '-o', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null',
f'{SERVER_USER}@{SERVER_HOST}:{SERVER_REPO_PATH}',
f'{LOCAL_REPO_PATH}_new'
]
result = subprocess.run(repo_scp_command, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
# Replace old repository with new one
subprocess.run(['rm', '-rf', LOCAL_REPO_PATH], check=True)
subprocess.run(['mv', f'{LOCAL_REPO_PATH}_new', LOCAL_REPO_PATH], check=True)
log_info_with_server("Repository updated successfully")
else:
log_info_with_server(f"Repository update failed: {result.stderr}")
# Download system packages folder
local_system_packages_path = os.path.join(local_base_dir, 'Files', 'system_packages')
server_system_packages = f'{SERVER_USER}@{SERVER_HOST}:/home/pi/Desktop/prezenta/Files/system_packages'
system_scp_command = [
'sshpass', '-p', SERVER_PASSWORD,
'scp', '-r', '-o', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null',
server_system_packages,
f'{local_system_packages_path}_new'
]
try:
result = subprocess.run(system_scp_command, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
# Replace old system packages with new ones
if os.path.exists(local_system_packages_path):
subprocess.run(['rm', '-rf', local_system_packages_path], check=True)
subprocess.run(['mv', f'{local_system_packages_path}_new', local_system_packages_path], check=True)
log_info_with_server("System packages updated successfully")
else:
log_info_with_server(f"System packages update failed: {result.stderr}")
except Exception as sys_e:
log_info_with_server(f"System packages update error: {sys_e}")
except Exception as e:
# Restore backup if something went wrong
try:
subprocess.run(['cp', backup_path, LOCAL_APP_PATH], check=True)
log_info_with_server("Backup restored due to error")
except:
pass
return jsonify({"error": f"Update failed: {str(e)}"}), 500
# Step 5: Schedule device restart
log_info_with_server("Update completed successfully. Scheduling restart...")
# Create a restart script that will run after this response
restart_script = '''#!/bin/bash
sleep 3
sudo reboot
'''
with open('/tmp/restart_device.sh', 'w') as f:
f.write(restart_script)
subprocess.run(['chmod', '+x', '/tmp/restart_device.sh'], check=True)
# Schedule the restart in background
subprocess.Popen(['/tmp/restart_device.sh'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
return jsonify({
"status": "success",
"message": f"Updated from version {current_version} to {remote_version}. Device restarting...",
"old_version": current_version,
"new_version": remote_version,
"restart_scheduled": True
}), 200
except Exception as e:
log_info_with_server(f"Auto-update error: {str(e)}")
return jsonify({"error": f"Auto-update failed: {str(e)}"}), 500
finally:
# Cleanup temp directory
try:
subprocess.run(['rm', '-rf', temp_dir], check=True)
except:
pass
def start_command_server():
"""
Start the Flask server with enhanced port handling and fallback
"""
# Try different ports in order of preference
preferred_ports = [
int(os.environ.get('FLASK_PORT', 80)), # Use environment variable or default to 80
80, # Standard HTTP port
5000, # Flask default
8080, # Alternative HTTP port
3000 # Development port
]
for port in preferred_ports:
try:
print(f"Attempting to start command server on port {port}...")
# Test if port is available
import socket
test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
test_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
test_socket.bind(('0.0.0.0', port))
test_socket.close()
# Port is available, start Flask server
print(f"Port {port} is available. Starting command server...")
command_app.run(host='0.0.0.0', port=port, debug=False, use_reloader=False)
return # Success, exit function
except PermissionError:
print(f"✗ Permission denied for port {port}")
if port == 80:
print(" Hint: Port 80 requires root privileges or capabilities")
print(" Try running: sudo setcap cap_net_bind_service=ep $(which python3)")
continue
except OSError as e:
if "Address already in use" in str(e):
print(f"✗ Port {port} is already in use")
else:
print(f"✗ Port {port} error: {e}")
continue
except Exception as e:
print(f"✗ Failed to start on port {port}: {e}")
continue
# If we get here, all ports failed
log_info_with_server("Error: Could not start command server on any port")
print("✗ Could not start command server on any available port")
# Start command server in a separate process with enhanced error handling
try:
print("Initializing command server...")
command_server_process = Process(target=start_command_server)
command_server_process.daemon = True # Ensure it dies with main process
command_server_process.start()
# Give the server a moment to start and check if it's running
import time
time.sleep(2)
if command_server_process.is_alive():
port = int(os.environ.get('FLASK_PORT', 80))
print(f"✓ Command server started successfully on port {port}")
else:
print("Warning: Command server process stopped unexpectedly")
except Exception as e:
print(f"Warning: Could not start command server: {e}")
log_info_with_server(f"Command server startup error: {str(e)}")
else:
print("Warning: Flask not available - Command server disabled")
# Call the function to delete old logs
delete_old_logs()
def config():
import config
# function for posting data to the harting server
def post_backup_data():
try:
with open("./data/tag.txt", "r") as file:
lines = file.readlines()
remaining_lines = lines[:]
for line in lines:
line = line.strip()
if line:
try:
response = requests.post(line, verify=False, timeout=3)
#
response.raise_for_status() # Raise an error for bad status codes
log_info_with_server(f"Data posted successfully:")
remaining_lines.remove(line + "\n")
except requests.exceptions.Timeout:
log_info_with_server("Request timed out.")
break
except requests.exceptions.RequestException as e:
log_info_with_server(f"An error occurred: ")
break
with open("./data/tag.txt", "w") as file:
file.writelines(remaining_lines)
#log_info_with_server("Backup data updated.")
except FileNotFoundError:
log_info_with_server("No backup file found.")
# Function to check internet connection
def check_internet_connection():
hostname = "10.76.140.17"
cmd_block_wifi = 'sudo rfkill block wifi'
cmd_unblock_wifi = 'sudo rfkill unblock wifi'
log_info_with_server('Internet connection check loaded')
delete_old_logs()
chromium_process_name = "chromium"
while True:
try:
# Use subprocess to execute the ping command
response = subprocess.run(
["ping", "-c", "1", hostname],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
if response.returncode == 0:
log_info_with_server("Internet is up! Waiting 45 minutes.")
post_backup_data()
time.sleep(2700) # 45 minutes
else:
log_info_with_server("Internet is down. Rebooting WiFi.")
os.system(cmd_block_wifi)
time.sleep(1200) # 20 minutes
os.system(cmd_unblock_wifi)
# Refresh Chromium process
log_info_with_server("Refreshing Chromium process.")
try:
# Find and terminate Chromium processes
subprocess.run(["pkill", "-f", chromium_process_name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(5) # Wait for processes to terminate
# Relaunch Chromium
url = "10.76.140.17/iweb_v2/index.php/traceability/production"
subprocess.Popen(
["chromium", "--test-type", "--noerrors", "--kiosk", "--start-fullscreen",
"--unsafely-treat-insecure-origin-as-secure=http://10.76.140.17", url],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, start_new_session=True
)
log_info_with_server("Chromium process restarted successfully.")
except Exception as e:
log_info_with_server(f"Failed to refresh Chromium process: {e}")
except Exception as e:
log_info_with_server(f"An error occurred during internet check: {e}")
time.sleep(60) # Retry after 1 minute in case of an error
# Start the internet connection check in a separate process (only if not in configuration mode)
if not CONFIGURATION_MODE:
internet_check_process = Process(target=check_internet_connection)
internet_check_process.start()
print("✅ Internet connectivity monitoring started")
else:
print("🔧 Configuration mode: Internet connectivity monitoring DISABLED")
# Launch Chromium with the specified URLs (only if not in configuration mode)
if not CONFIGURATION_MODE:
url = "10.76.140.17/iweb_v2/index.php/traceability/production" # pentru cazul in care raspberiul nu are sistem de prezenta
subprocess.Popen(["chromium", "--test-type", "--noerrors", "--kiosk", "--start-fullscreen", "--unsafely-treat-insecure-origin-as-secure=http://10.76.140.17", url], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, start_new_session=True)
print("✅ Main application browser launched")
else:
print("🔧 Configuration mode: Main application browser launch DISABLED")
info = "0"
#function to post info
def post_info(info):
#log_info_with_server("Starting to post data...")
info1 = info.strip() # Remove any leading/trailing whitespace, including newlines
try:
response = requests.post(info1, verify=False, timeout=3)
response.raise_for_status() # Raise an error for bad status codes
#log_info_with_server("Data posted successfully")
except requests.exceptions.Timeout:
with open("./data/tag.txt", "a") as file: # Open in append mode
file.write(info)
log_info_with_server(f"Value was saved to tag.txt")
except requests.exceptions.RequestException as e:
with open("./data/tag.txt", "a") as file: # Open in append mode
file.write(info)
log_info_with_server("Value was saved to tag.txt")
async def post_info_async(info):
try:
# Try to use aiohttp if available
if 'aiohttp' in globals():
async with aiohttp.ClientSession() as session:
try:
async with session.post(info, ssl=False, timeout=3) as response:
response_text = await response.text()
log_info_with_server(f"Data posted successfully")
except asyncio.TimeoutError:
with open("./data/tag.txt", "a") as file:
file.write(info)
except Exception as e:
with open("./data/tag.txt", "a") as file:
file.write(info)
else:
# Fallback to synchronous requests if aiohttp not available
try:
response = requests.post(info.strip(), verify=False, timeout=3)
response.raise_for_status()
log_info_with_server(f"Data posted successfully (fallback)")
except requests.exceptions.Timeout:
with open("./data/tag.txt", "a") as file:
file.write(info)
except requests.exceptions.RequestException as e:
with open("./data/tag.txt", "a") as file:
file.write(info)
except Exception as e:
# Final fallback - save to file
with open("./data/tag.txt", "a") as file:
file.write(info)
def post_info_thread(info):
try:
thread = threading.Thread(target=asyncio.run, args=(post_info_async(info),), daemon=True)
thread.start()
except Exception as e:
# Fallback to synchronous posting if async fails
print(f"Async posting failed, using sync fallback: {e}")
post_info(info)
# Initialize LEDs with error handling
try:
print("Initializing LED controls...")
led1 = OutputDevice(23)
led2 = OutputDevice(24)
print("✓ LED controls initialized successfully")
log_info_with_server("LED controls initialized")
except Exception as e:
print(f"Warning: Could not initialize LED controls: {e}")
print("Creating dummy LED objects - visual feedback will be disabled")
# Create dummy LED objects that don't crash the app
class DummyLED:
def __init__(self, pin):
self.pin = pin
def on(self):
print(f"LED {self.pin} would turn ON")
def off(self):
print(f"LED {self.pin} would turn OFF")
led1 = DummyLED(23)
led2 = DummyLED(24)
log_info_with_server("LED controls using dummy mode")
# Initialize table name/ID
print("Initializing device configuration...")
name = "idmasa"
logging.info("LED controls initialized")
logging.info("Variabila Id Masa A fost initializata ")
# Device name is already loaded during early configuration detection
# Use the existing name variable or reload if needed
if 'name' not in globals():
try:
with open("./data/idmasa.txt", "r") as f:
name = f.readline().strip() or "noconfig"
print(f"✓ Device name reloaded: {name}")
if not CONFIGURATION_MODE:
log_info_with_server(f"Device name initialized: {name}")
except Exception as e:
print(f"Error reloading device name: {e}")
name = "noconfig"
logging.info(name)
#clasa reader
class Reader(rdm6300.BaseReader):
global info
def card_inserted(self, card):
if card.value == 12886709:
config()
return
afisare = time.strftime("%Y-%m-%d&%H:%M:%S")
date = f'https://dataswsibiusb01.sibiusb.harting.intra/RO_Quality_PRD/api/record/{name}/{card.value}/1/{afisare}\n'
info = date
if name == "noconfig":
led1.on()
time.sleep(5)
led1.off()
log_info_with_server(f"card inserted {card} but no")
else:
post_info_thread(info)
led1.on()
log_info_with_server(f"card inserted {card}")
def card_removed(self, card):
if card.value == 12886709:
log_info_with_server("Removing Config card")
return
afisare = time.strftime("%Y-%m-%d&%H:%M:%S")
date = f'https://dataswsibiusb01.sibiusb.harting.intra/RO_Quality_PRD/api/record/{name}/{card.value}/0/{afisare}\n'
info = date
if name == "noconfig":
led1.off()
log_info_with_server(f"card removed {card}")
else:
post_info_thread(info)
led1.off()
log_info_with_server(f"card removed {card}")
# Initialize RFID Reader with comprehensive error handling
def initialize_rfid_reader():
"""
Initialize RFID reader with multiple device attempts and error handling
"""
print("Initializing RFID reader...")
# List of possible serial devices in order of preference
serial_devices = ['/dev/ttyS0', '/dev/ttyAMA0', '/dev/ttyUSB0', '/dev/ttyACM0']
for device in serial_devices:
try:
print(f"Attempting to initialize RFID reader on {device}...")
r = Reader(device)
r.start()
print(f"✓ RFID reader successfully initialized on {device}")
log_info_with_server(f"RFID reader started on {device}")
return r
except FileNotFoundError:
print(f"✗ Device {device} not found")
continue
except PermissionError:
print(f"✗ Permission denied for {device}")
print(f" Hint: Try adding user to dialout group: sudo usermod -a -G dialout $USER")
continue
except Exception as e:
print(f"✗ Failed to initialize on {device}: {e}")
continue
# If we get here, all devices failed
print("✗ Could not initialize RFID reader on any device")
print("Available solutions:")
print(" 1. Check hardware connections")
print(" 2. Enable UART: sudo raspi-config -> Interface Options -> Serial")
print(" 3. Add user to dialout group: sudo usermod -a -G dialout $USER")
print(" 4. Reboot the system after making changes")
log_info_with_server("ERROR: RFID reader initialization failed")
return None
# Start RFID reader
try:
rfid_reader = initialize_rfid_reader()
if rfid_reader is None:
print("WARNING: Application starting without RFID functionality")
print("Card reading will not work until RFID reader is properly configured")
except Exception as e:
print(f"Critical error initializing RFID reader: {e}")
log_info_with_server(f"Critical RFID error: {str(e)}")
print("Application will start but RFID functionality will be disabled")