Files
prezenta_work/app.py
2025-02-28 09:03:50 +02:00

155 lines
5.6 KiB
Python

import rdm6300
import os, time
import logging
from gpiozero import OutputDevice
from multiprocessing import Process
import requests
import subprocess
import urllib3
import threading
import urllib.parse
urllib3.disable_warnings()
# Configure logging
logging.basicConfig(filename='./data/log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def config():
import config
def post_backup_data():
logging.info("Reading backup data from tag.txt...")
try:
with open("./data/tag.txt", "r") as file:
lines = file.readlines()
remaining_lines = lines[:]
for line in lines:
line = line.strip()
if line:
logging.info(f"Posting backup data: {line}")
try:
response = requests.post(line, verify=False, timeout=3)
logging.info("Request sent, awaiting response...")
response.raise_for_status() # Raise an error for bad status codes
logging.info("Data posted successfully: %s", response.text)
remaining_lines.remove(line + "\n")
except requests.exceptions.Timeout:
logging.warning("Request timed out.")
break
except requests.exceptions.RequestException as e:
logging.error("An error occurred: %s", e)
break
with open("./data/tag.txt", "w") as file:
file.writelines(remaining_lines)
logging.info("Backup data updated.")
except FileNotFoundError:
logging.warning("No backup file found.")
except Exception as e:
logging.error("An error occurred while reading the backup file: %s", e)
def check_internet_connection():
hostname = "10.76.140.17" # "https://google.com"
cmd_block_wifi = 'sudo rfkill block wifi'
cmd_unblock_wifi = 'sudo rfkill unblock wifi'
logging.info('Internet connection check loaded')
def manage_wifi_connection():
if var1 == 0:
logging.info("Internet is up! Waiting 45 minutes.")
post_backup_data()
time.sleep(2700)
else:
os.system(cmd_block_wifi)
logging.info('System is rebooting WiFi, please wait until it finishes the job 20 minutes')
time.sleep(1200) # 20 minutes
os.system(cmd_unblock_wifi)
while True:
response = os.system("ping -c 1 " + hostname)
if response == 0:
var1 = 0
manage_wifi_connection()
else:
var1 = 1
manage_wifi_connection()
# Start the internet connection check in a separate process
internet_check_process = Process(target=check_internet_connection)
internet_check_process.start()
url = "10.76.140.17/iweb_v2/index.php/traceability/production" # pentru cazul in care raspberiul nu are sistem de prezenta
# Launch Chromium with the specified URLs
subprocess.Popen(["chromium", "--test-type", "--noerrors", "--kiosk", "--start-fullscreen", "--unsafely-treat-insecure-origin-as-secure=http://10.76.140.17", url], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, start_new_session=True)
info = "0"
def post_info(info):
logging.info("Starting to post data...")
info1 = info.strip() # Remove any leading/trailing whitespace, including newlines
try:
response = requests.post(info1, verify=False, timeout=3)
logging.info("Request sent, awaiting response...")
response.raise_for_status() # Raise an error for bad status codes
logging.info("Data posted successfully: %s", response.text)
except requests.exceptions.Timeout:
logging.warning("Request timed out. Saving data to backup file.")
with open("./data/tag.txt", "a") as file: # Open in append mode
file.write(info)
logging.info("Value %s was saved to tag.txt", info)
except requests.exceptions.RequestException as e:
logging.error("An error occurred: %s", e)
with open("./data/tag.txt", "a") as file: # Open in append mode
file.write(info)
logging.info("Value %s was saved to tag.txt", info)
def post_info_thread(info):
thread = threading.Thread(target=post_info, args=(info,))
thread.start()
led1 = OutputDevice(23)
led2 = OutputDevice(24)
name = "idmasa"
logging.info(f"Variabila Id Masa A fost initializata", name)
f = open("./data/idmasa.txt", "r") # deschid fisierul
name = f.readline().strip() # citesc toate liniile
f.close() # inchid fisierul
logging.info(name)
class Reader(rdm6300.BaseReader):
global info
def card_inserted(self, card):
if card.value == 12886709:
logging.info("Inserting Config Card")
return
# config()
afisare = time.strftime("%Y-%m-%d&%H:%M:%S")
date = 'https://dataswsibiusb01.sibiusb.harting.intra/RO_Quality_PRD/api/record/' + name + '/' + str(card.value) + "/1/" + afisare + "\n"
info = date
logging.info(info)
post_info_thread(info)
led1.on()
logging.info(f"card inserted {card}")
def card_removed(self, card):
if card.value == 12886709:
logging.info("Removing Config card")
return
# config()
afisare=time.strftime("%Y-%m-%d&%H:%M:%S")
date='https://dataswsibiusb01.sibiusb.harting.intra/RO_Quality_PRD/api/record/'+name+'/'+str(card.value)+"/0/"+afisare+"\n"
info = date
logging.info(info)
post_info_thread(info)
led1.off()
logging.info("Card removed")
try:
r = Reader('/dev/ttyS0')
except:
r = Reader('/dev/ttyAMA0')
r.start()