130 lines
3.9 KiB
Python
130 lines
3.9 KiB
Python
"""
|
|
Authentication utilities for login and session management
|
|
"""
|
|
import hashlib
|
|
import logging
|
|
from app.database import execute_query, execute_update
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def hash_password(password):
|
|
"""Hash a password using SHA256"""
|
|
return hashlib.sha256(password.encode()).hexdigest()
|
|
|
|
|
|
def verify_password(plain_password, hashed_password):
|
|
"""Verify a plain password against a hashed password"""
|
|
return hash_password(plain_password) == hashed_password
|
|
|
|
|
|
def authenticate_user(username, password):
|
|
"""
|
|
Authenticate a user by username and password
|
|
|
|
Args:
|
|
username: User's username
|
|
password: User's password (plain text)
|
|
|
|
Returns:
|
|
User dict if authentication successful, None otherwise
|
|
"""
|
|
try:
|
|
query = """
|
|
SELECT id, username, email, role, is_active, full_name
|
|
FROM users
|
|
WHERE username = %s AND is_active = 1
|
|
"""
|
|
|
|
result = execute_query(query, (username,), fetch_one=True)
|
|
|
|
if not result:
|
|
logger.warning(f"Login attempt for non-existent user: {username}")
|
|
return None
|
|
|
|
user_id, user_username, email, role, is_active, full_name = result
|
|
|
|
# Get stored password hash
|
|
password_query = "SELECT password_hash FROM user_credentials WHERE user_id = %s"
|
|
password_result = execute_query(password_query, (user_id,), fetch_one=True)
|
|
|
|
if not password_result:
|
|
logger.warning(f"No password hash found for user: {username}")
|
|
return None
|
|
|
|
password_hash = password_result[0]
|
|
|
|
if not verify_password(password, password_hash):
|
|
logger.warning(f"Invalid password for user: {username}")
|
|
return None
|
|
|
|
logger.info(f"User authenticated successfully: {username}")
|
|
|
|
return {
|
|
'id': user_id,
|
|
'username': user_username,
|
|
'email': email,
|
|
'role': role,
|
|
'full_name': full_name
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Authentication error: {e}")
|
|
return None
|
|
|
|
|
|
def get_user_by_id(user_id):
|
|
"""Get user information by user ID"""
|
|
try:
|
|
query = """
|
|
SELECT id, username, email, role, is_active, full_name
|
|
FROM users
|
|
WHERE id = %s
|
|
"""
|
|
result = execute_query(query, (user_id,), fetch_one=True)
|
|
|
|
if result:
|
|
user_id, username, email, role, is_active, full_name = result
|
|
return {
|
|
'id': user_id,
|
|
'username': username,
|
|
'email': email,
|
|
'role': role,
|
|
'full_name': full_name
|
|
}
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error getting user by ID: {e}")
|
|
return None
|
|
|
|
|
|
def create_user(username, email, password, full_name, role='user'):
|
|
"""Create a new user"""
|
|
try:
|
|
password_hash = hash_password(password)
|
|
|
|
# Insert into users table
|
|
user_query = """
|
|
INSERT INTO users (username, email, full_name, role, is_active)
|
|
VALUES (%s, %s, %s, %s, 1)
|
|
"""
|
|
execute_update(user_query, (username, email, full_name, role))
|
|
|
|
# Get the inserted user ID
|
|
get_id_query = "SELECT id FROM users WHERE username = %s"
|
|
result = execute_query(get_id_query, (username,), fetch_one=True)
|
|
user_id = result[0]
|
|
|
|
# Insert password hash
|
|
cred_query = """
|
|
INSERT INTO user_credentials (user_id, password_hash)
|
|
VALUES (%s, %s)
|
|
"""
|
|
execute_update(cred_query, (user_id, password_hash))
|
|
|
|
logger.info(f"User created successfully: {username}")
|
|
return user_id
|
|
except Exception as e:
|
|
logger.error(f"Error creating user: {e}")
|
|
return None
|