updated control access
This commit is contained in:
172
old code/migrate_permissions.py
Executable file
172
old code/migrate_permissions.py
Executable file
@@ -0,0 +1,172 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Migration script to convert from complex permission system to simplified 4-tier system
|
||||
This script will:
|
||||
1. Add 'modules' column to users table
|
||||
2. Convert existing roles to new 4-tier system
|
||||
3. Assign appropriate modules based on old roles
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
|
||||
# Add the app directory to Python path
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||
|
||||
def get_db_connections():
|
||||
"""Get both internal SQLite and external database connections"""
|
||||
connections = {}
|
||||
|
||||
# Internal SQLite database
|
||||
internal_db_path = os.path.join(os.path.dirname(__file__), 'instance/users.db')
|
||||
if os.path.exists(internal_db_path):
|
||||
connections['internal'] = sqlite3.connect(internal_db_path)
|
||||
print(f"Connected to internal SQLite database: {internal_db_path}")
|
||||
|
||||
# External database (try to connect using existing method)
|
||||
try:
|
||||
import mariadb
|
||||
|
||||
# Read external database configuration
|
||||
config_file = os.path.join(os.path.dirname(__file__), '../external_database_settings')
|
||||
if os.path.exists(config_file):
|
||||
with open(config_file, 'r') as f:
|
||||
lines = f.read().strip().split('\n')
|
||||
if len(lines) >= 5:
|
||||
host = lines[0].strip()
|
||||
port = int(lines[1].strip())
|
||||
database = lines[2].strip()
|
||||
user = lines[3].strip()
|
||||
password = lines[4].strip()
|
||||
|
||||
conn = mariadb.connect(
|
||||
user=user,
|
||||
password=password,
|
||||
host=host,
|
||||
port=port,
|
||||
database=database
|
||||
)
|
||||
connections['external'] = conn
|
||||
print(f"Connected to external MariaDB database: {host}:{port}/{database}")
|
||||
except Exception as e:
|
||||
print(f"Could not connect to external database: {e}")
|
||||
|
||||
return connections
|
||||
|
||||
def role_mapping():
|
||||
"""Map old roles to new 4-tier system"""
|
||||
return {
|
||||
# Old role -> (new_role, modules)
|
||||
'superadmin': ('superadmin', []), # All modules by default
|
||||
'administrator': ('admin', []), # All modules by default
|
||||
'admin': ('admin', []), # All modules by default
|
||||
'quality': ('manager', ['quality']),
|
||||
'warehouse': ('manager', ['warehouse']),
|
||||
'warehouse_manager': ('manager', ['warehouse']),
|
||||
'scan': ('worker', ['quality']), # Assume scan users are quality workers
|
||||
'etichete': ('manager', ['labels']),
|
||||
'quality_manager': ('manager', ['quality']),
|
||||
'quality_worker': ('worker', ['quality']),
|
||||
}
|
||||
|
||||
def migrate_database(conn, db_type):
|
||||
"""Migrate a specific database"""
|
||||
cursor = conn.cursor()
|
||||
|
||||
print(f"Migrating {db_type} database...")
|
||||
|
||||
# Check if users table exists
|
||||
if db_type == 'internal':
|
||||
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'")
|
||||
else: # external/MariaDB
|
||||
cursor.execute("SHOW TABLES LIKE 'users'")
|
||||
|
||||
if not cursor.fetchone():
|
||||
print(f"No users table found in {db_type} database")
|
||||
return
|
||||
|
||||
# Check if modules column already exists
|
||||
try:
|
||||
if db_type == 'internal':
|
||||
cursor.execute("PRAGMA table_info(users)")
|
||||
columns = [row[1] for row in cursor.fetchall()]
|
||||
else: # external/MariaDB
|
||||
cursor.execute("DESCRIBE users")
|
||||
columns = [row[0] for row in cursor.fetchall()]
|
||||
|
||||
if 'modules' not in columns:
|
||||
print(f"Adding modules column to {db_type} database...")
|
||||
if db_type == 'internal':
|
||||
cursor.execute("ALTER TABLE users ADD COLUMN modules TEXT")
|
||||
else: # external/MariaDB
|
||||
cursor.execute("ALTER TABLE users ADD COLUMN modules TEXT")
|
||||
else:
|
||||
print(f"Modules column already exists in {db_type} database")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error checking/adding modules column in {db_type}: {e}")
|
||||
return
|
||||
|
||||
# Get current users
|
||||
cursor.execute("SELECT id, username, role FROM users")
|
||||
users = cursor.fetchall()
|
||||
|
||||
print(f"Found {len(users)} users in {db_type} database")
|
||||
|
||||
# Convert roles and assign modules
|
||||
mapping = role_mapping()
|
||||
updates = []
|
||||
|
||||
for user_id, username, old_role in users:
|
||||
if old_role in mapping:
|
||||
new_role, modules = mapping[old_role]
|
||||
modules_json = json.dumps(modules) if modules else None
|
||||
updates.append((new_role, modules_json, user_id, username))
|
||||
print(f" {username}: {old_role} -> {new_role} with modules {modules}")
|
||||
else:
|
||||
print(f" {username}: Unknown role '{old_role}', keeping as-is")
|
||||
|
||||
# Apply updates
|
||||
for new_role, modules_json, user_id, username in updates:
|
||||
try:
|
||||
cursor.execute("UPDATE users SET role = ?, modules = ? WHERE id = ?",
|
||||
(new_role, modules_json, user_id))
|
||||
print(f" Updated {username} successfully")
|
||||
except Exception as e:
|
||||
print(f" Error updating {username}: {e}")
|
||||
|
||||
conn.commit()
|
||||
print(f"Migration completed for {db_type} database")
|
||||
|
||||
def main():
|
||||
"""Main migration function"""
|
||||
print("Starting migration to simplified 4-tier permission system...")
|
||||
print("="*60)
|
||||
|
||||
connections = get_db_connections()
|
||||
|
||||
if not connections:
|
||||
print("No database connections available. Please check your configuration.")
|
||||
return
|
||||
|
||||
for db_type, conn in connections.items():
|
||||
try:
|
||||
migrate_database(conn, db_type)
|
||||
print()
|
||||
except Exception as e:
|
||||
print(f"Error migrating {db_type} database: {e}")
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
print("Migration completed!")
|
||||
print("\nNew role structure:")
|
||||
print("- superadmin: Full system access")
|
||||
print("- admin: Full app access (except role_permissions and download_extension)")
|
||||
print("- manager: Module-based access (can have multiple modules)")
|
||||
print("- worker: Limited module access (one module only)")
|
||||
print("\nAvailable modules: quality, warehouse, labels")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user