Files
prezenta_work/app.py
2025-05-29 15:52:28 +03:00

270 lines
10 KiB
Python

#App version 2.1
import rdm6300
import os, time
import logging
from gpiozero import OutputDevice
from multiprocessing import Process
import requests
import subprocess
import threading
import urllib.parse
from datetime import datetime, timedelta # Import datetime and timedelta
import socket
import signal
import aiohttp
import asyncio
#configurare variabile
hostname = socket.gethostname()
device_ip = socket.gethostbyname(hostname)
print(hostname, device_ip)
# Configure logging
logging.basicConfig(filename='./data/log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
#logging.basicConfig(
# level=logging.DEBUG, # Set the logging level to DEBUG to capture all logs
# format='%(asctime)s [%(threadName)s] %(levelname)s: %(message)s', # Include thread name in logs
# handlers=[
# logging.StreamHandler() # Print logs to the terminal
# ]
#)
# function to delete old logs
def delete_old_logs():
log_dir = './data/'
log_file = 'log.txt'
log_path = os.path.join(log_dir, log_file)
if os.path.exists(log_path):
file_mod_time = datetime.fromtimestamp(os.path.getmtime(log_path))
if datetime.now() - file_mod_time > timedelta(days=10):
os.remove(log_path)
log_info_with_server(f"Deleted old log file: {log_file}")
else:
log_info_with_server(f"Log file is not older than 10 days: {log_file}")
else:
log_info_with_server(f"Log file does not exist: {log_file}")
# Function to read the name (idmasa) from the file
def read_name_from_file():
try:
with open("./data/idmasa.txt", "r") as file:
n_masa = file.readline().strip()
return n_masa
except FileNotFoundError:
logging.error("File ./data/idmasa.txt not found.")
return "unknown"
# Function to send logs to a remote server for the Server_monitorizare APP
def send_log_to_server(log_message, n_masa, hostname, device_ip):
host = hostname
device = device_ip
try:
log_data = {
"hostname": str(host),
"device_ip": str(device),
"nume_masa": str(n_masa),
"log_message": str(log_message)
}
server_url = "http://rpi-ansible:80/logs" # Replace with your server's URL
print(log_data) # Debugging: Print log_data to verify its contents
response = requests.post(server_url, json=log_data, timeout=5)
response.raise_for_status()
logging.info("Log successfully sent to server: %s", log_message)
except requests.exceptions.RequestException as e:
logging.error("Failed to send log to server: %s", e)
# Wrapper for logging.info to also send logs to the server Monitorizare APP
def log_info_with_server(message):
n_masa = read_name_from_file() # Read name (idmasa) from the file
formatted_message = f"{message} (n_masa: {n_masa})" # Format the message
logging.info(formatted_message) # Log the formatted message
send_log_to_server(message, n_masa, hostname, device_ip) # Send the original message to the server
# Call the function to delete old logs
delete_old_logs()
def config():
import config
# function for posting data to the harting server
def post_backup_data():
try:
with open("./data/tag.txt", "r") as file:
lines = file.readlines()
remaining_lines = lines[:]
for line in lines:
line = line.strip()
if line:
try:
response = requests.post(line, verify=False, timeout=3)
#
response.raise_for_status() # Raise an error for bad status codes
log_info_with_server(f"Data posted successfully:")
remaining_lines.remove(line + "\n")
except requests.exceptions.Timeout:
log_info_with_server("Request timed out.")
break
except requests.exceptions.RequestException as e:
log_info_with_server(f"An error occurred: {e}")
break
with open("./data/tag.txt", "w") as file:
file.writelines(remaining_lines)
#log_info_with_server("Backup data updated.")
except FileNotFoundError:
log_info_with_server("No backup file found.")
# Function to check internet connection
def check_internet_connection():
hostname = "10.76.140.17"
cmd_block_wifi = 'sudo rfkill block wifi'
cmd_unblock_wifi = 'sudo rfkill unblock wifi'
log_info_with_server('Internet connection check loaded')
delete_old_logs()
chromium_process_name = "chromium"
while True:
try:
# Use subprocess to execute the ping command
response = subprocess.run(
["ping", "-c", "1", hostname],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
if response.returncode == 0:
log_info_with_server("Internet is up! Waiting 45 minutes.")
post_backup_data()
time.sleep(2700) # 45 minutes
else:
log_info_with_server("Internet is down. Rebooting WiFi.")
os.system(cmd_block_wifi)
time.sleep(1200) # 20 minutes
os.system(cmd_unblock_wifi)
# Refresh Chromium process
log_info_with_server("Refreshing Chromium process.")
try:
# Find and terminate Chromium processes
subprocess.run(["pkill", "-f", chromium_process_name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(5) # Wait for processes to terminate
# Relaunch Chromium
url = "10.76.140.17/iweb_v2/index.php/traceability/production"
subprocess.Popen(
["chromium", "--test-type", "--noerrors", "--kiosk", "--start-fullscreen",
"--unsafely-treat-insecure-origin-as-secure=http://10.76.140.17", url],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, start_new_session=True
)
log_info_with_server("Chromium process restarted successfully.")
except Exception as e:
log_info_with_server(f"Failed to refresh Chromium process: {e}")
except Exception as e:
log_info_with_server(f"An error occurred during internet check: {e}")
time.sleep(60) # Retry after 1 minute in case of an error
# Start the internet connection check in a separate process
internet_check_process = Process(target=check_internet_connection)
internet_check_process.start()
url = "10.76.140.17/iweb_v2/index.php/traceability/production" # pentru cazul in care raspberiul nu are sistem de prezenta
# Launch Chromium with the specified URLs
subprocess.Popen(["chromium", "--test-type", "--noerrors", "--kiosk", "--start-fullscreen", "--unsafely-treat-insecure-origin-as-secure=http://10.76.140.17", url], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, start_new_session=True)
info = "0"
#function to post info
def post_info(info):
#log_info_with_server("Starting to post data...")
info1 = info.strip() # Remove any leading/trailing whitespace, including newlines
try:
response = requests.post(info1, verify=False, timeout=3)
response.raise_for_status() # Raise an error for bad status codes
log_info_with_server("Data posted successfully")
except requests.exceptions.Timeout:
with open("./data/tag.txt", "a") as file: # Open in append mode
file.write(info)
log_info_with_server(f"Value {info} was saved to tag.txt")
except requests.exceptions.RequestException as e:
with open("./data/tag.txt", "a") as file: # Open in append mode
file.write(info)
log_info_with_server("Value was saved to tag.txt")
async def post_info_async(info):
async with aiohttp.ClientSession() as session:
try:
async with session.post(info, ssl=False, timeout=3) as response:
response_text = await response.text()
log_info_with_server(f"Data posted successfully")
except asyncio.TimeoutError:
log_info_with_server("Request timed out. Saving data to backup file.")
with open("./data/tag.txt", "a") as file:
file.write(info)
log_info_with_server(f"Value was saved to tag.txt due to timeout error ")
except Exception as e:
with open("./data/tag.txt", "a") as file:
file.write(info)
log_info_with_server(f"Value was saved to tag.txt due to an error {e}")
def post_info_thread(info):
thread = threading.Thread(target=asyncio.run, args=(post_info_async(info),), daemon=True)
thread.start()
led1 = OutputDevice(23)
led2 = OutputDevice(24)
name = "idmasa"
logging.info("Variabila Id Masa A fost initializata ")
f = open("./data/idmasa.txt", "r") # deschid fisierul
name = f.readline().strip() # citesc toate liniile
f.close() # inchid fisierul
logging.info(name)
#clasa reader
class Reader(rdm6300.BaseReader):
global info
def card_inserted(self, card):
if card.value == 12886709:
config()
return
afisare = time.strftime("%Y-%m-%d&%H:%M:%S")
date = f'https://dataswsibiusb01.sibiusb.harting.intra/RO_Quality_PRD/api/record/{name}/{card.value}/1/{afisare}\n'
info = date
if name == "noconfig":
led1.on()
time.sleep(5)
led1.off()
log_info_with_server(f"card inserted {card} but no")
else:
post_info_thread(info)
led1.on()
#log_info_with_server(f"card inserted {card}")
def card_removed(self, card):
if card.value == 12886709:
log_info_with_server("Removing Config card")
return
afisare = time.strftime("%Y-%m-%d&%H:%M:%S")
date = f'https://dataswsibiusb01.sibiusb.harting.intra/RO_Quality_PRD/api/record/{name}/{card.value}/0/{afisare}\n'
info = date
if name == "noconfig":
led1.off()
log_info_with_server(f"card removed {card}")
else:
post_info_thread(info)
led1.off()
log_info_with_server(f"card removed {card}")
try:
r = Reader('/dev/ttyS0')
except:
r = Reader('/dev/ttyAMA0')
r.start()