607 lines
22 KiB
Python
Executable File
607 lines
22 KiB
Python
Executable File
from flask import render_template, request, session, redirect, url_for, flash, current_app, jsonify
|
|
from .models import User
|
|
from . import db
|
|
from .permissions import APP_PERMISSIONS, ROLE_HIERARCHY, ACTIONS, get_all_permissions, get_default_permissions_for_role
|
|
import mariadb
|
|
import os
|
|
import json
|
|
|
|
# Global permission cache to avoid repeated database queries
|
|
_permission_cache = {}
|
|
|
|
def check_permission(permission_key, user_role=None):
|
|
"""
|
|
Check if the current user (or specified role) has a specific permission.
|
|
|
|
Args:
|
|
permission_key (str): The permission key like 'settings.user_management.create'
|
|
user_role (str, optional): Role to check. If None, uses current session role.
|
|
|
|
Returns:
|
|
bool: True if user has the permission, False otherwise
|
|
"""
|
|
if user_role is None:
|
|
user_role = session.get('role')
|
|
|
|
if not user_role:
|
|
return False
|
|
|
|
# Superadmin always has all permissions
|
|
if user_role == 'superadmin':
|
|
return True
|
|
|
|
# Check cache first
|
|
cache_key = f"{user_role}:{permission_key}"
|
|
if cache_key in _permission_cache:
|
|
return _permission_cache[cache_key]
|
|
|
|
try:
|
|
conn = get_external_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT granted FROM role_permissions
|
|
WHERE role = %s AND permission_key = %s
|
|
""", (user_role, permission_key))
|
|
|
|
result = cursor.fetchone()
|
|
conn.close()
|
|
|
|
# Cache the result
|
|
has_permission = bool(result and result[0])
|
|
_permission_cache[cache_key] = has_permission
|
|
return has_permission
|
|
|
|
except Exception as e:
|
|
print(f"Error checking permission {permission_key} for role {user_role}: {e}")
|
|
return False
|
|
|
|
def clear_permission_cache():
|
|
"""Clear the permission cache (call after permission updates)"""
|
|
global _permission_cache
|
|
_permission_cache = {}
|
|
|
|
def require_permission(permission_key):
|
|
"""
|
|
Decorator to require a specific permission for a route.
|
|
|
|
Usage:
|
|
@require_permission('settings.user_management.create')
|
|
def create_user():
|
|
...
|
|
"""
|
|
def decorator(f):
|
|
from functools import wraps
|
|
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if not check_permission(permission_key):
|
|
flash(f'Access denied: You do not have permission to {permission_key}')
|
|
return redirect(url_for('main.dashboard'))
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
return decorator
|
|
|
|
def get_user_permissions(user_role):
|
|
"""Get all permissions for a specific role"""
|
|
if user_role == 'superadmin':
|
|
return [p['key'] for p in get_all_permissions()]
|
|
|
|
try:
|
|
conn = get_external_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT permission_key FROM role_permissions
|
|
WHERE role = %s AND granted = TRUE
|
|
""", (user_role,))
|
|
|
|
result = cursor.fetchall()
|
|
conn.close()
|
|
|
|
return [row[0] for row in result]
|
|
|
|
except Exception as e:
|
|
print(f"Error getting permissions for role {user_role}: {e}")
|
|
return []
|
|
|
|
# Settings module logic
|
|
|
|
# Helper to check if current user is superadmin
|
|
def is_superadmin():
|
|
return session.get('role') == 'superadmin'
|
|
|
|
# Route handler for role permissions management
|
|
def role_permissions_handler():
|
|
if not is_superadmin():
|
|
flash('Access denied: Superadmin only.')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
try:
|
|
# Get roles and their current permissions
|
|
conn = get_external_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Get roles from role_hierarchy table
|
|
cursor.execute("SELECT role_name, display_name, description, level FROM role_hierarchy ORDER BY level DESC")
|
|
role_data = cursor.fetchall()
|
|
|
|
roles = {}
|
|
for role_name, display_name, description, level in role_data:
|
|
roles[role_name] = {
|
|
'display_name': display_name,
|
|
'description': description,
|
|
'level': level
|
|
}
|
|
|
|
# Get current role permissions
|
|
cursor.execute("""
|
|
SELECT role, permission_key
|
|
FROM role_permissions
|
|
WHERE granted = TRUE
|
|
""")
|
|
permission_data = cursor.fetchall()
|
|
|
|
role_permissions = {}
|
|
for role, permission_key in permission_data:
|
|
if role not in role_permissions:
|
|
role_permissions[role] = []
|
|
role_permissions[role].append(permission_key)
|
|
|
|
conn.close()
|
|
|
|
# Convert to JSON for JavaScript
|
|
permissions_json = json.dumps(get_all_permissions())
|
|
role_permissions_json = json.dumps(role_permissions)
|
|
|
|
return render_template('role_permissions.html',
|
|
roles=roles,
|
|
pages=APP_PERMISSIONS,
|
|
action_names=ACTIONS,
|
|
permissions_json=permissions_json,
|
|
role_permissions_json=role_permissions_json)
|
|
|
|
except Exception as e:
|
|
flash(f'Error loading role permissions: {e}')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
|
|
|
|
def settings_handler():
|
|
if 'role' not in session or session['role'] != 'superadmin':
|
|
flash('Access denied: Superadmin only.')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
# Get users from external MariaDB database
|
|
users = []
|
|
try:
|
|
conn = get_external_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Create users table if it doesn't exist
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
username VARCHAR(50) UNIQUE NOT NULL,
|
|
password VARCHAR(255) NOT NULL,
|
|
role VARCHAR(50) NOT NULL,
|
|
email VARCHAR(255)
|
|
)
|
|
''')
|
|
|
|
# Get all users from external database
|
|
cursor.execute("SELECT id, username, password, role, email FROM users")
|
|
users_data = cursor.fetchall()
|
|
|
|
# Convert to list of dictionaries for template compatibility
|
|
users = []
|
|
for user_data in users_data:
|
|
users.append({
|
|
'id': user_data[0],
|
|
'username': user_data[1],
|
|
'password': user_data[2],
|
|
'role': user_data[3],
|
|
'email': user_data[4] if len(user_data) > 4 else None
|
|
})
|
|
|
|
conn.close()
|
|
|
|
except Exception as e:
|
|
print(f"Error fetching users from external database: {e}")
|
|
flash(f'Error loading users: {e}')
|
|
|
|
# Load external database settings from the instance folder
|
|
external_settings = {}
|
|
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
|
|
if os.path.exists(settings_file):
|
|
with open(settings_file, 'r') as f:
|
|
for line in f:
|
|
key, value = line.strip().split('=', 1)
|
|
external_settings[key] = value
|
|
|
|
return render_template('settings.html', users=users, external_settings=external_settings)
|
|
|
|
# Helper function to get external database connection
|
|
def get_external_db_connection():
|
|
"""Reads the external_server.conf file and returns a MariaDB database connection."""
|
|
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
|
|
if not os.path.exists(settings_file):
|
|
raise FileNotFoundError("The external_server.conf file is missing in the instance folder.")
|
|
|
|
# Read settings from the configuration file
|
|
settings = {}
|
|
with open(settings_file, 'r') as f:
|
|
for line in f:
|
|
key, value = line.strip().split('=', 1)
|
|
settings[key] = value
|
|
|
|
# Create a database connection
|
|
return mariadb.connect(
|
|
user=settings['username'],
|
|
password=settings['password'],
|
|
host=settings['server_domain'],
|
|
port=int(settings['port']),
|
|
database=settings['database_name']
|
|
)
|
|
|
|
# User management handlers
|
|
def create_user_handler():
|
|
if 'role' not in session or session['role'] != 'superadmin':
|
|
flash('Access denied: Superadmin only.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
role = request.form['role']
|
|
email = request.form.get('email', '').strip() or None # Optional field
|
|
|
|
try:
|
|
# Connect to external MariaDB database
|
|
conn = get_external_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Create users table if it doesn't exist
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
username VARCHAR(50) UNIQUE NOT NULL,
|
|
password VARCHAR(255) NOT NULL,
|
|
role VARCHAR(50) NOT NULL,
|
|
email VARCHAR(255)
|
|
)
|
|
''')
|
|
|
|
# Check if the username already exists
|
|
cursor.execute("SELECT id FROM users WHERE username = %s", (username,))
|
|
if cursor.fetchone():
|
|
flash('User already exists.')
|
|
conn.close()
|
|
return redirect(url_for('main.settings'))
|
|
|
|
# Create a new user in external MariaDB
|
|
cursor.execute("""
|
|
INSERT INTO users (username, password, role, email)
|
|
VALUES (%s, %s, %s, %s)
|
|
""", (username, password, role, email))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
flash('User created successfully in external database.')
|
|
|
|
except Exception as e:
|
|
print(f"Error creating user in external database: {e}")
|
|
flash(f'Error creating user: {e}')
|
|
|
|
return redirect(url_for('main.settings'))
|
|
|
|
def edit_user_handler():
|
|
if 'role' not in session or session['role'] != 'superadmin':
|
|
flash('Access denied: Superadmin only.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
user_id = request.form.get('user_id')
|
|
password = request.form.get('password', '').strip()
|
|
role = request.form.get('role')
|
|
email = request.form.get('email', '').strip() or None # Optional field
|
|
|
|
if not user_id or not role:
|
|
flash('Missing required fields.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
try:
|
|
# Connect to external MariaDB database
|
|
conn = get_external_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if the user exists
|
|
cursor.execute("SELECT id FROM users WHERE id = %s", (user_id,))
|
|
if not cursor.fetchone():
|
|
flash('User not found.')
|
|
conn.close()
|
|
return redirect(url_for('main.settings'))
|
|
|
|
# Update the user's details in external MariaDB
|
|
if password: # Only update password if provided
|
|
cursor.execute("""
|
|
UPDATE users SET password = %s, role = %s, email = %s WHERE id = %s
|
|
""", (password, role, email, user_id))
|
|
flash('User updated successfully (including password).')
|
|
else: # Just update role and email if no password provided
|
|
cursor.execute("""
|
|
UPDATE users SET role = %s, email = %s WHERE id = %s
|
|
""", (role, email, user_id))
|
|
flash('User role updated successfully.')
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
except Exception as e:
|
|
print(f"Error updating user in external database: {e}")
|
|
flash(f'Error updating user: {e}')
|
|
|
|
return redirect(url_for('main.settings'))
|
|
|
|
def delete_user_handler():
|
|
if 'role' not in session or session['role'] != 'superadmin':
|
|
flash('Access denied: Superadmin only.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
user_id = request.form['user_id']
|
|
|
|
try:
|
|
# Connect to external MariaDB database
|
|
conn = get_external_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if the user exists
|
|
cursor.execute("SELECT id FROM users WHERE id = %s", (user_id,))
|
|
if not cursor.fetchone():
|
|
flash('User not found.')
|
|
conn.close()
|
|
return redirect(url_for('main.settings'))
|
|
|
|
# Delete the user from external MariaDB
|
|
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
flash('User deleted successfully from external database.')
|
|
|
|
except Exception as e:
|
|
print(f"Error deleting user from external database: {e}")
|
|
flash(f'Error deleting user: {e}')
|
|
|
|
return redirect(url_for('main.settings'))
|
|
|
|
def save_external_db_handler():
|
|
if 'role' not in session or session['role'] != 'superadmin':
|
|
flash('Access denied: Superadmin only.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
# Get form data
|
|
server_domain = request.form['server_domain']
|
|
port = request.form['port']
|
|
database_name = request.form['database_name']
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
|
|
# Save data to a file in the instance folder
|
|
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
|
|
os.makedirs(os.path.dirname(settings_file), exist_ok=True)
|
|
with open(settings_file, 'w') as f:
|
|
f.write(f"server_domain={server_domain}\n")
|
|
f.write(f"port={port}\n")
|
|
f.write(f"database_name={database_name}\n")
|
|
f.write(f"username={username}\n")
|
|
f.write(f"password={password}\n")
|
|
|
|
flash('External database settings saved/updated successfully.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
def save_role_permissions_handler():
|
|
"""Save role permissions via AJAX"""
|
|
if not is_superadmin():
|
|
return jsonify({'success': False, 'error': 'Access denied: Superadmin only.'})
|
|
|
|
try:
|
|
data = request.get_json()
|
|
role = data.get('role')
|
|
permissions = data.get('permissions', [])
|
|
|
|
if not role:
|
|
return jsonify({'success': False, 'error': 'Role is required'})
|
|
|
|
conn = get_external_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Clear existing permissions for this role
|
|
cursor.execute("DELETE FROM role_permissions WHERE role = %s", (role,))
|
|
|
|
# Add new permissions
|
|
current_user = session.get('username', 'system')
|
|
for permission_key in permissions:
|
|
cursor.execute("""
|
|
INSERT INTO role_permissions (role, permission_key, granted, granted_by)
|
|
VALUES (%s, %s, TRUE, %s)
|
|
""", (role, permission_key, current_user))
|
|
|
|
# Log the change
|
|
cursor.execute("""
|
|
INSERT INTO permission_audit_log (role, permission_key, action, changed_by, reason)
|
|
VALUES (%s, %s, 'bulk_update', %s, %s)
|
|
""", (role, f"Updated {len(permissions)} permissions", current_user, f"Bulk update via UI"))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# Clear permission cache since permissions changed
|
|
clear_permission_cache()
|
|
|
|
return jsonify({'success': True, 'message': f'Saved {len(permissions)} permissions for {role}'})
|
|
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'error': str(e)})
|
|
|
|
def reset_role_permissions_handler():
|
|
"""Reset role permissions to defaults"""
|
|
if not is_superadmin():
|
|
return jsonify({'success': False, 'error': 'Access denied: Superadmin only.'})
|
|
|
|
try:
|
|
data = request.get_json()
|
|
role = data.get('role')
|
|
|
|
if not role:
|
|
return jsonify({'success': False, 'error': 'Role is required'})
|
|
|
|
# Get default permissions for the role
|
|
default_permissions = get_default_permissions_for_role(role)
|
|
|
|
conn = get_external_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Clear existing permissions for this role
|
|
cursor.execute("DELETE FROM role_permissions WHERE role = %s", (role,))
|
|
|
|
# Add default permissions
|
|
current_user = session.get('username', 'system')
|
|
for permission_key in default_permissions:
|
|
cursor.execute("""
|
|
INSERT INTO role_permissions (role, permission_key, granted, granted_by)
|
|
VALUES (%s, %s, TRUE, %s)
|
|
""", (role, permission_key, current_user))
|
|
|
|
# Log the change
|
|
cursor.execute("""
|
|
INSERT INTO permission_audit_log (role, permission_key, action, changed_by, reason)
|
|
VALUES (%s, %s, 'reset_defaults', %s, %s)
|
|
""", (role, f"Reset {len(default_permissions)} permissions", current_user, "Reset to default permissions"))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# Clear permission cache since permissions changed
|
|
clear_permission_cache()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'permissions': default_permissions,
|
|
'message': f'Reset {len(default_permissions)} permissions for {role} to defaults'
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'error': str(e)})
|
|
|
|
|
|
def save_all_role_permissions_handler():
|
|
"""Save all role permissions at once"""
|
|
if not is_superadmin():
|
|
return jsonify({'success': False, 'error': 'Access denied: Superadmin only.'})
|
|
|
|
try:
|
|
data = request.get_json()
|
|
permissions_data = data.get('permissions', {})
|
|
|
|
if not permissions_data:
|
|
return jsonify({'success': False, 'error': 'No permissions data provided'})
|
|
|
|
conn = get_external_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
current_user = session.get('username', 'system')
|
|
total_updated = 0
|
|
|
|
# Process each role's permissions
|
|
for role, role_permissions in permissions_data.items():
|
|
# Clear existing permissions for this role
|
|
cursor.execute("DELETE FROM role_permissions WHERE role = %s", (role,))
|
|
|
|
# Convert nested permissions to flat permission keys
|
|
permission_keys = []
|
|
for page_key, page_perms in role_permissions.items():
|
|
for section_key, actions in page_perms.items():
|
|
for action in actions:
|
|
permission_key = f"{page_key}.{section_key}.{action}"
|
|
permission_keys.append(permission_key)
|
|
|
|
# Insert new permissions
|
|
for permission_key in permission_keys:
|
|
cursor.execute("""
|
|
INSERT INTO role_permissions (role, permission_key, granted, granted_by)
|
|
VALUES (%s, %s, TRUE, %s)
|
|
""", (role, permission_key, current_user))
|
|
total_updated += 1
|
|
|
|
# Log the change
|
|
cursor.execute("""
|
|
INSERT INTO permission_audit_log (role, permission_key, action, changed_by, reason)
|
|
VALUES (%s, %s, 'bulk_update', %s, %s)
|
|
""", (role, f"Updated {len(permission_keys)} permissions", current_user, "Bulk permission update"))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# Clear permission cache since permissions changed
|
|
clear_permission_cache()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Successfully updated {total_updated} permissions across {len(permissions_data)} roles'
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'error': str(e)})
|
|
|
|
|
|
def reset_all_role_permissions_handler():
|
|
"""Reset all role permissions to defaults"""
|
|
if not is_superadmin():
|
|
return jsonify({'success': False, 'error': 'Access denied: Superadmin only.'})
|
|
|
|
try:
|
|
# Get all roles
|
|
conn = get_external_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("SELECT role_name FROM role_hierarchy")
|
|
roles = [row[0] for row in cursor.fetchall()]
|
|
|
|
current_user = session.get('username', 'system')
|
|
total_reset = 0
|
|
|
|
# Reset each role to defaults
|
|
for role in roles:
|
|
# Clear existing permissions
|
|
cursor.execute("DELETE FROM role_permissions WHERE role = %s", (role,))
|
|
|
|
# Get default permissions for the role
|
|
default_permissions = get_default_permissions_for_role(role)
|
|
|
|
# Add default permissions
|
|
for permission_key in default_permissions:
|
|
cursor.execute("""
|
|
INSERT INTO role_permissions (role, permission_key, granted, granted_by)
|
|
VALUES (%s, %s, TRUE, %s)
|
|
""", (role, permission_key, current_user))
|
|
total_reset += 1
|
|
|
|
# Log the change
|
|
cursor.execute("""
|
|
INSERT INTO permission_audit_log (role, permission_key, action, changed_by, reason)
|
|
VALUES (%s, %s, 'reset_all_defaults', %s, %s)
|
|
""", (role, f"Reset {len(default_permissions)} permissions", current_user, "Reset all to default permissions"))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# Clear permission cache since permissions changed
|
|
clear_permission_cache()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Successfully reset {total_reset} permissions across {len(roles)} roles to defaults'
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'error': str(e)})
|