""" Authentication utilities for login and session management """ import hashlib import logging from app.database import execute_query, execute_update logger = logging.getLogger(__name__) def hash_password(password): """Hash a password using SHA256""" return hashlib.sha256(password.encode()).hexdigest() def verify_password(plain_password, hashed_password): """Verify a plain password against a hashed password""" return hash_password(plain_password) == hashed_password def authenticate_user(username, password): """ Authenticate a user by username and password Args: username: User's username password: User's password (plain text) Returns: User dict if authentication successful, None otherwise """ try: query = """ SELECT id, username, email, role, is_active, full_name FROM users WHERE username = %s AND is_active = 1 """ result = execute_query(query, (username,), fetch_one=True) if not result: logger.warning(f"Login attempt for non-existent user: {username}") return None user_id, user_username, email, role, is_active, full_name = result # Get stored password hash password_query = "SELECT password_hash FROM user_credentials WHERE user_id = %s" password_result = execute_query(password_query, (user_id,), fetch_one=True) if not password_result: logger.warning(f"No password hash found for user: {username}") return None password_hash = password_result[0] if not verify_password(password, password_hash): logger.warning(f"Invalid password for user: {username}") return None logger.info(f"User authenticated successfully: {username}") return { 'id': user_id, 'username': user_username, 'email': email, 'role': role, 'full_name': full_name } except Exception as e: logger.error(f"Authentication error: {e}") return None def get_user_by_id(user_id): """Get user information by user ID""" try: query = """ SELECT id, username, email, role, is_active, full_name FROM users WHERE id = %s """ result = execute_query(query, (user_id,), fetch_one=True) if result: user_id, username, email, role, is_active, full_name = result return { 'id': user_id, 'username': username, 'email': email, 'role': role, 'full_name': full_name } return None except Exception as e: logger.error(f"Error getting user by ID: {e}") return None def create_user(username, email, password, full_name, role='user'): """Create a new user""" try: password_hash = hash_password(password) # Insert into users table user_query = """ INSERT INTO users (username, email, full_name, role, is_active) VALUES (%s, %s, %s, %s, 1) """ execute_update(user_query, (username, email, full_name, role)) # Get the inserted user ID get_id_query = "SELECT id FROM users WHERE username = %s" result = execute_query(get_id_query, (username,), fetch_one=True) user_id = result[0] # Insert password hash cred_query = """ INSERT INTO user_credentials (user_id, password_hash) VALUES (%s, %s) """ execute_update(cred_query, (user_id, password_hash)) logger.info(f"User created successfully: {username}") return user_id except Exception as e: logger.error(f"Error creating user: {e}") return None