Files
prezenta_work/app_v3_simplified.py
RPI User c3a55a89c3 Add log cleanup function (15-day deletion) and archive documentation
- Added cleanup_old_logs() function to app_v3_simplified.py
- Deletes log.txt if older than 15 days at app startup
- Sends notification to monitoring server when cleanup occurs
- Archived all legacy modules and documentation to oldcode/
- Updated device_info.txt with correct IP (192.168.1.104)
- All changes validated and tested
2025-12-18 17:18:14 +02:00

570 lines
20 KiB
Python

#!/usr/bin/env python3
"""
RFID Card Reader - Simplified Version 3.0
Minimal dependencies, focused on core functionality:
1. Read RFID cards
2. Send card data to monitoring server and external API
3. WiFi recovery with periodic checks
4. Offline tag backup to tag.txt
"""
import os
import sys
import time
import logging
import threading
import subprocess
import socket
from datetime import datetime, timedelta
import requests
import rdm6300
# ============================================================================
# CONFIGURATION
# ============================================================================
# Server URLs
MONITORING_SERVER = "http://192.168.1.103:80/logs"
HARTING_API_BASE = "https://dataswsibiusb01.sibiusb.harting.intra/RO_Quality_PRD/api/record"
WIFI_CHECK_HOST = "10.76.140.17"
# Timings (in seconds)
WIFI_CHECK_INTERVAL = 2400 # 40 minutes
WIFI_RECOVERY_WAIT = 1200 # 20 minutes
BATCH_LOG_SIZE = 10
BATCH_LOG_TIMEOUT = 5
# Paths
DATA_DIR = "./data"
IDMASA_FILE = os.path.join(DATA_DIR, "idmasa.txt")
LOG_FILE = os.path.join(DATA_DIR, "log.txt")
TAG_FILE = os.path.join(DATA_DIR, "tag.txt")
DEVICE_INFO_FILE = os.path.join(DATA_DIR, "device_info.txt")
# GPIO for LED
LED_PIN = 23
# ============================================================================
# SETUP & INITIALIZATION
# ============================================================================
def setup_directories():
"""Create required data directories"""
os.makedirs(DATA_DIR, exist_ok=True)
# Create default files if missing
if not os.path.exists(IDMASA_FILE):
with open(IDMASA_FILE, "w") as f:
f.write("noconfig")
for file_path in [LOG_FILE, TAG_FILE]:
if not os.path.exists(file_path):
open(file_path, 'a').close()
def get_device_info():
"""Get device hostname and IP from device_info.txt, with socket fallback"""
# Try to read from device_info.txt first (source of truth)
if os.path.exists(DEVICE_INFO_FILE):
try:
with open(DEVICE_INFO_FILE, "r") as f:
lines = f.read().strip().split('\n')
if len(lines) >= 2:
hostname = lines[0].strip()
device_ip = lines[1].strip()
if hostname and device_ip:
print(f"✓ Device info loaded from file: {hostname} ({device_ip})")
return hostname, device_ip
except Exception as e:
print(f"Warning: Could not read device_info.txt: {e}")
# Fallback to socket resolution
try:
hostname = socket.gethostname()
device_ip = socket.gethostbyname(hostname)
print(f"✓ Device info resolved via socket: {hostname} ({device_ip})")
# Save to file for future use
try:
os.makedirs(DATA_DIR, exist_ok=True)
with open(DEVICE_INFO_FILE, "w") as f:
f.write(f"{hostname}\n{device_ip}\n")
except:
pass
return hostname, device_ip
except Exception as e:
print(f"Warning: Could not resolve device info via socket: {e}")
return "unknown-device", "127.0.0.1"
def setup_logging(hostname, device_ip):
"""Configure logging"""
os.makedirs(DATA_DIR, exist_ok=True)
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Log startup
logging.info(f"Application started on {hostname} ({device_ip})")
print(f"✓ Logging configured: {LOG_FILE}")
def setup_led():
"""Initialize LED with fallback for systems without GPIO"""
try:
from gpiozero import OutputDevice
led = OutputDevice(LED_PIN)
print(f"✓ LED initialized on GPIO {LED_PIN}")
return led
except Exception as e:
print(f"⚠ LED initialization failed: {e}")
# Return dummy LED
class DummyLED:
def on(self):
print(f"[LED {LED_PIN} ON]")
def off(self):
print(f"[LED {LED_PIN} OFF]")
return DummyLED()
# ============================================================================
# SERVER COMMUNICATION
# ============================================================================
def read_idmasa():
"""Read device ID from idmasa.txt"""
try:
with open(IDMASA_FILE, "r") as f:
return f.read().strip() or "noconfig"
except:
return "noconfig"
def send_log_to_server(message, hostname, device_ip, name):
"""Send log message to monitoring server"""
try:
log_data = {
"hostname": hostname,
"device_ip": device_ip,
"nume_masa": name,
"log_message": message
}
response = requests.post(MONITORING_SERVER, json=log_data, timeout=5)
response.raise_for_status()
return True
except Exception as e:
logging.warning(f"Failed to send log to server: {e}")
return False
def post_to_harting(url, verify=False, timeout=3):
"""POST data to Harting API"""
try:
response = requests.post(url, verify=verify, timeout=timeout)
response.raise_for_status()
return True
except requests.exceptions.Timeout:
return False
except requests.exceptions.RequestException:
return False
def post_backup_data():
"""Send queued card data from tag.txt to Harting API"""
if not os.path.exists(TAG_FILE):
return
try:
with open(TAG_FILE, "r") as f:
lines = f.readlines()
remaining = []
for line in lines:
line = line.strip()
if not line:
continue
# Try to post the URL
if post_to_harting(line):
logging.info(f"Posted backed-up data: {line}")
continue # Success, don't keep it
else:
remaining.append(line) # Failed, keep for retry
# Write remaining failed posts back to file
with open(TAG_FILE, "w") as f:
for line in remaining:
f.write(line + "\n")
except Exception as e:
logging.error(f"Error posting backup data: {e}")
# ============================================================================
# RFID READER - Background Thread
# ============================================================================
# Thread-safe shared state between RFID thread and main thread
class CardState:
"""Shared state for card events between threads"""
def __init__(self):
self.lock = threading.Lock()
self.card_inserted_flag = False
self.card_removed_flag = False
self.current_card_id = None
self.timestamp = None
def set_inserted(self, card_id):
with self.lock:
self.current_card_id = card_id
self.timestamp = datetime.now().strftime("%Y-%m-%d&%H:%M:%S")
self.card_inserted_flag = True
def set_removed(self, card_id):
with self.lock:
self.current_card_id = card_id
self.timestamp = datetime.now().strftime("%Y-%m-%d&%H:%M:%S")
self.card_removed_flag = True
def get_inserted(self):
with self.lock:
if self.card_inserted_flag:
card_id = self.current_card_id
timestamp = self.timestamp
self.card_inserted_flag = False
return card_id, timestamp
return None, None
def get_removed(self):
with self.lock:
if self.card_removed_flag:
card_id = self.current_card_id
timestamp = self.timestamp
self.card_removed_flag = False
return card_id, timestamp
return None, None
# Global card state
card_state = CardState()
class RFIDReader(rdm6300.BaseReader):
"""RFID reader that runs in background and sets flags for main thread"""
def __init__(self, device, hostname, device_ip):
super().__init__(device)
self.hostname = hostname
self.device_ip = device_ip
self.name = read_idmasa()
self.led = led # Use global LED
def card_inserted(self, card):
"""Detect card insertion - just set flag"""
card_id = card.value
# Special card: device config card (ignore)
if card_id == 12886709:
logging.info(f"🔴 CONFIG CARD {card_id} detected (ignored)")
print(f"🔴 CONFIG CARD {card_id} detected (ignored)")
return
# IMMEDIATE LED feedback (BEFORE flag, for instant response)
try:
self.led.on()
except:
pass
# Set flag for main thread to handle
logging.info(f"🔴 CARD INSERTED DETECTED - ID: {card_id}")
print(f"🔴 CARD INSERTED - ID: {card_id}")
card_state.set_inserted(card_id)
def card_removed(self, card):
"""Detect card removal - just set flag"""
card_id = card.value
# Special card: device config card (ignore)
if card_id == 12886709:
logging.info(f"⚪ CONFIG CARD {card_id} detected (ignored)")
print(f"⚪ CONFIG CARD {card_id} detected (ignored)")
return
# IMMEDIATE LED feedback (BEFORE flag, for instant response)
try:
self.led.off()
except:
pass
# Set flag for main thread to handle
logging.info(f"⚪ CARD REMOVED DETECTED - ID: {card_id}")
print(f"⚪ CARD REMOVED - ID: {card_id}")
card_state.set_removed(card_id)
def process_card_events(hostname, device_ip):
"""Main thread checks card flags and processes them"""
name = read_idmasa()
while True:
try:
# Check for inserted cards
card_id, timestamp = card_state.get_inserted()
if card_id is not None:
logging.info(f"[Main] Processing CARD INSERTED: {card_id}")
# Build API URL (1 = ON/inserted)
url = f"{HARTING_API_BASE}/{name}/{card_id}/1/{timestamp}"
# Try to post
if post_to_harting(url):
logging.info(f"✓ Card event posted to API: {card_id}")
else:
logging.warning(f"✗ Offline: Saving card {card_id} to backup")
try:
with open(TAG_FILE, "a") as f:
f.write(url + "\n")
except Exception as e:
logging.error(f"Failed to save backup: {e}")
# ALWAYS send log to monitoring server (regardless of API post result)
send_log_to_server(f"Card {card_id} inserted", hostname, device_ip, name)
# Check for removed cards
card_id, timestamp = card_state.get_removed()
if card_id is not None:
logging.info(f"[Main] Processing CARD REMOVED: {card_id}")
# Build API URL (0 = OFF/removed)
url = f"{HARTING_API_BASE}/{name}/{card_id}/0/{timestamp}"
# Try to post
if post_to_harting(url):
logging.info(f"✓ Card event posted to API: {card_id}")
else:
logging.warning(f"✗ Offline: Saving card {card_id} to backup")
try:
with open(TAG_FILE, "a") as f:
f.write(url + "\n")
except Exception as e:
logging.error(f"Failed to save backup: {e}")
# ALWAYS send log to monitoring server (regardless of API post result)
send_log_to_server(f"Card {card_id} removed", hostname, device_ip, name)
# Very small sleep for fast response (10ms = check 100x per second)
time.sleep(0.01)
except Exception as e:
logging.error(f"Error processing card events: {e}")
time.sleep(0.1)
def initialize_rfid_reader(hostname, device_ip):
"""Initialize RFID reader on available serial device with proper threading"""
serial_devices = ['/dev/ttyS0', '/dev/ttyAMA0', '/dev/ttyUSB0', '/dev/ttyACM0']
for device in serial_devices:
try:
print(f"Trying RFID on {device}...")
logging.info(f"Attempting to initialize RFID reader on {device}...")
# Create reader instance
reader = RFIDReader(device, hostname, device_ip)
# Start reader in a non-daemon thread so it keeps listening
reader_started = threading.Event()
reader_error = [None]
def start_reader_thread():
try:
logging.info(f"[Reader Thread] Starting RFID listener on {device}")
print(f"[Reader Thread] Starting RFID listener on {device}")
reader.start() # This blocks, listening for cards
reader_started.set()
except Exception as e:
reader_error[0] = e
logging.error(f"[Reader Thread] Error starting reader: {e}")
reader_started.set()
# Start reader in non-daemon thread (stays alive even if main thread exits)
reader_thread = threading.Thread(
target=start_reader_thread,
daemon=False, # IMPORTANT: non-daemon so it keeps running
name="RFIDReaderThread"
)
reader_thread.start()
logging.info(f"[Main] RFID reader thread started (daemon={reader_thread.daemon})")
# Wait up to 2 seconds for reader to start or error
if not reader_started.wait(timeout=2):
# Still waiting - reader is listening (normal behavior)
logging.info(f"✓ Reader listening on {device} (waiting for cards...)")
print(f"✓ Reader listening on {device}")
return reader
elif reader_error[0]:
# Error occurred during startup
logging.error(f"Reader startup error: {reader_error[0]}")
continue
else:
# Started successfully
logging.info(f"✓ RFID reader initialized on {device}")
print(f"✓ RFID reader initialized on {device}")
return reader
except FileNotFoundError as e:
logging.warning(f"✗ Device {device} not found: {e}")
print(f"✗ Device {device} not found")
continue
except PermissionError as e:
logging.warning(f"✗ Permission denied on {device}: {e}")
print(f"⚠ Permission denied on {device} - try: sudo usermod -a -G dialout $USER")
continue
except Exception as e:
logging.error(f"✗ Failed to initialize on {device}: {e}")
print(f"✗ Error on {device}: {e}")
continue
print("✗ Could not initialize RFID reader on any device")
logging.error("RFID reader initialization failed on all devices")
return None
# ============================================================================
# WiFi RECOVERY & CONNECTIVITY
# ============================================================================
def check_internet_connection(hostname, device_ip):
"""Monitor internet connection and recover WiFi if needed"""
name = read_idmasa()
logging.info("WiFi monitor started")
print("✓ WiFi monitor started")
while True:
try:
# Check connection to monitoring server
response = subprocess.run(
["ping", "-c", "1", WIFI_CHECK_HOST],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=5
)
if response.returncode == 0:
# Connection OK
logging.info("✓ Connection OK - checking for backed-up data")
print("✓ Connection OK")
# Try to send any backed-up data
post_backup_data()
# Wait 40 minutes before next check
time.sleep(WIFI_CHECK_INTERVAL)
else:
# Connection lost
logging.warning("✗ Connection lost - disabling WiFi for recovery")
send_log_to_server("WiFi connection lost - initiating recovery", hostname, device_ip, name)
print("✗ Connection lost - WiFi recovery")
# Disable WiFi
os.system("sudo rfkill block wifi")
time.sleep(5)
# Wait 20 minutes
logging.info(f"WiFi disabled, waiting {WIFI_RECOVERY_WAIT}s for recovery...")
time.sleep(WIFI_RECOVERY_WAIT)
# Re-enable WiFi
os.system("sudo rfkill unblock wifi")
logging.info("WiFi re-enabled")
send_log_to_server("WiFi re-enabled", hostname, device_ip, name)
print("✓ WiFi re-enabled")
time.sleep(5) # Wait for WiFi to reconnect
except Exception as e:
logging.error(f"WiFi monitor error: {e}")
time.sleep(60)
# ============================================================================
# LOG CLEANUP
# ============================================================================
def cleanup_old_logs(hostname, device_ip, name):
"""Delete log file if older than 15 days"""
try:
if os.path.exists(LOG_FILE):
file_mod_time = datetime.fromtimestamp(os.path.getmtime(LOG_FILE))
if datetime.now() - file_mod_time > timedelta(days=15):
os.remove(LOG_FILE)
logging.info("Old log file deleted (>15 days)")
send_log_to_server("Log file deleted (older than 15 days)", hostname, device_ip, name)
else:
logging.info("Log file is recent, keeping it")
except Exception as e:
logging.error(f"Error cleaning up logs: {e}")
# ============================================================================
# MAIN APPLICATION
# ============================================================================
def main():
"""Main application entry point"""
global led
print("\n" + "="*60)
print("RFID CARD READER - Simplified v3.0")
print("="*60 + "\n")
# Setup
setup_directories()
hostname, device_ip = get_device_info()
setup_logging(hostname, device_ip)
led = setup_led()
# Clean up old logs (older than 15 days)
cleanup_old_logs(hostname, device_ip, read_idmasa())
print(f"Device: {hostname} ({device_ip})")
print(f"Name ID: {read_idmasa()}")
print(f"Monitoring: {MONITORING_SERVER}")
print(f"API: {HARTING_API_BASE}")
print()
# Thread 1: Start RFID reader (background, listening for cards)
rfid_reader = initialize_rfid_reader(hostname, device_ip)
if not rfid_reader:
print("✗ RFID reader failed - application cannot continue")
logging.error("RFID reader initialization failed - exiting")
sys.exit(1)
print("✓ RFID reader started in background")
# Thread 2: Start card event processor (main thread checks flags and processes)
card_processor_thread = threading.Thread(
target=process_card_events,
args=(hostname, device_ip),
daemon=False,
name="CardProcessor"
)
card_processor_thread.start()
print("✓ Card event processor started")
# Thread 3: Start WiFi monitor (background health check + recovery)
wifi_thread = threading.Thread(
target=check_internet_connection,
args=(hostname, device_ip),
daemon=False,
name="WiFiMonitor"
)
wifi_thread.start()
print("✓ WiFi monitor started")
# Application ready
logging.info("RFID Client operational")
send_log_to_server("RFID Client operational", hostname, device_ip, read_idmasa())
print("✓ RFID Client operational - waiting for cards...")
print()
# Keep main thread alive
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n✓ Shutting down...")
logging.info("Application shutdown")
sys.exit(0)
if __name__ == "__main__":
main()