Files
WMT/app.py

1760 lines
73 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#App version 2.9 - Added configuration mode for "notconfig" devices
import os
import sys
import subprocess
import stat
import pwd
import grp
# Import dependency check/install functions from dependency_utils
from dependency_utils import install_package_from_wheel, check_and_install_dependencies
# Global configuration mode flag
CONFIGURATION_MODE = False
# Run dependency check before importing anything else
try:
check_and_install_dependencies({
'rdm6300': 'rdm6300-0.1.1-py3-none-any.whl',
'gpiozero': None,
'requests': 'requests-2.32.3-py3-none-any.whl',
'aiohttp': 'aiohttp-3.11.18-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl',
'flask': None,
'urllib3': 'urllib3-2.3.0-py3-none-any.whl',
'certifi': 'certifi-2025.1.31-py3-none-any.whl',
'charset_normalizer': 'charset_normalizer-3.4.1-py3-none-any.whl',
'idna': 'idna-3.10-py3-none-any.whl',
'multidict': 'multidict-6.4.3-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl',
'aiosignal': 'aiosignal-1.3.2-py2.py3-none-any.whl',
'frozenlist': 'frozenlist-1.6.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl',
'attrs': 'attrs-25.3.0-py3-none-any.whl',
'yarl': 'yarl-1.20.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl',
'aiohappyeyeballs': 'aiohappyeyeballs-2.6.1-py3-none-any.whl',
'propcache': 'propcache-0.3.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl'
}, "./Files/reposytory")
except Exception as e:
print(f"Warning: Dependency check failed: {e}")
print("Continuing with existing packages...")
def safe_import(module_name, package_name=None):
"""
Safely import a module with error handling
"""
try:
if package_name:
module = __import__(package_name)
return getattr(module, module_name)
else:
return __import__(module_name)
except ImportError as e:
print(f"Warning: Could not import {module_name}: {e}")
return None
# Now import required modules with fallbacks
rdm6300 = safe_import('rdm6300')
if rdm6300 is None:
print("ERROR: rdm6300 is required for this application to work!")
print("Please ensure rdm6300 is installed from the repository.")
sys.exit(1)
# Import other required modules
import time
import logging
# Try to import GPIO-related modules
try:
from gpiozero import OutputDevice
print("✓ gpiozero imported successfully")
except ImportError as e:
print(f"Warning: Could not import gpiozero: {e}")
print("LED functionality will be disabled")
# Create a dummy OutputDevice class
class OutputDevice:
def __init__(self, pin):
self.pin = pin
def on(self):
print(f"LED {self.pin} would turn ON")
def off(self):
print(f"LED {self.pin} would turn OFF")
from multiprocessing import Process
# Import network-related modules
try:
import requests
print("✓ requests imported successfully")
except ImportError as e:
print(f"ERROR: requests is required: {e}")
sys.exit(1)
import threading
import urllib.parse
from datetime import datetime, timedelta
import socket
import signal
# Import async modules
try:
import aiohttp
import asyncio
print("✓ aiohttp and asyncio imported successfully")
except ImportError as e:
print(f"Warning: Could not import aiohttp: {e}")
print("Async functionality may be limited")
import asyncio
# Import Flask for command server
try:
from flask import Flask, request, jsonify
print("✓ Flask imported successfully")
FLASK_AVAILABLE = True
except ImportError as e:
print(f"Warning: Could not import Flask: {e}")
print("Command server functionality will be disabled")
FLASK_AVAILABLE = False
# Create dummy Flask classes
class Flask:
def __init__(self, name):
pass
def route(self, *args, **kwargs):
def decorator(f):
return f
return decorator
def run(self, *args, **kwargs):
pass
def request():
pass
def jsonify(data):
return data
import configparser
import json
def load_config():
"""
Load application configuration from data/config.txt (INI format).
Falls back to legacy file (device_info.txt) and hardcoded defaults.
"""
config_path = "./data/config.txt"
defaults = {
"chrome_url": "http://10.76.140.17/iweb_v2/index.php/traceability/production",
"chrome_local_url": "",
"chrome_insecure_origin": "http://10.76.140.17",
"card_post_base_url": "https://dataswsibiusb01.sibiusb.harting.intra/RO_Quality_PRD/api/record",
"server_log_url": "http://rpi-ansible:80/logs",
"update_host": "rpi-ansible",
"update_user": "pi",
"internet_check_host": "10.76.140.17",
"device_name": "notconfig",
"device_hostname": "unknown-device",
"device_ip": "127.0.0.1",
"device_location": "",
"device_info_reviewed_at": "1970-01-01T00:00:00",
"card_presence": "enable",
"last_synced": "1970-01-01T00:00:00",
}
cfg = dict(defaults)
parser = configparser.ConfigParser()
if os.path.exists(config_path):
try:
parser.read(config_path)
if parser.has_section("chrome"):
if parser.has_option("chrome", "chrome_url"):
cfg["chrome_url"] = parser.get("chrome", "chrome_url")
if parser.has_option("chrome", "chrome_local_url"):
cfg["chrome_local_url"] = parser.get("chrome", "chrome_local_url")
if parser.has_option("chrome", "chrome_insecure_origin"):
cfg["chrome_insecure_origin"] = parser.get("chrome", "chrome_insecure_origin")
if parser.has_section("card_api"):
if parser.has_option("card_api", "base_url"):
cfg["card_post_base_url"] = parser.get("card_api", "base_url")
if parser.has_section("server"):
if parser.has_option("server", "log_url"):
cfg["server_log_url"] = parser.get("server", "log_url")
if parser.has_option("server", "update_host"):
cfg["update_host"] = parser.get("server", "update_host")
if parser.has_option("server", "update_user"):
cfg["update_user"] = parser.get("server", "update_user")
if parser.has_option("server", "internet_check_host"):
cfg["internet_check_host"] = parser.get("server", "internet_check_host")
if parser.has_section("device"):
if parser.has_option("device", "work_place"):
cfg["device_name"] = parser.get("device", "work_place")
elif parser.has_option("device", "name"):
cfg["device_name"] = parser.get("device", "name")
if parser.has_option("device", "hostname"):
cfg["device_hostname"] = parser.get("device", "hostname")
if parser.has_option("device", "ip"):
cfg["device_ip"] = parser.get("device", "ip")
if parser.has_option("device", "info_reviewed_at"):
cfg["device_info_reviewed_at"] = parser.get("device", "info_reviewed_at")
if parser.has_option("device", "location"):
cfg["device_location"] = parser.get("device", "location")
if parser.has_option("device", "card_presence"):
cfg["card_presence"] = parser.get("device", "card_presence")
if parser.has_section("meta"):
if parser.has_option("meta", "last_synced"):
cfg["last_synced"] = parser.get("meta", "last_synced")
print(f"\u2713 Configuration loaded from {config_path}")
except Exception as e:
print(f"Warning: Could not parse {config_path}: {e}. Using defaults.")
else:
# Fall back to legacy individual files
try:
with open("./data/device_info.txt", "r") as f:
lines = f.read().strip().split('\n')
if len(lines) >= 1 and lines[0].strip():
cfg["device_hostname"] = lines[0].strip()
if len(lines) >= 2 and lines[1].strip():
cfg["device_ip"] = lines[1].strip()
except Exception:
pass
print("config.txt not found - using legacy files and defaults")
return cfg
# ---------------------------------------------------------------------------
# Server config sync (runs once at startup)
# ---------------------------------------------------------------------------
def _get_mac_address():
"""Return the MAC address of the primary network interface."""
for iface in ['eth0', 'wlan0', 'eth1']:
mac_path = f'/sys/class/net/{iface}/address'
try:
if os.path.exists(mac_path):
with open(mac_path) as f:
mac = f.read().strip()
if mac and mac != '00:00:00:00:00:00':
return mac
except Exception:
continue
# Fallback: derive from UUID node
import uuid as _uuid
raw = _uuid.getnode()
return ':'.join(f'{(raw >> i) & 0xff:02x}' for i in range(40, -1, -8))
def _write_config_from_server(new_cfg):
"""Overwrite data/config.txt with config received from the server."""
import configparser as _cp
p = _cp.ConfigParser()
p.add_section("chrome")
p.set("chrome", "chrome_url", new_cfg.get("chrome_url", ""))
p.set("chrome", "chrome_local_url", new_cfg.get("chrome_local_url", ""))
p.set("chrome", "chrome_insecure_origin", new_cfg.get("chrome_insecure_origin", ""))
p.add_section("card_api")
p.set("card_api", "base_url", new_cfg.get("card_api_base_url", ""))
p.add_section("server")
p.set("server", "log_url", new_cfg.get("server_log_url", ""))
p.set("server", "update_host", new_cfg.get("update_host", ""))
p.set("server", "update_user", new_cfg.get("update_user", ""))
p.set("server", "internet_check_host", new_cfg.get("internet_check_host", ""))
p.add_section("device")
p.set("device", "work_place", new_cfg.get("device_name", "notconfig"))
p.set("device", "hostname", new_cfg.get("hostname", ""))
p.set("device", "ip", new_cfg.get("device_ip", ""))
p.set("device", "location", new_cfg.get("location") or new_cfg.get("device_location", ""))
p.set("device", "info_reviewed_at", new_cfg.get("info_reviewed_at") or "1970-01-01T00:00:00")
p.set("device", "card_presence", new_cfg.get("card_presence", "enable"))
p.add_section("meta")
sync_ts = new_cfg.get("config_updated_at") or datetime.now().isoformat()
p.set("meta", "last_synced", sync_ts)
os.makedirs("./data", exist_ok=True)
with open("./data/config.txt", "w") as f:
f.write("# WMT Application Configuration\n")
f.write(f"# Synced from server: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
p.write(f)
print(f"\u2713 config.txt written from server (server ts: {sync_ts})")
def sync_config_with_server():
"""
Called once at startup.
- Derives base URL from server_log_url in APP_CONFIG.
- If server config is newer than last_synced → pull and overwrite config.txt, reload APP_CONFIG.
- Otherwise → send a device-info update request to the server.
Returns True if config was pulled from server.
"""
global APP_CONFIG
try:
import requests as _req
import urllib.parse as _up
parsed = _up.urlparse(APP_CONFIG.get("server_log_url", "http://rpi-ansible:80/logs"))
server_base = f"{parsed.scheme}://{parsed.netloc}"
mac = _get_mac_address()
last_synced_str = APP_CONFIG.get("last_synced", "1970-01-01T00:00:00")
try:
last_synced = datetime.fromisoformat(last_synced_str)
except Exception:
last_synced = datetime(1970, 1, 1)
local_info_reviewed_str = APP_CONFIG.get("device_info_reviewed_at", "1970-01-01T00:00:00")
try:
local_info_reviewed = datetime.fromisoformat(local_info_reviewed_str)
except Exception:
local_info_reviewed = datetime(1970, 1, 1)
print(f"Checking server config (MAC={mac}, last_synced={last_synced_str}, info_reviewed_at={local_info_reviewed_str}) ...")
# --- Step 1: get server timestamp ---
ts_resp = _req.get(
f"{server_base}/api/wmt/config/timestamp",
params={"mac": mac}, timeout=5
)
ts_resp.raise_for_status()
ts_data = ts_resp.json()
server_ts_str = ts_data.get("latest_updated_at", "1970-01-01T00:00:00")
try:
server_ts = datetime.fromisoformat(server_ts_str)
except Exception:
server_ts = datetime(1970, 1, 1)
server_info_reviewed_str = ts_data.get("device_info_reviewed_at") or "1970-01-01T00:00:00"
try:
server_info_reviewed = datetime.fromisoformat(server_info_reviewed_str)
except Exception:
server_info_reviewed = datetime(1970, 1, 1)
# Pull if global settings are newer OR if admin has reviewed device info more recently
needs_pull = server_ts > last_synced or server_info_reviewed > local_info_reviewed
if needs_pull:
# --- Step 2a: pull config ---
print(f"Server config is newer ({server_ts_str}), pulling ...")
cfg_resp = _req.get(f"{server_base}/api/wmt/config/{mac}", timeout=5)
cfg_resp.raise_for_status()
_write_config_from_server(cfg_resp.json())
APP_CONFIG = load_config()
print("\u2705 Config synced from server.")
return True
else:
# --- Step 2b: push device info ---
print("Local config is current, sending device info to server ...")
try:
_hostname = socket.gethostname()
_ip = socket.gethostbyname(_hostname)
except Exception:
_hostname = APP_CONFIG.get("device_hostname", "")
_ip = APP_CONFIG.get("device_ip", "")
_req.post(
f"{server_base}/api/wmt/config/update_request",
json={
"mac_address": mac,
"device_name": APP_CONFIG.get("device_name", ""),
"hostname": _hostname,
"device_ip": _ip,
"client_config_mtime": last_synced_str,
"client_info_reviewed_at": local_info_reviewed_str,
"card_presence": APP_CONFIG.get("card_presence", "enable"),
},
timeout=5,
)
print("\u2705 Device info update request sent.")
return False
except Exception as e:
print(f"Config sync skipped (server unreachable or error): {e}")
return False
# Load global application configuration
APP_CONFIG = load_config()
# Attempt to sync with server at startup (non-blocking failures are logged and ignored)
sync_config_with_server()
# ---------------------------------------------------------------------------
# Card presence flag controls whether RFID functions are started
# ---------------------------------------------------------------------------
CARD_PRESENCE_ENABLED = APP_CONFIG.get("card_presence", "enable").strip().lower() == "enable"
print(f"Card presence: {'ENABLED' if CARD_PRESENCE_ENABLED else 'DISABLED'}")
# Early configuration mode detection (before heavy initialization)
def early_launch_configuration_mode():
"""
Early launch of configuration mode with chromium displaying Screen.html
"""
try:
print("🔧 Configuration mode detected - launching Screen.html in Chromium")
# Path to Screen.html relative to the WMT working directory
screen_html_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Files", "Screen.html")
screen_html_relative = "./Files/Screen.html"
if os.path.exists(screen_html_relative):
screen_html_path = os.path.abspath(screen_html_relative)
if not os.path.exists(screen_html_path):
print(f"❌ Screen.html not found at: {screen_html_path}")
return False
print(f"📄 Loading Screen.html from: {screen_html_path}")
# Terminate any existing Chromium processes
chromium_process_name = "chromium"
# Local log only in configuration mode
logging.info("Refreshing Chromium process (configuration mode)")
try:
subprocess.run(["pkill", "-f", chromium_process_name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(5) # Wait for processes to terminate
# Launch Chromium with Screen.html
url = f"file://{screen_html_path}"
subprocess.Popen(
["chromium", "--test-type", "--noerrors", "--kiosk", "--start-fullscreen",
url],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, start_new_session=True
)
logging.info("Chromium process restarted successfully (configuration mode)")
except Exception as e:
logging.error(f"Failed to refresh Chromium process (configuration mode): {e}")
return False
print("✅ Configuration mode launched successfully")
print(" Network connectivity checks disabled")
print(" Server status messages disabled")
print("🔧 Device running in configuration mode")
print("🔧 To exit configuration mode, change work_place in config.txt")
return True
except FileNotFoundError:
print("❌ Chromium browser not found. Please install chromium-browser:")
print(" sudo apt update && sudo apt install chromium-browser")
return False
except Exception as e:
print(f"❌ Error launching configuration mode: {e}")
return False
# EARLY CONFIGURATION MODE DETECTION
print("=" * 60)
print("CONFIGURATION MODE DETECTION")
print("=" * 60)
try:
name = APP_CONFIG.get("device_name", "noconfig")
print(f"✓ Device name loaded: {name}")
# Check if device is in configuration mode
if name.lower() == "notconfig":
print("🔧 Device configured for setup mode (notconfig)")
# Launch configuration mode
if early_launch_configuration_mode():
print("🚀 Configuration mode active - application will run in setup mode")
print("⚠️ Network connectivity checks are DISABLED")
print("⚠️ Server status messages are DISABLED")
print("🔧 Change config.txt [device] name to exit configuration mode")
# Set global flag for configuration mode
CONFIGURATION_MODE = True
print("🔧 Configuration mode activated - continuing with RFID reader initialization")
print("🔧 Network and server monitoring will remain disabled")
else:
print("❌ Failed to launch configuration mode, continuing with normal operation")
CONFIGURATION_MODE = False
else:
print("✅ Device in normal operation mode")
CONFIGURATION_MODE = False
except Exception as e:
print(f"Error in configuration mode detection: {e}")
name = "noconfig"
CONFIGURATION_MODE = False
print("=" * 60)
print("CONTINUING WITH NORMAL INITIALIZATION" if not CONFIGURATION_MODE else "CONFIGURATION MODE ACTIVE")
print("=" * 60)
# ---------------------------------------------------------------------------
# Auto-printer setup
# ---------------------------------------------------------------------------
def _ensure_printer(device_name):
"""
Check whether a CUPS printer named *device_name* is installed.
If not found, install it automatically using lpadmin.
Only called when work_place is set to a real name (not 'notconfig').
"""
print(f"🖨 Checking printer for work_place='{device_name}' ...")
try:
import cups
conn = cups.Connection()
printers = conn.getPrinters()
if device_name in printers:
print(f"✓ Printer '{device_name}' already installed — no action needed.")
return
# Printer not found — install it
print(f"🖨 Printer '{device_name}' not found. Starting installation ...")
cmd = [
"sudo", "/usr/sbin/lpadmin",
"-p", device_name,
"-E",
"-v", "usb://CITIZEN/CT-S310II?serial=00000000",
"-m", "CTS310II.ppd",
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
print(f"✓ Printer '{device_name}' installed successfully.")
else:
print(f"✗ lpadmin returned code {result.returncode}: {result.stderr.strip()}")
except ImportError:
print("⚠ python3-cups not available — printer check skipped. "
"Install with: sudo apt install python3-cups")
except Exception as e:
print(f"⚠ Printer setup skipped: {e}")
# Run auto-printer setup only when device has a real work_place
_auto_printer_name = APP_CONFIG.get("device_name", "notconfig")
if not CONFIGURATION_MODE and _auto_printer_name.lower() != "notconfig":
_ensure_printer(_auto_printer_name)
else:
if _auto_printer_name.lower() == "notconfig":
print("🖨 Printer setup skipped (work_place=notconfig).")
def check_system_requirements():
"""
Check and set up system requirements for the application
"""
print("Checking system requirements...")
# 1. Check and install required system packages
system_packages = {
'sshpass': 'sshpass_1.09-1_armhf.deb' # Required for auto-update functionality
}
for package, deb_file in system_packages.items():
try:
result = subprocess.run(['which', package], capture_output=True, text=True)
if result.returncode == 0:
print(f"{package} is installed")
else:
print(f"Installing {package}...")
# Try online installation first
try:
install_result = subprocess.run(['sudo', 'apt', 'update'], capture_output=True, text=True, timeout=120)
install_result = subprocess.run(['sudo', 'apt', 'install', '-y', package],
capture_output=True, text=True, timeout=300)
if install_result.returncode == 0:
print(f"{package} installed successfully (online)")
continue
else:
print(f"Online installation failed, trying offline...")
except Exception as online_error:
print(f"Online installation failed: {online_error}, trying offline...")
# Try offline installation from local .deb file
deb_path = f"./Files/system_packages/{deb_file}"
if os.path.exists(deb_path):
try:
print(f"Installing {package} from local package: {deb_path}")
offline_result = subprocess.run(['sudo', 'dpkg', '-i', deb_path],
capture_output=True, text=True, timeout=120)
if offline_result.returncode == 0:
print(f"{package} installed successfully (offline)")
else:
print(f"✗ Offline installation failed: {offline_result.stderr}")
# Try to fix dependencies
print("Attempting to fix dependencies...")
subprocess.run(['sudo', 'apt', '--fix-broken', 'install', '-y'],
capture_output=True, text=True, timeout=300)
except Exception as offline_error:
print(f"✗ Offline installation error: {offline_error}")
else:
print(f"✗ Local package not found: {deb_path}")
print(f" To add offline support, download with: apt download {package}")
except Exception as e:
print(f"Warning: Could not check/install {package}: {e}")
# 2. Check and create required directories
required_dirs = ['./data', './Files', './Files/reposytory', './Files/system_packages']
for dir_path in required_dirs:
try:
os.makedirs(dir_path, exist_ok=True)
print(f"✓ Directory ensured: {dir_path}")
except Exception as e:
print(f"✗ Failed to create directory {dir_path}: {e}")
return False
# 2. Check required files and create defaults if missing
required_files = {
'./data/log.txt': '',
'./data/tag.txt': '',
'./data/device_info.txt': 'unknown-device\n127.0.0.1\n'
}
for file_path, default_content in required_files.items():
try:
if not os.path.exists(file_path):
with open(file_path, 'w') as f:
f.write(default_content)
print(f"✓ Created default file: {file_path}")
else:
print(f"✓ File exists: {file_path}")
except Exception as e:
print(f"✗ Failed to create file {file_path}: {e}")
# 3. Check file permissions
try:
for file_path in required_files.keys():
if os.path.exists(file_path):
# Ensure read/write permissions for the application
os.chmod(file_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
print("✓ File permissions set correctly")
except Exception as e:
print(f"Warning: Could not set file permissions: {e}")
return True
def check_port_capabilities():
"""
Check if the application can bind to port 80 and set up capabilities if needed
"""
print("Checking port 80 capabilities...")
try:
# Check if we're running as root
if os.geteuid() == 0:
print("✓ Running as root - port 80 access available")
return True
# Check if capabilities are set
python_path = sys.executable
result = subprocess.run(['getcap', python_path], capture_output=True, text=True)
if 'cap_net_bind_service=ep' in result.stdout:
print("✓ Port binding capabilities already set")
return True
# Try to set capabilities
print("Setting up port 80 binding capabilities...")
setup_script = './setup_port_capability.sh'
if os.path.exists(setup_script):
result = subprocess.run(['sudo', 'bash', setup_script], capture_output=True, text=True)
if result.returncode == 0:
print("✓ Port capabilities set successfully")
return True
else:
print(f"✗ Failed to set capabilities: {result.stderr}")
else:
# Create the setup script if it doesn't exist
script_content = f'''#!/bin/bash
# Set port binding capability for Python to allow port 80 access
echo "Setting port binding capability for Python..."
sudo setcap cap_net_bind_service=ep {python_path}
echo "Capability set successfully"
'''
try:
with open(setup_script, 'w') as f:
f.write(script_content)
os.chmod(setup_script, stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH)
result = subprocess.run(['sudo', 'bash', setup_script], capture_output=True, text=True)
if result.returncode == 0:
print("✓ Port capabilities set successfully")
return True
else:
print(f"✗ Failed to set capabilities: {result.stderr}")
except Exception as e:
print(f"✗ Failed to create setup script: {e}")
except Exception as e:
print(f"Warning: Could not check port capabilities: {e}")
print("Warning: Port 80 may not be accessible. App will try to run on default port.")
return False
def check_hardware_interfaces():
"""
Check hardware interfaces (UART/Serial) required for RFID reader
"""
print("Checking hardware interfaces...")
# Check for serial devices
serial_devices = ['/dev/ttyS0', '/dev/ttyAMA0', '/dev/ttyUSB0']
available_devices = []
for device in serial_devices:
if os.path.exists(device):
try:
# Check if we can access the device
with open(device, 'r'):
pass
available_devices.append(device)
print(f"✓ Serial device available: {device}")
except PermissionError:
print(f"✗ Permission denied for {device}. Adding user to dialout group...")
try:
# Add current user to dialout group for serial access
username = pwd.getpwuid(os.getuid()).pw_name
subprocess.run(['sudo', 'usermod', '-a', '-G', 'dialout', username],
capture_output=True, text=True)
print(f"✓ User {username} added to dialout group (reboot may be required)")
available_devices.append(device)
except Exception as e:
print(f"✗ Failed to add user to dialout group: {e}")
except Exception as e:
print(f"Warning: Could not test {device}: {e}")
if not available_devices:
print("✗ No serial devices found. RFID reader may not work.")
# Enable UART if we're on Raspberry Pi
try:
config_file = '/boot/config.txt'
if os.path.exists(config_file):
print("Attempting to enable UART in Raspberry Pi config...")
result = subprocess.run(['sudo', 'raspi-config', 'nonint', 'do_serial', '0'],
capture_output=True, text=True)
if result.returncode == 0:
print("✓ UART enabled in config (reboot required)")
else:
print("Warning: Could not enable UART automatically")
except Exception as e:
print(f"Warning: Could not configure UART: {e}")
return False
return True
def check_network_connectivity():
"""
Check network connectivity and DNS resolution
"""
print("Checking network connectivity...")
try:
# Test basic connectivity
result = subprocess.run(['ping', '-c', '1', '8.8.8.8'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
print("✓ Internet connectivity available")
# Test DNS resolution
try:
import socket
socket.gethostbyname('google.com')
print("✓ DNS resolution working")
return True
except socket.gaierror:
print("✗ DNS resolution failed")
return False
else:
print("✗ No internet connectivity")
return False
except subprocess.TimeoutExpired:
print("✗ Network timeout")
return False
except Exception as e:
print(f"Warning: Could not test network: {e}")
return False
def initialize_gpio_permissions():
"""
Set up GPIO permissions for LED control
"""
print("Setting up GPIO permissions...")
try:
# Add user to gpio group if it exists
username = pwd.getpwuid(os.getuid()).pw_name
# Check if gpio group exists
try:
grp.getgrnam('gpio')
subprocess.run(['sudo', 'usermod', '-a', '-G', 'gpio', username],
capture_output=True, text=True)
print(f"✓ User {username} added to gpio group")
except KeyError:
print("Warning: gpio group not found - GPIO access may be limited")
# Set up GPIO access via /dev/gpiomem if available
gpio_devices = ['/dev/gpiomem', '/dev/mem']
for device in gpio_devices:
if os.path.exists(device):
print(f"✓ GPIO device available: {device}")
break
else:
print("Warning: No GPIO devices found")
except Exception as e:
print(f"Warning: Could not set up GPIO permissions: {e}")
def perform_system_initialization():
"""
Perform complete system initialization for first run
"""
print("=" * 60)
print("SYSTEM INITIALIZATION - Preparing for first run")
print("=" * 60)
initialization_steps = [
("System Requirements", check_system_requirements),
("Port Capabilities", check_port_capabilities),
("Hardware Interfaces", check_hardware_interfaces),
("GPIO Permissions", initialize_gpio_permissions),
("Network Connectivity", check_network_connectivity)
]
success_count = 0
total_steps = len(initialization_steps)
for step_name, step_function in initialization_steps:
print(f"\n--- {step_name} ---")
try:
if step_function():
success_count += 1
print(f"{step_name} completed successfully")
else:
print(f"{step_name} completed with warnings")
except Exception as e:
print(f"{step_name} failed: {e}")
print("\n" + "=" * 60)
print(f"INITIALIZATION COMPLETE: {success_count}/{total_steps} steps successful")
print("=" * 60)
if success_count < total_steps:
print("Warning: Some initialization steps failed. Application may have limited functionality.")
print("Check the messages above for details.")
return success_count >= (total_steps - 1) # Allow one failure
#configurare variabile
def get_device_info():
"""
Get hostname and device IP with file-based fallback to avoid socket errors
"""
config_file = "./data/device_info.txt"
hostname = None
device_ip = None
# Try to get current hostname and IP
try:
hostname = socket.gethostname()
device_ip = socket.gethostbyname(hostname)
print(f"Successfully resolved - Hostname: {hostname}, IP: {device_ip}")
# Save the working values to file for future fallback
try:
os.makedirs("./data", exist_ok=True) # Create data directory if it doesn't exist
with open(config_file, "w") as f:
f.write(f"{hostname}\n{device_ip}\n")
print(f"Saved device info to {config_file}")
except Exception as e:
print(f"Warning: Could not save device info to file: {e}")
return hostname, device_ip
except socket.gaierror as e:
print(f"Socket error occurred: {e}")
print("Attempting to load device info from file...")
# Try to load from file
try:
with open(config_file, "r") as f:
lines = f.read().strip().split('\n')
if len(lines) >= 2:
hostname = lines[0].strip()
device_ip = lines[1].strip()
print(f"Loaded from file - Hostname: {hostname}, IP: {device_ip}")
return hostname, device_ip
else:
print("File exists but doesn't contain valid data")
except FileNotFoundError:
print(f"No fallback file found at {config_file}")
except Exception as e:
print(f"Error reading fallback file: {e}")
except Exception as e:
print(f"Unexpected error getting device info: {e}")
# Try to load from file as fallback
try:
with open(config_file, "r") as f:
lines = f.read().strip().split('\n')
if len(lines) >= 2:
hostname = lines[0].strip()
device_ip = lines[1].strip()
print(f"Loaded from file after error - Hostname: {hostname}, IP: {device_ip}")
return hostname, device_ip
except Exception as file_error:
print(f"Could not load from file: {file_error}")
# Final fallback: use values from APP_CONFIG (config.txt [device] section)
try:
cfg_hostname = APP_CONFIG.get("device_hostname", "")
cfg_ip = APP_CONFIG.get("device_ip", "")
if cfg_hostname and cfg_hostname != "unknown-device":
hostname = cfg_hostname
device_ip = cfg_ip or "127.0.0.1"
print(f"Loaded from config.txt - Hostname: {hostname}, IP: {device_ip}")
return hostname, device_ip
except Exception:
pass
# Absolute last resort defaults
print("All methods failed - Using default values")
hostname = hostname or "unknown-device"
device_ip = "127.0.0.1"
# Try to save these default values for next time
try:
os.makedirs("./data", exist_ok=True)
with open(config_file, "w") as f:
f.write(f"{hostname}\n{device_ip}\n")
print(f"Saved fallback values to {config_file}")
except Exception as e:
print(f"Could not save fallback values: {e}")
return hostname, device_ip
# Perform system initialization (first run setup)
if not perform_system_initialization():
print("Warning: System initialization completed with errors.")
print("The application will continue but may have limited functionality.")
# Get device information with error handling
hostname, device_ip = get_device_info()
print(f"Final result - Hostname: {hostname}, Device IP: {device_ip}")
# Configure logging
logging.basicConfig(filename='./data/log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# function to delete old logs
def delete_old_logs():
log_dir = './data/'
log_file = 'log.txt'
log_path = os.path.join(log_dir, log_file)
if os.path.exists(log_path):
file_mod_time = datetime.fromtimestamp(os.path.getmtime(log_path))
if datetime.now() - file_mod_time > timedelta(days=10):
os.remove(log_path)
log_info_with_server(f"Deleted old log file: {log_file}")
else:
log_info_with_server(f"Log file is not older than 10 days: {log_file}")
else:
log_info_with_server(f"Log file does not exist: {log_file}")
# Function to read the work place name from config
def read_name_from_file():
try:
n_masa = APP_CONFIG.get("device_name", "")
if n_masa:
return n_masa
except Exception:
pass
return "unknown"
def _get_os_version() -> str:
"""Read OS pretty-name from /etc/os-release (Raspberry Pi OS, etc.)."""
try:
with open("/etc/os-release") as f:
for line in f:
if line.startswith("PRETTY_NAME="):
return line.split("=", 1)[1].strip().strip('"')
except Exception:
pass
return ""
# Function to send logs to a remote server for the Server_monitorizare APP
def send_log_to_server(log_message, n_masa, hostname, device_ip):
try:
log_data = {
"hostname": str(hostname),
"device_ip": str(device_ip),
"nume_masa": str(n_masa),
"log_message": str(log_message),
# Device metadata keeps the server record up to date automatically
"device_type": "Raspberry Pi",
"os_version": _get_os_version(),
"location": APP_CONFIG.get("device_location", ""),
"mac_address": _get_mac_address(),
"card_presence": APP_CONFIG.get("card_presence", "enable"),
}
server_url = APP_CONFIG.get("server_log_url", "http://rpi-ansible:80/logs")
print(log_data) # Debugging: Print log_data to verify its contents
response = requests.post(server_url, json=log_data, timeout=5)
response.raise_for_status()
logging.info("Log successfully sent to server: %s", log_message)
except requests.exceptions.RequestException as e:
logging.error("Failed to send log to server: %s", e)
# Wrapper for logging.info to also send logs to the server Monitorizare APP
def log_info_with_server(message):
n_masa = read_name_from_file() # Read work place name from config
formatted_message = f"{message} (n_masa: {n_masa})" # Format the message
logging.info(formatted_message) # Log the formatted message
# Only send to server if not in configuration mode
try:
if not CONFIGURATION_MODE:
send_log_to_server(message, n_masa, hostname, device_ip) # Send the original message to the server
else:
logging.info("Configuration mode: Server logging disabled")
except NameError:
# CONFIGURATION_MODE not defined yet (during initialization)
send_log_to_server(message, n_masa, hostname, device_ip) # Send the original message to the server
# Function to execute system commands with proper security
def execute_system_command(command):
"""
Execute system commands with proper logging and security checks
"""
# Define allowed commands for security
allowed_commands = [
"sudo apt update",
"sudo apt upgrade -y",
"sudo apt update && sudo apt upgrade -y", # Combined update and upgrade
"sudo apt autoremove -y",
"sudo apt autoclean",
"sudo reboot",
"sudo shutdown -h now",
"df -h",
"free -m",
"uptime",
"systemctl status",
"sudo systemctl restart networking",
"sudo systemctl restart ssh"
]
try:
# Check if command is allowed
if command not in allowed_commands:
log_info_with_server(f"Command '{command}' is not allowed for security reasons")
return {"status": "error", "message": f"Command '{command}' is not allowed", "output": ""}
log_info_with_server(f"Executing command: {command}")
# Execute the command
result = subprocess.run(
command.split(),
capture_output=True,
text=True,
timeout=300 # 5 minute timeout
)
output = result.stdout + result.stderr
if result.returncode == 0:
log_info_with_server(f"Command '{command}' executed successfully")
return {"status": "success", "message": "Command executed successfully", "output": output}
else:
log_info_with_server(f"Command '{command}' failed with return code {result.returncode}")
return {"status": "error", "message": f"Command failed with return code {result.returncode}", "output": output}
except subprocess.TimeoutExpired:
log_info_with_server(f"Command '{command}' timed out")
return {"status": "error", "message": "Command timed out", "output": ""}
except Exception as e:
log_info_with_server(f"Error executing command '{command}': {str(e)}")
return {"status": "error", "message": f"Error: {str(e)}", "output": ""}
# Flask app for receiving commands (only if Flask is available)
if FLASK_AVAILABLE:
command_app = Flask(__name__)
@command_app.route('/execute_command', methods=['POST'])
def handle_command_execution():
"""
Endpoint to receive and execute system commands
"""
try:
data = request.json
if not data or 'command' not in data:
return jsonify({"error": "Invalid request. 'command' field is required"}), 400
command = data.get('command')
# Execute the command
result = execute_system_command(command)
return jsonify(result), 200 if result['status'] == 'success' else 400
except Exception as e:
log_info_with_server(f"Error handling command execution request: {str(e)}")
return jsonify({"error": f"Server error: {str(e)}"}), 500
@command_app.route('/status', methods=['GET'])
def get_device_status():
"""
Endpoint to get device status information
"""
try:
n_masa = read_name_from_file()
# Get system information
uptime_result = subprocess.run(['uptime'], capture_output=True, text=True)
df_result = subprocess.run(['df', '-h', '/'], capture_output=True, text=True)
free_result = subprocess.run(['free', '-m'], capture_output=True, text=True)
status_info = {
"hostname": hostname,
"device_ip": device_ip,
"nume_masa": n_masa,
"timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"uptime": uptime_result.stdout.strip() if uptime_result.returncode == 0 else "N/A",
"disk_usage": df_result.stdout.strip() if df_result.returncode == 0 else "N/A",
"memory_usage": free_result.stdout.strip() if free_result.returncode == 0 else "N/A"
}
return jsonify(status_info), 200
except Exception as e:
log_info_with_server(f"Error getting device status: {str(e)}")
return jsonify({"error": f"Error getting status: {str(e)}"}), 500
@command_app.route('/auto_update', methods=['POST'])
def auto_update_app():
"""
Auto-update the application from the central server
Checks version, downloads newer files if available, and restarts the device
"""
try:
# Configuration (read from APP_CONFIG, falling back to hardcoded defaults)
SERVER_HOST = APP_CONFIG.get("update_host", "rpi-ansible")
SERVER_USER = APP_CONFIG.get("update_user", "pi")
SERVER_PASSWORD = "Initial01!"
SERVER_APP_PATH = "/home/pi/Desktop/prezenta/app.py"
SERVER_REPO_PATH = "/home/pi/Desktop/prezenta/Files/reposytory"
# Dynamically determine local paths based on current script location
current_script_path = os.path.abspath(__file__)
local_base_dir = os.path.dirname(current_script_path)
LOCAL_APP_PATH = current_script_path
LOCAL_REPO_PATH = os.path.join(local_base_dir, "Files", "reposytory")
log_info_with_server(f"Auto-update process initiated from: {LOCAL_APP_PATH}")
# Step 1: Get current local version
current_version = None
try:
with open(LOCAL_APP_PATH, 'r') as f:
first_line = f.readline()
if 'version' in first_line.lower():
# Extract version number (e.g., "2.5" from "#App version 2.5")
import re
version_match = re.search(r'version\s+(\d+\.?\d*)', first_line, re.IGNORECASE)
if version_match:
current_version = float(version_match.group(1))
log_info_with_server(f"Current local version: {current_version}")
except Exception as e:
log_info_with_server(f"Could not determine local version: {e}")
return jsonify({"error": f"Could not determine local version: {str(e)}"}), 500
# Step 2: Get remote version via SCP
temp_dir = "/tmp/app_update"
try:
# Create temporary directory
subprocess.run(['mkdir', '-p', temp_dir], check=True)
# Download remote app.py to check version
scp_command = [
'sshpass', '-p', SERVER_PASSWORD,
'scp', '-o', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null',
f'{SERVER_USER}@{SERVER_HOST}:{SERVER_APP_PATH}',
f'{temp_dir}/app.py'
]
result = subprocess.run(scp_command, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
log_info_with_server(f"Failed to download remote app.py: {result.stderr}")
return jsonify({"error": f"Failed to connect to server: {result.stderr}"}), 500
# Check remote version
remote_version = None
with open(f'{temp_dir}/app.py', 'r') as f:
first_line = f.readline()
if 'version' in first_line.lower():
import re
version_match = re.search(r'version\s+(\d+\.?\d*)', first_line, re.IGNORECASE)
if version_match:
remote_version = float(version_match.group(1))
log_info_with_server(f"Remote version: {remote_version}")
except subprocess.TimeoutExpired:
return jsonify({"error": "Connection to server timed out"}), 500
except Exception as e:
log_info_with_server(f"Error checking remote version: {e}")
return jsonify({"error": f"Error checking remote version: {str(e)}"}), 500
# Step 3: Compare versions
if remote_version is None:
return jsonify({"error": "Could not determine remote version"}), 500
if current_version is None or remote_version <= current_version:
log_info_with_server(f"No update needed. Current: {current_version}, Remote: {remote_version}")
return jsonify({
"status": "no_update_needed",
"current_version": current_version,
"remote_version": remote_version,
"message": "Application is already up to date"
}), 200
# Step 4: Download updated files
log_info_with_server(f"Update available! Downloading version {remote_version}")
try:
# Create backup of current app
backup_path = f"{LOCAL_APP_PATH}.backup.{current_version}"
subprocess.run(['cp', LOCAL_APP_PATH, backup_path], check=True)
log_info_with_server(f"Backup created: {backup_path}")
# Download new app.py
subprocess.run(['cp', f'{temp_dir}/app.py', LOCAL_APP_PATH], check=True)
log_info_with_server("New app.py downloaded successfully")
# Download repository folder
repo_scp_command = [
'sshpass', '-p', SERVER_PASSWORD,
'scp', '-r', '-o', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null',
f'{SERVER_USER}@{SERVER_HOST}:{SERVER_REPO_PATH}',
f'{LOCAL_REPO_PATH}_new'
]
result = subprocess.run(repo_scp_command, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
# Replace old repository with new one
subprocess.run(['rm', '-rf', LOCAL_REPO_PATH], check=True)
subprocess.run(['mv', f'{LOCAL_REPO_PATH}_new', LOCAL_REPO_PATH], check=True)
log_info_with_server("Repository updated successfully")
else:
log_info_with_server(f"Repository update failed: {result.stderr}")
# Download system packages folder
local_system_packages_path = os.path.join(local_base_dir, 'Files', 'system_packages')
server_system_packages = f'{SERVER_USER}@{SERVER_HOST}:/home/pi/Desktop/prezenta/Files/system_packages'
system_scp_command = [
'sshpass', '-p', SERVER_PASSWORD,
'scp', '-r', '-o', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null',
server_system_packages,
f'{local_system_packages_path}_new'
]
try:
result = subprocess.run(system_scp_command, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
# Replace old system packages with new ones
if os.path.exists(local_system_packages_path):
subprocess.run(['rm', '-rf', local_system_packages_path], check=True)
subprocess.run(['mv', f'{local_system_packages_path}_new', local_system_packages_path], check=True)
log_info_with_server("System packages updated successfully")
else:
log_info_with_server(f"System packages update failed: {result.stderr}")
except Exception as sys_e:
log_info_with_server(f"System packages update error: {sys_e}")
except Exception as e:
# Restore backup if something went wrong
try:
subprocess.run(['cp', backup_path, LOCAL_APP_PATH], check=True)
log_info_with_server("Backup restored due to error")
except:
pass
return jsonify({"error": f"Update failed: {str(e)}"}), 500
# Step 5: Schedule device restart
log_info_with_server("Update completed successfully. Scheduling restart...")
# Create a restart script that will run after this response
restart_script = '''#!/bin/bash
sleep 3
sudo reboot
'''
with open('/tmp/restart_device.sh', 'w') as f:
f.write(restart_script)
subprocess.run(['chmod', '+x', '/tmp/restart_device.sh'], check=True)
# Schedule the restart in background
subprocess.Popen(['/tmp/restart_device.sh'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
return jsonify({
"status": "success",
"message": f"Updated from version {current_version} to {remote_version}. Device restarting...",
"old_version": current_version,
"new_version": remote_version,
"restart_scheduled": True
}), 200
except Exception as e:
log_info_with_server(f"Auto-update error: {str(e)}")
return jsonify({"error": f"Auto-update failed: {str(e)}"}), 500
finally:
# Cleanup temp directory
try:
subprocess.run(['rm', '-rf', temp_dir], check=True)
except:
pass
@command_app.route('/update_config', methods=['POST'])
def update_config_endpoint():
"""
Update configuration from Server_Monitorizare_v2.
Accepts a JSON body with sections matching config.txt structure.
Example body: {"chrome": {"chrome_url": "http://..."}}
"""
global APP_CONFIG
ALLOWED_SECTIONS = {
"chrome": ["chrome_url", "chrome_local_url", "chrome_insecure_origin"],
"card_api": ["base_url"],
"server": ["log_url", "update_host", "update_user", "internet_check_host"],
"device": ["name", "hostname", "ip"],
}
try:
data = request.json
if not data:
return jsonify({"error": "JSON body required"}), 400
config_path = "./data/config.txt"
parser = configparser.ConfigParser()
if os.path.exists(config_path):
parser.read(config_path)
updated_keys = []
for section, allowed_keys in ALLOWED_SECTIONS.items():
if section in data and isinstance(data[section], dict):
if not parser.has_section(section):
parser.add_section(section)
for key in allowed_keys:
if key in data[section]:
parser.set(section, key, str(data[section][key]))
updated_keys.append(f"{section}.{key}")
os.makedirs("./data", exist_ok=True)
with open(config_path, "w") as f:
f.write("# WMT Application Configuration\n")
f.write(f"# Last updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
parser.write(f)
APP_CONFIG = load_config()
log_info_with_server(f"Configuration updated via API: {updated_keys}")
return jsonify({"status": "success", "updated_keys": updated_keys}), 200
except Exception as e:
log_info_with_server(f"Error updating config: {str(e)}")
return jsonify({"error": f"Failed to update configuration: {str(e)}"}), 500
@command_app.route('/reload_config', methods=['POST'])
def reload_config_endpoint():
"""
Reload configuration from data/config.txt into memory without restarting.
"""
global APP_CONFIG
try:
APP_CONFIG = load_config()
log_info_with_server("Configuration reloaded via API")
return jsonify({
"status": "success",
"message": "Configuration reloaded",
"chrome_url": APP_CONFIG.get("chrome_url"),
"card_post_base_url": APP_CONFIG.get("card_post_base_url"),
"server_log_url": APP_CONFIG.get("server_log_url"),
"device_name": APP_CONFIG.get("device_name"),
}), 200
except Exception as e:
return jsonify({"error": f"Failed to reload config: {str(e)}"}), 500
def start_command_server():
"""
Start the Flask server with enhanced port handling and fallback
"""
# Try different ports in order of preference
preferred_ports = [
int(os.environ.get('FLASK_PORT', 80)), # Use environment variable or default to 80
80, # Standard HTTP port
5000, # Flask default
8080, # Alternative HTTP port
3000 # Development port
]
for port in preferred_ports:
try:
print(f"Attempting to start command server on port {port}...")
# Test if port is available
import socket
test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
test_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
test_socket.bind(('0.0.0.0', port))
test_socket.close()
# Port is available, start Flask server
print(f"Port {port} is available. Starting command server...")
command_app.run(host='0.0.0.0', port=port, debug=False, use_reloader=False)
return # Success, exit function
except PermissionError:
print(f"✗ Permission denied for port {port}")
if port == 80:
print(" Hint: Port 80 requires root privileges or capabilities")
print(" Try running: sudo setcap cap_net_bind_service=ep $(which python3)")
continue
except OSError as e:
if "Address already in use" in str(e):
print(f"✗ Port {port} is already in use")
else:
print(f"✗ Port {port} error: {e}")
continue
except Exception as e:
print(f"✗ Failed to start on port {port}: {e}")
continue
# If we get here, all ports failed
log_info_with_server("Error: Could not start command server on any port")
print("✗ Could not start command server on any available port")
# Start command server in a separate process with enhanced error handling
try:
print("Initializing command server...")
command_server_process = Process(target=start_command_server)
command_server_process.daemon = True # Ensure it dies with main process
command_server_process.start()
# Give the server a moment to start and check if it's running
import time
time.sleep(2)
if command_server_process.is_alive():
port = int(os.environ.get('FLASK_PORT', 80))
print(f"✓ Command server started successfully on port {port}")
else:
print("Warning: Command server process stopped unexpectedly")
except Exception as e:
print(f"Warning: Could not start command server: {e}")
log_info_with_server(f"Command server startup error: {str(e)}")
else:
print("Warning: Flask not available - Command server disabled")
# Call the function to delete old logs
delete_old_logs()
def config():
import config
# function for posting data to the harting server
def post_backup_data():
try:
with open("./data/tag.txt", "r") as file:
lines = file.readlines()
remaining_lines = lines[:]
for line in lines:
line = line.strip()
if line:
try:
response = requests.post(line, verify=False, timeout=3)
#
response.raise_for_status() # Raise an error for bad status codes
log_info_with_server(f"Data posted successfully:")
remaining_lines.remove(line + "\n")
except requests.exceptions.Timeout:
log_info_with_server("Request timed out.")
break
except requests.exceptions.RequestException as e:
log_info_with_server(f"An error occurred: ")
break
with open("./data/tag.txt", "w") as file:
file.writelines(remaining_lines)
#log_info_with_server("Backup data updated.")
except FileNotFoundError:
log_info_with_server("No backup file found.")
# Function to check internet connection
def check_internet_connection():
hostname = APP_CONFIG.get("internet_check_host", "10.76.140.17")
cmd_block_wifi = 'sudo rfkill block wifi'
cmd_unblock_wifi = 'sudo rfkill unblock wifi'
log_info_with_server('Internet connection check loaded')
delete_old_logs()
chromium_process_name = "chromium"
while True:
try:
# Use subprocess to execute the ping command
response = subprocess.run(
["ping", "-c", "1", hostname],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
if response.returncode == 0:
log_info_with_server("Internet is up! Waiting 45 minutes.")
post_backup_data()
time.sleep(2700) # 45 minutes
else:
log_info_with_server("Internet is down. Rebooting WiFi.")
os.system(cmd_block_wifi)
time.sleep(1200) # 20 minutes
os.system(cmd_unblock_wifi)
# Refresh Chromium process
log_info_with_server("Refreshing Chromium process.")
try:
# Find and terminate Chromium processes
subprocess.run(["pkill", "-f", chromium_process_name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(5) # Wait for processes to terminate
# Relaunch Chromium
url = APP_CONFIG.get("chrome_url", "http://10.76.140.17/iweb_v2/index.php/traceability/production")
chrome_insecure = APP_CONFIG.get("chrome_insecure_origin", "http://10.76.140.17")
subprocess.Popen(
["chromium", "--test-type", "--noerrors", "--kiosk", "--start-fullscreen",
f"--unsafely-treat-insecure-origin-as-secure={chrome_insecure}", url],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, start_new_session=True
)
log_info_with_server("Chromium process restarted successfully.")
except Exception as e:
log_info_with_server(f"Failed to refresh Chromium process: {e}")
except Exception as e:
log_info_with_server(f"An error occurred during internet check: {e}")
time.sleep(60) # Retry after 1 minute in case of an error
# Start the internet connection check in a separate process (only if not in configuration mode)
if not CONFIGURATION_MODE:
internet_check_process = Process(target=check_internet_connection)
internet_check_process.start()
print("✅ Internet connectivity monitoring started")
else:
print("🔧 Configuration mode: Internet connectivity monitoring DISABLED")
# Launch Chromium with the specified URLs (only if not in configuration mode)
if not CONFIGURATION_MODE:
url = APP_CONFIG.get("chrome_url", "http://10.76.140.17/iweb_v2/index.php/traceability/production")
chrome_insecure = APP_CONFIG.get("chrome_insecure_origin", "http://10.76.140.17")
subprocess.Popen(["chromium", "--test-type", "--noerrors", "--kiosk", "--start-fullscreen", f"--unsafely-treat-insecure-origin-as-secure={chrome_insecure}", url], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, start_new_session=True)
print("✅ Main application browser launched")
else:
print("🔧 Configuration mode: Main application browser launch DISABLED")
info = "0"
#function to post info
def post_info(info):
#log_info_with_server("Starting to post data...")
info1 = info.strip() # Remove any leading/trailing whitespace, including newlines
try:
response = requests.post(info1, verify=False, timeout=3)
response.raise_for_status() # Raise an error for bad status codes
#log_info_with_server("Data posted successfully")
except requests.exceptions.Timeout:
with open("./data/tag.txt", "a") as file: # Open in append mode
file.write(info)
log_info_with_server(f"Value was saved to tag.txt")
except requests.exceptions.RequestException as e:
with open("./data/tag.txt", "a") as file: # Open in append mode
file.write(info)
log_info_with_server("Value was saved to tag.txt")
async def post_info_async(info):
try:
# Try to use aiohttp if available
if 'aiohttp' in globals():
async with aiohttp.ClientSession() as session:
try:
async with session.post(info, ssl=False, timeout=3) as response:
response_text = await response.text()
log_info_with_server(f"Data posted successfully")
except asyncio.TimeoutError:
with open("./data/tag.txt", "a") as file:
file.write(info)
except Exception as e:
with open("./data/tag.txt", "a") as file:
file.write(info)
else:
# Fallback to synchronous requests if aiohttp not available
try:
response = requests.post(info.strip(), verify=False, timeout=3)
response.raise_for_status()
log_info_with_server(f"Data posted successfully (fallback)")
except requests.exceptions.Timeout:
with open("./data/tag.txt", "a") as file:
file.write(info)
except requests.exceptions.RequestException as e:
with open("./data/tag.txt", "a") as file:
file.write(info)
except Exception as e:
# Final fallback - save to file
with open("./data/tag.txt", "a") as file:
file.write(info)
def post_info_thread(info):
try:
thread = threading.Thread(target=asyncio.run, args=(post_info_async(info),), daemon=True)
thread.start()
except Exception as e:
# Fallback to synchronous posting if async fails
print(f"Async posting failed, using sync fallback: {e}")
post_info(info)
# Initialize LEDs only when card reader is active
class DummyLED:
def __init__(self, pin):
self.pin = pin
def on(self):
pass
def off(self):
pass
if CARD_PRESENCE_ENABLED:
try:
print("Initializing LED controls...")
led1 = OutputDevice(23)
led2 = OutputDevice(24)
print("✓ LED controls initialized successfully")
log_info_with_server("LED controls initialized")
except Exception as e:
print(f"Warning: Could not initialize LED controls: {e}")
print("Creating dummy LED objects - visual feedback will be disabled")
led1 = DummyLED(23)
led2 = DummyLED(24)
log_info_with_server("LED controls using dummy mode")
else:
print("🚫 Card presence disabled LED controls skipped")
led1 = DummyLED(23)
led2 = DummyLED(24)
# Initialize table name/ID
print("Initializing device configuration...")
name = APP_CONFIG.get("device_name", "noconfig")
logging.info("LED controls initialized")
logging.info("Variabila Id Masa A fost initializata ")
print(f"✓ Device name set from config: {name}")
if not CONFIGURATION_MODE:
log_info_with_server(f"Device name initialized: {name}")
logging.info(name)
#clasa reader
class Reader(rdm6300.BaseReader):
global info
def card_inserted(self, card):
if card.value == 12886709:
config()
return
afisare = time.strftime("%Y-%m-%d&%H:%M:%S")
_base = APP_CONFIG.get("card_post_base_url", "https://dataswsibiusb01.sibiusb.harting.intra/RO_Quality_PRD/api/record")
date = f'{_base}/{name}/{card.value}/1/{afisare}\n'
info = date
if name == "noconfig":
led1.on()
time.sleep(5)
led1.off()
log_info_with_server(f"card inserted {card} but no")
else:
post_info_thread(info)
led1.on()
log_info_with_server(f"card inserted {card}")
def card_removed(self, card):
if card.value == 12886709:
log_info_with_server("Removing Config card")
return
afisare = time.strftime("%Y-%m-%d&%H:%M:%S")
_base = APP_CONFIG.get("card_post_base_url", "https://dataswsibiusb01.sibiusb.harting.intra/RO_Quality_PRD/api/record")
date = f'{_base}/{name}/{card.value}/0/{afisare}\n'
info = date
if name == "noconfig":
led1.off()
log_info_with_server(f"card removed {card}")
else:
post_info_thread(info)
led1.off()
log_info_with_server(f"card removed {card}")
# Initialize RFID Reader with comprehensive error handling
def initialize_rfid_reader():
"""
Initialize RFID reader with multiple device attempts and error handling
"""
print("Initializing RFID reader...")
# List of possible serial devices in order of preference
serial_devices = ['/dev/ttyS0', '/dev/ttyAMA0', '/dev/ttyUSB0', '/dev/ttyACM0']
for device in serial_devices:
try:
print(f"Attempting to initialize RFID reader on {device}...")
r = Reader(device)
r.start()
print(f"✓ RFID reader successfully initialized on {device}")
log_info_with_server(f"RFID reader started on {device}")
return r
except FileNotFoundError:
print(f"✗ Device {device} not found")
continue
except PermissionError:
print(f"✗ Permission denied for {device}")
print(f" Hint: Try adding user to dialout group: sudo usermod -a -G dialout $USER")
continue
except Exception as e:
print(f"✗ Failed to initialize on {device}: {e}")
continue
# If we get here, all devices failed
print("✗ Could not initialize RFID reader on any device")
print("Available solutions:")
print(" 1. Check hardware connections")
print(" 2. Enable UART: sudo raspi-config -> Interface Options -> Serial")
print(" 3. Add user to dialout group: sudo usermod -a -G dialout $USER")
print(" 4. Reboot the system after making changes")
log_info_with_server("ERROR: RFID reader initialization failed")
return None
# Start RFID reader only if card_presence is enabled
if CARD_PRESENCE_ENABLED:
try:
rfid_reader = initialize_rfid_reader()
if rfid_reader is None:
print("WARNING: Application starting without RFID functionality")
print("Card reading will not work until RFID reader is properly configured")
except Exception as e:
print(f"Critical error initializing RFID reader: {e}")
log_info_with_server(f"Critical RFID error: {str(e)}")
print("Application will start but RFID functionality will be disabled")
else:
rfid_reader = None
print("🚫 Card presence disabled RFID reader NOT started")
log_info_with_server("Card presence is disabled RFID reader skipped, running without card reader")