import rdm6300 import os, time import logging from gpiozero import OutputDevice from multiprocessing import Process import requests import subprocess import urllib3 import threading import urllib.parse from datetime import datetime, timedelta # Import datetime and timedelta urllib3.disable_warnings() # Configure logging logging.basicConfig(filename='/home/pi/Desktop/Prezenta/data/log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def delete_old_logs(): log_dir = '/home/pi/Desktop/Prezenta/data/' log_file = 'log.txt' log_path = os.path.join(log_dir, log_file) if os.path.exists(log_path): file_mod_time = datetime.fromtimestamp(os.path.getmtime(log_path)) if datetime.now() - file_mod_time > timedelta(days=10): os.remove(log_path) logging.info("Deleted old log file: %s", log_file) else: logging.info("Log file is not older than 10 days: %s", log_file) else: logging.info("Log file does not exist: %s", log_file) # Call the function to delete old logs delete_old_logs() def config(): import config def post_backup_data(): logging.info("Reading backup data from tag.txt...") try: with open("./data/tag.txt", "r") as file: lines = file.readlines() remaining_lines = lines[:] for line in lines: line = line.strip() if line: logging.info(f"Posting backup data: {line}") try: response = requests.post(line, verify=False, timeout=3) logging.info("Request sent, awaiting response...") response.raise_for_status() # Raise an error for bad status codes logging.info("Data posted successfully: %s", response.text) remaining_lines.remove(line + "\n") except requests.exceptions.Timeout: logging.warning("Request timed out.") break except requests.exceptions.RequestException as e: logging.error("An error occurred: %s", e) break with open("./data/tag.txt", "w") as file: file.writelines(remaining_lines) logging.info("Backup data updated.") except FileNotFoundError: logging.warning("No backup file found.") except Exception as e: logging.error("An error occurred while reading the backup file: %s", e) def check_internet_connection(): hostname = "10.76.140.17" # "https://google.com" cmd_block_wifi = 'sudo rfkill block wifi' cmd_unblock_wifi = 'sudo rfkill unblock wifi' logging.info('Internet connection check loaded') delete_old_logs() def manage_wifi_connection(): if var1 == 0: logging.info("Internet is up! Waiting 45 minutes.") post_backup_data() time.sleep(2700) else: os.system(cmd_block_wifi) logging.info('System is rebooting WiFi, please wait until it finishes the job 20 minutes') time.sleep(1200) # 20 minutes os.system(cmd_unblock_wifi) while True: response = os.system("ping -c 1 " + hostname) if response == 0: var1 = 0 manage_wifi_connection() else: var1 = 1 manage_wifi_connection() # Start the internet connection check in a separate process internet_check_process = Process(target=check_internet_connection) internet_check_process.start() url = "10.76.140.17/iweb_v2/index.php/traceability/production" # pentru cazul in care raspberiul nu are sistem de prezenta # Launch Chromium with the specified URLs subprocess.Popen(["chromium", "--test-type", "--noerrors", "--kiosk", "--start-fullscreen", "--unsafely-treat-insecure-origin-as-secure=http://10.76.140.17", url], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, start_new_session=True) info = "0" def post_info(info): logging.info("Starting to post data...") info1 = info.strip() # Remove any leading/trailing whitespace, including newlines try: response = requests.post(info1, verify=False, timeout=3) logging.info("Request sent, awaiting response...") response.raise_for_status() # Raise an error for bad status codes logging.info("Data posted successfully: %s", response.text) except requests.exceptions.Timeout: logging.warning("Request timed out. Saving data to backup file.") with open("./data/tag.txt", "a") as file: # Open in append mode file.write(info) logging.info("Value %s was saved to tag.txt", info) except requests.exceptions.RequestException as e: logging.error("An error occurred: %s", e) with open("./data/tag.txt", "a") as file: # Open in append mode file.write(info) logging.info("Value %s was saved to tag.txt", info) def post_info_thread(info): thread = threading.Thread(target=post_info, args=(info,)) thread.start() led1 = OutputDevice(23) led2 = OutputDevice(24) name = "idmasa" logging.info("Variabila Id Masa A fost initializata ") f = open("./data/idmasa.txt", "r") # deschid fisierul name = f.readline().strip() # citesc toate liniile f.close() # inchid fisierul logging.info(name) class Reader(rdm6300.BaseReader): global info def card_inserted(self, card): if card.value == 12886709: logging.info("Inserting Config Card") config() return afisare = time.strftime("%Y-%m-%d&%H:%M:%S") date = 'https://dataswsibiusb01.sibiusb.harting.intra/RO_Quality_PRD/api/record/' + name + '/' + str(card.value) + "/1/" + afisare + "\n" info = date if name == "noconfig": led1.on() logging.info(info) logging.info(f"card inserted {card}") else: logging.info(info) post_info_thread(info) led1.on() logging.info(f"card inserted {card}") logging.info(f"card removed {card}") def card_removed(self, card): if card.value == 12886709: logging.info("Removing Config card") return # config() afisare=time.strftime("%Y-%m-%d&%H:%M:%S") date='https://dataswsibiusb01.sibiusb.harting.intra/RO_Quality_PRD/api/record/'+name+'/'+str(card.value)+"/0/"+afisare+"\n" info = date if name == "noconfig": led1.of() logging.info(info) logging.info(f"card card removed {card}") else: logging.info(info) post_info_thread(info) led1.off() logging.info(f"card removed {card}") logging.info(f"card removed {card}") try: r = Reader('/dev/ttyS0') except: r = Reader('/dev/ttyAMA0') r.start()