import mysql.connector from mysql.connector import Error from typing import List, Tuple, Optional import json import os import sys import logging from datetime import datetime, timedelta # When frozen by PyInstaller, __file__ points to a temp folder that is deleted on exit. # sys.executable points to the .exe location, which is persistent. if getattr(sys, 'frozen', False): _BASE_DIR = os.path.dirname(sys.executable) else: _BASE_DIR = os.path.dirname(os.path.abspath(__file__)) CONFIG_FILE = os.path.join(_BASE_DIR, 'config.json') LOG_FILE = os.path.join(_BASE_DIR, 'db_debug.log') ACTION_LOG_FILE = os.path.join(_BASE_DIR, 'app_actions.log') # ---- Action log helpers (module-level, no class dependency) ---- def _log_action(action: str, record_id: str = '', detail: str = '') -> None: """Append one structured line to app_actions.log.""" ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S') line = f"{ts} | {action:<18} | id={record_id:<20} | {detail}\n" try: with open(ACTION_LOG_FILE, 'a', encoding='utf-8') as f: f.write(line) except Exception as e: print(f"Action log write error: {e}") def purge_old_action_logs(days: int = 30) -> None: """Remove lines older than *days* from app_actions.log.""" if not os.path.exists(ACTION_LOG_FILE): return cutoff = datetime.now() - timedelta(days=days) kept = [] removed = 0 try: with open(ACTION_LOG_FILE, 'r', encoding='utf-8') as f: for line in f: # Every valid line starts with YYYY-MM-DD HH:MM:SS try: ts = datetime.strptime(line[:19], '%Y-%m-%d %H:%M:%S') if ts >= cutoff: kept.append(line) else: removed += 1 except ValueError: kept.append(line) # malformed line — keep it with open(ACTION_LOG_FILE, 'w', encoding='utf-8') as f: f.writelines(kept) if removed: _log_action('LOG_PURGE', '', f'removed {removed} entries older than {days} days') print(f"Action log purged: {removed} old entries removed") except Exception as e: print(f"Action log purge error: {e}") # File logger – appends on every run so history is preserved logging.basicConfig( filename=LOG_FILE, level=logging.DEBUG, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S', ) log = logging.getLogger('db_manager') log.info('=== DatabaseManager module loaded ===') class DatabaseManager: """ Database manager class for handling MariaDB operations. Connects to MariaDB server with table offsystemsCounting containing id (VARCHAR(20)) and mass (REAL). """ def __init__(self): self.host = self._load_host() self.database = "cantare_injectie" self.user = "omron" self.password = "Initial01!" self.connection = None # init_database() is called asynchronously from the UI layer to avoid blocking def _load_host(self) -> str: """Load the database host from the config file, falling back to localhost.""" try: if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, 'r') as f: data = json.load(f) return data.get('host', 'localhost') except Exception as e: print(f"Could not read config file: {e}") return 'localhost' def save_host(self, host: str): """Persist the database host to the config file.""" try: data = {} if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, 'r') as f: data = json.load(f) data['host'] = host with open(CONFIG_FILE, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Could not save config file: {e}") def test_connection(self, host: str) -> tuple: """Test connectivity to the database using the given host. Returns (success: bool, message: str).""" try: test_conn = mysql.connector.connect( host=host, database=self.database, user=self.user, password=self.password, connection_timeout=5, use_pure=True, autocommit=True ) if test_conn.is_connected(): test_conn.close() return True, f"Connected successfully to '{host}'" except Error as e: return False, str(e) return False, "Connection failed" def _new_conn(self): """Open and return a fresh connection. Caller must close it.""" return mysql.connector.connect( host=self.host, database=self.database, user=self.user, password=self.password, connection_timeout=5, use_pure=True, autocommit=True ) def get_connection(self): """Get a reusable connection (kept for test_connection compatibility).""" try: if self.connection is None or not self.connection.is_connected(): log.info(f'Opening persistent connection to {self.host}/{self.database}') self.connection = self._new_conn() log.info('Connection opened OK') return self.connection except Error as e: log.error(f'Connection error: {e}') print(f"Database connection error: {e}") return None def init_database(self): """Initialize the database connection and create the table if it doesn't exist.""" # Purge action log entries older than 30 days on every startup purge_old_action_logs(30) _log_action('APP_START', '', f'host={self.host}') try: conn = self._new_conn() cursor = conn.cursor(buffered=True) cursor.execute(''' CREATE TABLE IF NOT EXISTS offsystemsCounting ( id VARCHAR(20) PRIMARY KEY, mass REAL NOT NULL ) ''') # Add t_update column if it doesn't exist yet (MySQL-compatible check) cursor.execute(""" SELECT COUNT(*) FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = %s AND TABLE_NAME = 'offsystemsCounting' AND COLUMN_NAME = 't_update' """, (self.database,)) col_exists = cursor.fetchone()[0] cursor.close() if not col_exists: log.info("Adding t_update column to offsystemsCounting") c2 = conn.cursor() c2.execute("ALTER TABLE offsystemsCounting ADD COLUMN t_update DATETIME DEFAULT NULL") c2.close() log.info("t_update column added") conn.close() log.info("init_database complete") print(f"Connected to MariaDB database: {self.database}") print("Table 'offsystemsCounting' ready") except Error as e: log.error(f"Database initialization error: {e}") print(f"Database initialization error: {e}") def read_all_data(self) -> List[Tuple[str, float]]: """Read all data from the database.""" try: conn = self._new_conn() cursor = conn.cursor(buffered=True) cursor.execute("SELECT id, mass FROM offsystemsCounting ORDER BY id") rows = cursor.fetchall() cursor.close() conn.close() return rows except Error as e: log.error(f"read_all_data error: {e}") print(f"Error reading data: {e}") return [] def search_by_id(self, record_id: str) -> Optional[Tuple]: """Search for a record by ID. Returns (id, mass, t_update) or None.""" log.info(f'search_by_id: looking up id={record_id!r}') try: conn = self._new_conn() cursor = conn.cursor(buffered=True) cursor.execute("SELECT id, mass, t_update FROM offsystemsCounting WHERE id = %s", (record_id,)) row = cursor.fetchone() # buffered=True already fetched the full result; no extra drain needed cursor.close() conn.close() log.info(f'search_by_id: result={row}') if row: _log_action('SEARCH_FOUND', record_id, f'mass={row[1]}, t_update={row[2]}') else: _log_action('SEARCH_NOT_FOUND', record_id, '') return row except Error as e: log.error(f'search_by_id error: {e}') _log_action('SEARCH_ERROR', record_id, str(e)) print(f"Error searching data: {e}") return None def add_or_update_record(self, record_id: str, mass: float) -> bool: """Update mass/t_update for an existing record, or INSERT if it doesn't exist yet.""" log.info(f'add_or_update_record: id={record_id!r} mass={mass}') try: conn = self._new_conn() cursor = conn.cursor(buffered=True) # Try UPDATE first update_sql = ( "UPDATE offsystemsCounting " "SET mass = %s, t_update = NOW() " "WHERE id = %s" ) log.debug(f'Executing SQL: {update_sql} | params=({mass}, {record_id!r})') cursor.execute(update_sql, (mass, record_id)) affected = cursor.rowcount if affected == 0: # Record does not exist yet — INSERT it insert_sql = ( "INSERT INTO offsystemsCounting (id, mass, t_update) " "VALUES (%s, %s, NOW())" ) log.debug(f'Executing SQL: {insert_sql} | params=({record_id!r}, {mass})') cursor.execute(insert_sql, (record_id, mass)) affected = cursor.rowcount log.info(f'add_or_update_record: inserted new record, rowcount={affected}') _log_action('INSERT', record_id, f'mass={mass}') print(f"Inserted new record: {record_id} = {mass}") else: log.info(f'add_or_update_record: updated existing record, rowcount={affected}') _log_action('UPDATE', record_id, f'mass={mass}') print(f"Updated record: {record_id} = {mass} (rowcount={affected})") cursor.close() conn.close() return True except Error as e: log.error(f'add_or_update_record error: {e}') _log_action('UPDATE_ERROR', record_id, str(e)) print(f"Error adding/updating record: {e}") return False def delete_record(self, record_id: str) -> bool: """Delete a record by ID.""" log.info(f'delete_record: id={record_id!r}') try: conn = self._new_conn() cursor = conn.cursor(buffered=True) cursor.execute("DELETE FROM offsystemsCounting WHERE id = %s", (record_id,)) deleted = cursor.rowcount # DML produces no result set — do NOT fetchall() cursor.close() conn.close() if deleted > 0: log.info(f'delete_record: deleted {deleted} row(s)') _log_action('DELETE', record_id, f'rows_deleted={deleted}') print(f"Deleted record: {record_id}") return True else: log.info(f'delete_record: no row found for id={record_id!r}') _log_action('DELETE_NOT_FOUND', record_id, '') print(f"No record found with ID: {record_id}") return False except Error as e: log.error(f'delete_record error: {e}') _log_action('DELETE_ERROR', record_id, str(e)) print(f"Error deleting record: {e}") return False def get_record_count(self) -> int: """Get the total number of records in the database.""" try: conn = self._new_conn() cursor = conn.cursor(buffered=True) cursor.execute("SELECT COUNT(*) FROM offsystemsCounting") count = cursor.fetchone()[0] # fetchone() on a buffered cursor is fully consumed — no drain needed cursor.close() conn.close() return count except Error as e: print(f"Error getting record count: {e}") return 0 def close_connection(self): """Close the database connection.""" try: if self.connection and self.connection.is_connected(): self.connection.close() print("MariaDB connection closed") except Error as e: print(f"Error closing connection: {e}") def __del__(self): """Destructor to ensure connection is closed.""" self.close_connection()