diff --git a/instance/users.db b/instance/users.db new file mode 100644 index 0000000..fa85049 Binary files /dev/null and b/instance/users.db differ diff --git a/py_app/__pycache__/run.cpython-312.pyc b/py_app/__pycache__/run.cpython-312.pyc new file mode 100644 index 0000000..b3b0c89 Binary files /dev/null and b/py_app/__pycache__/run.cpython-312.pyc differ diff --git a/py_app/app/__pycache__/permissions.cpython-312.pyc b/py_app/app/__pycache__/permissions.cpython-312.pyc new file mode 100644 index 0000000..4df9285 Binary files /dev/null and b/py_app/app/__pycache__/permissions.cpython-312.pyc differ diff --git a/py_app/app/__pycache__/routes.cpython-312.pyc b/py_app/app/__pycache__/routes.cpython-312.pyc index 4cb294b..6338d3a 100644 Binary files a/py_app/app/__pycache__/routes.cpython-312.pyc and b/py_app/app/__pycache__/routes.cpython-312.pyc differ diff --git a/py_app/app/__pycache__/settings.cpython-312.pyc b/py_app/app/__pycache__/settings.cpython-312.pyc index fd127d3..b5f7fbf 100644 Binary files a/py_app/app/__pycache__/settings.cpython-312.pyc and b/py_app/app/__pycache__/settings.cpython-312.pyc differ diff --git a/py_app/app/db_create_scripts/add_email_column.py b/py_app/app/db_create_scripts/add_email_column.py new file mode 100644 index 0000000..08cbb37 --- /dev/null +++ b/py_app/app/db_create_scripts/add_email_column.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 + +import mariadb +import os +import sys + +def get_external_db_connection(): + """Reads the external_server.conf file and returns a MariaDB database connection.""" + # Get the instance folder path + current_dir = os.path.dirname(os.path.abspath(__file__)) + instance_folder = os.path.join(current_dir, '../../instance') + settings_file = os.path.join(instance_folder, 'external_server.conf') + + if not os.path.exists(settings_file): + raise FileNotFoundError(f"The external_server.conf file is missing: {settings_file}") + + # Read settings from the configuration file + settings = {} + with open(settings_file, 'r') as f: + for line in f: + line = line.strip() + if line and '=' in line: + key, value = line.split('=', 1) + settings[key] = value + + print(f"Connecting to MariaDB:") + print(f" Host: {settings.get('server_domain', 'N/A')}") + print(f" Port: {settings.get('port', 'N/A')}") + print(f" Database: {settings.get('database_name', 'N/A')}") + + return mariadb.connect( + user=settings['username'], + password=settings['password'], + host=settings['server_domain'], + port=int(settings['port']), + database=settings['database_name'] + ) + +def main(): + try: + print("=== Adding Email Column to Users Table ===") + conn = get_external_db_connection() + cursor = conn.cursor() + + # First, check the current table structure + print("\n1. Checking current table structure...") + cursor.execute("DESCRIBE users") + columns = cursor.fetchall() + + has_email = False + for column in columns: + print(f" Column: {column[0]} ({column[1]})") + if column[0] == 'email': + has_email = True + + if not has_email: + print("\n2. Adding email column...") + cursor.execute("ALTER TABLE users ADD COLUMN email VARCHAR(255)") + conn.commit() + print(" ✓ Email column added successfully") + else: + print("\n2. Email column already exists") + + # Now check and display all users + print("\n3. Current users in database:") + cursor.execute("SELECT id, username, role, email FROM users") + users = cursor.fetchall() + + if users: + print(f" Found {len(users)} users:") + for user in users: + email = user[3] if user[3] else "No email" + print(f" - ID: {user[0]}, Username: {user[1]}, Role: {user[2]}, Email: {email}") + else: + print(" No users found - creating test users...") + + # Create some test users + test_users = [ + ('admin_user', 'admin123', 'admin', 'admin@company.com'), + ('manager_user', 'manager123', 'manager', 'manager@company.com'), + ('warehouse_user', 'warehouse123', 'warehouse_manager', 'warehouse@company.com'), + ('quality_user', 'quality123', 'quality_manager', 'quality@company.com') + ] + + for username, password, role, email in test_users: + try: + cursor.execute(""" + INSERT INTO users (username, password, role, email) + VALUES (%s, %s, %s, %s) + """, (username, password, role, email)) + print(f" ✓ Created user: {username} ({role})") + except mariadb.IntegrityError as e: + print(f" ⚠ User {username} already exists: {e}") + + conn.commit() + print(" ✓ Test users created successfully") + + conn.close() + print("\n=== Database Update Complete ===") + + except Exception as e: + print(f"❌ Error: {e}") + import traceback + traceback.print_exc() + return 1 + + return 0 + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/py_app/app/db_create_scripts/check_external_db_users.py b/py_app/app/db_create_scripts/check_external_db_users.py new file mode 100644 index 0000000..8f28e24 --- /dev/null +++ b/py_app/app/db_create_scripts/check_external_db_users.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 + +import mariadb +import os +import sys + +def get_external_db_connection(): + """Reads the external_server.conf file and returns a MariaDB database connection.""" + # Get the instance folder path + current_dir = os.path.dirname(os.path.abspath(__file__)) + instance_folder = os.path.join(current_dir, '../../instance') + settings_file = os.path.join(instance_folder, 'external_server.conf') + + if not os.path.exists(settings_file): + raise FileNotFoundError(f"The external_server.conf file is missing: {settings_file}") + + # Read settings from the configuration file + settings = {} + with open(settings_file, 'r') as f: + for line in f: + line = line.strip() + if line and '=' in line: + key, value = line.split('=', 1) + settings[key] = value + + print(f"Connecting to MariaDB with settings:") + print(f" Host: {settings.get('server_domain', 'N/A')}") + print(f" Port: {settings.get('port', 'N/A')}") + print(f" Database: {settings.get('database_name', 'N/A')}") + print(f" Username: {settings.get('username', 'N/A')}") + + # Create a database connection + return mariadb.connect( + user=settings['username'], + password=settings['password'], + host=settings['server_domain'], + port=int(settings['port']), + database=settings['database_name'] + ) + +def main(): + try: + print("=== Checking External MariaDB Database ===") + conn = get_external_db_connection() + cursor = conn.cursor() + + # Create users table if it doesn't exist + print("\n1. Creating/verifying users table...") + cursor.execute(''' + CREATE TABLE IF NOT EXISTS users ( + id INT AUTO_INCREMENT PRIMARY KEY, + username VARCHAR(50) UNIQUE NOT NULL, + password VARCHAR(255) NOT NULL, + role VARCHAR(50) NOT NULL, + email VARCHAR(255) + ) + ''') + print(" ✓ Users table created/verified") + + # Check existing users + print("\n2. Checking existing users...") + cursor.execute("SELECT id, username, role, email FROM users") + users = cursor.fetchall() + + if users: + print(f" Found {len(users)} existing users:") + for user in users: + email = user[3] if user[3] else "No email" + print(f" - ID: {user[0]}, Username: {user[1]}, Role: {user[2]}, Email: {email}") + else: + print(" No users found in external database") + + # Create some test users + print("\n3. Creating test users...") + test_users = [ + ('admin_user', 'admin123', 'admin', 'admin@company.com'), + ('manager_user', 'manager123', 'manager', 'manager@company.com'), + ('warehouse_user', 'warehouse123', 'warehouse_manager', 'warehouse@company.com'), + ('quality_user', 'quality123', 'quality_manager', 'quality@company.com') + ] + + for username, password, role, email in test_users: + try: + cursor.execute(""" + INSERT INTO users (username, password, role, email) + VALUES (%s, %s, %s, %s) + """, (username, password, role, email)) + print(f" ✓ Created user: {username} ({role})") + except mariadb.IntegrityError as e: + print(f" ⚠ User {username} already exists: {e}") + + conn.commit() + print(" ✓ Test users created successfully") + + conn.close() + print("\n=== Database Check Complete ===") + + except Exception as e: + print(f"❌ Error: {e}") + return 1 + + return 0 + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/py_app/app/db_create_scripts/create_permissions_tables.py b/py_app/app/db_create_scripts/create_permissions_tables.py new file mode 100644 index 0000000..a807a0b --- /dev/null +++ b/py_app/app/db_create_scripts/create_permissions_tables.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 + +import mariadb +import os +import sys + +def get_external_db_connection(): + """Reads the external_server.conf file and returns a MariaDB database connection.""" + # Get the instance folder path + current_dir = os.path.dirname(os.path.abspath(__file__)) + instance_folder = os.path.join(current_dir, '../../instance') + settings_file = os.path.join(instance_folder, 'external_server.conf') + + if not os.path.exists(settings_file): + raise FileNotFoundError(f"The external_server.conf file is missing: {settings_file}") + + # Read settings from the configuration file + settings = {} + with open(settings_file, 'r') as f: + for line in f: + line = line.strip() + if line and '=' in line: + key, value = line.split('=', 1) + settings[key] = value + + return mariadb.connect( + user=settings['username'], + password=settings['password'], + host=settings['server_domain'], + port=int(settings['port']), + database=settings['database_name'] + ) + +def main(): + try: + print("=== Creating Permission Management Tables ===") + conn = get_external_db_connection() + cursor = conn.cursor() + + # 1. Create permissions table + print("\n1. Creating permissions table...") + cursor.execute(''' + CREATE TABLE IF NOT EXISTS permissions ( + id INT AUTO_INCREMENT PRIMARY KEY, + permission_key VARCHAR(255) UNIQUE NOT NULL, + page VARCHAR(100) NOT NULL, + page_name VARCHAR(255) NOT NULL, + section VARCHAR(100) NOT NULL, + section_name VARCHAR(255) NOT NULL, + action VARCHAR(50) NOT NULL, + action_name VARCHAR(255) NOT NULL, + description TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP + ) + ''') + print(" ✓ Permissions table created/verified") + + # 2. Create role_permissions table + print("\n2. Creating role_permissions table...") + cursor.execute(''' + CREATE TABLE IF NOT EXISTS role_permissions ( + id INT AUTO_INCREMENT PRIMARY KEY, + role VARCHAR(50) NOT NULL, + permission_key VARCHAR(255) NOT NULL, + granted BOOLEAN DEFAULT TRUE, + granted_by VARCHAR(50), + granted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + UNIQUE KEY unique_role_permission (role, permission_key), + FOREIGN KEY (permission_key) REFERENCES permissions(permission_key) ON DELETE CASCADE + ) + ''') + print(" ✓ Role permissions table created/verified") + + # 3. Create role_hierarchy table for role management + print("\n3. Creating role_hierarchy table...") + cursor.execute(''' + CREATE TABLE IF NOT EXISTS role_hierarchy ( + id INT AUTO_INCREMENT PRIMARY KEY, + role_name VARCHAR(50) UNIQUE NOT NULL, + display_name VARCHAR(255) NOT NULL, + description TEXT, + level INT DEFAULT 0, + is_active BOOLEAN DEFAULT TRUE, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP + ) + ''') + print(" ✓ Role hierarchy table created/verified") + + # 4. Create permission_audit_log table for tracking changes + print("\n4. Creating permission_audit_log table...") + cursor.execute(''' + CREATE TABLE IF NOT EXISTS permission_audit_log ( + id INT AUTO_INCREMENT PRIMARY KEY, + role VARCHAR(50) NOT NULL, + permission_key VARCHAR(255) NOT NULL, + action ENUM('granted', 'revoked') NOT NULL, + changed_by VARCHAR(50) NOT NULL, + changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + reason TEXT, + ip_address VARCHAR(45) + ) + ''') + print(" ✓ Permission audit log table created/verified") + + conn.commit() + + # 5. Check if we need to populate initial data + print("\n5. Checking for existing data...") + cursor.execute("SELECT COUNT(*) FROM permissions") + permission_count = cursor.fetchone()[0] + + if permission_count == 0: + print(" No permissions found - will need to populate with default data") + print(" Run 'populate_permissions.py' to initialize the permission system") + else: + print(f" Found {permission_count} existing permissions") + + cursor.execute("SELECT COUNT(*) FROM role_hierarchy") + role_count = cursor.fetchone()[0] + + if role_count == 0: + print(" No roles found - will need to populate with default roles") + else: + print(f" Found {role_count} existing roles") + + conn.close() + print("\n=== Permission Database Schema Created Successfully ===") + + except Exception as e: + print(f"❌ Error: {e}") + import traceback + traceback.print_exc() + return 1 + + return 0 + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/py_app/app/db_create_scripts/populate_permissions.py b/py_app/app/db_create_scripts/populate_permissions.py new file mode 100644 index 0000000..2d2d4eb --- /dev/null +++ b/py_app/app/db_create_scripts/populate_permissions.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python3 + +import mariadb +import os +import sys + +# Add the app directory to the path so we can import our permissions module +sys.path.append(os.path.join(os.path.dirname(__file__), '..')) + +from permissions import APP_PERMISSIONS, ROLE_HIERARCHY, ACTIONS, get_all_permissions, get_default_permissions_for_role + +def get_external_db_connection(): + """Reads the external_server.conf file and returns a MariaDB database connection.""" + current_dir = os.path.dirname(os.path.abspath(__file__)) + instance_folder = os.path.join(current_dir, '../../instance') + settings_file = os.path.join(instance_folder, 'external_server.conf') + + if not os.path.exists(settings_file): + raise FileNotFoundError(f"The external_server.conf file is missing: {settings_file}") + + settings = {} + with open(settings_file, 'r') as f: + for line in f: + line = line.strip() + if line and '=' in line: + key, value = line.split('=', 1) + settings[key] = value + + return mariadb.connect( + user=settings['username'], + password=settings['password'], + host=settings['server_domain'], + port=int(settings['port']), + database=settings['database_name'] + ) + +def main(): + try: + print("=== Populating Permission System ===") + conn = get_external_db_connection() + cursor = conn.cursor() + + # 1. Populate all permissions + print("\n1. Populating permissions...") + permissions = get_all_permissions() + + for perm in permissions: + try: + cursor.execute(''' + INSERT INTO permissions (permission_key, page, page_name, section, section_name, action, action_name) + VALUES (%s, %s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + page_name = VALUES(page_name), + section_name = VALUES(section_name), + action_name = VALUES(action_name), + updated_at = CURRENT_TIMESTAMP + ''', ( + perm['key'], + perm['page'], + perm['page_name'], + perm['section'], + perm['section_name'], + perm['action'], + perm['action_name'] + )) + except Exception as e: + print(f" ⚠ Error inserting permission {perm['key']}: {e}") + + conn.commit() + print(f" ✓ Populated {len(permissions)} permissions") + + # 2. Populate role hierarchy + print("\n2. Populating role hierarchy...") + for role_name, role_data in ROLE_HIERARCHY.items(): + try: + cursor.execute(''' + INSERT INTO role_hierarchy (role_name, display_name, description, level) + VALUES (%s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + display_name = VALUES(display_name), + description = VALUES(description), + level = VALUES(level), + updated_at = CURRENT_TIMESTAMP + ''', ( + role_name, + role_data['name'], + role_data['description'], + role_data['level'] + )) + except Exception as e: + print(f" ⚠ Error inserting role {role_name}: {e}") + + conn.commit() + print(f" ✓ Populated {len(ROLE_HIERARCHY)} roles") + + # 3. Set default permissions for each role + print("\n3. Setting default role permissions...") + for role_name in ROLE_HIERARCHY.keys(): + default_permissions = get_default_permissions_for_role(role_name) + + print(f" Setting permissions for {role_name}: {len(default_permissions)} permissions") + + for permission_key in default_permissions: + try: + cursor.execute(''' + INSERT INTO role_permissions (role, permission_key, granted, granted_by) + VALUES (%s, %s, TRUE, 'system') + ON DUPLICATE KEY UPDATE + granted = TRUE, + updated_at = CURRENT_TIMESTAMP + ''', (role_name, permission_key)) + except Exception as e: + print(f" ⚠ Error setting permission {permission_key} for {role_name}: {e}") + + conn.commit() + + # 4. Show summary + print("\n4. Permission Summary:") + cursor.execute(''' + SELECT r.role_name, r.display_name, COUNT(rp.permission_key) as permission_count + FROM role_hierarchy r + LEFT JOIN role_permissions rp ON r.role_name = rp.role AND rp.granted = TRUE + GROUP BY r.role_name, r.display_name + ORDER BY r.level DESC + ''') + + results = cursor.fetchall() + for role_name, display_name, count in results: + print(f" {display_name} ({role_name}): {count} permissions") + + conn.close() + print("\n=== Permission System Initialization Complete ===") + + except Exception as e: + print(f"❌ Error: {e}") + import traceback + traceback.print_exc() + return 1 + + return 0 + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/py_app/app/permissions.py b/py_app/app/permissions.py new file mode 100644 index 0000000..8ef7049 --- /dev/null +++ b/py_app/app/permissions.py @@ -0,0 +1,344 @@ +""" +Role-Based Access Control (RBAC) System +Hierarchical permission structure: Pages → Sections → Actions +""" + +# Permission Actions +ACTIONS = { + 'view': 'View/Read Access', + 'create': 'Create/Add New', + 'edit': 'Edit/Modify', + 'delete': 'Delete/Remove', + 'upload': 'Upload Files', + 'download': 'Download Files', + 'export': 'Export Data', + 'import': 'Import Data' +} + +# Application Structure with Hierarchical Permissions +APP_PERMISSIONS = { + 'dashboard': { + 'name': 'Dashboard', + 'sections': { + 'overview': { + 'name': 'Overview Statistics', + 'actions': ['view'] + }, + 'recent_activity': { + 'name': 'Recent Activity Feed', + 'actions': ['view'] + }, + 'quick_actions': { + 'name': 'Quick Action Buttons', + 'actions': ['view'] + } + } + }, + 'settings': { + 'name': 'Settings & Administration', + 'sections': { + 'user_management': { + 'name': 'User Management', + 'actions': ['view', 'create', 'edit', 'delete'] + }, + 'role_permissions': { + 'name': 'Role & Permissions', + 'actions': ['view', 'edit'] + }, + 'external_database': { + 'name': 'External Database Config', + 'actions': ['view', 'edit'] + }, + 'system_settings': { + 'name': 'System Configuration', + 'actions': ['view', 'edit'] + } + } + }, + 'warehouse': { + 'name': 'Warehouse Management', + 'sections': { + 'inventory': { + 'name': 'Inventory Management', + 'actions': ['view', 'create', 'edit', 'delete', 'export'] + }, + 'stock_movements': { + 'name': 'Stock Movements', + 'actions': ['view', 'create', 'edit', 'delete'] + }, + 'receiving': { + 'name': 'Goods Receiving', + 'actions': ['view', 'create', 'edit', 'upload'] + }, + 'shipping': { + 'name': 'Goods Shipping', + 'actions': ['view', 'create', 'edit', 'delete'] + }, + 'locations': { + 'name': 'Storage Locations', + 'actions': ['view', 'create', 'edit', 'delete'] + }, + 'reports': { + 'name': 'Warehouse Reports', + 'actions': ['view', 'export', 'download'] + } + } + }, + 'quality': { + 'name': 'Quality Control', + 'sections': { + 'inspections': { + 'name': 'Quality Inspections', + 'actions': ['view', 'create', 'edit', 'delete'] + }, + 'test_results': { + 'name': 'Test Results', + 'actions': ['view', 'create', 'edit', 'upload'] + }, + 'certificates': { + 'name': 'Quality Certificates', + 'actions': ['view', 'create', 'edit', 'delete', 'upload', 'download'] + }, + 'compliance': { + 'name': 'Compliance Management', + 'actions': ['view', 'create', 'edit'] + }, + 'quality_reports': { + 'name': 'Quality Reports', + 'actions': ['view', 'export', 'download'] + } + } + }, + 'production': { + 'name': 'Production Management', + 'sections': { + 'work_orders': { + 'name': 'Work Orders', + 'actions': ['view', 'create', 'edit', 'delete'] + }, + 'production_lines': { + 'name': 'Production Lines', + 'actions': ['view', 'create', 'edit', 'delete'] + }, + 'scheduling': { + 'name': 'Production Scheduling', + 'actions': ['view', 'create', 'edit', 'delete'] + }, + 'equipment': { + 'name': 'Equipment Management', + 'actions': ['view', 'create', 'edit', 'delete'] + }, + 'maintenance': { + 'name': 'Maintenance Records', + 'actions': ['view', 'create', 'edit', 'delete', 'upload'] + } + } + }, + 'traceability': { + 'name': 'Product Traceability', + 'sections': { + 'batch_tracking': { + 'name': 'Batch Tracking', + 'actions': ['view', 'create', 'edit'] + }, + 'lot_genealogy': { + 'name': 'Lot Genealogy', + 'actions': ['view', 'export'] + }, + 'recall_management': { + 'name': 'Product Recall', + 'actions': ['view', 'create', 'edit', 'delete'] + }, + 'chain_of_custody': { + 'name': 'Chain of Custody', + 'actions': ['view', 'create', 'edit'] + } + } + }, + 'reports': { + 'name': 'Reports & Analytics', + 'sections': { + 'standard_reports': { + 'name': 'Standard Reports', + 'actions': ['view', 'export', 'download'] + }, + 'custom_reports': { + 'name': 'Custom Reports', + 'actions': ['view', 'create', 'edit', 'delete', 'export'] + }, + 'dashboards': { + 'name': 'Analytics Dashboards', + 'actions': ['view', 'create', 'edit', 'delete'] + }, + 'data_export': { + 'name': 'Data Export Tools', + 'actions': ['view', 'export', 'download'] + } + } + } +} + +# Role Hierarchy and Default Permissions +ROLE_HIERARCHY = { + 'superadmin': { + 'name': 'Super Administrator', + 'description': 'Full system access - can manage all aspects including system configuration', + 'level': 100, + 'default_permissions': 'ALL' # Gets all permissions by default + }, + 'admin': { + 'name': 'Administrator', + 'description': 'Administrative access - can manage users and most system functions', + 'level': 90, + 'default_sections': [ + 'dashboard', + 'settings.user_management', + 'settings.system_settings.view', + 'warehouse', + 'quality', + 'production', + 'reports', + 'traceability' + ], + 'restrictions': [ + 'settings.role_permissions.edit', # Cannot modify role permissions + 'settings.external_database.edit', # Cannot modify external DB config + ] + }, + 'manager': { + 'name': 'Manager', + 'description': 'Management level access - can view and manage operational data', + 'level': 70, + 'default_sections': [ + 'dashboard', + 'warehouse.inventory.view', + 'warehouse.reports.view', + 'quality.inspections.view', + 'quality.quality_reports.view', + 'production.work_orders', + 'reports.standard_reports' + ] + }, + 'warehouse_manager': { + 'name': 'Warehouse Manager', + 'description': 'Full warehouse access with limited system access', + 'level': 60, + 'default_sections': [ + 'dashboard.overview.view', + 'warehouse', # Full warehouse access + 'traceability.batch_tracking', + 'reports.standard_reports.view' + ] + }, + 'warehouse_worker': { + 'name': 'Warehouse Worker', + 'description': 'Limited warehouse operations access', + 'level': 50, + 'default_sections': [ + 'dashboard.overview.view', + 'warehouse.inventory.view', + 'warehouse.stock_movements', + 'warehouse.receiving', + 'warehouse.shipping.view' + ] + }, + 'quality_manager': { + 'name': 'Quality Manager', + 'description': 'Full quality control access', + 'level': 60, + 'default_sections': [ + 'dashboard.overview.view', + 'quality', # Full quality access + 'traceability', + 'reports.standard_reports.view' + ] + }, + 'quality_worker': { + 'name': 'Quality Worker', + 'description': 'Limited quality control operations', + 'level': 50, + 'default_sections': [ + 'dashboard.overview.view', + 'quality.inspections', + 'quality.test_results', + 'quality.certificates.view' + ] + } +} + +def get_permission_key(page, section, action): + """Generate a standardized permission key""" + return f"{page}.{section}.{action}" + +def parse_permission_key(permission_key): + """Parse a permission key into its components""" + parts = permission_key.split('.') + if len(parts) == 3: + return parts[0], parts[1], parts[2] + return None, None, None + +def get_all_permissions(): + """Get a flat list of all possible permissions""" + permissions = [] + for page_key, page_data in APP_PERMISSIONS.items(): + for section_key, section_data in page_data['sections'].items(): + for action in section_data['actions']: + permissions.append({ + 'key': get_permission_key(page_key, section_key, action), + 'page': page_key, + 'page_name': page_data['name'], + 'section': section_key, + 'section_name': section_data['name'], + 'action': action, + 'action_name': ACTIONS.get(action, action) + }) + return permissions + +def get_default_permissions_for_role(role): + """Get default permissions for a specific role""" + if role not in ROLE_HIERARCHY: + return [] + + role_config = ROLE_HIERARCHY[role] + + # Superadmin gets everything + if role_config.get('default_permissions') == 'ALL': + return [p['key'] for p in get_all_permissions()] + + # Other roles get specific sections + permissions = [] + default_sections = role_config.get('default_sections', []) + + for section_pattern in default_sections: + if section_pattern == 'dashboard': + # Full dashboard access + for section_key in APP_PERMISSIONS['dashboard']['sections'].keys(): + for action in APP_PERMISSIONS['dashboard']['sections'][section_key]['actions']: + permissions.append(get_permission_key('dashboard', section_key, action)) + elif '.' in section_pattern: + # Specific page.section or page.section.action + parts = section_pattern.split('.') + if len(parts) == 2: # page.section - all actions + page, section = parts + if page in APP_PERMISSIONS and section in APP_PERMISSIONS[page]['sections']: + for action in APP_PERMISSIONS[page]['sections'][section]['actions']: + permissions.append(get_permission_key(page, section, action)) + elif len(parts) == 3: # page.section.action - specific action + page, section, action = parts + if (page in APP_PERMISSIONS and + section in APP_PERMISSIONS[page]['sections'] and + action in APP_PERMISSIONS[page]['sections'][section]['actions']): + permissions.append(get_permission_key(page, section, action)) + else: + # Full page access + page = section_pattern + if page in APP_PERMISSIONS: + for section_key, section_data in APP_PERMISSIONS[page]['sections'].items(): + for action in section_data['actions']: + permissions.append(get_permission_key(page, section_key, action)) + + # Remove any restricted permissions + restrictions = role_config.get('restrictions', []) + permissions = [p for p in permissions if p not in restrictions] + + return permissions \ No newline at end of file diff --git a/py_app/app/routes.py b/py_app/app/routes.py index 5f86e77..2c8455f 100644 --- a/py_app/app/routes.py +++ b/py_app/app/routes.py @@ -11,7 +11,9 @@ import csv from .warehouse import add_location from .settings import ( settings_handler, - edit_access_roles_handler, + role_permissions_handler, + save_role_permissions_handler, + reset_role_permissions_handler, create_user_handler, edit_user_handler, delete_user_handler, @@ -21,11 +23,6 @@ from .settings import ( bp = Blueprint('main', __name__) warehouse_bp = Blueprint('warehouse', __name__) -@bp.route('/update_role_access/', methods=['POST']) -def update_role_access(role): - from .settings import update_role_access_handler - return update_role_access_handler(role) - @bp.route('/store_articles') def store_articles(): return render_template('store_articles.html') @@ -153,11 +150,6 @@ def dashboard(): def settings(): return settings_handler() -# Route for editing access roles (superadmin only) -@bp.route('/edit_access_roles') -def edit_access_roles(): - return edit_access_roles_handler() - @bp.route('/quality') def quality(): if 'role' not in session or session['role'] not in ['superadmin', 'quality']: @@ -264,6 +256,19 @@ def delete_user(): def save_external_db(): return save_external_db_handler() +# Role Permissions Management Routes +@bp.route('/role_permissions') +def role_permissions(): + return role_permissions_handler() + +@bp.route('/settings/save_role_permissions', methods=['POST']) +def save_role_permissions(): + return save_role_permissions_handler() + +@bp.route('/settings/reset_role_permissions', methods=['POST']) +def reset_role_permissions(): + return reset_role_permissions_handler() + @bp.route('/get_report_data', methods=['GET']) def get_report_data(): report = request.args.get('report') diff --git a/py_app/app/settings.py b/py_app/app/settings.py index 7b59a67..5ade332 100644 --- a/py_app/app/settings.py +++ b/py_app/app/settings.py @@ -1,62 +1,171 @@ -from flask import render_template, request, session, redirect, url_for, flash, current_app +from flask import render_template, request, session, redirect, url_for, flash, current_app, jsonify from .models import User from . import db +from .permissions import APP_PERMISSIONS, ROLE_HIERARCHY, ACTIONS, get_all_permissions, get_default_permissions_for_role import mariadb import os +import json + +# Global permission cache to avoid repeated database queries +_permission_cache = {} + +def check_permission(permission_key, user_role=None): + """ + Check if the current user (or specified role) has a specific permission. + + Args: + permission_key (str): The permission key like 'settings.user_management.create' + user_role (str, optional): Role to check. If None, uses current session role. + + Returns: + bool: True if user has the permission, False otherwise + """ + if user_role is None: + user_role = session.get('role') + + if not user_role: + return False + + # Superadmin always has all permissions + if user_role == 'superadmin': + return True + + # Check cache first + cache_key = f"{user_role}:{permission_key}" + if cache_key in _permission_cache: + return _permission_cache[cache_key] + + try: + conn = get_external_db_connection() + cursor = conn.cursor() + + cursor.execute(""" + SELECT granted FROM role_permissions + WHERE role = %s AND permission_key = %s + """, (user_role, permission_key)) + + result = cursor.fetchone() + conn.close() + + # Cache the result + has_permission = bool(result and result[0]) + _permission_cache[cache_key] = has_permission + return has_permission + + except Exception as e: + print(f"Error checking permission {permission_key} for role {user_role}: {e}") + return False + +def clear_permission_cache(): + """Clear the permission cache (call after permission updates)""" + global _permission_cache + _permission_cache = {} + +def require_permission(permission_key): + """ + Decorator to require a specific permission for a route. + + Usage: + @require_permission('settings.user_management.create') + def create_user(): + ... + """ + def decorator(f): + from functools import wraps + + @wraps(f) + def decorated_function(*args, **kwargs): + if not check_permission(permission_key): + flash(f'Access denied: You do not have permission to {permission_key}') + return redirect(url_for('main.dashboard')) + return f(*args, **kwargs) + return decorated_function + return decorator + +def get_user_permissions(user_role): + """Get all permissions for a specific role""" + if user_role == 'superadmin': + return [p['key'] for p in get_all_permissions()] + + try: + conn = get_external_db_connection() + cursor = conn.cursor() + + cursor.execute(""" + SELECT permission_key FROM role_permissions + WHERE role = %s AND granted = TRUE + """, (user_role,)) + + result = cursor.fetchall() + conn.close() + + return [row[0] for row in result] + + except Exception as e: + print(f"Error getting permissions for role {user_role}: {e}") + return [] # Settings module logic -import sqlite3 -import os -def ensure_roles_table(): - instance_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../instance')) - if not os.path.exists(instance_folder): - os.makedirs(instance_folder) - db_path = os.path.join(instance_folder, 'users.db') - conn = sqlite3.connect(db_path) - cursor = conn.cursor() - cursor.execute(""" - CREATE TABLE IF NOT EXISTS roles ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT UNIQUE NOT NULL, - access_level TEXT NOT NULL, - description TEXT - ) - """) - cursor.execute(""" - INSERT OR IGNORE INTO roles (name, access_level, description) - VALUES (?, ?, ?) - """, ('superadmin', 'full', 'Full access to all app areas and functions')) - conn.commit() - conn.close() -# List of roles (should match your app's roles) -ROLES = [ - 'superadmin', 'admin', 'manager', 'warehouse_manager', 'warehouse_worker', 'quality_manager', 'quality_worker' -] # Helper to check if current user is superadmin def is_superadmin(): return session.get('role') == 'superadmin' -# Route handler for editing access roles -def edit_access_roles_handler(): +# Route handler for role permissions management +def role_permissions_handler(): if not is_superadmin(): flash('Access denied: Superadmin only.') return redirect(url_for('main.dashboard')) - ensure_roles_table() - return render_template('edit_access_roles.html', roles=ROLES) + + try: + # Get roles and their current permissions + conn = get_external_db_connection() + cursor = conn.cursor() + + # Get roles from role_hierarchy table + cursor.execute("SELECT role_name, display_name, description, level FROM role_hierarchy ORDER BY level DESC") + role_data = cursor.fetchall() + + roles = {} + for role_name, display_name, description, level in role_data: + roles[role_name] = { + 'display_name': display_name, + 'description': description, + 'level': level + } + + # Get current role permissions + cursor.execute(""" + SELECT role, permission_key + FROM role_permissions + WHERE granted = TRUE + """) + permission_data = cursor.fetchall() + + role_permissions = {} + for role, permission_key in permission_data: + if role not in role_permissions: + role_permissions[role] = [] + role_permissions[role].append(permission_key) + + conn.close() + + # Convert to JSON for JavaScript + permissions_json = json.dumps(get_all_permissions()) + role_permissions_json = json.dumps(role_permissions) + + return render_template('role_permissions.html', + roles=roles, + pages=APP_PERMISSIONS, + action_names=ACTIONS, + permissions_json=permissions_json, + role_permissions_json=role_permissions_json) + + except Exception as e: + flash(f'Error loading role permissions: {e}') + return redirect(url_for('main.settings')) + -# Handler for updating role access (stub, to be implemented) -def update_role_access_handler(role): - if not is_superadmin(): - flash('Access denied: Superadmin only.') - return redirect(url_for('main.dashboard')) - if role == 'superadmin': - flash('Superadmin access cannot be changed.') - return redirect(url_for('main.edit_access_roles')) - access_level = request.form.get('access_level') - # TODO: Save access_level for the role in the database or config - flash(f'Access for role {role} updated to {access_level}.') - return redirect(url_for('main.edit_access_roles')) def settings_handler(): if 'role' not in session or session['role'] != 'superadmin': @@ -75,12 +184,13 @@ def settings_handler(): id INT AUTO_INCREMENT PRIMARY KEY, username VARCHAR(50) UNIQUE NOT NULL, password VARCHAR(255) NOT NULL, - role VARCHAR(50) NOT NULL + role VARCHAR(50) NOT NULL, + email VARCHAR(255) ) ''') # Get all users from external database - cursor.execute("SELECT id, username, password, role FROM users") + cursor.execute("SELECT id, username, password, role, email FROM users") users_data = cursor.fetchall() # Convert to list of dictionaries for template compatibility @@ -90,7 +200,8 @@ def settings_handler(): 'id': user_data[0], 'username': user_data[1], 'password': user_data[2], - 'role': user_data[3] + 'role': user_data[3], + 'email': user_data[4] if len(user_data) > 4 else None }) conn.close() @@ -142,6 +253,7 @@ def create_user_handler(): username = request.form['username'] password = request.form['password'] role = request.form['role'] + email = request.form.get('email', '').strip() or None # Optional field try: # Connect to external MariaDB database @@ -154,7 +266,8 @@ def create_user_handler(): id INT AUTO_INCREMENT PRIMARY KEY, username VARCHAR(50) UNIQUE NOT NULL, password VARCHAR(255) NOT NULL, - role VARCHAR(50) NOT NULL + role VARCHAR(50) NOT NULL, + email VARCHAR(255) ) ''') @@ -167,9 +280,9 @@ def create_user_handler(): # Create a new user in external MariaDB cursor.execute(""" - INSERT INTO users (username, password, role) - VALUES (%s, %s, %s) - """, (username, password, role)) + INSERT INTO users (username, password, role, email) + VALUES (%s, %s, %s, %s) + """, (username, password, role, email)) conn.commit() conn.close() @@ -189,6 +302,7 @@ def edit_user_handler(): user_id = request.form.get('user_id') password = request.form.get('password', '').strip() role = request.form.get('role') + email = request.form.get('email', '').strip() or None # Optional field if not user_id or not role: flash('Missing required fields.') @@ -209,13 +323,13 @@ def edit_user_handler(): # Update the user's details in external MariaDB if password: # Only update password if provided cursor.execute(""" - UPDATE users SET password = %s, role = %s WHERE id = %s - """, (password, role, user_id)) + UPDATE users SET password = %s, role = %s, email = %s WHERE id = %s + """, (password, role, email, user_id)) flash('User updated successfully (including password).') - else: # Just update role if no password provided + else: # Just update role and email if no password provided cursor.execute(""" - UPDATE users SET role = %s WHERE id = %s - """, (role, user_id)) + UPDATE users SET role = %s, email = %s WHERE id = %s + """, (role, email, user_id)) flash('User role updated successfully.') conn.commit() @@ -283,3 +397,97 @@ def save_external_db_handler(): flash('External database settings saved/updated successfully.') return redirect(url_for('main.settings')) + +def save_role_permissions_handler(): + """Save role permissions via AJAX""" + if not is_superadmin(): + return jsonify({'success': False, 'error': 'Access denied: Superadmin only.'}) + + try: + data = request.get_json() + role = data.get('role') + permissions = data.get('permissions', []) + + if not role: + return jsonify({'success': False, 'error': 'Role is required'}) + + conn = get_external_db_connection() + cursor = conn.cursor() + + # Clear existing permissions for this role + cursor.execute("DELETE FROM role_permissions WHERE role = %s", (role,)) + + # Add new permissions + current_user = session.get('username', 'system') + for permission_key in permissions: + cursor.execute(""" + INSERT INTO role_permissions (role, permission_key, granted, granted_by) + VALUES (%s, %s, TRUE, %s) + """, (role, permission_key, current_user)) + + # Log the change + cursor.execute(""" + INSERT INTO permission_audit_log (role, permission_key, action, changed_by, reason) + VALUES (%s, %s, 'bulk_update', %s, %s) + """, (role, f"Updated {len(permissions)} permissions", current_user, f"Bulk update via UI")) + + conn.commit() + conn.close() + + # Clear permission cache since permissions changed + clear_permission_cache() + + return jsonify({'success': True, 'message': f'Saved {len(permissions)} permissions for {role}'}) + + except Exception as e: + return jsonify({'success': False, 'error': str(e)}) + +def reset_role_permissions_handler(): + """Reset role permissions to defaults""" + if not is_superadmin(): + return jsonify({'success': False, 'error': 'Access denied: Superadmin only.'}) + + try: + data = request.get_json() + role = data.get('role') + + if not role: + return jsonify({'success': False, 'error': 'Role is required'}) + + # Get default permissions for the role + default_permissions = get_default_permissions_for_role(role) + + conn = get_external_db_connection() + cursor = conn.cursor() + + # Clear existing permissions for this role + cursor.execute("DELETE FROM role_permissions WHERE role = %s", (role,)) + + # Add default permissions + current_user = session.get('username', 'system') + for permission_key in default_permissions: + cursor.execute(""" + INSERT INTO role_permissions (role, permission_key, granted, granted_by) + VALUES (%s, %s, TRUE, %s) + """, (role, permission_key, current_user)) + + # Log the change + cursor.execute(""" + INSERT INTO permission_audit_log (role, permission_key, action, changed_by, reason) + VALUES (%s, %s, 'reset_defaults', %s, %s) + """, (role, f"Reset {len(default_permissions)} permissions", current_user, "Reset to default permissions")) + + conn.commit() + conn.close() + + # Clear permission cache since permissions changed + clear_permission_cache() + + return jsonify({ + 'success': True, + 'permissions': default_permissions, + 'message': f'Reset {len(default_permissions)} permissions for {role} to defaults' + }) + + except Exception as e: + return jsonify({'success': False, 'error': str(e)}) diff --git a/py_app/app/templates/edit_access_roles.html b/py_app/app/templates/edit_access_roles.html deleted file mode 100644 index ae53cb1..0000000 --- a/py_app/app/templates/edit_access_roles.html +++ /dev/null @@ -1,43 +0,0 @@ -{% extends "base.html" %} -{% block title %}Edit Access Roles{% endblock %} -{% block content %} -
-

Role Access Management

-

Configure which roles can view or execute functions on each app page and feature.

- - - - - - - - - - - - - - - {% for role in roles %} - {% if role != 'superadmin' %} - - - - - - {% endif %} - {% endfor %} - -
RoleAccess LevelEditable
superadminFull access to all pages and functionsNot editable
{{ role }} -
- - -
-
Editable
-

Only superadmin users can view and manage role access.

-
-{% endblock %} diff --git a/py_app/app/templates/role_permissions.html b/py_app/app/templates/role_permissions.html new file mode 100644 index 0000000..735880d --- /dev/null +++ b/py_app/app/templates/role_permissions.html @@ -0,0 +1,748 @@ +{% extends "base.html" %} + +{% block title %}Role Permissions Management{% endblock %} + +{% block head %} + +{% endblock %} + +{% block content %} +
+
+

Role Permissions Management

+

Configure granular access permissions for each role in the system

+
+ + +
+ {% for role_name, role_data in roles.items() %} +
+
{{ role_data.display_name }}
+ Level {{ role_data.level }} +
+ {% endfor %} +
+ + {% for role_name, role_data in roles.items() %} +
+ + +
+

{{ role_data.display_name }} Permissions Summary

+

{{ role_data.description }}

+
+
+
0
+
Total Permissions
+
+
+
0
+
Granted
+
+
+
0
+
Denied
+
+
+
+ + +
+ {% for page_key, page_data in pages.items() %} +
+ +
+ {% for section_key, section_data in page_data.sections.items() %} +
+
+ + + {{ section_data.name }} + + 0/{{ section_data.actions|length }} +
+
+
+ {% for action in section_data.actions %} +
+
+
+ {% if action == 'view' %}👁{% elif action == 'create' %}➕{% elif action == 'edit' %}✏️{% elif action == 'delete' %}🗑{% elif action == 'upload' %}📤{% elif action == 'download' %}📥{% elif action == 'export' %}📊{% elif action == 'import' %}📈{% endif %} +
+ {{ action_names.get(action, action) }} +
+ +
+ {% endfor %} +
+
+
+ {% endfor %} +
+
+ {% endfor %} +
+ + +
+ + + +
+
+ {% endfor %} +
+ + +{% endblock %} \ No newline at end of file diff --git a/py_app/app/templates/settings.html b/py_app/app/templates/settings.html index 5f6d7a1..d63ccfc 100644 --- a/py_app/app/templates/settings.html +++ b/py_app/app/templates/settings.html @@ -37,9 +37,9 @@
-

Edit Access Roles

-

Manage which roles can view or execute functions on each app page and feature.

- Edit Access Roles +

Role & Permissions Management

+

Configure granular permissions for each role in the system with expandable sections and detailed access control.

+ Manage Role Permissions
@@ -105,6 +105,7 @@ Array.from(document.getElementsByClassName('edit-user-btn')).forEach(function(bt document.getElementById('user-popup-title').innerText = 'Edit User'; document.getElementById('user-id').value = btn.getAttribute('data-user-id'); document.getElementById('username').value = btn.getAttribute('data-username'); + document.getElementById('email').value = btn.getAttribute('data-email') || ''; document.getElementById('role').value = btn.getAttribute('data-role'); document.getElementById('password').value = ''; document.getElementById('password').required = false;