Files
WMT/app.py
T
ske087 b17b9bb8da feat: HTTP-based auto-update; rename device_name->work_place; auto-configure from idmasa.txt
- Replace SCP/sshpass auto_update_app with HTTP-based _check_and_apply_update()
  - Downloads zip from /api/wmt/client/version + /api/wmt/client/download
  - Extracts over WMT dir, skipping data/ to preserve device config
  - Backs up app.py as app.py.bak.<version>
  - Restarts via systemctl restart wmt
- Startup and periodic (5-min) version check threads
- _try_autoconfigure_from_prezenta(): auto-write config.txt from idmasa.txt on first boot
- _send_first_registration_log(): POST to update_request after auto-configure
- Renamed all APP_CONFIG 'device_name' keys to 'work_place' (12 occurrences)
2026-05-13 16:36:33 +03:00

1844 lines
76 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#App version 2.9 - Added configuration mode for "notconfig" devices
import os
import sys
import subprocess
import stat
import pwd
import grp
# Import dependency check/install functions from dependency_utils
from dependency_utils import install_package_from_wheel, check_and_install_dependencies
# Global configuration mode flag
CONFIGURATION_MODE = False
# Run dependency check before importing anything else
try:
check_and_install_dependencies({
'rdm6300': 'rdm6300-0.1.1-py3-none-any.whl',
'gpiozero': None,
'requests': 'requests-2.32.3-py3-none-any.whl',
'aiohttp': 'aiohttp-3.11.18-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl',
'flask': None,
'urllib3': 'urllib3-2.3.0-py3-none-any.whl',
'certifi': 'certifi-2025.1.31-py3-none-any.whl',
'charset_normalizer': 'charset_normalizer-3.4.1-py3-none-any.whl',
'idna': 'idna-3.10-py3-none-any.whl',
'multidict': 'multidict-6.4.3-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl',
'aiosignal': 'aiosignal-1.3.2-py2.py3-none-any.whl',
'frozenlist': 'frozenlist-1.6.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl',
'attrs': 'attrs-25.3.0-py3-none-any.whl',
'yarl': 'yarl-1.20.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl',
'aiohappyeyeballs': 'aiohappyeyeballs-2.6.1-py3-none-any.whl',
'propcache': 'propcache-0.3.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl'
}, "./Files/reposytory")
except Exception as e:
print(f"Warning: Dependency check failed: {e}")
print("Continuing with existing packages...")
def safe_import(module_name, package_name=None):
"""
Safely import a module with error handling
"""
try:
if package_name:
module = __import__(package_name)
return getattr(module, module_name)
else:
return __import__(module_name)
except ImportError as e:
print(f"Warning: Could not import {module_name}: {e}")
return None
# Now import required modules with fallbacks
rdm6300 = safe_import('rdm6300')
if rdm6300 is None:
print("ERROR: rdm6300 is required for this application to work!")
print("Please ensure rdm6300 is installed from the repository.")
sys.exit(1)
# Import other required modules
import time
import logging
# Try to import GPIO-related modules
try:
from gpiozero import OutputDevice
print("✓ gpiozero imported successfully")
except ImportError as e:
print(f"Warning: Could not import gpiozero: {e}")
print("LED functionality will be disabled")
# Create a dummy OutputDevice class
class OutputDevice:
def __init__(self, pin):
self.pin = pin
def on(self):
print(f"LED {self.pin} would turn ON")
def off(self):
print(f"LED {self.pin} would turn OFF")
from multiprocessing import Process
# Import network-related modules
try:
import requests
print("✓ requests imported successfully")
except ImportError as e:
print(f"ERROR: requests is required: {e}")
sys.exit(1)
import threading
import urllib.parse
from datetime import datetime, timedelta
import socket
import signal
# Import async modules
try:
import aiohttp
import asyncio
print("✓ aiohttp and asyncio imported successfully")
except ImportError as e:
print(f"Warning: Could not import aiohttp: {e}")
print("Async functionality may be limited")
import asyncio
# Import Flask for command server
try:
from flask import Flask, request, jsonify
print("✓ Flask imported successfully")
FLASK_AVAILABLE = True
except ImportError as e:
print(f"Warning: Could not import Flask: {e}")
print("Command server functionality will be disabled")
FLASK_AVAILABLE = False
# Create dummy Flask classes
class Flask:
def __init__(self, name):
pass
def route(self, *args, **kwargs):
def decorator(f):
return f
return decorator
def run(self, *args, **kwargs):
pass
def request():
pass
def jsonify(data):
return data
import configparser
import json
def load_config():
"""
Load application configuration from data/config.txt (INI format).
Falls back to legacy file (device_info.txt) and hardcoded defaults.
"""
config_path = "./data/config.txt"
defaults = {
"chrome_url": "http://filesibiusb05.sibiusb.harting.intra/iweb_v2/index.php/traceability/production",
"chrome_local_url": "",
"chrome_insecure_origin": "http://filesibiusb05.sibiusb.harting.intra",
"card_post_base_url": "https://dataswsibiusb01.sibiusb.harting.intra/RO_Quality_PRD/api/record",
"server_log_url": "http://rpi-ansible:80/logs",
"update_host": "rpi-ansible",
"update_user": "pi",
"internet_check_host": "filesibiusb05.sibiusb.harting.intra",
"work_place": "notconfig",
"device_hostname": "unknown-device",
"device_ip": "127.0.0.1",
"device_location": "",
"device_info_reviewed_at": "1970-01-01T00:00:00",
"card_presence": "enable",
"last_synced": "1970-01-01T00:00:00",
}
cfg = dict(defaults)
parser = configparser.ConfigParser()
if os.path.exists(config_path):
try:
parser.read(config_path)
if parser.has_section("chrome"):
if parser.has_option("chrome", "chrome_url"):
cfg["chrome_url"] = parser.get("chrome", "chrome_url")
if parser.has_option("chrome", "chrome_local_url"):
cfg["chrome_local_url"] = parser.get("chrome", "chrome_local_url")
if parser.has_option("chrome", "chrome_insecure_origin"):
cfg["chrome_insecure_origin"] = parser.get("chrome", "chrome_insecure_origin")
if parser.has_section("card_api"):
if parser.has_option("card_api", "base_url"):
cfg["card_post_base_url"] = parser.get("card_api", "base_url")
if parser.has_section("server"):
if parser.has_option("server", "log_url"):
cfg["server_log_url"] = parser.get("server", "log_url")
if parser.has_option("server", "update_host"):
cfg["update_host"] = parser.get("server", "update_host")
if parser.has_option("server", "update_user"):
cfg["update_user"] = parser.get("server", "update_user")
if parser.has_option("server", "internet_check_host"):
cfg["internet_check_host"] = parser.get("server", "internet_check_host")
if parser.has_section("device"):
if parser.has_option("device", "work_place"):
cfg["work_place"] = parser.get("device", "work_place")
elif parser.has_option("device", "name"):
cfg["work_place"] = parser.get("device", "name")
if parser.has_option("device", "hostname"):
cfg["device_hostname"] = parser.get("device", "hostname")
if parser.has_option("device", "ip"):
cfg["device_ip"] = parser.get("device", "ip")
if parser.has_option("device", "info_reviewed_at"):
cfg["device_info_reviewed_at"] = parser.get("device", "info_reviewed_at")
if parser.has_option("device", "location"):
cfg["device_location"] = parser.get("device", "location")
if parser.has_option("device", "card_presence"):
cfg["card_presence"] = parser.get("device", "card_presence")
if parser.has_section("meta"):
if parser.has_option("meta", "last_synced"):
cfg["last_synced"] = parser.get("meta", "last_synced")
print(f"\u2713 Configuration loaded from {config_path}")
except Exception as e:
print(f"Warning: Could not parse {config_path}: {e}. Using defaults.")
else:
# Fall back to legacy individual files
try:
with open("./data/device_info.txt", "r") as f:
lines = f.read().strip().split('\n')
if len(lines) >= 1 and lines[0].strip():
cfg["device_hostname"] = lines[0].strip()
if len(lines) >= 2 and lines[1].strip():
cfg["device_ip"] = lines[1].strip()
except Exception:
pass
print("config.txt not found - using legacy files and defaults")
return cfg
# ---------------------------------------------------------------------------
# Server config sync (runs once at startup)
# ---------------------------------------------------------------------------
def _get_mac_address():
"""Return the MAC address of the primary network interface."""
for iface in ['eth0', 'wlan0', 'eth1']:
mac_path = f'/sys/class/net/{iface}/address'
try:
if os.path.exists(mac_path):
with open(mac_path) as f:
mac = f.read().strip()
if mac and mac != '00:00:00:00:00:00':
return mac
except Exception:
continue
# Fallback: derive from UUID node
import uuid as _uuid
raw = _uuid.getnode()
return ':'.join(f'{(raw >> i) & 0xff:02x}' for i in range(40, -1, -8))
def _write_config_from_server(new_cfg):
"""Overwrite data/config.txt with config received from the server."""
import configparser as _cp
p = _cp.ConfigParser()
p.add_section("chrome")
p.set("chrome", "chrome_url", new_cfg.get("chrome_url", ""))
p.set("chrome", "chrome_local_url", new_cfg.get("chrome_local_url", ""))
p.set("chrome", "chrome_insecure_origin", new_cfg.get("chrome_insecure_origin", ""))
p.add_section("card_api")
p.set("card_api", "base_url", new_cfg.get("card_api_base_url", ""))
p.add_section("server")
p.set("server", "log_url", new_cfg.get("server_log_url", ""))
p.set("server", "update_host", new_cfg.get("update_host", ""))
p.set("server", "update_user", new_cfg.get("update_user", ""))
p.set("server", "internet_check_host", new_cfg.get("internet_check_host", ""))
p.add_section("device")
# Only overwrite work_place if the server provides a non-empty value.
# This prevents a freshly migrated device from losing its idmasa-sourced
# work_place on first startup when the server doesn't know the device yet.
server_device_name = new_cfg.get("device_name", "").strip()
local_work_place = APP_CONFIG.get("work_place", "").strip()
effective_work_place = server_device_name if server_device_name else (local_work_place or "notconfig")
p.set("device", "work_place", effective_work_place)
# Same guard for hostname: prefer server value, fall back to OS hostname,
# then whatever was in local config. Never blank it.
server_hostname = new_cfg.get("hostname", "").strip()
if not server_hostname:
try:
server_hostname = socket.gethostname()
except Exception:
server_hostname = APP_CONFIG.get("device_hostname", "").strip()
p.set("device", "hostname", server_hostname)
p.set("device", "ip", new_cfg.get("device_ip", ""))
p.set("device", "location", new_cfg.get("location") or new_cfg.get("device_location", ""))
p.set("device", "info_reviewed_at", new_cfg.get("info_reviewed_at") or "1970-01-01T00:00:00")
p.set("device", "card_presence", new_cfg.get("card_presence", "enable"))
p.add_section("meta")
sync_ts = new_cfg.get("config_updated_at") or datetime.now().isoformat()
p.set("meta", "last_synced", sync_ts)
os.makedirs("./data", exist_ok=True)
with open("./data/config.txt", "w") as f:
f.write("# WMT Application Configuration\n")
f.write(f"# Synced from server: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
p.write(f)
print(f"\u2713 config.txt written from server (server ts: {sync_ts})")
def sync_config_with_server():
"""
Called once at startup.
- Derives base URL from server_log_url in APP_CONFIG.
- If server config is newer than last_synced → pull and overwrite config.txt, reload APP_CONFIG.
- Otherwise → send a device-info update request to the server.
Returns True if config was pulled from server.
"""
global APP_CONFIG
try:
import requests as _req
import urllib.parse as _up
parsed = _up.urlparse(APP_CONFIG.get("server_log_url", "http://rpi-ansible:80/logs"))
server_base = f"{parsed.scheme}://{parsed.netloc}"
mac = _get_mac_address()
last_synced_str = APP_CONFIG.get("last_synced", "1970-01-01T00:00:00")
try:
last_synced = datetime.fromisoformat(last_synced_str)
except Exception:
last_synced = datetime(1970, 1, 1)
local_info_reviewed_str = APP_CONFIG.get("device_info_reviewed_at", "1970-01-01T00:00:00")
try:
local_info_reviewed = datetime.fromisoformat(local_info_reviewed_str)
except Exception:
local_info_reviewed = datetime(1970, 1, 1)
print(f"Checking server config (MAC={mac}, last_synced={last_synced_str}, info_reviewed_at={local_info_reviewed_str}) ...")
# --- Step 1: get server timestamp ---
ts_resp = _req.get(
f"{server_base}/api/wmt/config/timestamp",
params={"mac": mac}, timeout=5
)
ts_resp.raise_for_status()
ts_data = ts_resp.json()
server_ts_str = ts_data.get("latest_updated_at", "1970-01-01T00:00:00")
try:
server_ts = datetime.fromisoformat(server_ts_str)
except Exception:
server_ts = datetime(1970, 1, 1)
server_info_reviewed_str = ts_data.get("device_info_reviewed_at") or "1970-01-01T00:00:00"
try:
server_info_reviewed = datetime.fromisoformat(server_info_reviewed_str)
except Exception:
server_info_reviewed = datetime(1970, 1, 1)
# Pull if global settings are newer OR if admin has reviewed device info more recently
needs_pull = server_ts > last_synced or server_info_reviewed > local_info_reviewed
if needs_pull:
# --- Step 2a: pull config ---
print(f"Server config is newer ({server_ts_str}), pulling ...")
cfg_resp = _req.get(f"{server_base}/api/wmt/config/{mac}", timeout=5)
cfg_resp.raise_for_status()
_write_config_from_server(cfg_resp.json())
APP_CONFIG = load_config()
print("\u2705 Config synced from server.")
return True
else:
# --- Step 2b: push device info ---
print("Local config is current, sending device info to server ...")
try:
_hostname = socket.gethostname()
_ip = socket.gethostbyname(_hostname)
except Exception:
_hostname = APP_CONFIG.get("device_hostname", "")
_ip = APP_CONFIG.get("device_ip", "")
_req.post(
f"{server_base}/api/wmt/config/update_request",
json={
"mac_address": mac,
"device_name": APP_CONFIG.get("work_place", ""),
"hostname": _hostname,
"device_ip": _ip,
"client_config_mtime": last_synced_str,
"client_info_reviewed_at": local_info_reviewed_str,
"card_presence": APP_CONFIG.get("card_presence", "enable"),
},
timeout=5,
)
print("\u2705 Device info update request sent.")
return False
except Exception as e:
print(f"Config sync skipped (server unreachable or error): {e}")
return False
# Load global application configuration
APP_CONFIG = load_config()
# Attempt to sync with server at startup (non-blocking failures are logged and ignored)
sync_config_with_server()
# ---------------------------------------------------------------------------
# Card presence flag controls whether RFID functions are started
# ---------------------------------------------------------------------------
CARD_PRESENCE_ENABLED = APP_CONFIG.get("card_presence", "enable").strip().lower() == "enable"
print(f"Card presence: {'ENABLED' if CARD_PRESENCE_ENABLED else 'DISABLED'}")
# Early configuration mode detection (before heavy initialization)
def early_launch_configuration_mode():
"""
Early launch of configuration mode with chromium displaying Screen.html
"""
try:
print("🔧 Configuration mode detected - launching Screen.html in Chromium")
# Path to Screen.html relative to the WMT working directory
screen_html_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Files", "Screen.html")
screen_html_relative = "./Files/Screen.html"
if os.path.exists(screen_html_relative):
screen_html_path = os.path.abspath(screen_html_relative)
if not os.path.exists(screen_html_path):
print(f"❌ Screen.html not found at: {screen_html_path}")
return False
print(f"📄 Loading Screen.html from: {screen_html_path}")
# Terminate any existing Chromium processes
chromium_process_name = "chromium"
# Local log only in configuration mode
logging.info("Refreshing Chromium process (configuration mode)")
try:
subprocess.run(["pkill", "-f", chromium_process_name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(5) # Wait for processes to terminate
# Launch Chromium with Screen.html
url = f"file://{screen_html_path}"
subprocess.Popen(
["chromium", "--test-type", "--noerrors", "--kiosk", "--start-fullscreen",
url],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, start_new_session=True
)
logging.info("Chromium process restarted successfully (configuration mode)")
except Exception as e:
logging.error(f"Failed to refresh Chromium process (configuration mode): {e}")
return False
print("✅ Configuration mode launched successfully")
print("️ Network connectivity checks disabled")
print("️ Server status messages disabled")
print("🔧 Device running in configuration mode")
print("🔧 To exit configuration mode, change work_place in config.txt")
return True
except FileNotFoundError:
print("❌ Chromium browser not found. Please install chromium-browser:")
print(" sudo apt update && sudo apt install chromium-browser")
return False
except Exception as e:
print(f"❌ Error launching configuration mode: {e}")
return False
# EARLY CONFIGURATION MODE DETECTION
print("=" * 60)
print("CONFIGURATION MODE DETECTION")
print("=" * 60)
def _try_autoconfigure_from_prezenta():
"""
If work_place is still 'notconfig', look for the idmasa.txt left behind by
the migration playbook in Prezenta_Old_Data and use it to self-configure.
Updates config.txt and reloads APP_CONFIG.
Returns True if a valid work_place was found and written.
"""
global APP_CONFIG
idmasa_candidates = [
"/home/pi/Desktop/Prezenta_Old_Data/data/idmasa.txt",
"/home/pi/Desktop/Prezenta_Old_Data/Files/idmasa.txt",
"/home/pi/Desktop/Prezenta/data/idmasa.txt",
"/home/pi/Desktop/Prezenta/Files/idmasa.txt",
]
for path in idmasa_candidates:
if os.path.exists(path):
try:
with open(path, "r") as f:
value = f.read().strip()
if value and value.lower() != "notconfig":
print(f"✅ Auto-configure: found work_place='{value}' in {path}")
# Write into config.txt [device] section
import configparser as _cp
p = _cp.ConfigParser()
p.read("./data/config.txt")
if not p.has_section("device"):
p.add_section("device")
p.set("device", "work_place", value)
# Also capture real hostname/IP
try:
_hn = socket.gethostname()
_ip = socket.gethostbyname(_hn)
except Exception:
_hn = APP_CONFIG.get("device_hostname", "")
_ip = APP_CONFIG.get("device_ip", "")
p.set("device", "hostname", _hn)
p.set("device", "ip", _ip)
os.makedirs("./data", exist_ok=True)
with open("./data/config.txt", "w") as f:
f.write("# WMT Application Configuration\n")
f.write(f"# Auto-configured from {path}\n\n")
p.write(f)
APP_CONFIG = load_config()
print(f"✅ config.txt updated: work_place={value}, hostname={_hn}, ip={_ip}")
return True
except Exception as e:
print(f"Warning: could not read {path}: {e}")
return False
def _send_first_registration_log():
"""
Send device info to the server immediately after auto-configuration so the
server record is created before the periodic sync runs.
"""
try:
import requests as _req
import urllib.parse as _up
parsed = _up.urlparse(APP_CONFIG.get("server_log_url", "http://rpi-ansible:80/logs"))
server_base = f"{parsed.scheme}://{parsed.netloc}"
mac = _get_mac_address()
try:
_hn = socket.gethostname()
_ip = socket.gethostbyname(_hn)
except Exception:
_hn = APP_CONFIG.get("device_hostname", "")
_ip = APP_CONFIG.get("device_ip", "")
_req.post(
f"{server_base}/api/wmt/config/update_request",
json={
"mac_address": mac,
"device_name": APP_CONFIG.get("work_place", ""),
"hostname": _hn,
"device_ip": _ip,
"card_presence": APP_CONFIG.get("card_presence", "enable"),
"client_config_mtime": APP_CONFIG.get("last_synced", "1970-01-01T00:00:00"),
},
timeout=5,
)
print(f"✅ Registration sent to server (work_place={APP_CONFIG.get('work_place')}, hostname={_hn})")
except Exception as e:
print(f"Warning: could not send first registration to server: {e}")
try:
name = APP_CONFIG.get("work_place", "noconfig")
print(f"✓ Device name loaded: {name}")
# Check if device is in configuration mode
if name.lower() == "notconfig":
print("🔧 Device configured for setup mode (notconfig)")
# --- Try to self-configure from Prezenta_Old_Data before entering config mode ---
if _try_autoconfigure_from_prezenta():
name = APP_CONFIG.get("work_place", "notconfig")
print(f"✅ Auto-configuration successful — work_place='{name}', skipping config mode")
_send_first_registration_log()
CONFIGURATION_MODE = False
else:
# No idmasa.txt found — fall through to manual configuration mode
if early_launch_configuration_mode():
print("🚀 Configuration mode active - application will run in setup mode")
print("⚠️ Network connectivity checks are DISABLED")
print("⚠️ Server status messages are DISABLED")
print("🔧 Change config.txt [device] name to exit configuration mode")
CONFIGURATION_MODE = True
print("🔧 Configuration mode activated - continuing with RFID reader initialization")
print("🔧 Network and server monitoring will remain disabled")
else:
print("❌ Failed to launch configuration mode, continuing with normal operation")
CONFIGURATION_MODE = False
else:
print("✅ Device in normal operation mode")
CONFIGURATION_MODE = False
except Exception as e:
print(f"Error in configuration mode detection: {e}")
name = "noconfig"
CONFIGURATION_MODE = False
print("=" * 60)
print("CONTINUING WITH NORMAL INITIALIZATION" if not CONFIGURATION_MODE else "CONFIGURATION MODE ACTIVE")
print("=" * 60)
# ---------------------------------------------------------------------------
# Auto-printer setup
# ---------------------------------------------------------------------------
def _ensure_printer(device_name):
"""
Check whether a CUPS printer named *device_name* is installed.
If not found, install it automatically using lpadmin.
Only called when work_place is set to a real name (not 'notconfig').
"""
print(f"🖨 Checking printer for work_place='{device_name}' ...")
try:
import cups
conn = cups.Connection()
printers = conn.getPrinters()
if device_name in printers:
print(f"✓ Printer '{device_name}' already installed — no action needed.")
return
# Printer not found — install it
print(f"🖨 Printer '{device_name}' not found. Starting installation ...")
cmd = [
"sudo", "/usr/sbin/lpadmin",
"-p", device_name,
"-E",
"-v", "usb://CITIZEN/CT-S310II?serial=00000000",
"-m", "CTS310II.ppd",
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
print(f"✓ Printer '{device_name}' installed successfully.")
else:
print(f"✗ lpadmin returned code {result.returncode}: {result.stderr.strip()}")
except ImportError:
print("⚠ python3-cups not available — printer check skipped. "
"Install with: sudo apt install python3-cups")
except Exception as e:
print(f"⚠ Printer setup skipped: {e}")
# Run auto-printer setup only when device has a real work_place
_auto_printer_name = APP_CONFIG.get("work_place", "notconfig")
if not CONFIGURATION_MODE and _auto_printer_name.lower() != "notconfig":
_ensure_printer(_auto_printer_name)
else:
if _auto_printer_name.lower() == "notconfig":
print("🖨 Printer setup skipped (work_place=notconfig).")
def check_system_requirements():
"""
Check and set up system requirements for the application
"""
print("Checking system requirements...")
# 1. Check and install required system packages
system_packages = {
'sshpass': 'sshpass_1.09-1_armhf.deb' # Required for auto-update functionality
}
for package, deb_file in system_packages.items():
try:
result = subprocess.run(['which', package], capture_output=True, text=True)
if result.returncode == 0:
print(f"{package} is installed")
else:
print(f"Installing {package}...")
# Try online installation first
try:
install_result = subprocess.run(['sudo', 'apt', 'update'], capture_output=True, text=True, timeout=120)
install_result = subprocess.run(['sudo', 'apt', 'install', '-y', package],
capture_output=True, text=True, timeout=300)
if install_result.returncode == 0:
print(f"{package} installed successfully (online)")
continue
else:
print(f"Online installation failed, trying offline...")
except Exception as online_error:
print(f"Online installation failed: {online_error}, trying offline...")
# Try offline installation from local .deb file
deb_path = f"./Files/system_packages/{deb_file}"
if os.path.exists(deb_path):
try:
print(f"Installing {package} from local package: {deb_path}")
offline_result = subprocess.run(['sudo', 'dpkg', '-i', deb_path],
capture_output=True, text=True, timeout=120)
if offline_result.returncode == 0:
print(f"{package} installed successfully (offline)")
else:
print(f"✗ Offline installation failed: {offline_result.stderr}")
# Try to fix dependencies
print("Attempting to fix dependencies...")
subprocess.run(['sudo', 'apt', '--fix-broken', 'install', '-y'],
capture_output=True, text=True, timeout=300)
except Exception as offline_error:
print(f"✗ Offline installation error: {offline_error}")
else:
print(f"✗ Local package not found: {deb_path}")
print(f" To add offline support, download with: apt download {package}")
except Exception as e:
print(f"Warning: Could not check/install {package}: {e}")
# 2. Check and create required directories
required_dirs = ['./data', './Files', './Files/reposytory', './Files/system_packages']
for dir_path in required_dirs:
try:
os.makedirs(dir_path, exist_ok=True)
print(f"✓ Directory ensured: {dir_path}")
except Exception as e:
print(f"✗ Failed to create directory {dir_path}: {e}")
return False
# 2. Check required files and create defaults if missing
required_files = {
'./data/log.txt': '',
'./data/tag.txt': '',
'./data/device_info.txt': 'unknown-device\n127.0.0.1\n'
}
for file_path, default_content in required_files.items():
try:
if not os.path.exists(file_path):
with open(file_path, 'w') as f:
f.write(default_content)
print(f"✓ Created default file: {file_path}")
else:
print(f"✓ File exists: {file_path}")
except Exception as e:
print(f"✗ Failed to create file {file_path}: {e}")
# 3. Check file permissions
try:
for file_path in required_files.keys():
if os.path.exists(file_path):
# Ensure read/write permissions for the application
os.chmod(file_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
print("✓ File permissions set correctly")
except Exception as e:
print(f"Warning: Could not set file permissions: {e}")
return True
def check_port_capabilities():
"""
Check if the application can bind to port 80 and set up capabilities if needed
"""
print("Checking port 80 capabilities...")
try:
# Check if we're running as root
if os.geteuid() == 0:
print("✓ Running as root - port 80 access available")
return True
# Check if capabilities are set
python_path = sys.executable
result = subprocess.run(['getcap', python_path], capture_output=True, text=True)
if 'cap_net_bind_service=ep' in result.stdout:
print("✓ Port binding capabilities already set")
return True
# Try to set capabilities
print("Setting up port 80 binding capabilities...")
setup_script = './setup_port_capability.sh'
if os.path.exists(setup_script):
result = subprocess.run(['sudo', 'bash', setup_script], capture_output=True, text=True)
if result.returncode == 0:
print("✓ Port capabilities set successfully")
return True
else:
print(f"✗ Failed to set capabilities: {result.stderr}")
else:
# Create the setup script if it doesn't exist
script_content = f'''#!/bin/bash
# Set port binding capability for Python to allow port 80 access
echo "Setting port binding capability for Python..."
sudo setcap cap_net_bind_service=ep {python_path}
echo "Capability set successfully"
'''
try:
with open(setup_script, 'w') as f:
f.write(script_content)
os.chmod(setup_script, stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH)
result = subprocess.run(['sudo', 'bash', setup_script], capture_output=True, text=True)
if result.returncode == 0:
print("✓ Port capabilities set successfully")
return True
else:
print(f"✗ Failed to set capabilities: {result.stderr}")
except Exception as e:
print(f"✗ Failed to create setup script: {e}")
except Exception as e:
print(f"Warning: Could not check port capabilities: {e}")
print("Warning: Port 80 may not be accessible. App will try to run on default port.")
return False
def check_hardware_interfaces():
"""
Check hardware interfaces (UART/Serial) required for RFID reader
"""
print("Checking hardware interfaces...")
# Check for serial devices
serial_devices = ['/dev/ttyS0', '/dev/ttyAMA0', '/dev/ttyUSB0']
available_devices = []
for device in serial_devices:
if os.path.exists(device):
try:
# Check if we can access the device
with open(device, 'r'):
pass
available_devices.append(device)
print(f"✓ Serial device available: {device}")
except PermissionError:
print(f"✗ Permission denied for {device}. Adding user to dialout group...")
try:
# Add current user to dialout group for serial access
username = pwd.getpwuid(os.getuid()).pw_name
subprocess.run(['sudo', 'usermod', '-a', '-G', 'dialout', username],
capture_output=True, text=True)
print(f"✓ User {username} added to dialout group (reboot may be required)")
available_devices.append(device)
except Exception as e:
print(f"✗ Failed to add user to dialout group: {e}")
except Exception as e:
print(f"Warning: Could not test {device}: {e}")
if not available_devices:
print("✗ No serial devices found. RFID reader may not work.")
# Enable UART if we're on Raspberry Pi
try:
config_file = '/boot/config.txt'
if os.path.exists(config_file):
print("Attempting to enable UART in Raspberry Pi config...")
result = subprocess.run(['sudo', 'raspi-config', 'nonint', 'do_serial', '0'],
capture_output=True, text=True)
if result.returncode == 0:
print("✓ UART enabled in config (reboot required)")
else:
print("Warning: Could not enable UART automatically")
except Exception as e:
print(f"Warning: Could not configure UART: {e}")
return False
return True
def check_network_connectivity():
"""
Check network connectivity and DNS resolution
"""
print("Checking network connectivity...")
try:
# Test basic connectivity
result = subprocess.run(['ping', '-c', '1', '8.8.8.8'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
print("✓ Internet connectivity available")
# Test DNS resolution
try:
import socket
socket.gethostbyname('google.com')
print("✓ DNS resolution working")
return True
except socket.gaierror:
print("✗ DNS resolution failed")
return False
else:
print("✗ No internet connectivity")
return False
except subprocess.TimeoutExpired:
print("✗ Network timeout")
return False
except Exception as e:
print(f"Warning: Could not test network: {e}")
return False
def initialize_gpio_permissions():
"""
Set up GPIO permissions for LED control
"""
print("Setting up GPIO permissions...")
try:
# Add user to gpio group if it exists
username = pwd.getpwuid(os.getuid()).pw_name
# Check if gpio group exists
try:
grp.getgrnam('gpio')
subprocess.run(['sudo', 'usermod', '-a', '-G', 'gpio', username],
capture_output=True, text=True)
print(f"✓ User {username} added to gpio group")
except KeyError:
print("Warning: gpio group not found - GPIO access may be limited")
# Set up GPIO access via /dev/gpiomem if available
gpio_devices = ['/dev/gpiomem', '/dev/mem']
for device in gpio_devices:
if os.path.exists(device):
print(f"✓ GPIO device available: {device}")
break
else:
print("Warning: No GPIO devices found")
except Exception as e:
print(f"Warning: Could not set up GPIO permissions: {e}")
def perform_system_initialization():
"""
Perform complete system initialization for first run
"""
print("=" * 60)
print("SYSTEM INITIALIZATION - Preparing for first run")
print("=" * 60)
initialization_steps = [
("System Requirements", check_system_requirements),
("Port Capabilities", check_port_capabilities),
("Hardware Interfaces", check_hardware_interfaces),
("GPIO Permissions", initialize_gpio_permissions),
("Network Connectivity", check_network_connectivity)
]
success_count = 0
total_steps = len(initialization_steps)
for step_name, step_function in initialization_steps:
print(f"\n--- {step_name} ---")
try:
if step_function():
success_count += 1
print(f"{step_name} completed successfully")
else:
print(f"{step_name} completed with warnings")
except Exception as e:
print(f"{step_name} failed: {e}")
print("\n" + "=" * 60)
print(f"INITIALIZATION COMPLETE: {success_count}/{total_steps} steps successful")
print("=" * 60)
if success_count < total_steps:
print("Warning: Some initialization steps failed. Application may have limited functionality.")
print("Check the messages above for details.")
return success_count >= (total_steps - 1) # Allow one failure
#configurare variabile
def get_device_info():
"""
Get hostname and device IP with file-based fallback to avoid socket errors
"""
config_file = "./data/device_info.txt"
hostname = None
device_ip = None
# Try to get current hostname and IP
try:
hostname = socket.gethostname()
device_ip = socket.gethostbyname(hostname)
print(f"Successfully resolved - Hostname: {hostname}, IP: {device_ip}")
# Save the working values to file for future fallback
try:
os.makedirs("./data", exist_ok=True) # Create data directory if it doesn't exist
with open(config_file, "w") as f:
f.write(f"{hostname}\n{device_ip}\n")
print(f"Saved device info to {config_file}")
except Exception as e:
print(f"Warning: Could not save device info to file: {e}")
return hostname, device_ip
except socket.gaierror as e:
print(f"Socket error occurred: {e}")
print("Attempting to load device info from file...")
# Try to load from file
try:
with open(config_file, "r") as f:
lines = f.read().strip().split('\n')
if len(lines) >= 2:
hostname = lines[0].strip()
device_ip = lines[1].strip()
print(f"Loaded from file - Hostname: {hostname}, IP: {device_ip}")
return hostname, device_ip
else:
print("File exists but doesn't contain valid data")
except FileNotFoundError:
print(f"No fallback file found at {config_file}")
except Exception as e:
print(f"Error reading fallback file: {e}")
except Exception as e:
print(f"Unexpected error getting device info: {e}")
# Try to load from file as fallback
try:
with open(config_file, "r") as f:
lines = f.read().strip().split('\n')
if len(lines) >= 2:
hostname = lines[0].strip()
device_ip = lines[1].strip()
print(f"Loaded from file after error - Hostname: {hostname}, IP: {device_ip}")
return hostname, device_ip
except Exception as file_error:
print(f"Could not load from file: {file_error}")
# Final fallback: use values from APP_CONFIG (config.txt [device] section)
try:
cfg_hostname = APP_CONFIG.get("device_hostname", "")
cfg_ip = APP_CONFIG.get("device_ip", "")
if cfg_hostname and cfg_hostname != "unknown-device":
hostname = cfg_hostname
device_ip = cfg_ip or "127.0.0.1"
print(f"Loaded from config.txt - Hostname: {hostname}, IP: {device_ip}")
return hostname, device_ip
except Exception:
pass
# Absolute last resort defaults
print("All methods failed - Using default values")
hostname = hostname or "unknown-device"
device_ip = "127.0.0.1"
# Try to save these default values for next time
try:
os.makedirs("./data", exist_ok=True)
with open(config_file, "w") as f:
f.write(f"{hostname}\n{device_ip}\n")
print(f"Saved fallback values to {config_file}")
except Exception as e:
print(f"Could not save fallback values: {e}")
return hostname, device_ip
# Perform system initialization (first run setup)
if not perform_system_initialization():
print("Warning: System initialization completed with errors.")
print("The application will continue but may have limited functionality.")
# Get device information with error handling
hostname, device_ip = get_device_info()
print(f"Final result - Hostname: {hostname}, Device IP: {device_ip}")
# Configure logging
logging.basicConfig(filename='./data/log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# function to delete old logs
def delete_old_logs():
log_dir = './data/'
log_file = 'log.txt'
log_path = os.path.join(log_dir, log_file)
if os.path.exists(log_path):
file_mod_time = datetime.fromtimestamp(os.path.getmtime(log_path))
if datetime.now() - file_mod_time > timedelta(days=10):
os.remove(log_path)
log_info_with_server(f"Deleted old log file: {log_file}")
else:
log_info_with_server(f"Log file is not older than 10 days: {log_file}")
else:
log_info_with_server(f"Log file does not exist: {log_file}")
# Function to read the work place name from config
def read_name_from_file():
try:
n_masa = APP_CONFIG.get("work_place", "")
if n_masa:
return n_masa
except Exception:
pass
return "unknown"
def _get_os_version() -> str:
"""Read OS pretty-name from /etc/os-release (Raspberry Pi OS, etc.)."""
try:
with open("/etc/os-release") as f:
for line in f:
if line.startswith("PRETTY_NAME="):
return line.split("=", 1)[1].strip().strip('"')
except Exception:
pass
return ""
# Function to send logs to a remote server for the Server_monitorizare APP
def send_log_to_server(log_message, n_masa, hostname, device_ip):
try:
log_data = {
"hostname": str(hostname),
"device_ip": str(device_ip),
"nume_masa": str(n_masa),
"log_message": str(log_message),
# Device metadata keeps the server record up to date automatically
"device_type": "Raspberry Pi",
"os_version": _get_os_version(),
"location": APP_CONFIG.get("device_location", ""),
"mac_address": _get_mac_address(),
"card_presence": APP_CONFIG.get("card_presence", "enable"),
}
server_url = APP_CONFIG.get("server_log_url", "http://rpi-ansible:80/logs")
print(log_data) # Debugging: Print log_data to verify its contents
response = requests.post(server_url, json=log_data, timeout=5)
response.raise_for_status()
logging.info("Log successfully sent to server: %s", log_message)
except requests.exceptions.RequestException as e:
logging.error("Failed to send log to server: %s", e)
# Wrapper for logging.info to also send logs to the server Monitorizare APP
def log_info_with_server(message):
n_masa = read_name_from_file() # Read work place name from config
formatted_message = f"{message} (n_masa: {n_masa})" # Format the message
logging.info(formatted_message) # Log the formatted message
# Only send to server if not in configuration mode
try:
if not CONFIGURATION_MODE:
send_log_to_server(message, n_masa, hostname, device_ip) # Send the original message to the server
else:
logging.info("Configuration mode: Server logging disabled")
except NameError:
# CONFIGURATION_MODE not defined yet (during initialization)
send_log_to_server(message, n_masa, hostname, device_ip) # Send the original message to the server
# Function to execute system commands with proper security
def execute_system_command(command):
"""
Execute system commands with proper logging and security checks
"""
# Define allowed commands for security
allowed_commands = [
"sudo apt update",
"sudo apt upgrade -y",
"sudo apt update && sudo apt upgrade -y", # Combined update and upgrade
"sudo apt autoremove -y",
"sudo apt autoclean",
"sudo reboot",
"sudo shutdown -h now",
"df -h",
"free -m",
"uptime",
"systemctl status",
"sudo systemctl restart networking",
"sudo systemctl restart ssh"
]
try:
# Check if command is allowed
if command not in allowed_commands:
log_info_with_server(f"Command '{command}' is not allowed for security reasons")
return {"status": "error", "message": f"Command '{command}' is not allowed", "output": ""}
log_info_with_server(f"Executing command: {command}")
# Execute the command
result = subprocess.run(
command.split(),
capture_output=True,
text=True,
timeout=300 # 5 minute timeout
)
output = result.stdout + result.stderr
if result.returncode == 0:
log_info_with_server(f"Command '{command}' executed successfully")
return {"status": "success", "message": "Command executed successfully", "output": output}
else:
log_info_with_server(f"Command '{command}' failed with return code {result.returncode}")
return {"status": "error", "message": f"Command failed with return code {result.returncode}", "output": output}
except subprocess.TimeoutExpired:
log_info_with_server(f"Command '{command}' timed out")
return {"status": "error", "message": "Command timed out", "output": ""}
except Exception as e:
log_info_with_server(f"Error executing command '{command}': {str(e)}")
return {"status": "error", "message": f"Error: {str(e)}", "output": ""}
# Flask app for receiving commands (only if Flask is available)
if FLASK_AVAILABLE:
command_app = Flask(__name__)
@command_app.route('/execute_command', methods=['POST'])
def handle_command_execution():
"""
Endpoint to receive and execute system commands
"""
try:
data = request.json
if not data or 'command' not in data:
return jsonify({"error": "Invalid request. 'command' field is required"}), 400
command = data.get('command')
# Execute the command
result = execute_system_command(command)
return jsonify(result), 200 if result['status'] == 'success' else 400
except Exception as e:
log_info_with_server(f"Error handling command execution request: {str(e)}")
return jsonify({"error": f"Server error: {str(e)}"}), 500
@command_app.route('/status', methods=['GET'])
def get_device_status():
"""
Endpoint to get device status information
"""
try:
n_masa = read_name_from_file()
# Get system information
uptime_result = subprocess.run(['uptime'], capture_output=True, text=True)
df_result = subprocess.run(['df', '-h', '/'], capture_output=True, text=True)
free_result = subprocess.run(['free', '-m'], capture_output=True, text=True)
status_info = {
"hostname": hostname,
"device_ip": device_ip,
"nume_masa": n_masa,
"timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"uptime": uptime_result.stdout.strip() if uptime_result.returncode == 0 else "N/A",
"disk_usage": df_result.stdout.strip() if df_result.returncode == 0 else "N/A",
"memory_usage": free_result.stdout.strip() if free_result.returncode == 0 else "N/A"
}
return jsonify(status_info), 200
except Exception as e:
log_info_with_server(f"Error getting device status: {str(e)}")
return jsonify({"error": f"Error getting status: {str(e)}"}), 500
@command_app.route('/auto_update', methods=['POST'])
def auto_update_app():
"""
Trigger an immediate WMT client update check against the monitoring server.
Delegates to _check_and_apply_update().
"""
result = _check_and_apply_update()
if result.get('updated'):
return jsonify({"status": "success", "message": result['message']}), 200
elif result.get('error'):
return jsonify({"error": result['message']}), 500
else:
return jsonify({"status": "no_update", "message": result['message']}), 200
@command_app.route('/update_config', methods=['POST'])
def update_config_endpoint():
"""
Update configuration from Server_Monitorizare_v2.
Accepts a JSON body with sections matching config.txt structure.
Example body: {"chrome": {"chrome_url": "http://..."}}
"""
global APP_CONFIG
ALLOWED_SECTIONS = {
"chrome": ["chrome_url", "chrome_local_url", "chrome_insecure_origin"],
"card_api": ["base_url"],
"server": ["log_url", "update_host", "update_user", "internet_check_host"],
"device": ["name", "hostname", "ip"],
}
try:
data = request.json
if not data:
return jsonify({"error": "JSON body required"}), 400
config_path = "./data/config.txt"
parser = configparser.ConfigParser()
if os.path.exists(config_path):
parser.read(config_path)
updated_keys = []
for section, allowed_keys in ALLOWED_SECTIONS.items():
if section in data and isinstance(data[section], dict):
if not parser.has_section(section):
parser.add_section(section)
for key in allowed_keys:
if key in data[section]:
parser.set(section, key, str(data[section][key]))
updated_keys.append(f"{section}.{key}")
os.makedirs("./data", exist_ok=True)
with open(config_path, "w") as f:
f.write("# WMT Application Configuration\n")
f.write(f"# Last updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
parser.write(f)
APP_CONFIG = load_config()
log_info_with_server(f"Configuration updated via API: {updated_keys}")
return jsonify({"status": "success", "updated_keys": updated_keys}), 200
except Exception as e:
log_info_with_server(f"Error updating config: {str(e)}")
return jsonify({"error": f"Failed to update configuration: {str(e)}"}), 500
@command_app.route('/reload_config', methods=['POST'])
def reload_config_endpoint():
"""
Reload configuration from data/config.txt into memory without restarting.
"""
global APP_CONFIG
try:
APP_CONFIG = load_config()
log_info_with_server("Configuration reloaded via API")
return jsonify({
"status": "success",
"message": "Configuration reloaded",
"chrome_url": APP_CONFIG.get("chrome_url"),
"card_post_base_url": APP_CONFIG.get("card_post_base_url"),
"server_log_url": APP_CONFIG.get("server_log_url"),
"device_name": APP_CONFIG.get("work_place"),
}), 200
except Exception as e:
return jsonify({"error": f"Failed to reload config: {str(e)}"}), 500
def start_command_server():
"""
Start the Flask server with enhanced port handling and fallback
"""
# Try different ports in order of preference
preferred_ports = [
int(os.environ.get('FLASK_PORT', 80)), # Use environment variable or default to 80
80, # Standard HTTP port
5000, # Flask default
8080, # Alternative HTTP port
3000 # Development port
]
for port in preferred_ports:
try:
print(f"Attempting to start command server on port {port}...")
# Test if port is available
import socket
test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
test_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
test_socket.bind(('0.0.0.0', port))
test_socket.close()
# Port is available, start Flask server
print(f"Port {port} is available. Starting command server...")
command_app.run(host='0.0.0.0', port=port, debug=False, use_reloader=False)
return # Success, exit function
except PermissionError:
print(f"✗ Permission denied for port {port}")
if port == 80:
print(" Hint: Port 80 requires root privileges or capabilities")
print(" Try running: sudo setcap cap_net_bind_service=ep $(which python3)")
continue
except OSError as e:
if "Address already in use" in str(e):
print(f"✗ Port {port} is already in use")
else:
print(f"✗ Port {port} error: {e}")
continue
except Exception as e:
print(f"✗ Failed to start on port {port}: {e}")
continue
# If we get here, all ports failed
log_info_with_server("Error: Could not start command server on any port")
print("✗ Could not start command server on any available port")
# Start command server in a separate process with enhanced error handling
try:
print("Initializing command server...")
command_server_process = Process(target=start_command_server)
command_server_process.daemon = True # Ensure it dies with main process
command_server_process.start()
# Give the server a moment to start and check if it's running
import time
time.sleep(2)
if command_server_process.is_alive():
port = int(os.environ.get('FLASK_PORT', 80))
print(f"✓ Command server started successfully on port {port}")
else:
print("Warning: Command server process stopped unexpectedly")
except Exception as e:
print(f"Warning: Could not start command server: {e}")
log_info_with_server(f"Command server startup error: {str(e)}")
else:
print("Warning: Flask not available - Command server disabled")
# Call the function to delete old logs
delete_old_logs()
def config():
import config
# function for posting data to the harting server
def post_backup_data():
try:
with open("./data/tag.txt", "r") as file:
lines = file.readlines()
remaining_lines = lines[:]
for line in lines:
line = line.strip()
if line:
try:
response = requests.post(line, verify=False, timeout=3)
#
response.raise_for_status() # Raise an error for bad status codes
log_info_with_server(f"Data posted successfully:")
remaining_lines.remove(line + "\n")
except requests.exceptions.Timeout:
log_info_with_server("Request timed out.")
break
except requests.exceptions.RequestException as e:
log_info_with_server(f"An error occurred: ")
break
with open("./data/tag.txt", "w") as file:
file.writelines(remaining_lines)
#log_info_with_server("Backup data updated.")
except FileNotFoundError:
log_info_with_server("No backup file found.")
# Function to check internet connection
def check_internet_connection():
hostname = APP_CONFIG.get("internet_check_host", "filesibiusb05.sibiusb.harting.intra")
cmd_block_wifi = 'sudo rfkill block wifi'
cmd_unblock_wifi = 'sudo rfkill unblock wifi'
log_info_with_server('Internet connection check loaded')
delete_old_logs()
chromium_process_name = "chromium"
while True:
try:
# Use subprocess to execute the ping command
response = subprocess.run(
["ping", "-c", "1", hostname],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
if response.returncode == 0:
log_info_with_server("Internet is up! Waiting 45 minutes.")
post_backup_data()
time.sleep(2700) # 45 minutes
else:
log_info_with_server("Internet is down. Rebooting WiFi.")
os.system(cmd_block_wifi)
time.sleep(1200) # 20 minutes
os.system(cmd_unblock_wifi)
# Refresh Chromium process
log_info_with_server("Refreshing Chromium process.")
try:
# Find and terminate Chromium processes
subprocess.run(["pkill", "-f", chromium_process_name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(5) # Wait for processes to terminate
# Relaunch Chromium
url = APP_CONFIG.get("chrome_url", "http://filesibiusb05.sibiusb.harting.intra/iweb_v2/index.php/traceability/production")
chrome_insecure = APP_CONFIG.get("chrome_insecure_origin", "http://filesibiusb05.sibiusb.harting.intra")
subprocess.Popen(
["chromium", "--test-type", "--noerrors", "--kiosk", "--start-fullscreen",
f"--unsafely-treat-insecure-origin-as-secure={chrome_insecure}", url],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, start_new_session=True
)
log_info_with_server("Chromium process restarted successfully.")
except Exception as e:
log_info_with_server(f"Failed to refresh Chromium process: {e}")
except Exception as e:
log_info_with_server(f"An error occurred during internet check: {e}")
time.sleep(60) # Retry after 1 minute in case of an error
# Start the internet connection check in a separate process (only if not in configuration mode)
if not CONFIGURATION_MODE:
internet_check_process = Process(target=check_internet_connection)
internet_check_process.start()
print("✅ Internet connectivity monitoring started")
else:
print("🔧 Configuration mode: Internet connectivity monitoring DISABLED")
# ---------------------------------------------------------------------------
# HTTP-based client auto-update helpers
# ---------------------------------------------------------------------------
def _get_local_version():
"""Return local app version as float by reading the first line of this file."""
try:
with open(os.path.abspath(__file__), 'r') as f:
first_line = f.readline()
m = re.search(r'version\s+(\d+\.?\d*)', first_line, re.IGNORECASE)
if m:
return float(m.group(1))
except Exception:
pass
return None
def _check_and_apply_update():
"""
Query the monitoring server for the latest WMT release.
If a newer version is available, download the zip, back up app.py,
extract into the WMT directory, and schedule a systemd service restart.
Returns a dict: {updated, message, error}.
"""
server_host = APP_CONFIG.get("server_host", "")
server_port = APP_CONFIG.get("server_port", "5000")
if not server_host:
return {"updated": False, "error": False, "message": "server_host not configured skipping update check"}
base_url = f"http://{server_host}:{server_port}"
local_version = _get_local_version()
try:
resp = requests.get(f"{base_url}/api/wmt/client/version", timeout=10)
if resp.status_code != 200:
return {"updated": False, "error": True, "message": f"Version endpoint returned {resp.status_code}"}
meta = resp.json()
server_version = float(meta.get("version", 0))
except Exception as e:
return {"updated": False, "error": True, "message": f"Could not reach version endpoint: {e}"}
if local_version is not None and server_version <= local_version:
return {"updated": False, "error": False, "message": f"Already on latest version {local_version}"}
log_info_with_server(f"WMT update available: local={local_version} server={server_version} downloading …")
wmt_dir = os.path.dirname(os.path.abspath(__file__))
tmp_zip = "/tmp/wmt_update.zip"
app_py = os.path.join(wmt_dir, "app.py")
try:
# Download the zip
dl = requests.get(f"{base_url}/api/wmt/client/download", timeout=60, stream=True)
if dl.status_code != 200:
return {"updated": False, "error": True, "message": f"Download endpoint returned {dl.status_code}"}
with open(tmp_zip, 'wb') as f:
for chunk in dl.iter_content(chunk_size=65536):
if chunk:
f.write(chunk)
# Validate the zip
import zipfile
if not zipfile.is_zipfile(tmp_zip):
return {"updated": False, "error": True, "message": "Downloaded file is not a valid zip"}
# Backup current app.py
bak = f"{app_py}.bak.{local_version or 'old'}"
try:
import shutil
shutil.copy2(app_py, bak)
except Exception as e:
log_info_with_server(f"Warning: could not back up app.py: {e}")
# Extract into WMT directory skip anything inside data/ to preserve device config
with zipfile.ZipFile(tmp_zip, 'r') as zf:
for member in zf.infolist():
# Normalise path separators and skip the data folder
member_path = member.filename.replace('\\', '/')
if member_path.startswith('data/') or member_path == 'data':
continue
zf.extract(member, wmt_dir)
log_info_with_server(f"WMT updated to version {server_version} scheduling service restart")
# Schedule restart via systemd (non-blocking)
subprocess.Popen(
["bash", "-c", "sleep 3 && sudo systemctl restart wmt"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL, start_new_session=True
)
return {"updated": True, "error": False,
"message": f"Updated from {local_version} to {server_version}. Restarting service …"}
except Exception as e:
log_info_with_server(f"WMT auto-update failed: {e}")
return {"updated": False, "error": True, "message": str(e)}
finally:
try:
os.remove(tmp_zip)
except OSError:
pass
# ---------------------------------------------------------------------------
# Periodic server config sync (background thread)
# ---------------------------------------------------------------------------
def _periodic_config_sync():
"""
Background thread: re-checks the server for config updates every 5 minutes.
When a new config is pulled, Chromium is restarted with the updated URLs.
"""
sync_interval = 300 # seconds (5 minutes)
while True:
time.sleep(sync_interval)
try:
config_changed = sync_config_with_server()
if config_changed and not CONFIGURATION_MODE:
print("🔄 Config changed restarting Chromium with updated settings.")
try:
subprocess.run(["pkill", "-f", "chromium"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(5)
_url = APP_CONFIG.get("chrome_url", "")
_insecure = APP_CONFIG.get("chrome_insecure_origin", "")
subprocess.Popen(
["chromium", "--test-type", "--noerrors", "--kiosk", "--start-fullscreen",
f"--unsafely-treat-insecure-origin-as-secure={_insecure}", _url],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL, start_new_session=True
)
print("✅ Chromium restarted with new config.")
except Exception as e:
print(f"Warning: Could not restart Chromium after config update: {e}")
except Exception as e:
print(f"Periodic config sync error: {e}")
# Check for WMT client update every cycle (non-fatal)
try:
_check_and_apply_update()
except Exception as e:
print(f"Periodic update check error: {e}")
if not CONFIGURATION_MODE:
_sync_thread = threading.Thread(target=_periodic_config_sync, daemon=True, name="config-sync")
_sync_thread.start()
print("✅ Periodic server config sync started (every 5 min)")
# Startup version check (non-blocking)
_update_thread = threading.Thread(target=_check_and_apply_update, daemon=True, name="startup-update-check")
_update_thread.start()
else:
print("🔧 Configuration mode: Periodic config sync DISABLED")
# Launch Chromium with the specified URLs (only if not in configuration mode)
if not CONFIGURATION_MODE:
url = APP_CONFIG.get("chrome_url", "http://filesibiusb05.sibiusb.harting.intra/iweb_v2/index.php/traceability/production")
chrome_insecure = APP_CONFIG.get("chrome_insecure_origin", "http://filesibiusb05.sibiusb.harting.intra")
subprocess.Popen(["chromium", "--test-type", "--noerrors", "--kiosk", "--start-fullscreen", f"--unsafely-treat-insecure-origin-as-secure={chrome_insecure}", url], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, start_new_session=True)
print("✅ Main application browser launched")
else:
print("🔧 Configuration mode: Main application browser launch DISABLED")
info = "0"
#function to post info
def post_info(info):
#log_info_with_server("Starting to post data...")
info1 = info.strip() # Remove any leading/trailing whitespace, including newlines
try:
response = requests.post(info1, verify=False, timeout=3)
response.raise_for_status() # Raise an error for bad status codes
#log_info_with_server("Data posted successfully")
except requests.exceptions.Timeout:
with open("./data/tag.txt", "a") as file: # Open in append mode
file.write(info)
log_info_with_server(f"Value was saved to tag.txt")
except requests.exceptions.RequestException as e:
with open("./data/tag.txt", "a") as file: # Open in append mode
file.write(info)
log_info_with_server("Value was saved to tag.txt")
async def post_info_async(info):
try:
# Try to use aiohttp if available
if 'aiohttp' in globals():
async with aiohttp.ClientSession() as session:
try:
async with session.post(info, ssl=False, timeout=3) as response:
response_text = await response.text()
log_info_with_server(f"Data posted successfully")
except asyncio.TimeoutError:
with open("./data/tag.txt", "a") as file:
file.write(info)
except Exception as e:
with open("./data/tag.txt", "a") as file:
file.write(info)
else:
# Fallback to synchronous requests if aiohttp not available
try:
response = requests.post(info.strip(), verify=False, timeout=3)
response.raise_for_status()
log_info_with_server(f"Data posted successfully (fallback)")
except requests.exceptions.Timeout:
with open("./data/tag.txt", "a") as file:
file.write(info)
except requests.exceptions.RequestException as e:
with open("./data/tag.txt", "a") as file:
file.write(info)
except Exception as e:
# Final fallback - save to file
with open("./data/tag.txt", "a") as file:
file.write(info)
def post_info_thread(info):
try:
thread = threading.Thread(target=asyncio.run, args=(post_info_async(info),), daemon=True)
thread.start()
except Exception as e:
# Fallback to synchronous posting if async fails
print(f"Async posting failed, using sync fallback: {e}")
post_info(info)
# Initialize LEDs only when card reader is active
class DummyLED:
def __init__(self, pin):
self.pin = pin
def on(self):
pass
def off(self):
pass
if CARD_PRESENCE_ENABLED:
try:
print("Initializing LED controls...")
led1 = OutputDevice(23)
led2 = OutputDevice(24)
print("✓ LED controls initialized successfully")
log_info_with_server("LED controls initialized")
except Exception as e:
print(f"Warning: Could not initialize LED controls: {e}")
print("Creating dummy LED objects - visual feedback will be disabled")
led1 = DummyLED(23)
led2 = DummyLED(24)
log_info_with_server("LED controls using dummy mode")
else:
print("🚫 Card presence disabled LED controls skipped")
led1 = DummyLED(23)
led2 = DummyLED(24)
# Initialize table name/ID
print("Initializing device configuration...")
name = APP_CONFIG.get("work_place", "noconfig")
logging.info("LED controls initialized")
logging.info("Variabila Id Masa A fost initializata ")
print(f"✓ Device name set from config: {name}")
if not CONFIGURATION_MODE:
log_info_with_server(f"Device name initialized: {name}")
logging.info(name)
#clasa reader
class Reader(rdm6300.BaseReader):
global info
def card_inserted(self, card):
if card.value == 12886709:
config()
return
afisare = time.strftime("%Y-%m-%d&%H:%M:%S")
_base = APP_CONFIG.get("card_post_base_url", "https://dataswsibiusb01.sibiusb.harting.intra/RO_Quality_PRD/api/record")
date = f'{_base}/{name}/{card.value}/1/{afisare}\n'
info = date
if name == "noconfig":
led1.on()
time.sleep(5)
led1.off()
log_info_with_server(f"card inserted {card} but no")
else:
post_info_thread(info)
led1.on()
log_info_with_server(f"card inserted {card}")
def card_removed(self, card):
if card.value == 12886709:
log_info_with_server("Removing Config card")
return
afisare = time.strftime("%Y-%m-%d&%H:%M:%S")
_base = APP_CONFIG.get("card_post_base_url", "https://dataswsibiusb01.sibiusb.harting.intra/RO_Quality_PRD/api/record")
date = f'{_base}/{name}/{card.value}/0/{afisare}\n'
info = date
if name == "noconfig":
led1.off()
log_info_with_server(f"card removed {card}")
else:
post_info_thread(info)
led1.off()
log_info_with_server(f"card removed {card}")
# Initialize RFID Reader with comprehensive error handling
def initialize_rfid_reader():
"""
Initialize RFID reader with multiple device attempts and error handling
"""
print("Initializing RFID reader...")
# List of possible serial devices in order of preference
serial_devices = ['/dev/ttyS0', '/dev/ttyAMA0', '/dev/ttyUSB0', '/dev/ttyACM0']
for device in serial_devices:
try:
print(f"Attempting to initialize RFID reader on {device}...")
r = Reader(device)
r.start()
print(f"✓ RFID reader successfully initialized on {device}")
log_info_with_server(f"RFID reader started on {device}")
return r
except FileNotFoundError:
print(f"✗ Device {device} not found")
continue
except PermissionError:
print(f"✗ Permission denied for {device}")
print(f" Hint: Try adding user to dialout group: sudo usermod -a -G dialout $USER")
continue
except Exception as e:
print(f"✗ Failed to initialize on {device}: {e}")
continue
# If we get here, all devices failed
print("✗ Could not initialize RFID reader on any device")
print("Available solutions:")
print(" 1. Check hardware connections")
print(" 2. Enable UART: sudo raspi-config -> Interface Options -> Serial")
print(" 3. Add user to dialout group: sudo usermod -a -G dialout $USER")
print(" 4. Reboot the system after making changes")
log_info_with_server("ERROR: RFID reader initialization failed")
return None
# Start RFID reader only if card_presence is enabled
if CARD_PRESENCE_ENABLED:
try:
rfid_reader = initialize_rfid_reader()
if rfid_reader is None:
print("WARNING: Application starting without RFID functionality")
print("Card reading will not work until RFID reader is properly configured")
except Exception as e:
print(f"Critical error initializing RFID reader: {e}")
log_info_with_server(f"Critical RFID error: {str(e)}")
print("Application will start but RFID functionality will be disabled")
else:
rfid_reader = None
print("🚫 Card presence disabled RFID reader NOT started")
log_info_with_server("Card presence is disabled RFID reader skipped, running without card reader")