Files
prezenta_work/app.py
ske087 6975e18ed2 v2.7: Fixed auto-update path detection for case-sensitive file systems
- Replaced hardcoded paths with dynamic path detection using __file__
- Auto-update now works regardless of folder case (prezenta vs Prezenta)
- Added comprehensive dependency management documentation
- Enhanced port 80 capability setup script
- Updated system packages repository structure
- Fixed path resolution for Files/reposytory and system_packages directories
2025-08-14 15:51:29 +03:00

1334 lines
55 KiB
Python

#App version 2.7 - Fixed auto-update path detection for case-sensitive file systems
import os
import sys
import subprocess
import importlib
import importlib.util
import stat
import pwd
import grp
def install_package_from_wheel(wheel_path, package_name):
"""
Install a Python package from a wheel file
"""
try:
print(f"Installing {package_name} from {wheel_path}...")
result = subprocess.run([
sys.executable, "-m", "pip", "install", wheel_path,
"--no-index", "--no-deps", "--break-system-packages",
"--no-warn-script-location", "--force-reinstall"
], capture_output=True, text=True, timeout=60)
if result.returncode == 0:
print(f"{package_name} installed successfully")
return True
else:
print(f"✗ Failed to install {package_name}: {result.stderr}")
return False
except Exception as e:
print(f"✗ Error installing {package_name}: {e}")
return False
def check_and_install_dependencies():
"""
Check if required packages are installed and install them from local repository if needed
"""
print("Checking and installing dependencies...")
# Define required packages and their corresponding wheel files
required_packages = {
'rdm6300': 'rdm6300-0.1.1-py3-none-any.whl',
'gpiozero': None, # System package, should be pre-installed
'requests': 'requests-2.32.3-py3-none-any.whl',
'aiohttp': 'aiohttp-3.11.18-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl',
'flask': None, # Will try to install if needed
'urllib3': 'urllib3-2.3.0-py3-none-any.whl',
'certifi': 'certifi-2025.1.31-py3-none-any.whl',
'charset_normalizer': 'charset_normalizer-3.4.1-py3-none-any.whl',
'idna': 'idna-3.10-py3-none-any.whl',
'multidict': 'multidict-6.4.3-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl',
'aiosignal': 'aiosignal-1.3.2-py2.py3-none-any.whl',
'frozenlist': 'frozenlist-1.6.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl',
'attrs': 'attrs-25.3.0-py3-none-any.whl',
'yarl': 'yarl-1.20.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl',
'aiohappyeyeballs': 'aiohappyeyeballs-2.6.1-py3-none-any.whl',
'propcache': 'propcache-0.3.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl'
}
repository_path = "./Files/reposytory"
missing_packages = []
# Check each required package
for package_name, wheel_file in required_packages.items():
try:
# Use importlib to check if package exists
spec = importlib.util.find_spec(package_name)
if spec is not None:
print(f"{package_name} is already installed")
else:
raise ImportError(f"Package {package_name} not found")
except ImportError:
print(f"{package_name} is not installed")
missing_packages.append((package_name, wheel_file))
except Exception as e:
print(f"✗ Error checking {package_name}: {e}")
missing_packages.append((package_name, wheel_file))
# Install missing packages
if missing_packages:
print(f"\nInstalling {len(missing_packages)} missing packages...")
for package_name, wheel_file in missing_packages:
if wheel_file is None:
# Try to install via pip from internet or system packages
try:
print(f"Attempting to install {package_name} via pip...")
result = subprocess.run([
sys.executable, "-m", "pip", "install", package_name,
"--break-system-packages", "--no-warn-script-location"
], capture_output=True, text=True, timeout=120)
if result.returncode == 0:
print(f"{package_name} installed via pip")
else:
print(f"✗ Could not install {package_name} via pip: {result.stderr}")
# Try system package manager for common packages
if package_name in ['flask', 'gpiozero']:
try:
print(f"Trying to install {package_name} via apt...")
apt_name = f"python3-{package_name.replace('_', '-')}"
apt_result = subprocess.run([
"sudo", "apt", "install", "-y", apt_name
], capture_output=True, text=True, timeout=300)
if apt_result.returncode == 0:
print(f"{package_name} installed via apt")
else:
print(f"✗ Could not install {package_name} via apt")
except Exception as apt_e:
print(f"✗ Error installing {package_name} via apt: {apt_e}")
except Exception as e:
print(f"✗ Error installing {package_name} via pip: {e}")
continue
wheel_path = os.path.join(repository_path, wheel_file)
if not os.path.exists(wheel_path):
print(f"✗ Wheel file not found: {wheel_path}")
continue
# Install the package
install_package_from_wheel(wheel_path, package_name)
print("Dependency check completed.\n")
# Run dependency check before importing anything else
try:
check_and_install_dependencies()
except Exception as e:
print(f"Warning: Dependency check failed: {e}")
print("Continuing with existing packages...")
def safe_import(module_name, package_name=None):
"""
Safely import a module with error handling
"""
try:
if package_name:
module = __import__(package_name)
return getattr(module, module_name)
else:
return __import__(module_name)
except ImportError as e:
print(f"Warning: Could not import {module_name}: {e}")
return None
# Now import required modules with fallbacks
rdm6300 = safe_import('rdm6300')
if rdm6300 is None:
print("ERROR: rdm6300 is required for this application to work!")
print("Please ensure rdm6300 is installed from the repository.")
sys.exit(1)
# Import other required modules
import time
import logging
# Try to import GPIO-related modules
try:
from gpiozero import OutputDevice
print("✓ gpiozero imported successfully")
except ImportError as e:
print(f"Warning: Could not import gpiozero: {e}")
print("LED functionality will be disabled")
# Create a dummy OutputDevice class
class OutputDevice:
def __init__(self, pin):
self.pin = pin
def on(self):
print(f"LED {self.pin} would turn ON")
def off(self):
print(f"LED {self.pin} would turn OFF")
from multiprocessing import Process
# Import network-related modules
try:
import requests
print("✓ requests imported successfully")
except ImportError as e:
print(f"ERROR: requests is required: {e}")
sys.exit(1)
import threading
import urllib.parse
from datetime import datetime, timedelta
import socket
import signal
# Import async modules
try:
import aiohttp
import asyncio
print("✓ aiohttp and asyncio imported successfully")
except ImportError as e:
print(f"Warning: Could not import aiohttp: {e}")
print("Async functionality may be limited")
import asyncio
# Import Flask for command server
try:
from flask import Flask, request, jsonify
print("✓ Flask imported successfully")
FLASK_AVAILABLE = True
except ImportError as e:
print(f"Warning: Could not import Flask: {e}")
print("Command server functionality will be disabled")
FLASK_AVAILABLE = False
# Create dummy Flask classes
class Flask:
def __init__(self, name):
pass
def route(self, *args, **kwargs):
def decorator(f):
return f
return decorator
def run(self, *args, **kwargs):
pass
def request():
pass
def jsonify(data):
return data
import json
def check_system_requirements():
"""
Check and set up system requirements for the application
"""
print("Checking system requirements...")
# 1. Check and install required system packages
system_packages = {
'sshpass': 'sshpass_1.09-1_armhf.deb' # Required for auto-update functionality
}
for package, deb_file in system_packages.items():
try:
result = subprocess.run(['which', package], capture_output=True, text=True)
if result.returncode == 0:
print(f"{package} is installed")
else:
print(f"Installing {package}...")
# Try online installation first
try:
install_result = subprocess.run(['sudo', 'apt', 'update'], capture_output=True, text=True, timeout=120)
install_result = subprocess.run(['sudo', 'apt', 'install', '-y', package],
capture_output=True, text=True, timeout=300)
if install_result.returncode == 0:
print(f"{package} installed successfully (online)")
continue
else:
print(f"Online installation failed, trying offline...")
except Exception as online_error:
print(f"Online installation failed: {online_error}, trying offline...")
# Try offline installation from local .deb file
deb_path = f"./Files/system_packages/{deb_file}"
if os.path.exists(deb_path):
try:
print(f"Installing {package} from local package: {deb_path}")
offline_result = subprocess.run(['sudo', 'dpkg', '-i', deb_path],
capture_output=True, text=True, timeout=120)
if offline_result.returncode == 0:
print(f"{package} installed successfully (offline)")
else:
print(f"✗ Offline installation failed: {offline_result.stderr}")
# Try to fix dependencies
print("Attempting to fix dependencies...")
subprocess.run(['sudo', 'apt', '--fix-broken', 'install', '-y'],
capture_output=True, text=True, timeout=300)
except Exception as offline_error:
print(f"✗ Offline installation error: {offline_error}")
else:
print(f"✗ Local package not found: {deb_path}")
print(f" To add offline support, download with: apt download {package}")
except Exception as e:
print(f"Warning: Could not check/install {package}: {e}")
# 2. Check and create required directories
required_dirs = ['./data', './Files', './Files/reposytory', './Files/system_packages']
for dir_path in required_dirs:
try:
os.makedirs(dir_path, exist_ok=True)
print(f"✓ Directory ensured: {dir_path}")
except Exception as e:
print(f"✗ Failed to create directory {dir_path}: {e}")
return False
# 2. Check required files and create defaults if missing
required_files = {
'./data/idmasa.txt': 'noconfig',
'./data/log.txt': '',
'./data/tag.txt': '',
'./data/device_info.txt': 'unknown-device\n127.0.0.1\n'
}
for file_path, default_content in required_files.items():
try:
if not os.path.exists(file_path):
with open(file_path, 'w') as f:
f.write(default_content)
print(f"✓ Created default file: {file_path}")
else:
print(f"✓ File exists: {file_path}")
except Exception as e:
print(f"✗ Failed to create file {file_path}: {e}")
# 3. Check file permissions
try:
for file_path in required_files.keys():
if os.path.exists(file_path):
# Ensure read/write permissions for the application
os.chmod(file_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
print("✓ File permissions set correctly")
except Exception as e:
print(f"Warning: Could not set file permissions: {e}")
return True
def check_port_capabilities():
"""
Check if the application can bind to port 80 and set up capabilities if needed
"""
print("Checking port 80 capabilities...")
try:
# Check if we're running as root
if os.geteuid() == 0:
print("✓ Running as root - port 80 access available")
return True
# Check if capabilities are set
python_path = sys.executable
result = subprocess.run(['getcap', python_path], capture_output=True, text=True)
if 'cap_net_bind_service=ep' in result.stdout:
print("✓ Port binding capabilities already set")
return True
# Try to set capabilities
print("Setting up port 80 binding capabilities...")
setup_script = './setup_port_capability.sh'
if os.path.exists(setup_script):
result = subprocess.run(['sudo', 'bash', setup_script], capture_output=True, text=True)
if result.returncode == 0:
print("✓ Port capabilities set successfully")
return True
else:
print(f"✗ Failed to set capabilities: {result.stderr}")
else:
# Create the setup script if it doesn't exist
script_content = f'''#!/bin/bash
# Set port binding capability for Python to allow port 80 access
echo "Setting port binding capability for Python..."
sudo setcap cap_net_bind_service=ep {python_path}
echo "Capability set successfully"
'''
try:
with open(setup_script, 'w') as f:
f.write(script_content)
os.chmod(setup_script, stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH)
result = subprocess.run(['sudo', 'bash', setup_script], capture_output=True, text=True)
if result.returncode == 0:
print("✓ Port capabilities set successfully")
return True
else:
print(f"✗ Failed to set capabilities: {result.stderr}")
except Exception as e:
print(f"✗ Failed to create setup script: {e}")
except Exception as e:
print(f"Warning: Could not check port capabilities: {e}")
print("Warning: Port 80 may not be accessible. App will try to run on default port.")
return False
def check_hardware_interfaces():
"""
Check hardware interfaces (UART/Serial) required for RFID reader
"""
print("Checking hardware interfaces...")
# Check for serial devices
serial_devices = ['/dev/ttyS0', '/dev/ttyAMA0', '/dev/ttyUSB0']
available_devices = []
for device in serial_devices:
if os.path.exists(device):
try:
# Check if we can access the device
with open(device, 'r'):
pass
available_devices.append(device)
print(f"✓ Serial device available: {device}")
except PermissionError:
print(f"✗ Permission denied for {device}. Adding user to dialout group...")
try:
# Add current user to dialout group for serial access
username = pwd.getpwuid(os.getuid()).pw_name
subprocess.run(['sudo', 'usermod', '-a', '-G', 'dialout', username],
capture_output=True, text=True)
print(f"✓ User {username} added to dialout group (reboot may be required)")
available_devices.append(device)
except Exception as e:
print(f"✗ Failed to add user to dialout group: {e}")
except Exception as e:
print(f"Warning: Could not test {device}: {e}")
if not available_devices:
print("✗ No serial devices found. RFID reader may not work.")
# Enable UART if we're on Raspberry Pi
try:
config_file = '/boot/config.txt'
if os.path.exists(config_file):
print("Attempting to enable UART in Raspberry Pi config...")
result = subprocess.run(['sudo', 'raspi-config', 'nonint', 'do_serial', '0'],
capture_output=True, text=True)
if result.returncode == 0:
print("✓ UART enabled in config (reboot required)")
else:
print("Warning: Could not enable UART automatically")
except Exception as e:
print(f"Warning: Could not configure UART: {e}")
return False
return True
def check_network_connectivity():
"""
Check network connectivity and DNS resolution
"""
print("Checking network connectivity...")
try:
# Test basic connectivity
result = subprocess.run(['ping', '-c', '1', '8.8.8.8'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
print("✓ Internet connectivity available")
# Test DNS resolution
try:
import socket
socket.gethostbyname('google.com')
print("✓ DNS resolution working")
return True
except socket.gaierror:
print("✗ DNS resolution failed")
return False
else:
print("✗ No internet connectivity")
return False
except subprocess.TimeoutExpired:
print("✗ Network timeout")
return False
except Exception as e:
print(f"Warning: Could not test network: {e}")
return False
def initialize_gpio_permissions():
"""
Set up GPIO permissions for LED control
"""
print("Setting up GPIO permissions...")
try:
# Add user to gpio group if it exists
username = pwd.getpwuid(os.getuid()).pw_name
# Check if gpio group exists
try:
grp.getgrnam('gpio')
subprocess.run(['sudo', 'usermod', '-a', '-G', 'gpio', username],
capture_output=True, text=True)
print(f"✓ User {username} added to gpio group")
except KeyError:
print("Warning: gpio group not found - GPIO access may be limited")
# Set up GPIO access via /dev/gpiomem if available
gpio_devices = ['/dev/gpiomem', '/dev/mem']
for device in gpio_devices:
if os.path.exists(device):
print(f"✓ GPIO device available: {device}")
break
else:
print("Warning: No GPIO devices found")
except Exception as e:
print(f"Warning: Could not set up GPIO permissions: {e}")
def perform_system_initialization():
"""
Perform complete system initialization for first run
"""
print("=" * 60)
print("SYSTEM INITIALIZATION - Preparing for first run")
print("=" * 60)
initialization_steps = [
("System Requirements", check_system_requirements),
("Port Capabilities", check_port_capabilities),
("Hardware Interfaces", check_hardware_interfaces),
("GPIO Permissions", initialize_gpio_permissions),
("Network Connectivity", check_network_connectivity)
]
success_count = 0
total_steps = len(initialization_steps)
for step_name, step_function in initialization_steps:
print(f"\n--- {step_name} ---")
try:
if step_function():
success_count += 1
print(f"{step_name} completed successfully")
else:
print(f"{step_name} completed with warnings")
except Exception as e:
print(f"{step_name} failed: {e}")
print("\n" + "=" * 60)
print(f"INITIALIZATION COMPLETE: {success_count}/{total_steps} steps successful")
print("=" * 60)
if success_count < total_steps:
print("Warning: Some initialization steps failed. Application may have limited functionality.")
print("Check the messages above for details.")
return success_count >= (total_steps - 1) # Allow one failure
#configurare variabile
def get_device_info():
"""
Get hostname and device IP with file-based fallback to avoid socket errors
"""
config_file = "./data/device_info.txt"
hostname = None
device_ip = None
# Try to get current hostname and IP
try:
hostname = socket.gethostname()
device_ip = socket.gethostbyname(hostname)
print(f"Successfully resolved - Hostname: {hostname}, IP: {device_ip}")
# Save the working values to file for future fallback
try:
os.makedirs("./data", exist_ok=True) # Create data directory if it doesn't exist
with open(config_file, "w") as f:
f.write(f"{hostname}\n{device_ip}\n")
print(f"Saved device info to {config_file}")
except Exception as e:
print(f"Warning: Could not save device info to file: {e}")
return hostname, device_ip
except socket.gaierror as e:
print(f"Socket error occurred: {e}")
print("Attempting to load device info from file...")
# Try to load from file
try:
with open(config_file, "r") as f:
lines = f.read().strip().split('\n')
if len(lines) >= 2:
hostname = lines[0].strip()
device_ip = lines[1].strip()
print(f"Loaded from file - Hostname: {hostname}, IP: {device_ip}")
return hostname, device_ip
else:
print("File exists but doesn't contain valid data")
except FileNotFoundError:
print(f"No fallback file found at {config_file}")
except Exception as e:
print(f"Error reading fallback file: {e}")
except Exception as e:
print(f"Unexpected error getting device info: {e}")
# Try to load from file as fallback
try:
with open(config_file, "r") as f:
lines = f.read().strip().split('\n')
if len(lines) >= 2:
hostname = lines[0].strip()
device_ip = lines[1].strip()
print(f"Loaded from file after error - Hostname: {hostname}, IP: {device_ip}")
return hostname, device_ip
except Exception as file_error:
print(f"Could not load from file: {file_error}")
# Final fallback if everything fails
print("All methods failed - Using default values")
hostname = hostname or "unknown-device"
device_ip = "127.0.0.1"
# Try to save these default values for next time
try:
os.makedirs("./data", exist_ok=True)
with open(config_file, "w") as f:
f.write(f"{hostname}\n{device_ip}\n")
print(f"Saved fallback values to {config_file}")
except Exception as e:
print(f"Could not save fallback values: {e}")
return hostname, device_ip
# Perform system initialization (first run setup)
if not perform_system_initialization():
print("Warning: System initialization completed with errors.")
print("The application will continue but may have limited functionality.")
# Get device information with error handling
hostname, device_ip = get_device_info()
print(f"Final result - Hostname: {hostname}, Device IP: {device_ip}")
# Configure logging
logging.basicConfig(filename='./data/log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# function to delete old logs
def delete_old_logs():
log_dir = './data/'
log_file = 'log.txt'
log_path = os.path.join(log_dir, log_file)
if os.path.exists(log_path):
file_mod_time = datetime.fromtimestamp(os.path.getmtime(log_path))
if datetime.now() - file_mod_time > timedelta(days=10):
os.remove(log_path)
log_info_with_server(f"Deleted old log file: {log_file}")
else:
log_info_with_server(f"Log file is not older than 10 days: {log_file}")
else:
log_info_with_server(f"Log file does not exist: {log_file}")
# Function to read the name (idmasa) from the file
def read_name_from_file():
try:
with open("./data/idmasa.txt", "r") as file:
n_masa = file.readline().strip()
return n_masa
except FileNotFoundError:
logging.error("File ./data/idmasa.txt not found.")
return "unknown"
# Function to send logs to a remote server for the Server_monitorizare APP
def send_log_to_server(log_message, n_masa, hostname, device_ip):
host = hostname
device = device_ip
try:
log_data = {
"hostname": str(host),
"device_ip": str(device),
"nume_masa": str(n_masa),
"log_message": str(log_message)
}
server_url = "http://rpi-ansible:80/logs" # Replace with your server's URL
print(log_data) # Debugging: Print log_data to verify its contents
response = requests.post(server_url, json=log_data, timeout=5)
response.raise_for_status()
logging.info("Log successfully sent to server: %s", log_message)
except requests.exceptions.RequestException as e:
logging.error("Failed to send log to server: %s", e)
# Wrapper for logging.info to also send logs to the server Monitorizare APP
def log_info_with_server(message):
n_masa = read_name_from_file() # Read name (idmasa) from the file
formatted_message = f"{message} (n_masa: {n_masa})" # Format the message
logging.info(formatted_message) # Log the formatted message
send_log_to_server(message, n_masa, hostname, device_ip) # Send the original message to the server
# Function to execute system commands with proper security
def execute_system_command(command):
"""
Execute system commands with proper logging and security checks
"""
# Define allowed commands for security
allowed_commands = [
"sudo apt update",
"sudo apt upgrade -y",
"sudo apt autoremove -y",
"sudo apt autoclean",
"sudo reboot",
"sudo shutdown -h now",
"df -h",
"free -m",
"uptime",
"systemctl status",
"sudo systemctl restart networking",
"sudo systemctl restart ssh"
]
try:
# Check if command is allowed
if command not in allowed_commands:
log_info_with_server(f"Command '{command}' is not allowed for security reasons")
return {"status": "error", "message": f"Command '{command}' is not allowed", "output": ""}
log_info_with_server(f"Executing command: {command}")
# Execute the command
result = subprocess.run(
command.split(),
capture_output=True,
text=True,
timeout=300 # 5 minute timeout
)
output = result.stdout + result.stderr
if result.returncode == 0:
log_info_with_server(f"Command '{command}' executed successfully")
return {"status": "success", "message": "Command executed successfully", "output": output}
else:
log_info_with_server(f"Command '{command}' failed with return code {result.returncode}")
return {"status": "error", "message": f"Command failed with return code {result.returncode}", "output": output}
except subprocess.TimeoutExpired:
log_info_with_server(f"Command '{command}' timed out")
return {"status": "error", "message": "Command timed out", "output": ""}
except Exception as e:
log_info_with_server(f"Error executing command '{command}': {str(e)}")
return {"status": "error", "message": f"Error: {str(e)}", "output": ""}
# Flask app for receiving commands (only if Flask is available)
if FLASK_AVAILABLE:
command_app = Flask(__name__)
@command_app.route('/execute_command', methods=['POST'])
def handle_command_execution():
"""
Endpoint to receive and execute system commands
"""
try:
data = request.json
if not data or 'command' not in data:
return jsonify({"error": "Invalid request. 'command' field is required"}), 400
command = data.get('command')
# Execute the command
result = execute_system_command(command)
return jsonify(result), 200 if result['status'] == 'success' else 400
except Exception as e:
log_info_with_server(f"Error handling command execution request: {str(e)}")
return jsonify({"error": f"Server error: {str(e)}"}), 500
@command_app.route('/status', methods=['GET'])
def get_device_status():
"""
Endpoint to get device status information
"""
try:
n_masa = read_name_from_file()
# Get system information
uptime_result = subprocess.run(['uptime'], capture_output=True, text=True)
df_result = subprocess.run(['df', '-h', '/'], capture_output=True, text=True)
free_result = subprocess.run(['free', '-m'], capture_output=True, text=True)
status_info = {
"hostname": hostname,
"device_ip": device_ip,
"nume_masa": n_masa,
"timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"uptime": uptime_result.stdout.strip() if uptime_result.returncode == 0 else "N/A",
"disk_usage": df_result.stdout.strip() if df_result.returncode == 0 else "N/A",
"memory_usage": free_result.stdout.strip() if free_result.returncode == 0 else "N/A"
}
return jsonify(status_info), 200
except Exception as e:
log_info_with_server(f"Error getting device status: {str(e)}")
return jsonify({"error": f"Error getting status: {str(e)}"}), 500
@command_app.route('/auto_update', methods=['POST'])
def auto_update_app():
"""
Auto-update the application from the central server
Checks version, downloads newer files if available, and restarts the device
"""
try:
# Configuration
SERVER_HOST = "rpi-ansible"
SERVER_USER = "pi"
SERVER_PASSWORD = "Initial01!"
SERVER_APP_PATH = "/home/pi/Desktop/prezenta/app.py"
SERVER_REPO_PATH = "/home/pi/Desktop/prezenta/Files/reposytory"
# Dynamically determine local paths based on current script location
current_script_path = os.path.abspath(__file__)
local_base_dir = os.path.dirname(current_script_path)
LOCAL_APP_PATH = current_script_path
LOCAL_REPO_PATH = os.path.join(local_base_dir, "Files", "reposytory")
log_info_with_server(f"Auto-update process initiated from: {LOCAL_APP_PATH}")
# Step 1: Get current local version
current_version = None
try:
with open(LOCAL_APP_PATH, 'r') as f:
first_line = f.readline()
if 'version' in first_line.lower():
# Extract version number (e.g., "2.5" from "#App version 2.5")
import re
version_match = re.search(r'version\s+(\d+\.?\d*)', first_line, re.IGNORECASE)
if version_match:
current_version = float(version_match.group(1))
log_info_with_server(f"Current local version: {current_version}")
except Exception as e:
log_info_with_server(f"Could not determine local version: {e}")
return jsonify({"error": f"Could not determine local version: {str(e)}"}), 500
# Step 2: Get remote version via SCP
temp_dir = "/tmp/app_update"
try:
# Create temporary directory
subprocess.run(['mkdir', '-p', temp_dir], check=True)
# Download remote app.py to check version
scp_command = [
'sshpass', '-p', SERVER_PASSWORD,
'scp', '-o', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null',
f'{SERVER_USER}@{SERVER_HOST}:{SERVER_APP_PATH}',
f'{temp_dir}/app.py'
]
result = subprocess.run(scp_command, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
log_info_with_server(f"Failed to download remote app.py: {result.stderr}")
return jsonify({"error": f"Failed to connect to server: {result.stderr}"}), 500
# Check remote version
remote_version = None
with open(f'{temp_dir}/app.py', 'r') as f:
first_line = f.readline()
if 'version' in first_line.lower():
import re
version_match = re.search(r'version\s+(\d+\.?\d*)', first_line, re.IGNORECASE)
if version_match:
remote_version = float(version_match.group(1))
log_info_with_server(f"Remote version: {remote_version}")
except subprocess.TimeoutExpired:
return jsonify({"error": "Connection to server timed out"}), 500
except Exception as e:
log_info_with_server(f"Error checking remote version: {e}")
return jsonify({"error": f"Error checking remote version: {str(e)}"}), 500
# Step 3: Compare versions
if remote_version is None:
return jsonify({"error": "Could not determine remote version"}), 500
if current_version is None or remote_version <= current_version:
log_info_with_server(f"No update needed. Current: {current_version}, Remote: {remote_version}")
return jsonify({
"status": "no_update_needed",
"current_version": current_version,
"remote_version": remote_version,
"message": "Application is already up to date"
}), 200
# Step 4: Download updated files
log_info_with_server(f"Update available! Downloading version {remote_version}")
try:
# Create backup of current app
backup_path = f"{LOCAL_APP_PATH}.backup.{current_version}"
subprocess.run(['cp', LOCAL_APP_PATH, backup_path], check=True)
log_info_with_server(f"Backup created: {backup_path}")
# Download new app.py
subprocess.run(['cp', f'{temp_dir}/app.py', LOCAL_APP_PATH], check=True)
log_info_with_server("New app.py downloaded successfully")
# Download repository folder
repo_scp_command = [
'sshpass', '-p', SERVER_PASSWORD,
'scp', '-r', '-o', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null',
f'{SERVER_USER}@{SERVER_HOST}:{SERVER_REPO_PATH}',
f'{LOCAL_REPO_PATH}_new'
]
result = subprocess.run(repo_scp_command, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
# Replace old repository with new one
subprocess.run(['rm', '-rf', LOCAL_REPO_PATH], check=True)
subprocess.run(['mv', f'{LOCAL_REPO_PATH}_new', LOCAL_REPO_PATH], check=True)
log_info_with_server("Repository updated successfully")
else:
log_info_with_server(f"Repository update failed: {result.stderr}")
# Download system packages folder
local_system_packages_path = os.path.join(local_base_dir, 'Files', 'system_packages')
server_system_packages = f'{SERVER_USER}@{SERVER_HOST}:/home/pi/Desktop/prezenta/Files/system_packages'
system_scp_command = [
'sshpass', '-p', SERVER_PASSWORD,
'scp', '-r', '-o', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null',
server_system_packages,
f'{local_system_packages_path}_new'
]
try:
result = subprocess.run(system_scp_command, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
# Replace old system packages with new ones
if os.path.exists(local_system_packages_path):
subprocess.run(['rm', '-rf', local_system_packages_path], check=True)
subprocess.run(['mv', f'{local_system_packages_path}_new', local_system_packages_path], check=True)
log_info_with_server("System packages updated successfully")
else:
log_info_with_server(f"System packages update failed: {result.stderr}")
except Exception as sys_e:
log_info_with_server(f"System packages update error: {sys_e}")
except Exception as e:
# Restore backup if something went wrong
try:
subprocess.run(['cp', backup_path, LOCAL_APP_PATH], check=True)
log_info_with_server("Backup restored due to error")
except:
pass
return jsonify({"error": f"Update failed: {str(e)}"}), 500
# Step 5: Schedule device restart
log_info_with_server("Update completed successfully. Scheduling restart...")
# Create a restart script that will run after this response
restart_script = '''#!/bin/bash
sleep 3
sudo reboot
'''
with open('/tmp/restart_device.sh', 'w') as f:
f.write(restart_script)
subprocess.run(['chmod', '+x', '/tmp/restart_device.sh'], check=True)
# Schedule the restart in background
subprocess.Popen(['/tmp/restart_device.sh'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
return jsonify({
"status": "success",
"message": f"Updated from version {current_version} to {remote_version}. Device restarting...",
"old_version": current_version,
"new_version": remote_version,
"restart_scheduled": True
}), 200
except Exception as e:
log_info_with_server(f"Auto-update error: {str(e)}")
return jsonify({"error": f"Auto-update failed: {str(e)}"}), 500
finally:
# Cleanup temp directory
try:
subprocess.run(['rm', '-rf', temp_dir], check=True)
except:
pass
def start_command_server():
"""
Start the Flask server with enhanced port handling and fallback
"""
# Try different ports in order of preference
preferred_ports = [
int(os.environ.get('FLASK_PORT', 80)), # Use environment variable or default to 80
80, # Standard HTTP port
5000, # Flask default
8080, # Alternative HTTP port
3000 # Development port
]
for port in preferred_ports:
try:
print(f"Attempting to start command server on port {port}...")
# Test if port is available
import socket
test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
test_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
test_socket.bind(('0.0.0.0', port))
test_socket.close()
# Port is available, start Flask server
print(f"Port {port} is available. Starting command server...")
command_app.run(host='0.0.0.0', port=port, debug=False, use_reloader=False)
return # Success, exit function
except PermissionError:
print(f"✗ Permission denied for port {port}")
if port == 80:
print(" Hint: Port 80 requires root privileges or capabilities")
print(" Try running: sudo setcap cap_net_bind_service=ep $(which python3)")
continue
except OSError as e:
if "Address already in use" in str(e):
print(f"✗ Port {port} is already in use")
else:
print(f"✗ Port {port} error: {e}")
continue
except Exception as e:
print(f"✗ Failed to start on port {port}: {e}")
continue
# If we get here, all ports failed
log_info_with_server("Error: Could not start command server on any port")
print("✗ Could not start command server on any available port")
# Start command server in a separate process with enhanced error handling
try:
print("Initializing command server...")
command_server_process = Process(target=start_command_server)
command_server_process.daemon = True # Ensure it dies with main process
command_server_process.start()
# Give the server a moment to start and check if it's running
import time
time.sleep(2)
if command_server_process.is_alive():
port = int(os.environ.get('FLASK_PORT', 80))
print(f"✓ Command server started successfully on port {port}")
else:
print("Warning: Command server process stopped unexpectedly")
except Exception as e:
print(f"Warning: Could not start command server: {e}")
log_info_with_server(f"Command server startup error: {str(e)}")
else:
print("Warning: Flask not available - Command server disabled")
# Call the function to delete old logs
delete_old_logs()
def config():
import config
# function for posting data to the harting server
def post_backup_data():
try:
with open("./data/tag.txt", "r") as file:
lines = file.readlines()
remaining_lines = lines[:]
for line in lines:
line = line.strip()
if line:
try:
response = requests.post(line, verify=False, timeout=3)
#
response.raise_for_status() # Raise an error for bad status codes
log_info_with_server(f"Data posted successfully:")
remaining_lines.remove(line + "\n")
except requests.exceptions.Timeout:
log_info_with_server("Request timed out.")
break
except requests.exceptions.RequestException as e:
log_info_with_server(f"An error occurred: ")
break
with open("./data/tag.txt", "w") as file:
file.writelines(remaining_lines)
#log_info_with_server("Backup data updated.")
except FileNotFoundError:
log_info_with_server("No backup file found.")
# Function to check internet connection
def check_internet_connection():
hostname = "10.76.140.17"
cmd_block_wifi = 'sudo rfkill block wifi'
cmd_unblock_wifi = 'sudo rfkill unblock wifi'
log_info_with_server('Internet connection check loaded')
delete_old_logs()
chromium_process_name = "chromium"
while True:
try:
# Use subprocess to execute the ping command
response = subprocess.run(
["ping", "-c", "1", hostname],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
if response.returncode == 0:
log_info_with_server("Internet is up! Waiting 45 minutes.")
post_backup_data()
time.sleep(2700) # 45 minutes
else:
log_info_with_server("Internet is down. Rebooting WiFi.")
os.system(cmd_block_wifi)
time.sleep(1200) # 20 minutes
os.system(cmd_unblock_wifi)
# Refresh Chromium process
log_info_with_server("Refreshing Chromium process.")
try:
# Find and terminate Chromium processes
subprocess.run(["pkill", "-f", chromium_process_name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(5) # Wait for processes to terminate
# Relaunch Chromium
url = "10.76.140.17/iweb_v2/index.php/traceability/production"
subprocess.Popen(
["chromium", "--test-type", "--noerrors", "--kiosk", "--start-fullscreen",
"--unsafely-treat-insecure-origin-as-secure=http://10.76.140.17", url],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, start_new_session=True
)
log_info_with_server("Chromium process restarted successfully.")
except Exception as e:
log_info_with_server(f"Failed to refresh Chromium process: {e}")
except Exception as e:
log_info_with_server(f"An error occurred during internet check: {e}")
time.sleep(60) # Retry after 1 minute in case of an error
# Start the internet connection check in a separate process
internet_check_process = Process(target=check_internet_connection)
internet_check_process.start()
url = "10.76.140.17/iweb_v2/index.php/traceability/production" # pentru cazul in care raspberiul nu are sistem de prezenta
# Launch Chromium with the specified URLs
subprocess.Popen(["chromium", "--test-type", "--noerrors", "--kiosk", "--start-fullscreen", "--unsafely-treat-insecure-origin-as-secure=http://10.76.140.17", url], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, start_new_session=True)
info = "0"
#function to post info
def post_info(info):
#log_info_with_server("Starting to post data...")
info1 = info.strip() # Remove any leading/trailing whitespace, including newlines
try:
response = requests.post(info1, verify=False, timeout=3)
response.raise_for_status() # Raise an error for bad status codes
#log_info_with_server("Data posted successfully")
except requests.exceptions.Timeout:
with open("./data/tag.txt", "a") as file: # Open in append mode
file.write(info)
log_info_with_server(f"Value was saved to tag.txt")
except requests.exceptions.RequestException as e:
with open("./data/tag.txt", "a") as file: # Open in append mode
file.write(info)
log_info_with_server("Value was saved to tag.txt")
async def post_info_async(info):
try:
# Try to use aiohttp if available
if 'aiohttp' in globals():
async with aiohttp.ClientSession() as session:
try:
async with session.post(info, ssl=False, timeout=3) as response:
response_text = await response.text()
log_info_with_server(f"Data posted successfully")
except asyncio.TimeoutError:
with open("./data/tag.txt", "a") as file:
file.write(info)
except Exception as e:
with open("./data/tag.txt", "a") as file:
file.write(info)
else:
# Fallback to synchronous requests if aiohttp not available
try:
response = requests.post(info.strip(), verify=False, timeout=3)
response.raise_for_status()
log_info_with_server(f"Data posted successfully (fallback)")
except requests.exceptions.Timeout:
with open("./data/tag.txt", "a") as file:
file.write(info)
except requests.exceptions.RequestException as e:
with open("./data/tag.txt", "a") as file:
file.write(info)
except Exception as e:
# Final fallback - save to file
with open("./data/tag.txt", "a") as file:
file.write(info)
def post_info_thread(info):
try:
thread = threading.Thread(target=asyncio.run, args=(post_info_async(info),), daemon=True)
thread.start()
except Exception as e:
# Fallback to synchronous posting if async fails
print(f"Async posting failed, using sync fallback: {e}")
post_info(info)
# Initialize LEDs with error handling
try:
print("Initializing LED controls...")
led1 = OutputDevice(23)
led2 = OutputDevice(24)
print("✓ LED controls initialized successfully")
log_info_with_server("LED controls initialized")
except Exception as e:
print(f"Warning: Could not initialize LED controls: {e}")
print("Creating dummy LED objects - visual feedback will be disabled")
# Create dummy LED objects that don't crash the app
class DummyLED:
def __init__(self, pin):
self.pin = pin
def on(self):
print(f"LED {self.pin} would turn ON")
def off(self):
print(f"LED {self.pin} would turn OFF")
led1 = DummyLED(23)
led2 = DummyLED(24)
log_info_with_server("LED controls using dummy mode")
# Initialize table name/ID
print("Initializing device configuration...")
name = "idmasa"
logging.info("Variabila Id Masa A fost initializata ")
try:
with open("./data/idmasa.txt", "r") as f:
name = f.readline().strip() or "noconfig"
print(f"✓ Device name loaded: {name}")
log_info_with_server(f"Device name initialized: {name}")
except FileNotFoundError:
print("Warning: idmasa.txt not found, using default 'noconfig'")
name = "noconfig"
# Create the file with default value
try:
with open("./data/idmasa.txt", "w") as f:
f.write("noconfig")
print("✓ Created default idmasa.txt file")
except Exception as e:
print(f"Could not create idmasa.txt: {e}")
except Exception as e:
print(f"Error reading idmasa.txt: {e}")
name = "noconfig"
logging.info(name)
#clasa reader
class Reader(rdm6300.BaseReader):
global info
def card_inserted(self, card):
if card.value == 12886709:
config()
return
afisare = time.strftime("%Y-%m-%d&%H:%M:%S")
date = f'https://dataswsibiusb01.sibiusb.harting.intra/RO_Quality_PRD/api/record/{name}/{card.value}/1/{afisare}\n'
info = date
if name == "noconfig":
led1.on()
time.sleep(5)
led1.off()
log_info_with_server(f"card inserted {card} but no")
else:
post_info_thread(info)
led1.on()
log_info_with_server(f"card inserted {card}")
def card_removed(self, card):
if card.value == 12886709:
log_info_with_server("Removing Config card")
return
afisare = time.strftime("%Y-%m-%d&%H:%M:%S")
date = f'https://dataswsibiusb01.sibiusb.harting.intra/RO_Quality_PRD/api/record/{name}/{card.value}/0/{afisare}\n'
info = date
if name == "noconfig":
led1.off()
log_info_with_server(f"card removed {card}")
else:
post_info_thread(info)
led1.off()
log_info_with_server(f"card removed {card}")
# Initialize RFID Reader with comprehensive error handling
def initialize_rfid_reader():
"""
Initialize RFID reader with multiple device attempts and error handling
"""
print("Initializing RFID reader...")
# List of possible serial devices in order of preference
serial_devices = ['/dev/ttyS0', '/dev/ttyAMA0', '/dev/ttyUSB0', '/dev/ttyACM0']
for device in serial_devices:
try:
print(f"Attempting to initialize RFID reader on {device}...")
r = Reader(device)
r.start()
print(f"✓ RFID reader successfully initialized on {device}")
log_info_with_server(f"RFID reader started on {device}")
return r
except FileNotFoundError:
print(f"✗ Device {device} not found")
continue
except PermissionError:
print(f"✗ Permission denied for {device}")
print(f" Hint: Try adding user to dialout group: sudo usermod -a -G dialout $USER")
continue
except Exception as e:
print(f"✗ Failed to initialize on {device}: {e}")
continue
# If we get here, all devices failed
print("✗ Could not initialize RFID reader on any device")
print("Available solutions:")
print(" 1. Check hardware connections")
print(" 2. Enable UART: sudo raspi-config -> Interface Options -> Serial")
print(" 3. Add user to dialout group: sudo usermod -a -G dialout $USER")
print(" 4. Reboot the system after making changes")
log_info_with_server("ERROR: RFID reader initialization failed")
return None
# Start RFID reader
try:
rfid_reader = initialize_rfid_reader()
if rfid_reader is None:
print("WARNING: Application starting without RFID functionality")
print("Card reading will not work until RFID reader is properly configured")
except Exception as e:
print(f"Critical error initializing RFID reader: {e}")
log_info_with_server(f"Critical RFID error: {str(e)}")
print("Application will start but RFID functionality will be disabled")