Files
prezenta_work/app.py
RPI User e0f7ec34f2 Implement dual logging workflow: noconfig vs configured mode
NOCONFIG MODE (device not configured):
- Sends 'App initialized' message on startup
- Sends 'RFID initialized' message after RFID ready
- Sends 'App working' message on startup
- Sends periodic 'App working' health status every 5 minutes
- Card inserted/removed events sent directly to monitoring server

CONFIGURED MODE (table ID set):
- Sends 'App initialized' and startup messages
- Card events POST to Harting API first
- Only sends to monitoring server on successful API post
- Failed posts saved to tag.txt backup file
- Backup data sent with summary when internet restored

Changes:
- Added send_health_status() function for periodic monitoring
- Updated process_card_events() to handle both workflows
- Updated main() to send startup messages and start health monitor
- Improved monitoring server URL to use rpi-ansible hostname
- Added retry logic for monitoring server requests
2025-12-19 15:08:02 +02:00

756 lines
29 KiB
Python

#!/usr/bin/env python3
"""
RFID Card Reader - Simplified Version 3.0
Minimal dependencies, focused on core functionality:
1. Read RFID cards
2. Send card data to monitoring server and external API
3. WiFi recovery with periodic checks
4. Offline tag backup to tag.txt
"""
import os
import sys
import time
import logging
import threading
import subprocess
import socket
from datetime import datetime, timedelta
import requests
import rdm6300
# ============================================================================
# CONFIGURATION
# ============================================================================
# Server URLs
MONITORING_SERVER = "http://rpi-ansible:80/logs"
HARTING_API_BASE = "https://dataswsibiusb01.sibiusb.harting.intra/RO_Quality_PRD/api/record"
WIFI_CHECK_HOST = "10.76.140.17"
# Timings (in seconds)
WIFI_CHECK_INTERVAL = 2400 # 40 minutes
WIFI_RECOVERY_WAIT = 1200 # 20 minutes
BATCH_LOG_SIZE = 10
BATCH_LOG_TIMEOUT = 5
# Paths
DATA_DIR = "./data"
IDMASA_FILE = os.path.join(DATA_DIR, "idmasa.txt")
LOG_FILE = os.path.join(DATA_DIR, "log.txt")
TAG_FILE = os.path.join(DATA_DIR, "tag.txt")
DEVICE_INFO_FILE = os.path.join(DATA_DIR, "device_info.txt")
# GPIO for LED
LED_PIN = 23
# ============================================================================
# SETUP & INITIALIZATION
# ============================================================================
def setup_directories():
"""Create required data directories"""
os.makedirs(DATA_DIR, exist_ok=True)
# Create default files if missing
if not os.path.exists(IDMASA_FILE):
with open(IDMASA_FILE, "w") as f:
f.write("noconfig")
for file_path in [LOG_FILE, TAG_FILE]:
if not os.path.exists(file_path):
open(file_path, 'a').close()
def get_device_info():
"""Get device hostname and IP from device_info.txt, with socket fallback"""
# Try to read from device_info.txt first (source of truth)
if os.path.exists(DEVICE_INFO_FILE):
try:
with open(DEVICE_INFO_FILE, "r") as f:
lines = f.read().strip().split('\n')
if len(lines) >= 2:
hostname = lines[0].strip()
device_ip = lines[1].strip()
if hostname and device_ip:
print(f"✓ Device info loaded from file: {hostname} ({device_ip})")
return hostname, device_ip
except Exception as e:
print(f"Warning: Could not read device_info.txt: {e}")
# Fallback to socket resolution
try:
hostname = socket.gethostname()
device_ip = socket.gethostbyname(hostname)
print(f"✓ Device info resolved via socket: {hostname} ({device_ip})")
# Save to file for future use
try:
os.makedirs(DATA_DIR, exist_ok=True)
with open(DEVICE_INFO_FILE, "w") as f:
f.write(f"{hostname}\n{device_ip}\n")
except:
pass
return hostname, device_ip
except Exception as e:
print(f"Warning: Could not resolve device info via socket: {e}")
return "unknown-device", "127.0.0.1"
def setup_logging(hostname, device_ip):
"""Configure logging"""
os.makedirs(DATA_DIR, exist_ok=True)
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Log startup
logging.info(f"Application started on {hostname} ({device_ip})")
print(f"✓ Logging configured: {LOG_FILE}")
def setup_led():
"""Initialize LED with fallback for systems without GPIO"""
try:
from gpiozero import OutputDevice
led = OutputDevice(LED_PIN)
print(f"✓ LED initialized on GPIO {LED_PIN}")
return led
except Exception as e:
print(f"⚠ LED initialization failed: {e}")
# Return dummy LED
class DummyLED:
def on(self):
print(f"[LED {LED_PIN} ON]")
def off(self):
print(f"[LED {LED_PIN} OFF]")
return DummyLED()
# ============================================================================
# SERVER COMMUNICATION
# ============================================================================
def read_idmasa():
"""Read device ID from idmasa.txt"""
try:
with open(IDMASA_FILE, "r") as f:
return f.read().strip() or "noconfig"
except:
return "noconfig"
def send_log_to_server(message, hostname, device_ip, name):
"""Send log message to monitoring server with retry logic"""
try:
log_data = {
"hostname": hostname,
"device_ip": device_ip,
"nume_masa": name,
"log_message": message
}
logging.debug(f"[LOG] Sending to {MONITORING_SERVER}: {log_data}")
print(f"[LOG] Sending: {log_data}")
# Try up to 3 times with small delay
for attempt in range(3):
try:
response = requests.post(MONITORING_SERVER, json=log_data, timeout=5)
response.raise_for_status()
logging.info(f"✓ Log sent to server: {message}")
print(f"✓ Log sent successfully")
return True
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
if attempt < 2:
time.sleep(1) # Wait 1 second before retry
continue
raise
return False
except requests.exceptions.Timeout:
logging.warning(f"✗ Log request timeout after retries: {message}")
print(f"✗ Timeout sending log")
return False
except requests.exceptions.ConnectionError as e:
logging.warning(f"✗ Connection error sending log: {e}")
print(f"✗ Connection error: {e}")
return False
except Exception as e:
logging.warning(f"✗ Failed to send log to server: {e}")
print(f"✗ Failed to send log: {e}")
return False
def post_to_harting(url, verify=False, timeout=3):
"""POST data to Harting API"""
try:
response = requests.post(url, verify=verify, timeout=timeout)
response.raise_for_status()
return True
except requests.exceptions.Timeout:
return False
except requests.exceptions.RequestException:
return False
def post_backup_data(hostname=None, device_ip=None, name=None):
"""Send queued card data from tag.txt to Harting API and log summary to monitoring server"""
if not os.path.exists(TAG_FILE):
return
try:
with open(TAG_FILE, "r") as f:
lines = f.readlines()
if not lines:
return
remaining = []
posted_count = 0
for line in lines:
line = line.strip()
if not line:
continue
# Try to post the URL
if post_to_harting(line):
logging.info(f"✓ Posted backed-up data: {line}")
posted_count += 1
continue # Success, don't keep it
else:
remaining.append(line) # Failed, keep for retry
# Write remaining failed posts back to file
with open(TAG_FILE, "w") as f:
for line in remaining:
f.write(line + "\n")
# Send summary to monitoring server only if backups were posted
if posted_count > 0 and hostname and device_ip and name:
summary_msg = f"Backup card data pushed to server ({posted_count} records)"
send_log_to_server(summary_msg, hostname, device_ip, name)
logging.info(f"{summary_msg}")
except Exception as e:
logging.error(f"Error posting backup data: {e}")
def load_config():
"""Launch configuration UI when config card is inserted (card ID: 12886709)"""
try:
config_path = "./config.py"
if os.path.exists(config_path):
logging.info("🔧 Launching configuration interface...")
print("🔧 Launching configuration interface...")
# Launch config.py as a subprocess
subprocess.Popen([sys.executable, config_path],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
logging.info("✓ Configuration UI launched successfully")
print("✓ Configuration UI launched successfully")
return True
else:
logging.error("Config file not found at ./config.py")
print("✗ Config file not found at ./config.py")
return False
except Exception as e:
logging.error(f"Error launching configuration: {e}")
print(f"✗ Error launching configuration: {e}")
return False
# ============================================================================
# RFID READER - Background Thread
# ============================================================================
# Thread-safe shared state between RFID thread and main thread
class CardState:
"""Shared state for card events between threads"""
def __init__(self):
self.lock = threading.Lock()
self.card_inserted_flag = False
self.card_removed_flag = False
self.current_card_id = None
self.timestamp = None
def set_inserted(self, card_id):
with self.lock:
self.current_card_id = card_id
self.timestamp = datetime.now().strftime("%Y-%m-%d&%H:%M:%S")
self.card_inserted_flag = True
def set_removed(self, card_id):
with self.lock:
self.current_card_id = card_id
self.timestamp = datetime.now().strftime("%Y-%m-%d&%H:%M:%S")
self.card_removed_flag = True
def get_inserted(self):
with self.lock:
if self.card_inserted_flag:
card_id = self.current_card_id
timestamp = self.timestamp
self.card_inserted_flag = False
return card_id, timestamp
return None, None
def get_removed(self):
with self.lock:
if self.card_removed_flag:
card_id = self.current_card_id
timestamp = self.timestamp
self.card_removed_flag = False
return card_id, timestamp
return None, None
# Global card state
card_state = CardState()
class RFIDReader(rdm6300.BaseReader):
"""RFID reader that runs in background and sets flags for main thread"""
def __init__(self, device, hostname, device_ip):
super().__init__(device)
self.hostname = hostname
self.device_ip = device_ip
self.name = read_idmasa()
self.led = led # Use global LED
def card_inserted(self, card):
"""Detect card insertion - just set flag"""
card_id = card.value
# Special card: device config card
if card_id == 12886709:
logging.info(f"🔴 CONFIG CARD {card_id} detected - Loading configuration")
print(f"🔴 CONFIG CARD {card_id} detected - Loading configuration")
# Call config loading function
load_config()
return
# IMMEDIATE LED feedback (BEFORE flag, for instant response)
try:
self.led.on()
except:
pass
# Set flag for main thread to handle
logging.info(f"🔴 CARD INSERTED DETECTED - ID: {card_id}")
print(f"🔴 CARD INSERTED - ID: {card_id}")
card_state.set_inserted(card_id)
def card_removed(self, card):
"""Detect card removal - just set flag"""
card_id = card.value
# Special card: device config card (ignore)
if card_id == 12886709:
logging.info(f"⚪ CONFIG CARD {card_id} detected (ignored)")
print(f"⚪ CONFIG CARD {card_id} detected (ignored)")
return
# IMMEDIATE LED feedback (BEFORE flag, for instant response)
try:
self.led.off()
except:
pass
# Set flag for main thread to handle
logging.info(f"⚪ CARD REMOVED DETECTED - ID: {card_id}")
print(f"⚪ CARD REMOVED - ID: {card_id}")
card_state.set_removed(card_id)
def process_card_events(hostname, device_ip):
"""Main thread checks card flags and processes them"""
while True:
try:
# Re-read name each time to catch configuration changes
name = read_idmasa()
# Check for inserted cards
card_id, timestamp = card_state.get_inserted()
if card_id is not None:
logging.info(f"[Main] Processing CARD INSERTED: {card_id}")
if name == "noconfig":
# NOCONFIG MODE: Send card event directly to monitoring server
message = f"Card {card_id} inserted"
send_log_to_server(message, hostname, device_ip, name)
logging.info(f"✓ Card insert logged to monitoring: {message}")
else:
# CONFIGURED MODE: Try to post to Harting API first
# Build API URL (1 = ON/inserted)
url = f"{HARTING_API_BASE}/{name}/{card_id}/1/{timestamp}"
# Try to post
if post_to_harting(url):
logging.info(f"✓ Card {card_id} inserted - Posted to API")
# Send to monitoring server ONLY on successful API post
send_log_to_server(f"Card {card_id} inserted", hostname, device_ip, name)
else:
logging.warning(f"✗ Offline: Saving card {card_id} to backup")
try:
with open(TAG_FILE, "a") as f:
f.write(url + "\n")
except Exception as e:
logging.error(f"Failed to save backup: {e}")
# Check for removed cards
card_id, timestamp = card_state.get_removed()
if card_id is not None:
logging.info(f"[Main] Processing CARD REMOVED: {card_id}")
if name == "noconfig":
# NOCONFIG MODE: Send card event directly to monitoring server
message = f"Card {card_id} removed"
send_log_to_server(message, hostname, device_ip, name)
logging.info(f"✓ Card remove logged to monitoring: {message}")
else:
# CONFIGURED MODE: Try to post to Harting API first
# Build API URL (0 = OFF/removed)
url = f"{HARTING_API_BASE}/{name}/{card_id}/0/{timestamp}"
# Try to post
if post_to_harting(url):
logging.info(f"✓ Card {card_id} removed - Posted to API")
# Send to monitoring server ONLY on successful API post
send_log_to_server(f"Card {card_id} removed", hostname, device_ip, name)
else:
logging.warning(f"✗ Offline: Saving card {card_id} to backup")
try:
with open(TAG_FILE, "a") as f:
f.write(url + "\n")
except Exception as e:
logging.error(f"Failed to save backup: {e}")
# Very small sleep for fast response (10ms = check 100x per second)
time.sleep(0.01)
except Exception as e:
logging.error(f"Error processing card events: {e}")
time.sleep(0.1)
def initialize_rfid_reader(hostname, device_ip):
"""Initialize RFID reader on available serial device with proper threading"""
serial_devices = ['/dev/ttyS0', '/dev/ttyAMA0', '/dev/ttyUSB0', '/dev/ttyACM0']
for device in serial_devices:
try:
print(f"Trying RFID on {device}...")
logging.info(f"Attempting to initialize RFID reader on {device}...")
# Create reader instance
reader = RFIDReader(device, hostname, device_ip)
# Start reader in a non-daemon thread so it keeps listening
reader_started = threading.Event()
reader_error = [None]
def start_reader_thread():
try:
logging.info(f"[Reader Thread] Starting RFID listener on {device}")
print(f"[Reader Thread] Starting RFID listener on {device}")
reader.start() # This blocks, listening for cards
reader_started.set()
except Exception as e:
reader_error[0] = e
logging.error(f"[Reader Thread] Error starting reader: {e}")
reader_started.set()
# Start reader in non-daemon thread (stays alive even if main thread exits)
reader_thread = threading.Thread(
target=start_reader_thread,
daemon=False, # IMPORTANT: non-daemon so it keeps running
name="RFIDReaderThread"
)
reader_thread.start()
logging.info(f"[Main] RFID reader thread started (daemon={reader_thread.daemon})")
# Wait up to 2 seconds for reader to start or error
if not reader_started.wait(timeout=2):
# Still waiting - reader is listening (normal behavior)
logging.info(f"✓ Reader listening on {device} (waiting for cards...)")
print(f"✓ Reader listening on {device}")
return reader
elif reader_error[0]:
# Error occurred during startup
logging.error(f"Reader startup error: {reader_error[0]}")
continue
else:
# Started successfully
logging.info(f"✓ RFID reader initialized on {device}")
print(f"✓ RFID reader initialized on {device}")
return reader
except FileNotFoundError as e:
logging.warning(f"✗ Device {device} not found: {e}")
print(f"✗ Device {device} not found")
continue
except PermissionError as e:
logging.warning(f"✗ Permission denied on {device}: {e}")
print(f"⚠ Permission denied on {device} - try: sudo usermod -a -G dialout $USER")
continue
except Exception as e:
logging.error(f"✗ Failed to initialize on {device}: {e}")
print(f"✗ Error on {device}: {e}")
continue
print("✗ Could not initialize RFID reader on any device")
logging.error("RFID reader initialization failed on all devices")
return None
# ============================================================================
# WiFi RECOVERY & CONNECTIVITY
# ============================================================================
def check_internet_connection(hostname, device_ip):
"""Monitor internet connection and recover WiFi if needed"""
name = read_idmasa()
logging.info("WiFi monitor started")
print("✓ WiFi monitor started")
while True:
try:
# Check connection to monitoring server
response = subprocess.run(
["ping", "-c", "1", WIFI_CHECK_HOST],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=5
)
if response.returncode == 0:
# Connection OK
logging.info("✓ Connection OK - checking for backed-up data")
print("✓ Connection OK")
# Try to send any backed-up data
post_backup_data(hostname, device_ip, name)
# Wait 40 minutes before next check
time.sleep(WIFI_CHECK_INTERVAL)
else:
# Connection lost
logging.warning("✗ Connection lost - disabling WiFi for recovery")
send_log_to_server("WiFi connection lost - initiating recovery", hostname, device_ip, name)
print("✗ Connection lost - WiFi recovery")
# Disable WiFi
os.system("sudo rfkill block wifi")
time.sleep(5)
# Wait 20 minutes
logging.info(f"WiFi disabled, waiting {WIFI_RECOVERY_WAIT}s for recovery...")
time.sleep(WIFI_RECOVERY_WAIT)
# Re-enable WiFi
os.system("sudo rfkill unblock wifi")
logging.info("WiFi re-enabled")
send_log_to_server("WiFi re-enabled", hostname, device_ip, name)
print("✓ WiFi re-enabled")
time.sleep(5) # Wait for WiFi to reconnect
except Exception as e:
logging.error(f"WiFi monitor error: {e}")
time.sleep(60)
# ============================================================================
# LOG CLEANUP
# ============================================================================
def cleanup_old_logs(hostname, device_ip, name):
"""Delete log file if older than 15 days"""
try:
if os.path.exists(LOG_FILE):
file_mod_time = datetime.fromtimestamp(os.path.getmtime(LOG_FILE))
if datetime.now() - file_mod_time > timedelta(days=15):
os.remove(LOG_FILE)
logging.info("Old log file deleted (>15 days)")
send_log_to_server("Log file deleted (older than 15 days)", hostname, device_ip, name)
else:
logging.info("Log file is recent, keeping it")
except Exception as e:
logging.error(f"Error cleaning up logs: {e}")
# ============================================================================
# CHROMIUM LAUNCHER
# ============================================================================
def launch_chromium(name):
"""
Launch Chromium with either production URL or local fallback page
"""
try:
# Debug logging
print(f"DEBUG: launch_chromium called with name='{name}'")
print(f"DEBUG: name type={type(name)}, len={len(name)}")
print(f"DEBUG: name != 'noconfig' = {name != 'noconfig'}")
logging.debug(f"DEBUG: launch_chromium called with name='{name}' (type={type(name).__name__}, len={len(name)})")
# Determine which URL to use based on device configuration
if name != "noconfig":
# Device is configured - use production URL
url = "http://10.76.140.17/iweb_v2/index.php/traceability/production"
print(f"Device configured as '{name}'. Launching production URL: {url}")
logging.info(f"Launching Chromium with production URL for device: {name}")
# Launch Chromium with production URL
print("Starting Chromium with production URL...")
subprocess.Popen(
["chromium", "--test-type", "--noerrors", "--kiosk", "--start-fullscreen",
"--unsafely-treat-insecure-origin-as-secure=http://10.76.140.17", url],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL,
start_new_session=True
)
else:
# Device not configured - use local HTML file
print(f"Device NOT configured (name='noconfig'). Using local HTML.")
local_html_dir = os.path.join(DATA_DIR, "html")
local_html_file = os.path.join(local_html_dir, "Screen.html")
# Create directory if it doesn't exist
os.makedirs(local_html_dir, exist_ok=True)
# Check if local HTML file exists
if not os.path.exists(local_html_file):
print(f"Warning: Local HTML file not found at {local_html_file}")
print(f"Please create the file or copy it from the data/html directory")
logging.warning(f"Local HTML file not found: {local_html_file}")
# Convert to file:// URL for local file
abs_path = os.path.abspath(local_html_file)
url = f"file://{abs_path}"
print(f"Device not configured. Launching local fallback page: {url}")
logging.info(f"Loading Screen.html from: {abs_path}")
# Launch Chromium with local file - different flags for file:// URLs
print("Starting Chromium with local HTML file...")
subprocess.Popen(
["chromium", "--test-type", "--noerrors", "--kiosk", "--start-fullscreen",
"--no-first-run", "--no-default-browser-check", url],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL,
start_new_session=True
)
print("✓ Chromium launched successfully")
logging.info("Chromium launched successfully")
except Exception as e:
print(f"✗ Failed to launch Chromium: {e}")
logging.error(f"Failed to launch Chromium: {e}")
def send_health_status(hostname, device_ip):
"""Send periodic health status messages when device is not configured"""
logging.info("Health status monitor started")
while True:
try:
name = read_idmasa()
# Only send health status when device is NOT configured
if name == "noconfig":
message = f"App working - RFID monitoring active"
send_log_to_server(message, hostname, device_ip, name)
logging.info(f"✓ Health check sent: {message}")
# Send every 5 minutes
time.sleep(300) # 5 minutes
except Exception as e:
logging.error(f"Error in health status monitor: {e}")
time.sleep(30)
# ============================================================================
# MAIN APPLICATION
# ============================================================================
def main():
"""Main application entry point"""
global led
print("\n" + "="*60)
print("RFID CARD READER - Simplified v3.0")
print("="*60 + "\n")
# Setup
setup_directories()
hostname, device_ip = get_device_info()
setup_logging(hostname, device_ip)
led = setup_led()
# Clean up old logs (older than 15 days)
cleanup_old_logs(hostname, device_ip, read_idmasa())
print(f"Device: {hostname} ({device_ip})")
print(f"Name ID: {read_idmasa()}")
print(f"Monitoring: {MONITORING_SERVER}")
print(f"API: {HARTING_API_BASE}")
print()
# Send startup message to monitoring server
name = read_idmasa()
send_log_to_server("App initialized", hostname, device_ip, name)
# Thread 1: Start RFID reader (background, listening for cards)
rfid_reader = initialize_rfid_reader(hostname, device_ip)
if not rfid_reader:
print("✗ RFID reader failed - application cannot continue")
logging.error("RFID reader initialization failed - exiting")
sys.exit(1)
print("✓ RFID reader started in background")
send_log_to_server("RFID initialized", hostname, device_ip, name)
# Thread 2: Start card event processor (main thread checks flags and processes)
card_processor_thread = threading.Thread(
target=process_card_events,
args=(hostname, device_ip),
daemon=False,
name="CardProcessor"
)
card_processor_thread.start()
print("✓ Card event processor started")
# Thread 3: Start WiFi monitor (background health check + recovery)
wifi_thread = threading.Thread(
target=check_internet_connection,
args=(hostname, device_ip),
daemon=False,
name="WiFiMonitor"
)
wifi_thread.start()
print("✓ WiFi monitor started")
# Thread 4: Start health status monitor (sends periodic status when noconfig)
health_thread = threading.Thread(
target=send_health_status,
args=(hostname, device_ip),
daemon=False,
name="HealthMonitor"
)
health_thread.start()
print("✓ Health status monitor started")
# Launch Chromium browser
print("\nInitializing Chromium launcher...")
time.sleep(2) # Give system time to stabilize
launch_chromium(read_idmasa())
print()
logging.info("RFID Client operational")
send_log_to_server("App working", hostname, device_ip, name)
print("✓ RFID Client operational - waiting for cards...")
print()
# Keep main thread alive
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n✓ Shutting down...")
logging.info("Application shutdown")
sys.exit(0)
if __name__ == "__main__":
main()