""" Database connection management and initialization Uses connection pooling to manage database connections efficiently """ import pymysql import logging from dbutils.pooled_db import PooledDB from flask import g logger = logging.getLogger(__name__) # Global database pool db_pool = None def init_db(app): """Initialize database connection pool""" global db_pool try: db_pool = PooledDB( creator=pymysql, maxconnections=app.config.get('DB_POOL_SIZE', 10), mincached=2, maxcached=5, maxshared=3, blocking=True, maxusage=None, setsession=[], ping=1, # PyMySQL connection parameters user=app.config['DB_USER'], password=app.config['DB_PASSWORD'], host=app.config['DB_HOST'], port=app.config['DB_PORT'], database=app.config['DB_NAME'], ) logger.info(f"Database pool initialized: {app.config['DB_HOST']}:{app.config['DB_PORT']}/{app.config['DB_NAME']}") except Exception as e: logger.error(f"Failed to initialize database pool: {e}") raise def get_db(): """Get database connection from pool""" if db_pool is None: raise RuntimeError("Database pool not initialized") if 'db' not in g: try: g.db = db_pool.connection() logger.debug("Database connection obtained from pool") except Exception as e: logger.error(f"Failed to get database connection: {e}") raise return g.db def close_db(e=None): """Close database connection""" db = g.pop('db', None) if db is not None: try: db.close() logger.debug("Database connection closed") except Exception as e: logger.error(f"Error closing database connection: {e}") def execute_query(query, params=None, fetch_one=False, fetch_all=True): """ Execute a database query Args: query: SQL query string params: Query parameters (tuple or list) fetch_one: Fetch only one row fetch_all: Fetch all rows (if fetch_one is False) Returns: Query result or None """ try: db = get_db() cursor = db.cursor() if params: cursor.execute(query, params) else: cursor.execute(query) if fetch_one: result = cursor.fetchone() elif fetch_all: result = cursor.fetchall() else: result = None cursor.close() return result except Exception as e: logger.error(f"Database query error: {e}\nQuery: {query}\nParams: {params}") raise def execute_update(query, params=None): """ Execute an UPDATE, INSERT, or DELETE query Args: query: SQL query string params: Query parameters (tuple or list) Returns: Number of affected rows """ try: db = get_db() cursor = db.cursor() if params: cursor.execute(query, params) else: cursor.execute(query) affected_rows = cursor.rowcount db.commit() cursor.close() logger.debug(f"Query executed. Affected rows: {affected_rows}") return affected_rows except Exception as e: logger.error(f"Database update error: {e}\nQuery: {query}\nParams: {params}") raise def init_app(app): """Initialize database with Flask app""" init_db(app) app.teardown_appcontext(close_db)