updated roles ant permissions

This commit is contained in:
2025-09-12 22:14:51 +03:00
parent 4597595db4
commit 9d80252c14
15 changed files with 1875 additions and 113 deletions

View File

@@ -1,62 +1,171 @@
from flask import render_template, request, session, redirect, url_for, flash, current_app
from flask import render_template, request, session, redirect, url_for, flash, current_app, jsonify
from .models import User
from . import db
from .permissions import APP_PERMISSIONS, ROLE_HIERARCHY, ACTIONS, get_all_permissions, get_default_permissions_for_role
import mariadb
import os
import json
# Global permission cache to avoid repeated database queries
_permission_cache = {}
def check_permission(permission_key, user_role=None):
"""
Check if the current user (or specified role) has a specific permission.
Args:
permission_key (str): The permission key like 'settings.user_management.create'
user_role (str, optional): Role to check. If None, uses current session role.
Returns:
bool: True if user has the permission, False otherwise
"""
if user_role is None:
user_role = session.get('role')
if not user_role:
return False
# Superadmin always has all permissions
if user_role == 'superadmin':
return True
# Check cache first
cache_key = f"{user_role}:{permission_key}"
if cache_key in _permission_cache:
return _permission_cache[cache_key]
try:
conn = get_external_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT granted FROM role_permissions
WHERE role = %s AND permission_key = %s
""", (user_role, permission_key))
result = cursor.fetchone()
conn.close()
# Cache the result
has_permission = bool(result and result[0])
_permission_cache[cache_key] = has_permission
return has_permission
except Exception as e:
print(f"Error checking permission {permission_key} for role {user_role}: {e}")
return False
def clear_permission_cache():
"""Clear the permission cache (call after permission updates)"""
global _permission_cache
_permission_cache = {}
def require_permission(permission_key):
"""
Decorator to require a specific permission for a route.
Usage:
@require_permission('settings.user_management.create')
def create_user():
...
"""
def decorator(f):
from functools import wraps
@wraps(f)
def decorated_function(*args, **kwargs):
if not check_permission(permission_key):
flash(f'Access denied: You do not have permission to {permission_key}')
return redirect(url_for('main.dashboard'))
return f(*args, **kwargs)
return decorated_function
return decorator
def get_user_permissions(user_role):
"""Get all permissions for a specific role"""
if user_role == 'superadmin':
return [p['key'] for p in get_all_permissions()]
try:
conn = get_external_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT permission_key FROM role_permissions
WHERE role = %s AND granted = TRUE
""", (user_role,))
result = cursor.fetchall()
conn.close()
return [row[0] for row in result]
except Exception as e:
print(f"Error getting permissions for role {user_role}: {e}")
return []
# Settings module logic
import sqlite3
import os
def ensure_roles_table():
instance_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../instance'))
if not os.path.exists(instance_folder):
os.makedirs(instance_folder)
db_path = os.path.join(instance_folder, 'users.db')
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS roles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
access_level TEXT NOT NULL,
description TEXT
)
""")
cursor.execute("""
INSERT OR IGNORE INTO roles (name, access_level, description)
VALUES (?, ?, ?)
""", ('superadmin', 'full', 'Full access to all app areas and functions'))
conn.commit()
conn.close()
# List of roles (should match your app's roles)
ROLES = [
'superadmin', 'admin', 'manager', 'warehouse_manager', 'warehouse_worker', 'quality_manager', 'quality_worker'
]
# Helper to check if current user is superadmin
def is_superadmin():
return session.get('role') == 'superadmin'
# Route handler for editing access roles
def edit_access_roles_handler():
# Route handler for role permissions management
def role_permissions_handler():
if not is_superadmin():
flash('Access denied: Superadmin only.')
return redirect(url_for('main.dashboard'))
ensure_roles_table()
return render_template('edit_access_roles.html', roles=ROLES)
try:
# Get roles and their current permissions
conn = get_external_db_connection()
cursor = conn.cursor()
# Get roles from role_hierarchy table
cursor.execute("SELECT role_name, display_name, description, level FROM role_hierarchy ORDER BY level DESC")
role_data = cursor.fetchall()
roles = {}
for role_name, display_name, description, level in role_data:
roles[role_name] = {
'display_name': display_name,
'description': description,
'level': level
}
# Get current role permissions
cursor.execute("""
SELECT role, permission_key
FROM role_permissions
WHERE granted = TRUE
""")
permission_data = cursor.fetchall()
role_permissions = {}
for role, permission_key in permission_data:
if role not in role_permissions:
role_permissions[role] = []
role_permissions[role].append(permission_key)
conn.close()
# Convert to JSON for JavaScript
permissions_json = json.dumps(get_all_permissions())
role_permissions_json = json.dumps(role_permissions)
return render_template('role_permissions.html',
roles=roles,
pages=APP_PERMISSIONS,
action_names=ACTIONS,
permissions_json=permissions_json,
role_permissions_json=role_permissions_json)
except Exception as e:
flash(f'Error loading role permissions: {e}')
return redirect(url_for('main.settings'))
# Handler for updating role access (stub, to be implemented)
def update_role_access_handler(role):
if not is_superadmin():
flash('Access denied: Superadmin only.')
return redirect(url_for('main.dashboard'))
if role == 'superadmin':
flash('Superadmin access cannot be changed.')
return redirect(url_for('main.edit_access_roles'))
access_level = request.form.get('access_level')
# TODO: Save access_level for the role in the database or config
flash(f'Access for role {role} updated to {access_level}.')
return redirect(url_for('main.edit_access_roles'))
def settings_handler():
if 'role' not in session or session['role'] != 'superadmin':
@@ -75,12 +184,13 @@ def settings_handler():
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
password VARCHAR(255) NOT NULL,
role VARCHAR(50) NOT NULL
role VARCHAR(50) NOT NULL,
email VARCHAR(255)
)
''')
# Get all users from external database
cursor.execute("SELECT id, username, password, role FROM users")
cursor.execute("SELECT id, username, password, role, email FROM users")
users_data = cursor.fetchall()
# Convert to list of dictionaries for template compatibility
@@ -90,7 +200,8 @@ def settings_handler():
'id': user_data[0],
'username': user_data[1],
'password': user_data[2],
'role': user_data[3]
'role': user_data[3],
'email': user_data[4] if len(user_data) > 4 else None
})
conn.close()
@@ -142,6 +253,7 @@ def create_user_handler():
username = request.form['username']
password = request.form['password']
role = request.form['role']
email = request.form.get('email', '').strip() or None # Optional field
try:
# Connect to external MariaDB database
@@ -154,7 +266,8 @@ def create_user_handler():
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
password VARCHAR(255) NOT NULL,
role VARCHAR(50) NOT NULL
role VARCHAR(50) NOT NULL,
email VARCHAR(255)
)
''')
@@ -167,9 +280,9 @@ def create_user_handler():
# Create a new user in external MariaDB
cursor.execute("""
INSERT INTO users (username, password, role)
VALUES (%s, %s, %s)
""", (username, password, role))
INSERT INTO users (username, password, role, email)
VALUES (%s, %s, %s, %s)
""", (username, password, role, email))
conn.commit()
conn.close()
@@ -189,6 +302,7 @@ def edit_user_handler():
user_id = request.form.get('user_id')
password = request.form.get('password', '').strip()
role = request.form.get('role')
email = request.form.get('email', '').strip() or None # Optional field
if not user_id or not role:
flash('Missing required fields.')
@@ -209,13 +323,13 @@ def edit_user_handler():
# Update the user's details in external MariaDB
if password: # Only update password if provided
cursor.execute("""
UPDATE users SET password = %s, role = %s WHERE id = %s
""", (password, role, user_id))
UPDATE users SET password = %s, role = %s, email = %s WHERE id = %s
""", (password, role, email, user_id))
flash('User updated successfully (including password).')
else: # Just update role if no password provided
else: # Just update role and email if no password provided
cursor.execute("""
UPDATE users SET role = %s WHERE id = %s
""", (role, user_id))
UPDATE users SET role = %s, email = %s WHERE id = %s
""", (role, email, user_id))
flash('User role updated successfully.')
conn.commit()
@@ -283,3 +397,97 @@ def save_external_db_handler():
flash('External database settings saved/updated successfully.')
return redirect(url_for('main.settings'))
def save_role_permissions_handler():
"""Save role permissions via AJAX"""
if not is_superadmin():
return jsonify({'success': False, 'error': 'Access denied: Superadmin only.'})
try:
data = request.get_json()
role = data.get('role')
permissions = data.get('permissions', [])
if not role:
return jsonify({'success': False, 'error': 'Role is required'})
conn = get_external_db_connection()
cursor = conn.cursor()
# Clear existing permissions for this role
cursor.execute("DELETE FROM role_permissions WHERE role = %s", (role,))
# Add new permissions
current_user = session.get('username', 'system')
for permission_key in permissions:
cursor.execute("""
INSERT INTO role_permissions (role, permission_key, granted, granted_by)
VALUES (%s, %s, TRUE, %s)
""", (role, permission_key, current_user))
# Log the change
cursor.execute("""
INSERT INTO permission_audit_log (role, permission_key, action, changed_by, reason)
VALUES (%s, %s, 'bulk_update', %s, %s)
""", (role, f"Updated {len(permissions)} permissions", current_user, f"Bulk update via UI"))
conn.commit()
conn.close()
# Clear permission cache since permissions changed
clear_permission_cache()
return jsonify({'success': True, 'message': f'Saved {len(permissions)} permissions for {role}'})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
def reset_role_permissions_handler():
"""Reset role permissions to defaults"""
if not is_superadmin():
return jsonify({'success': False, 'error': 'Access denied: Superadmin only.'})
try:
data = request.get_json()
role = data.get('role')
if not role:
return jsonify({'success': False, 'error': 'Role is required'})
# Get default permissions for the role
default_permissions = get_default_permissions_for_role(role)
conn = get_external_db_connection()
cursor = conn.cursor()
# Clear existing permissions for this role
cursor.execute("DELETE FROM role_permissions WHERE role = %s", (role,))
# Add default permissions
current_user = session.get('username', 'system')
for permission_key in default_permissions:
cursor.execute("""
INSERT INTO role_permissions (role, permission_key, granted, granted_by)
VALUES (%s, %s, TRUE, %s)
""", (role, permission_key, current_user))
# Log the change
cursor.execute("""
INSERT INTO permission_audit_log (role, permission_key, action, changed_by, reason)
VALUES (%s, %s, 'reset_defaults', %s, %s)
""", (role, f"Reset {len(default_permissions)} permissions", current_user, "Reset to default permissions"))
conn.commit()
conn.close()
# Clear permission cache since permissions changed
clear_permission_cache()
return jsonify({
'success': True,
'permissions': default_permissions,
'message': f'Reset {len(default_permissions)} permissions for {role} to defaults'
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})