Files
db_interface/database_manager.py
ske087 704e01669f fix: db update bug, add action log with 30-day purge, rebuild exe
- main.py: _pending_record_id locks resolved DB key at Add/Update time;
  show original barcode in update frame; auto-focus mass field on open;
  clear all fields and return focus to ID input after confirm/reset
- database_manager.py: buffered=True cursors on all SELECTs; no
  fetchall() after DML; replace ON DUPLICATE KEY UPDATE VALUES() with
  explicit UPDATE then INSERT fallback; add app_actions.log with
  structured per-action entries; purge_old_action_logs(30) on startup
- dist/DatabaseApp.exe: rebuilt single-file Windows binary (30.9 MB)
- remove unused files: README, WINDOWS_README, run_app.sh,
  setup_database.sh, setup_user.sql, test_database.py, sept.csv"
2026-04-09 11:00:37 +03:00

324 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import mysql.connector
from mysql.connector import Error
from typing import List, Tuple, Optional
import json
import os
import sys
import logging
from datetime import datetime, timedelta
# When frozen by PyInstaller, __file__ points to a temp folder that is deleted on exit.
# sys.executable points to the .exe location, which is persistent.
if getattr(sys, 'frozen', False):
_BASE_DIR = os.path.dirname(sys.executable)
else:
_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG_FILE = os.path.join(_BASE_DIR, 'config.json')
LOG_FILE = os.path.join(_BASE_DIR, 'db_debug.log')
ACTION_LOG_FILE = os.path.join(_BASE_DIR, 'app_actions.log')
# ---- Action log helpers (module-level, no class dependency) ----
def _log_action(action: str, record_id: str = '', detail: str = '') -> None:
"""Append one structured line to app_actions.log."""
ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
line = f"{ts} | {action:<18} | id={record_id:<20} | {detail}\n"
try:
with open(ACTION_LOG_FILE, 'a', encoding='utf-8') as f:
f.write(line)
except Exception as e:
print(f"Action log write error: {e}")
def purge_old_action_logs(days: int = 30) -> None:
"""Remove lines older than *days* from app_actions.log."""
if not os.path.exists(ACTION_LOG_FILE):
return
cutoff = datetime.now() - timedelta(days=days)
kept = []
removed = 0
try:
with open(ACTION_LOG_FILE, 'r', encoding='utf-8') as f:
for line in f:
# Every valid line starts with YYYY-MM-DD HH:MM:SS
try:
ts = datetime.strptime(line[:19], '%Y-%m-%d %H:%M:%S')
if ts >= cutoff:
kept.append(line)
else:
removed += 1
except ValueError:
kept.append(line) # malformed line — keep it
with open(ACTION_LOG_FILE, 'w', encoding='utf-8') as f:
f.writelines(kept)
if removed:
_log_action('LOG_PURGE', '', f'removed {removed} entries older than {days} days')
print(f"Action log purged: {removed} old entries removed")
except Exception as e:
print(f"Action log purge error: {e}")
# File logger appends on every run so history is preserved
logging.basicConfig(
filename=LOG_FILE,
level=logging.DEBUG,
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
)
log = logging.getLogger('db_manager')
log.info('=== DatabaseManager module loaded ===')
class DatabaseManager:
"""
Database manager class for handling MariaDB operations.
Connects to MariaDB server with table offsystemsCounting containing id (VARCHAR(20)) and mass (REAL).
"""
def __init__(self):
self.host = self._load_host()
self.database = "cantare_injectie"
self.user = "omron"
self.password = "Initial01!"
self.connection = None
# init_database() is called asynchronously from the UI layer to avoid blocking
def _load_host(self) -> str:
"""Load the database host from the config file, falling back to localhost."""
try:
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r') as f:
data = json.load(f)
return data.get('host', 'localhost')
except Exception as e:
print(f"Could not read config file: {e}")
return 'localhost'
def save_host(self, host: str):
"""Persist the database host to the config file."""
try:
data = {}
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r') as f:
data = json.load(f)
data['host'] = host
with open(CONFIG_FILE, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Could not save config file: {e}")
def test_connection(self, host: str) -> tuple:
"""Test connectivity to the database using the given host. Returns (success: bool, message: str)."""
try:
test_conn = mysql.connector.connect(
host=host,
database=self.database,
user=self.user,
password=self.password,
connection_timeout=5,
use_pure=True,
autocommit=True
)
if test_conn.is_connected():
test_conn.close()
return True, f"Connected successfully to '{host}'"
except Error as e:
return False, str(e)
return False, "Connection failed"
def _new_conn(self):
"""Open and return a fresh connection. Caller must close it."""
return mysql.connector.connect(
host=self.host,
database=self.database,
user=self.user,
password=self.password,
connection_timeout=5,
use_pure=True,
autocommit=True
)
def get_connection(self):
"""Get a reusable connection (kept for test_connection compatibility)."""
try:
if self.connection is None or not self.connection.is_connected():
log.info(f'Opening persistent connection to {self.host}/{self.database}')
self.connection = self._new_conn()
log.info('Connection opened OK')
return self.connection
except Error as e:
log.error(f'Connection error: {e}')
print(f"Database connection error: {e}")
return None
def init_database(self):
"""Initialize the database connection and create the table if it doesn't exist."""
# Purge action log entries older than 30 days on every startup
purge_old_action_logs(30)
_log_action('APP_START', '', f'host={self.host}')
try:
conn = self._new_conn()
cursor = conn.cursor(buffered=True)
cursor.execute('''
CREATE TABLE IF NOT EXISTS offsystemsCounting (
id VARCHAR(20) PRIMARY KEY,
mass REAL NOT NULL
)
''')
# Add t_update column if it doesn't exist yet (MySQL-compatible check)
cursor.execute("""
SELECT COUNT(*) FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = %s
AND TABLE_NAME = 'offsystemsCounting'
AND COLUMN_NAME = 't_update'
""", (self.database,))
col_exists = cursor.fetchone()[0]
cursor.close()
if not col_exists:
log.info("Adding t_update column to offsystemsCounting")
c2 = conn.cursor()
c2.execute("ALTER TABLE offsystemsCounting ADD COLUMN t_update DATETIME DEFAULT NULL")
c2.close()
log.info("t_update column added")
conn.close()
log.info("init_database complete")
print(f"Connected to MariaDB database: {self.database}")
print("Table 'offsystemsCounting' ready")
except Error as e:
log.error(f"Database initialization error: {e}")
print(f"Database initialization error: {e}")
def read_all_data(self) -> List[Tuple[str, float]]:
"""Read all data from the database."""
try:
conn = self._new_conn()
cursor = conn.cursor(buffered=True)
cursor.execute("SELECT id, mass FROM offsystemsCounting ORDER BY id")
rows = cursor.fetchall()
cursor.close()
conn.close()
return rows
except Error as e:
log.error(f"read_all_data error: {e}")
print(f"Error reading data: {e}")
return []
def search_by_id(self, record_id: str) -> Optional[Tuple]:
"""Search for a record by ID. Returns (id, mass, t_update) or None."""
log.info(f'search_by_id: looking up id={record_id!r}')
try:
conn = self._new_conn()
cursor = conn.cursor(buffered=True)
cursor.execute("SELECT id, mass, t_update FROM offsystemsCounting WHERE id = %s", (record_id,))
row = cursor.fetchone()
# buffered=True already fetched the full result; no extra drain needed
cursor.close()
conn.close()
log.info(f'search_by_id: result={row}')
if row:
_log_action('SEARCH_FOUND', record_id, f'mass={row[1]}, t_update={row[2]}')
else:
_log_action('SEARCH_NOT_FOUND', record_id, '')
return row
except Error as e:
log.error(f'search_by_id error: {e}')
_log_action('SEARCH_ERROR', record_id, str(e))
print(f"Error searching data: {e}")
return None
def add_or_update_record(self, record_id: str, mass: float) -> bool:
"""Update mass/t_update for an existing record, or INSERT if it doesn't exist yet."""
log.info(f'add_or_update_record: id={record_id!r} mass={mass}')
try:
conn = self._new_conn()
cursor = conn.cursor(buffered=True)
# Try UPDATE first
update_sql = (
"UPDATE offsystemsCounting "
"SET mass = %s, t_update = NOW() "
"WHERE id = %s"
)
log.debug(f'Executing SQL: {update_sql} | params=({mass}, {record_id!r})')
cursor.execute(update_sql, (mass, record_id))
affected = cursor.rowcount
if affected == 0:
# Record does not exist yet — INSERT it
insert_sql = (
"INSERT INTO offsystemsCounting (id, mass, t_update) "
"VALUES (%s, %s, NOW())"
)
log.debug(f'Executing SQL: {insert_sql} | params=({record_id!r}, {mass})')
cursor.execute(insert_sql, (record_id, mass))
affected = cursor.rowcount
log.info(f'add_or_update_record: inserted new record, rowcount={affected}')
_log_action('INSERT', record_id, f'mass={mass}')
print(f"Inserted new record: {record_id} = {mass}")
else:
log.info(f'add_or_update_record: updated existing record, rowcount={affected}')
_log_action('UPDATE', record_id, f'mass={mass}')
print(f"Updated record: {record_id} = {mass} (rowcount={affected})")
cursor.close()
conn.close()
return True
except Error as e:
log.error(f'add_or_update_record error: {e}')
_log_action('UPDATE_ERROR', record_id, str(e))
print(f"Error adding/updating record: {e}")
return False
def delete_record(self, record_id: str) -> bool:
"""Delete a record by ID."""
log.info(f'delete_record: id={record_id!r}')
try:
conn = self._new_conn()
cursor = conn.cursor(buffered=True)
cursor.execute("DELETE FROM offsystemsCounting WHERE id = %s", (record_id,))
deleted = cursor.rowcount
# DML produces no result set — do NOT fetchall()
cursor.close()
conn.close()
if deleted > 0:
log.info(f'delete_record: deleted {deleted} row(s)')
_log_action('DELETE', record_id, f'rows_deleted={deleted}')
print(f"Deleted record: {record_id}")
return True
else:
log.info(f'delete_record: no row found for id={record_id!r}')
_log_action('DELETE_NOT_FOUND', record_id, '')
print(f"No record found with ID: {record_id}")
return False
except Error as e:
log.error(f'delete_record error: {e}')
_log_action('DELETE_ERROR', record_id, str(e))
print(f"Error deleting record: {e}")
return False
def get_record_count(self) -> int:
"""Get the total number of records in the database."""
try:
conn = self._new_conn()
cursor = conn.cursor(buffered=True)
cursor.execute("SELECT COUNT(*) FROM offsystemsCounting")
count = cursor.fetchone()[0]
# fetchone() on a buffered cursor is fully consumed — no drain needed
cursor.close()
conn.close()
return count
except Error as e:
print(f"Error getting record count: {e}")
return 0
def close_connection(self):
"""Close the database connection."""
try:
if self.connection and self.connection.is_connected():
self.connection.close()
print("MariaDB connection closed")
except Error as e:
print(f"Error closing connection: {e}")
def __del__(self):
"""Destructor to ensure connection is closed."""
self.close_connection()