Files
quality_recticel/py_app/app/settings.py
2025-09-14 21:26:29 +03:00

607 lines
22 KiB
Python

from flask import render_template, request, session, redirect, url_for, flash, current_app, jsonify
from .models import User
from . import db
from .permissions import APP_PERMISSIONS, ROLE_HIERARCHY, ACTIONS, get_all_permissions, get_default_permissions_for_role
import mariadb
import os
import json
# Global permission cache to avoid repeated database queries
_permission_cache = {}
def check_permission(permission_key, user_role=None):
"""
Check if the current user (or specified role) has a specific permission.
Args:
permission_key (str): The permission key like 'settings.user_management.create'
user_role (str, optional): Role to check. If None, uses current session role.
Returns:
bool: True if user has the permission, False otherwise
"""
if user_role is None:
user_role = session.get('role')
if not user_role:
return False
# Superadmin always has all permissions
if user_role == 'superadmin':
return True
# Check cache first
cache_key = f"{user_role}:{permission_key}"
if cache_key in _permission_cache:
return _permission_cache[cache_key]
try:
conn = get_external_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT granted FROM role_permissions
WHERE role = %s AND permission_key = %s
""", (user_role, permission_key))
result = cursor.fetchone()
conn.close()
# Cache the result
has_permission = bool(result and result[0])
_permission_cache[cache_key] = has_permission
return has_permission
except Exception as e:
print(f"Error checking permission {permission_key} for role {user_role}: {e}")
return False
def clear_permission_cache():
"""Clear the permission cache (call after permission updates)"""
global _permission_cache
_permission_cache = {}
def require_permission(permission_key):
"""
Decorator to require a specific permission for a route.
Usage:
@require_permission('settings.user_management.create')
def create_user():
...
"""
def decorator(f):
from functools import wraps
@wraps(f)
def decorated_function(*args, **kwargs):
if not check_permission(permission_key):
flash(f'Access denied: You do not have permission to {permission_key}')
return redirect(url_for('main.dashboard'))
return f(*args, **kwargs)
return decorated_function
return decorator
def get_user_permissions(user_role):
"""Get all permissions for a specific role"""
if user_role == 'superadmin':
return [p['key'] for p in get_all_permissions()]
try:
conn = get_external_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT permission_key FROM role_permissions
WHERE role = %s AND granted = TRUE
""", (user_role,))
result = cursor.fetchall()
conn.close()
return [row[0] for row in result]
except Exception as e:
print(f"Error getting permissions for role {user_role}: {e}")
return []
# Settings module logic
# Helper to check if current user is superadmin
def is_superadmin():
return session.get('role') == 'superadmin'
# Route handler for role permissions management
def role_permissions_handler():
if not is_superadmin():
flash('Access denied: Superadmin only.')
return redirect(url_for('main.dashboard'))
try:
# Get roles and their current permissions
conn = get_external_db_connection()
cursor = conn.cursor()
# Get roles from role_hierarchy table
cursor.execute("SELECT role_name, display_name, description, level FROM role_hierarchy ORDER BY level DESC")
role_data = cursor.fetchall()
roles = {}
for role_name, display_name, description, level in role_data:
roles[role_name] = {
'display_name': display_name,
'description': description,
'level': level
}
# Get current role permissions
cursor.execute("""
SELECT role, permission_key
FROM role_permissions
WHERE granted = TRUE
""")
permission_data = cursor.fetchall()
role_permissions = {}
for role, permission_key in permission_data:
if role not in role_permissions:
role_permissions[role] = []
role_permissions[role].append(permission_key)
conn.close()
# Convert to JSON for JavaScript
permissions_json = json.dumps(get_all_permissions())
role_permissions_json = json.dumps(role_permissions)
return render_template('role_permissions.html',
roles=roles,
pages=APP_PERMISSIONS,
action_names=ACTIONS,
permissions_json=permissions_json,
role_permissions_json=role_permissions_json)
except Exception as e:
flash(f'Error loading role permissions: {e}')
return redirect(url_for('main.settings'))
def settings_handler():
if 'role' not in session or session['role'] != 'superadmin':
flash('Access denied: Superadmin only.')
return redirect(url_for('main.dashboard'))
# Get users from external MariaDB database
users = []
try:
conn = get_external_db_connection()
cursor = conn.cursor()
# Create users table if it doesn't exist
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
password VARCHAR(255) NOT NULL,
role VARCHAR(50) NOT NULL,
email VARCHAR(255)
)
''')
# Get all users from external database
cursor.execute("SELECT id, username, password, role, email FROM users")
users_data = cursor.fetchall()
# Convert to list of dictionaries for template compatibility
users = []
for user_data in users_data:
users.append({
'id': user_data[0],
'username': user_data[1],
'password': user_data[2],
'role': user_data[3],
'email': user_data[4] if len(user_data) > 4 else None
})
conn.close()
except Exception as e:
print(f"Error fetching users from external database: {e}")
flash(f'Error loading users: {e}')
# Load external database settings from the instance folder
external_settings = {}
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
if os.path.exists(settings_file):
with open(settings_file, 'r') as f:
for line in f:
key, value = line.strip().split('=', 1)
external_settings[key] = value
return render_template('settings.html', users=users, external_settings=external_settings)
# Helper function to get external database connection
def get_external_db_connection():
"""Reads the external_server.conf file and returns a MariaDB database connection."""
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
if not os.path.exists(settings_file):
raise FileNotFoundError("The external_server.conf file is missing in the instance folder.")
# Read settings from the configuration file
settings = {}
with open(settings_file, 'r') as f:
for line in f:
key, value = line.strip().split('=', 1)
settings[key] = value
# Create a database connection
return mariadb.connect(
user=settings['username'],
password=settings['password'],
host=settings['server_domain'],
port=int(settings['port']),
database=settings['database_name']
)
# User management handlers
def create_user_handler():
if 'role' not in session or session['role'] != 'superadmin':
flash('Access denied: Superadmin only.')
return redirect(url_for('main.settings'))
username = request.form['username']
password = request.form['password']
role = request.form['role']
email = request.form.get('email', '').strip() or None # Optional field
try:
# Connect to external MariaDB database
conn = get_external_db_connection()
cursor = conn.cursor()
# Create users table if it doesn't exist
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
password VARCHAR(255) NOT NULL,
role VARCHAR(50) NOT NULL,
email VARCHAR(255)
)
''')
# Check if the username already exists
cursor.execute("SELECT id FROM users WHERE username = %s", (username,))
if cursor.fetchone():
flash('User already exists.')
conn.close()
return redirect(url_for('main.settings'))
# Create a new user in external MariaDB
cursor.execute("""
INSERT INTO users (username, password, role, email)
VALUES (%s, %s, %s, %s)
""", (username, password, role, email))
conn.commit()
conn.close()
flash('User created successfully in external database.')
except Exception as e:
print(f"Error creating user in external database: {e}")
flash(f'Error creating user: {e}')
return redirect(url_for('main.settings'))
def edit_user_handler():
if 'role' not in session or session['role'] != 'superadmin':
flash('Access denied: Superadmin only.')
return redirect(url_for('main.settings'))
user_id = request.form.get('user_id')
password = request.form.get('password', '').strip()
role = request.form.get('role')
email = request.form.get('email', '').strip() or None # Optional field
if not user_id or not role:
flash('Missing required fields.')
return redirect(url_for('main.settings'))
try:
# Connect to external MariaDB database
conn = get_external_db_connection()
cursor = conn.cursor()
# Check if the user exists
cursor.execute("SELECT id FROM users WHERE id = %s", (user_id,))
if not cursor.fetchone():
flash('User not found.')
conn.close()
return redirect(url_for('main.settings'))
# Update the user's details in external MariaDB
if password: # Only update password if provided
cursor.execute("""
UPDATE users SET password = %s, role = %s, email = %s WHERE id = %s
""", (password, role, email, user_id))
flash('User updated successfully (including password).')
else: # Just update role and email if no password provided
cursor.execute("""
UPDATE users SET role = %s, email = %s WHERE id = %s
""", (role, email, user_id))
flash('User role updated successfully.')
conn.commit()
conn.close()
except Exception as e:
print(f"Error updating user in external database: {e}")
flash(f'Error updating user: {e}')
return redirect(url_for('main.settings'))
def delete_user_handler():
if 'role' not in session or session['role'] != 'superadmin':
flash('Access denied: Superadmin only.')
return redirect(url_for('main.settings'))
user_id = request.form['user_id']
try:
# Connect to external MariaDB database
conn = get_external_db_connection()
cursor = conn.cursor()
# Check if the user exists
cursor.execute("SELECT id FROM users WHERE id = %s", (user_id,))
if not cursor.fetchone():
flash('User not found.')
conn.close()
return redirect(url_for('main.settings'))
# Delete the user from external MariaDB
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
conn.commit()
conn.close()
flash('User deleted successfully from external database.')
except Exception as e:
print(f"Error deleting user from external database: {e}")
flash(f'Error deleting user: {e}')
return redirect(url_for('main.settings'))
def save_external_db_handler():
if 'role' not in session or session['role'] != 'superadmin':
flash('Access denied: Superadmin only.')
return redirect(url_for('main.settings'))
# Get form data
server_domain = request.form['server_domain']
port = request.form['port']
database_name = request.form['database_name']
username = request.form['username']
password = request.form['password']
# Save data to a file in the instance folder
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
os.makedirs(os.path.dirname(settings_file), exist_ok=True)
with open(settings_file, 'w') as f:
f.write(f"server_domain={server_domain}\n")
f.write(f"port={port}\n")
f.write(f"database_name={database_name}\n")
f.write(f"username={username}\n")
f.write(f"password={password}\n")
flash('External database settings saved/updated successfully.')
return redirect(url_for('main.settings'))
def save_role_permissions_handler():
"""Save role permissions via AJAX"""
if not is_superadmin():
return jsonify({'success': False, 'error': 'Access denied: Superadmin only.'})
try:
data = request.get_json()
role = data.get('role')
permissions = data.get('permissions', [])
if not role:
return jsonify({'success': False, 'error': 'Role is required'})
conn = get_external_db_connection()
cursor = conn.cursor()
# Clear existing permissions for this role
cursor.execute("DELETE FROM role_permissions WHERE role = %s", (role,))
# Add new permissions
current_user = session.get('username', 'system')
for permission_key in permissions:
cursor.execute("""
INSERT INTO role_permissions (role, permission_key, granted, granted_by)
VALUES (%s, %s, TRUE, %s)
""", (role, permission_key, current_user))
# Log the change
cursor.execute("""
INSERT INTO permission_audit_log (role, permission_key, action, changed_by, reason)
VALUES (%s, %s, 'bulk_update', %s, %s)
""", (role, f"Updated {len(permissions)} permissions", current_user, f"Bulk update via UI"))
conn.commit()
conn.close()
# Clear permission cache since permissions changed
clear_permission_cache()
return jsonify({'success': True, 'message': f'Saved {len(permissions)} permissions for {role}'})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
def reset_role_permissions_handler():
"""Reset role permissions to defaults"""
if not is_superadmin():
return jsonify({'success': False, 'error': 'Access denied: Superadmin only.'})
try:
data = request.get_json()
role = data.get('role')
if not role:
return jsonify({'success': False, 'error': 'Role is required'})
# Get default permissions for the role
default_permissions = get_default_permissions_for_role(role)
conn = get_external_db_connection()
cursor = conn.cursor()
# Clear existing permissions for this role
cursor.execute("DELETE FROM role_permissions WHERE role = %s", (role,))
# Add default permissions
current_user = session.get('username', 'system')
for permission_key in default_permissions:
cursor.execute("""
INSERT INTO role_permissions (role, permission_key, granted, granted_by)
VALUES (%s, %s, TRUE, %s)
""", (role, permission_key, current_user))
# Log the change
cursor.execute("""
INSERT INTO permission_audit_log (role, permission_key, action, changed_by, reason)
VALUES (%s, %s, 'reset_defaults', %s, %s)
""", (role, f"Reset {len(default_permissions)} permissions", current_user, "Reset to default permissions"))
conn.commit()
conn.close()
# Clear permission cache since permissions changed
clear_permission_cache()
return jsonify({
'success': True,
'permissions': default_permissions,
'message': f'Reset {len(default_permissions)} permissions for {role} to defaults'
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
def save_all_role_permissions_handler():
"""Save all role permissions at once"""
if not is_superadmin():
return jsonify({'success': False, 'error': 'Access denied: Superadmin only.'})
try:
data = request.get_json()
permissions_data = data.get('permissions', {})
if not permissions_data:
return jsonify({'success': False, 'error': 'No permissions data provided'})
conn = get_external_db_connection()
cursor = conn.cursor()
current_user = session.get('username', 'system')
total_updated = 0
# Process each role's permissions
for role, role_permissions in permissions_data.items():
# Clear existing permissions for this role
cursor.execute("DELETE FROM role_permissions WHERE role = %s", (role,))
# Convert nested permissions to flat permission keys
permission_keys = []
for page_key, page_perms in role_permissions.items():
for section_key, actions in page_perms.items():
for action in actions:
permission_key = f"{page_key}.{section_key}.{action}"
permission_keys.append(permission_key)
# Insert new permissions
for permission_key in permission_keys:
cursor.execute("""
INSERT INTO role_permissions (role, permission_key, granted, granted_by)
VALUES (%s, %s, TRUE, %s)
""", (role, permission_key, current_user))
total_updated += 1
# Log the change
cursor.execute("""
INSERT INTO permission_audit_log (role, permission_key, action, changed_by, reason)
VALUES (%s, %s, 'bulk_update', %s, %s)
""", (role, f"Updated {len(permission_keys)} permissions", current_user, "Bulk permission update"))
conn.commit()
conn.close()
# Clear permission cache since permissions changed
clear_permission_cache()
return jsonify({
'success': True,
'message': f'Successfully updated {total_updated} permissions across {len(permissions_data)} roles'
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
def reset_all_role_permissions_handler():
"""Reset all role permissions to defaults"""
if not is_superadmin():
return jsonify({'success': False, 'error': 'Access denied: Superadmin only.'})
try:
# Get all roles
conn = get_external_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT role_name FROM role_hierarchy")
roles = [row[0] for row in cursor.fetchall()]
current_user = session.get('username', 'system')
total_reset = 0
# Reset each role to defaults
for role in roles:
# Clear existing permissions
cursor.execute("DELETE FROM role_permissions WHERE role = %s", (role,))
# Get default permissions for the role
default_permissions = get_default_permissions_for_role(role)
# Add default permissions
for permission_key in default_permissions:
cursor.execute("""
INSERT INTO role_permissions (role, permission_key, granted, granted_by)
VALUES (%s, %s, TRUE, %s)
""", (role, permission_key, current_user))
total_reset += 1
# Log the change
cursor.execute("""
INSERT INTO permission_audit_log (role, permission_key, action, changed_by, reason)
VALUES (%s, %s, 'reset_all_defaults', %s, %s)
""", (role, f"Reset {len(default_permissions)} permissions", current_user, "Reset all to default permissions"))
conn.commit()
conn.close()
# Clear permission cache since permissions changed
clear_permission_cache()
return jsonify({
'success': True,
'message': f'Successfully reset {total_reset} permissions across {len(roles)} roles to defaults'
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})