Files
quality_app-v2/app/database.py
2026-01-25 22:25:18 +02:00

142 lines
3.6 KiB
Python

"""
Database connection management and initialization
Uses connection pooling to manage database connections efficiently
"""
import pymysql
import logging
from dbutils.pooled_db import PooledDB
from flask import g
logger = logging.getLogger(__name__)
# Global database pool
db_pool = None
def init_db(app):
"""Initialize database connection pool"""
global db_pool
try:
db_pool = PooledDB(
creator=pymysql,
maxconnections=app.config.get('DB_POOL_SIZE', 10),
mincached=2,
maxcached=5,
maxshared=3,
blocking=True,
maxusage=None,
setsession=[],
ping=1,
# PyMySQL connection parameters
user=app.config['DB_USER'],
password=app.config['DB_PASSWORD'],
host=app.config['DB_HOST'],
port=app.config['DB_PORT'],
database=app.config['DB_NAME'],
)
logger.info(f"Database pool initialized: {app.config['DB_HOST']}:{app.config['DB_PORT']}/{app.config['DB_NAME']}")
except Exception as e:
logger.error(f"Failed to initialize database pool: {e}")
raise
def get_db():
"""Get database connection from pool"""
if db_pool is None:
raise RuntimeError("Database pool not initialized")
if 'db' not in g:
try:
g.db = db_pool.connection()
logger.debug("Database connection obtained from pool")
except Exception as e:
logger.error(f"Failed to get database connection: {e}")
raise
return g.db
def close_db(e=None):
"""Close database connection"""
db = g.pop('db', None)
if db is not None:
try:
db.close()
logger.debug("Database connection closed")
except Exception as e:
logger.error(f"Error closing database connection: {e}")
def execute_query(query, params=None, fetch_one=False, fetch_all=True):
"""
Execute a database query
Args:
query: SQL query string
params: Query parameters (tuple or list)
fetch_one: Fetch only one row
fetch_all: Fetch all rows (if fetch_one is False)
Returns:
Query result or None
"""
try:
db = get_db()
cursor = db.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
if fetch_one:
result = cursor.fetchone()
elif fetch_all:
result = cursor.fetchall()
else:
result = None
cursor.close()
return result
except Exception as e:
logger.error(f"Database query error: {e}\nQuery: {query}\nParams: {params}")
raise
def execute_update(query, params=None):
"""
Execute an UPDATE, INSERT, or DELETE query
Args:
query: SQL query string
params: Query parameters (tuple or list)
Returns:
Number of affected rows
"""
try:
db = get_db()
cursor = db.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
affected_rows = cursor.rowcount
db.commit()
cursor.close()
logger.debug(f"Query executed. Affected rows: {affected_rows}")
return affected_rows
except Exception as e:
logger.error(f"Database update error: {e}\nQuery: {query}\nParams: {params}")
raise
def init_app(app):
"""Initialize database with Flask app"""
init_db(app)
app.teardown_appcontext(close_db)