- Add SchemaVerifier class for automatic database schema verification and repair - Implement warehouse_manager (Level 75) and warehouse_worker (Level 35) roles - Add zone-based access control for warehouse workers - Implement worker-manager binding system with zone filtering - Add comprehensive database auto-repair on Docker initialization - Remove Module Access section from user form (role-based access only) - Add autocomplete attributes to password fields for better UX - Include detailed documentation for warehouse implementation - Update initialize_db.py with schema verification as Step 0
546 lines
17 KiB
Python
546 lines
17 KiB
Python
"""
|
|
Role-Based Access Control (RBAC) System
|
|
Defines roles, permissions, and access control decorators
|
|
"""
|
|
from functools import wraps
|
|
from flask import session, redirect, url_for, flash
|
|
|
|
# Role Definitions
|
|
ROLES = {
|
|
'superadmin': {
|
|
'name': 'Super Administrator',
|
|
'description': 'Full system access to all modules and features',
|
|
'level': 100,
|
|
'modules': ['quality', 'settings', 'warehouse']
|
|
},
|
|
'admin': {
|
|
'name': 'Administrator',
|
|
'description': 'Administrative access - can manage users and system configuration',
|
|
'level': 90,
|
|
'modules': ['quality', 'settings', 'warehouse']
|
|
},
|
|
'manager': {
|
|
'name': 'Manager - Quality',
|
|
'description': 'Full access to quality module and quality control',
|
|
'level': 70,
|
|
'modules': ['quality']
|
|
},
|
|
'warehouse_manager': {
|
|
'name': 'Manager - Warehouse',
|
|
'description': 'Full access to warehouse module - input and reports',
|
|
'level': 75,
|
|
'modules': ['warehouse']
|
|
},
|
|
'worker': {
|
|
'name': 'Worker - Quality',
|
|
'description': 'Limited access to quality inspections - input only',
|
|
'level': 50,
|
|
'modules': ['quality']
|
|
},
|
|
'warehouse_worker': {
|
|
'name': 'Worker - Warehouse',
|
|
'description': 'Limited access to warehouse - input pages only, no reports',
|
|
'level': 35,
|
|
'modules': ['warehouse']
|
|
}
|
|
}
|
|
|
|
# Module Permissions Structure
|
|
MODULE_PERMISSIONS = {
|
|
'quality': {
|
|
'name': 'Quality Control Module',
|
|
'sections': {
|
|
'inspections': {
|
|
'name': 'Quality Inspections',
|
|
'actions': {
|
|
'view': 'View inspections',
|
|
'create': 'Create new inspection',
|
|
'edit': 'Edit inspections',
|
|
'delete': 'Delete inspections'
|
|
},
|
|
'superadmin': ['view', 'create', 'edit', 'delete'],
|
|
'admin': ['view', 'create', 'edit', 'delete'],
|
|
'manager': ['view', 'create', 'edit', 'delete'],
|
|
'worker': ['view', 'create']
|
|
},
|
|
'reports': {
|
|
'name': 'Quality Reports',
|
|
'actions': {
|
|
'view': 'View reports',
|
|
'export': 'Export reports',
|
|
'download': 'Download reports'
|
|
},
|
|
'superadmin': ['view', 'export', 'download'],
|
|
'admin': ['view', 'export', 'download'],
|
|
'manager': ['view', 'export', 'download'],
|
|
'worker': ['view']
|
|
}
|
|
}
|
|
},
|
|
'settings': {
|
|
'name': 'Settings Module',
|
|
'sections': {
|
|
'general': {
|
|
'name': 'General Settings',
|
|
'actions': {
|
|
'view': 'View settings',
|
|
'edit': 'Edit settings'
|
|
},
|
|
'superadmin': ['view', 'edit'],
|
|
'admin': ['view', 'edit'],
|
|
'manager': [],
|
|
'worker': []
|
|
},
|
|
'users': {
|
|
'name': 'User Management',
|
|
'actions': {
|
|
'view': 'View users',
|
|
'create': 'Create users',
|
|
'edit': 'Edit users',
|
|
'delete': 'Delete users'
|
|
},
|
|
'superadmin': ['view', 'create', 'edit', 'delete'],
|
|
'admin': ['view', 'create', 'edit', 'delete'],
|
|
'manager': [],
|
|
'worker': []
|
|
},
|
|
'database': {
|
|
'name': 'Database Settings',
|
|
'actions': {
|
|
'view': 'View database settings',
|
|
'edit': 'Edit database settings'
|
|
},
|
|
'superadmin': ['view', 'edit'],
|
|
'admin': ['view', 'edit'],
|
|
'manager': [],
|
|
'worker': [],
|
|
'warehouse_manager': [],
|
|
'warehouse_worker': []
|
|
}
|
|
}
|
|
},
|
|
'warehouse': {
|
|
'name': 'Warehouse Module',
|
|
'sections': {
|
|
'input': {
|
|
'name': 'Warehouse Data Input',
|
|
'actions': {
|
|
'view': 'View warehouse input pages',
|
|
'create': 'Create warehouse entries',
|
|
'edit': 'Edit warehouse entries',
|
|
'delete': 'Delete warehouse entries'
|
|
},
|
|
'superadmin': ['view', 'create', 'edit', 'delete'],
|
|
'admin': ['view', 'create', 'edit', 'delete'],
|
|
'manager': [],
|
|
'worker': [],
|
|
'warehouse_manager': ['view', 'create', 'edit', 'delete'],
|
|
'warehouse_worker': ['view', 'create', 'edit']
|
|
},
|
|
'reports': {
|
|
'name': 'Warehouse Reports & Analytics',
|
|
'actions': {
|
|
'view': 'View warehouse reports',
|
|
'export': 'Export warehouse data',
|
|
'download': 'Download reports',
|
|
'analytics': 'View analytics'
|
|
},
|
|
'superadmin': ['view', 'export', 'download', 'analytics'],
|
|
'admin': ['view', 'export', 'download', 'analytics'],
|
|
'manager': [],
|
|
'worker': [],
|
|
'warehouse_manager': ['view', 'export', 'download', 'analytics'],
|
|
'warehouse_worker': []
|
|
},
|
|
'locations': {
|
|
'name': 'Location Management',
|
|
'actions': {
|
|
'view': 'View locations',
|
|
'create': 'Create locations',
|
|
'edit': 'Edit locations',
|
|
'delete': 'Delete locations'
|
|
},
|
|
'superadmin': ['view', 'create', 'edit', 'delete'],
|
|
'admin': ['view', 'create', 'edit', 'delete'],
|
|
'manager': [],
|
|
'worker': [],
|
|
'warehouse_manager': ['view', 'create', 'edit', 'delete'],
|
|
'warehouse_worker': ['view']
|
|
},
|
|
'management': {
|
|
'name': 'Warehouse User Management',
|
|
'actions': {
|
|
'manage_workers': 'Manage assigned workers',
|
|
'manage_zones': 'Manage warehouse zones'
|
|
},
|
|
'superadmin': ['manage_workers', 'manage_zones'],
|
|
'admin': ['manage_workers', 'manage_zones'],
|
|
'manager': [],
|
|
'worker': [],
|
|
'warehouse_manager': ['manage_workers'],
|
|
'warehouse_worker': []
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
def check_permission(user_role, module, section, action):
|
|
"""
|
|
Check if a user has permission to perform an action
|
|
|
|
Args:
|
|
user_role (str): User's role
|
|
module (str): Module name
|
|
section (str): Section within module
|
|
action (str): Action to perform
|
|
|
|
Returns:
|
|
bool: True if user has permission, False otherwise
|
|
"""
|
|
if not user_role or user_role not in ROLES:
|
|
return False
|
|
|
|
# Superadmin has all permissions
|
|
if user_role == 'superadmin':
|
|
return True
|
|
|
|
# Check if module exists
|
|
if module not in MODULE_PERMISSIONS:
|
|
return False
|
|
|
|
# Check if section exists
|
|
if section not in MODULE_PERMISSIONS[module]['sections']:
|
|
return False
|
|
|
|
# Get allowed actions for this role in this section
|
|
section_config = MODULE_PERMISSIONS[module]['sections'][section]
|
|
allowed_actions = section_config.get(user_role, [])
|
|
|
|
return action in allowed_actions
|
|
|
|
|
|
def has_module_access(user_role, module):
|
|
"""
|
|
Check if user has access to a module
|
|
|
|
Args:
|
|
user_role (str): User's role
|
|
module (str): Module name
|
|
|
|
Returns:
|
|
bool: True if user can access module, False otherwise
|
|
"""
|
|
if not user_role or user_role not in ROLES:
|
|
return False
|
|
|
|
if user_role == 'superadmin':
|
|
return True
|
|
|
|
return module in ROLES[user_role].get('modules', [])
|
|
|
|
|
|
def requires_role(*allowed_roles):
|
|
"""
|
|
Decorator to require specific roles for a route
|
|
|
|
Usage:
|
|
@requires_role('superadmin', 'admin')
|
|
def admin_page():
|
|
pass
|
|
"""
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if 'user_id' not in session:
|
|
flash('Please log in to access this page.', 'error')
|
|
return redirect(url_for('main.login'))
|
|
|
|
user_role = session.get('role', 'worker')
|
|
|
|
if user_role not in allowed_roles:
|
|
flash('Access denied: You do not have permission to access this page.', 'error')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
return decorator
|
|
|
|
|
|
def requires_module_permission(module, section, action):
|
|
"""
|
|
Decorator to require specific module/section/action permission
|
|
|
|
Usage:
|
|
@requires_module_permission('quality', 'inspections', 'edit')
|
|
def edit_inspection():
|
|
pass
|
|
"""
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if 'user_id' not in session:
|
|
flash('Please log in to access this page.', 'error')
|
|
return redirect(url_for('main.login'))
|
|
|
|
user_role = session.get('role', 'worker')
|
|
|
|
if not check_permission(user_role, module, section, action):
|
|
flash('Access denied: You do not have permission to perform this action.', 'error')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
return decorator
|
|
|
|
|
|
def requires_module_access(module):
|
|
"""
|
|
Decorator to require access to a specific module
|
|
|
|
Usage:
|
|
@requires_module_access('quality')
|
|
def quality_page():
|
|
pass
|
|
"""
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if 'user_id' not in session:
|
|
flash('Please log in to access this page.', 'error')
|
|
return redirect(url_for('main.login'))
|
|
|
|
user_role = session.get('role', 'worker')
|
|
|
|
if not has_module_access(user_role, module):
|
|
flash(f'Access denied: You do not have access to the {module} module.', 'error')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
return decorator
|
|
|
|
|
|
def get_user_permissions(user_role):
|
|
"""
|
|
Get all permissions for a user role
|
|
|
|
Args:
|
|
user_role (str): User's role
|
|
|
|
Returns:
|
|
dict: Dictionary of all permissions for the role
|
|
"""
|
|
permissions = {}
|
|
|
|
if not user_role or user_role not in ROLES:
|
|
return permissions
|
|
|
|
# Superadmin gets all permissions
|
|
if user_role == 'superadmin':
|
|
for module, module_data in MODULE_PERMISSIONS.items():
|
|
permissions[module] = {}
|
|
for section, section_data in module_data['sections'].items():
|
|
permissions[module][section] = list(section_data['actions'].keys())
|
|
return permissions
|
|
|
|
# Get specific role permissions
|
|
for module, module_data in MODULE_PERMISSIONS.items():
|
|
if module in ROLES[user_role].get('modules', []):
|
|
permissions[module] = {}
|
|
for section, section_data in module_data['sections'].items():
|
|
allowed_actions = section_data.get(user_role, [])
|
|
if allowed_actions:
|
|
permissions[module][section] = allowed_actions
|
|
|
|
return permissions
|
|
|
|
|
|
# Warehouse-specific access control helpers
|
|
|
|
def can_access_warehouse_input(user_role):
|
|
"""
|
|
Check if user can access warehouse INPUT pages
|
|
|
|
Args:
|
|
user_role (str): User's role
|
|
|
|
Returns:
|
|
bool: True if user can access input pages (managers and workers)
|
|
"""
|
|
return check_permission(user_role, 'warehouse', 'input', 'view')
|
|
|
|
|
|
def can_access_warehouse_reports(user_role):
|
|
"""
|
|
Check if user can access warehouse REPORT/ANALYTICS pages
|
|
|
|
Args:
|
|
user_role (str): User's role
|
|
|
|
Returns:
|
|
bool: True if user can access reports (managers only)
|
|
"""
|
|
return check_permission(user_role, 'warehouse', 'reports', 'view')
|
|
|
|
|
|
def can_manage_warehouse_workers(user_role):
|
|
"""
|
|
Check if user can manage warehouse workers (assign/unassign)
|
|
|
|
Args:
|
|
user_role (str): User's role
|
|
|
|
Returns:
|
|
bool: True if user can manage workers (managers only)
|
|
"""
|
|
return check_permission(user_role, 'warehouse', 'management', 'manage_workers')
|
|
|
|
|
|
# Zone-restricted access functions
|
|
|
|
def get_worker_warehouse_zone(user_id):
|
|
"""
|
|
Get the warehouse zone restriction for a warehouse_worker
|
|
|
|
Args:
|
|
user_id (int): Worker user ID
|
|
|
|
Returns:
|
|
str or None: Zone name if restricted, None if all zones allowed
|
|
"""
|
|
try:
|
|
from app.database import get_db
|
|
db = get_db()
|
|
cursor = db.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT warehouse_zone FROM worker_manager_bindings
|
|
WHERE worker_id = %s AND is_active = 1
|
|
LIMIT 1
|
|
""", (user_id,))
|
|
|
|
result = cursor.fetchone()
|
|
if result:
|
|
return result[0] # Return zone name or None
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error getting worker zone: {e}")
|
|
return None
|
|
|
|
|
|
def get_manager_workers(manager_id):
|
|
"""
|
|
Get all active workers assigned to a manager
|
|
|
|
Args:
|
|
manager_id (int): Manager user ID
|
|
|
|
Returns:
|
|
list: List of dicts with worker info {id, username, full_name, zone}
|
|
"""
|
|
try:
|
|
from app.database import get_db
|
|
db = get_db()
|
|
cursor = db.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT u.id, u.username, u.full_name, wmb.warehouse_zone
|
|
FROM worker_manager_bindings wmb
|
|
JOIN users u ON wmb.worker_id = u.id
|
|
WHERE wmb.manager_id = %s AND wmb.is_active = 1
|
|
ORDER BY u.full_name
|
|
""", (manager_id,))
|
|
|
|
results = []
|
|
for row in cursor.fetchall():
|
|
results.append({
|
|
'id': row[0],
|
|
'username': row[1],
|
|
'full_name': row[2],
|
|
'zone': row[3]
|
|
})
|
|
return results
|
|
except Exception as e:
|
|
logger.error(f"Error getting manager workers: {e}")
|
|
return []
|
|
|
|
|
|
def validate_worker_zone_access(worker_id, manager_id, zone):
|
|
"""
|
|
Validate that a worker can access a specific zone
|
|
|
|
Args:
|
|
worker_id (int): Worker user ID
|
|
manager_id (int): Expected manager ID
|
|
zone (str): Zone name to validate
|
|
|
|
Returns:
|
|
bool: True if worker can access zone
|
|
"""
|
|
try:
|
|
from app.database import get_db
|
|
db = get_db()
|
|
cursor = db.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT warehouse_zone FROM worker_manager_bindings
|
|
WHERE worker_id = %s AND manager_id = %s AND is_active = 1
|
|
LIMIT 1
|
|
""", (worker_id, manager_id))
|
|
|
|
result = cursor.fetchone()
|
|
if not result:
|
|
return False # Binding doesn't exist
|
|
|
|
assigned_zone = result[0]
|
|
|
|
# If no zone restriction (NULL), worker can access all zones
|
|
if assigned_zone is None:
|
|
return True
|
|
|
|
# If zone is restricted, must match the requested zone
|
|
return assigned_zone == zone
|
|
except Exception as e:
|
|
logger.error(f"Error validating worker zone access: {e}")
|
|
return False
|
|
|
|
|
|
def build_zone_filter_sql(user_id, user_role):
|
|
"""
|
|
Build WHERE clause SQL fragment for zone-filtered queries
|
|
|
|
For managers: returns all data from assigned workers
|
|
For workers: returns only data from their assigned zone
|
|
For others: returns no data or all data depending on role
|
|
|
|
Args:
|
|
user_id (int): User ID
|
|
user_role (str): User's role
|
|
|
|
Returns:
|
|
str: SQL WHERE fragment (empty string if no filter needed)
|
|
"""
|
|
if user_role in ['superadmin', 'admin']:
|
|
return "" # No filter - see everything
|
|
|
|
if user_role == 'warehouse_manager':
|
|
# Manager sees data from all assigned workers
|
|
try:
|
|
workers = get_manager_workers(user_id)
|
|
if not workers:
|
|
# Manager has no workers assigned - see own data only
|
|
return f"AND created_by_user_id = {user_id}"
|
|
worker_ids = [w['id'] for w in workers]
|
|
return f"AND (created_by_user_id IN ({','.join(map(str, worker_ids))}) OR created_by_user_id = {user_id})"
|
|
except Exception as e:
|
|
logger.error(f"Error building manager zone filter: {e}")
|
|
return f"AND created_by_user_id = {user_id}"
|
|
|
|
if user_role == 'warehouse_worker':
|
|
# Worker sees only their own data in their assigned zone
|
|
return f"AND created_by_user_id = {user_id}"
|
|
|
|
return "" # Default - no filtering
|
|
|