Initial commit: Kivy database interface application with search, add/update, delete functionality and Windows build support
This commit is contained in:
149
database_manager.py
Normal file
149
database_manager.py
Normal file
@@ -0,0 +1,149 @@
|
||||
import mysql.connector
|
||||
from mysql.connector import Error
|
||||
from typing import List, Tuple, Optional
|
||||
|
||||
class DatabaseManager:
|
||||
"""
|
||||
Database manager class for handling MariaDB operations.
|
||||
Connects to MariaDB server with table offsystemsCounting containing id (VARCHAR(20)) and mass (REAL).
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.host = "localhost"
|
||||
self.database = "cantare_injectie"
|
||||
self.user = "omron"
|
||||
self.password = "Initial01!"
|
||||
self.connection = None
|
||||
self.init_database()
|
||||
|
||||
def get_connection(self):
|
||||
"""Get a database connection."""
|
||||
try:
|
||||
if self.connection is None or not self.connection.is_connected():
|
||||
self.connection = mysql.connector.connect(
|
||||
host=self.host,
|
||||
database=self.database,
|
||||
user=self.user,
|
||||
password=self.password
|
||||
)
|
||||
return self.connection
|
||||
except Error as e:
|
||||
print(f"Database connection error: {e}")
|
||||
return None
|
||||
|
||||
def init_database(self):
|
||||
"""Initialize the database connection and create the table if it doesn't exist."""
|
||||
try:
|
||||
conn = self.get_connection()
|
||||
if conn and conn.is_connected():
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS offsystemsCounting (
|
||||
id VARCHAR(20) PRIMARY KEY,
|
||||
mass REAL NOT NULL
|
||||
)
|
||||
''')
|
||||
conn.commit()
|
||||
print(f"Connected to MariaDB database: {self.database}")
|
||||
print("Table 'offsystemsCounting' ready")
|
||||
except Error as e:
|
||||
print(f"Database initialization error: {e}")
|
||||
|
||||
def read_all_data(self) -> List[Tuple[str, float]]:
|
||||
"""Read all data from the database."""
|
||||
try:
|
||||
conn = self.get_connection()
|
||||
if conn and conn.is_connected():
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT id, mass FROM offsystemsCounting ORDER BY id")
|
||||
return cursor.fetchall()
|
||||
except Error as e:
|
||||
print(f"Error reading data: {e}")
|
||||
return []
|
||||
|
||||
def search_by_id(self, record_id: str) -> Optional[Tuple[str, float]]:
|
||||
"""Search for a record by ID."""
|
||||
try:
|
||||
conn = self.get_connection()
|
||||
if conn and conn.is_connected():
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT id, mass FROM offsystemsCounting WHERE id = %s", (record_id,))
|
||||
return cursor.fetchone()
|
||||
except Error as e:
|
||||
print(f"Error searching data: {e}")
|
||||
return None
|
||||
|
||||
def add_or_update_record(self, record_id: str, mass: float) -> bool:
|
||||
"""Add a new record or update existing one if ID already exists."""
|
||||
try:
|
||||
conn = self.get_connection()
|
||||
if conn and conn.is_connected():
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Check if record exists
|
||||
existing = self.search_by_id(record_id)
|
||||
|
||||
if existing:
|
||||
# Update existing record
|
||||
cursor.execute(
|
||||
"UPDATE offsystemsCounting SET mass = %s WHERE id = %s",
|
||||
(mass, record_id)
|
||||
)
|
||||
print(f"Updated record: {record_id} = {mass}")
|
||||
else:
|
||||
# Insert new record
|
||||
cursor.execute(
|
||||
"INSERT INTO offsystemsCounting (id, mass) VALUES (%s, %s)",
|
||||
(record_id, mass)
|
||||
)
|
||||
print(f"Added new record: {record_id} = {mass}")
|
||||
|
||||
conn.commit()
|
||||
return True
|
||||
except Error as e:
|
||||
print(f"Error adding/updating record: {e}")
|
||||
return False
|
||||
|
||||
def delete_record(self, record_id: str) -> bool:
|
||||
"""Delete a record by ID."""
|
||||
try:
|
||||
conn = self.get_connection()
|
||||
if conn and conn.is_connected():
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("DELETE FROM offsystemsCounting WHERE id = %s", (record_id,))
|
||||
|
||||
if cursor.rowcount > 0:
|
||||
conn.commit()
|
||||
print(f"Deleted record: {record_id}")
|
||||
return True
|
||||
else:
|
||||
print(f"No record found with ID: {record_id}")
|
||||
return False
|
||||
except Error as e:
|
||||
print(f"Error deleting record: {e}")
|
||||
return False
|
||||
|
||||
def get_record_count(self) -> int:
|
||||
"""Get the total number of records in the database."""
|
||||
try:
|
||||
conn = self.get_connection()
|
||||
if conn and conn.is_connected():
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT COUNT(*) FROM offsystemsCounting")
|
||||
return cursor.fetchone()[0]
|
||||
except Error as e:
|
||||
print(f"Error getting record count: {e}")
|
||||
return 0
|
||||
|
||||
def close_connection(self):
|
||||
"""Close the database connection."""
|
||||
try:
|
||||
if self.connection and self.connection.is_connected():
|
||||
self.connection.close()
|
||||
print("MariaDB connection closed")
|
||||
except Error as e:
|
||||
print(f"Error closing connection: {e}")
|
||||
|
||||
def __del__(self):
|
||||
"""Destructor to ensure connection is closed."""
|
||||
self.close_connection()
|
||||
Reference in New Issue
Block a user