feat: Implement warehouse module roles with auto-schema repair and remove module access section

- Add SchemaVerifier class for automatic database schema verification and repair
- Implement warehouse_manager (Level 75) and warehouse_worker (Level 35) roles
- Add zone-based access control for warehouse workers
- Implement worker-manager binding system with zone filtering
- Add comprehensive database auto-repair on Docker initialization
- Remove Module Access section from user form (role-based access only)
- Add autocomplete attributes to password fields for better UX
- Include detailed documentation for warehouse implementation
- Update initialize_db.py with schema verification as Step 0
This commit is contained in:
Quality App Developer
2026-01-28 00:46:59 +02:00
parent e6ff40184a
commit 8de85ca87f
18 changed files with 4194 additions and 167 deletions

View File

@@ -63,15 +63,11 @@ def fg_scan():
operator_code, cp_code, oc1_code, oc2_code, defect_code, date, time
)
# Flash appropriate message based on defect code
if int(defect_code) == 0 or defect_code == '000':
flash(f'✅ APPROVED scan recorded for {cp_code}. Total approved: {approved_count}')
else:
flash(f'❌ REJECTED scan recorded for {cp_code} (defect: {defect_code}). Total rejected: {rejected_count}')
# Note: Flash messages are NOT used here - JS notifications handle this on the frontend
# This prevents wide flash message alerts from appearing
except Exception as e:
logger.error(f"Error saving finish goods scan data: {e}")
flash(f"Error saving scan data: {str(e)}", 'error')
# Check if this is an AJAX request (for scan-to-boxes feature)
if request.headers.get('X-Requested-With') == 'XMLHttpRequest' or request.accept_mimetypes.best == 'application/json':
@@ -86,7 +82,6 @@ def fg_scan():
scan_groups = get_latest_scans(limit=25)
except Exception as e:
logger.error(f"Error fetching latest scans: {e}")
flash(f"Error fetching scan data: {str(e)}", 'error')
scan_groups = []
return render_template('modules/quality/fg_scan.html', scan_groups=scan_groups)

View File

@@ -185,7 +185,6 @@ def create_user():
password = request.form.get('password', '').strip()
confirm_password = request.form.get('confirm_password', '').strip()
is_active = request.form.get('is_active') == 'on'
modules = request.form.getlist('modules')
# Validation
errors = []
@@ -202,8 +201,6 @@ def create_user():
errors.append("Passwords do not match")
if len(password) < 8:
errors.append("Password must be at least 8 characters")
if not modules:
errors.append("Select at least one module")
if errors:
conn = get_db()
@@ -251,13 +248,6 @@ def create_user():
(user_id, password_hash)
)
# Grant module access
for module in modules:
cursor.execute(
"INSERT IGNORE INTO user_modules (user_id, module_name) VALUES (%s, %s)",
(user_id, module)
)
conn.commit()
cursor.close()
@@ -311,11 +301,10 @@ def edit_user(user_id):
if request.method == 'GET':
cursor.close()
available_modules = ['quality', 'settings']
return render_template('modules/settings/user_form.html',
user=user,
roles=roles,
available_modules=available_modules,
available_modules=['quality', 'settings'],
user_modules=user_modules)
# Handle POST - Update user
@@ -326,7 +315,6 @@ def edit_user(user_id):
password = request.form.get('password', '').strip()
confirm_password = request.form.get('confirm_password', '').strip()
is_active = request.form.get('is_active') == 'on'
modules = request.form.getlist('modules')
# Validation
errors = []
@@ -339,16 +327,13 @@ def edit_user(user_id):
errors.append("Passwords do not match")
if password and len(password) < 8:
errors.append("Password must be at least 8 characters")
if not modules:
errors.append("Select at least one module")
if errors:
cursor.close()
available_modules = ['quality', 'settings']
return render_template('modules/settings/user_form.html',
user=user,
roles=roles,
available_modules=available_modules,
available_modules=['quality', 'settings'],
user_modules=user_modules,
error="; ".join(errors))
@@ -366,14 +351,6 @@ def edit_user(user_id):
(password_hash, user_id)
)
# Update module access
cursor.execute("DELETE FROM user_modules WHERE user_id = %s", (user_id,))
for module in modules:
cursor.execute(
"INSERT INTO user_modules (user_id, module_name) VALUES (%s, %s)",
(user_id, module)
)
conn.commit()
cursor.close()
@@ -381,11 +358,10 @@ def edit_user(user_id):
except Exception as e:
cursor.close()
available_modules = ['quality', 'settings']
return render_template('modules/settings/user_form.html',
user=user,
roles=roles,
available_modules=available_modules,
available_modules=['quality', 'settings'],
user_modules=user_modules,
error=f"Error updating user: {str(e)}")
@@ -862,6 +838,40 @@ def restore_database():
return jsonify({'error': str(e)}), 500
@settings_bp.route('/api/database/tables', methods=['GET'])
def get_database_tables():
"""Get list of all database tables (dynamically fetched)"""
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
try:
conn = get_db()
cursor = conn.cursor()
# Get list of all tables with their row counts
cursor.execute("""
SELECT TABLE_NAME, TABLE_ROWS
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = DATABASE()
ORDER BY TABLE_NAME
""")
tables = [{'name': row[0], 'rows': row[1] or 0} for row in cursor.fetchall()]
cursor.close()
return jsonify({
'success': True,
'tables': tables,
'count': len(tables)
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@settings_bp.route('/api/database/truncate', methods=['POST'])
def truncate_table():
"""Truncate (clear) a database table"""

View File

@@ -0,0 +1,329 @@
"""
Warehouse Worker Management Functions
Handles worker-manager bindings and zone assignments for the warehouse module
"""
from app.database import get_db
import logging
logger = logging.getLogger(__name__)
def assign_worker_to_manager(manager_id, worker_id, warehouse_zone=None):
"""
Assign a warehouse worker to a manager with optional zone restriction
Args:
manager_id (int): Manager user ID
worker_id (int): Worker user ID
warehouse_zone (str, optional): Zone restriction (e.g., "Cold Storage", "High Shelf")
Returns:
tuple: (success: bool, message: str)
"""
try:
if manager_id == worker_id:
return False, "Cannot assign a worker to themselves"
db = get_db()
cursor = db.cursor()
# Check if binding already exists
cursor.execute("""
SELECT id FROM worker_manager_bindings
WHERE manager_id = %s AND worker_id = %s
""", (manager_id, worker_id))
existing = cursor.fetchone()
if existing:
# Update existing binding
cursor.execute("""
UPDATE worker_manager_bindings
SET warehouse_zone = %s, is_active = 1, updated_at = NOW()
WHERE manager_id = %s AND worker_id = %s
""", (warehouse_zone, manager_id, worker_id))
message = f"Worker assignment updated (zone: {warehouse_zone or 'all zones'})"
else:
# Create new binding
cursor.execute("""
INSERT INTO worker_manager_bindings (manager_id, worker_id, warehouse_zone)
VALUES (%s, %s, %s)
""", (manager_id, worker_id, warehouse_zone))
message = f"Worker assigned to manager (zone: {warehouse_zone or 'all zones'})"
db.commit()
logger.info(f"{message} - Manager ID: {manager_id}, Worker ID: {worker_id}")
return True, message
except Exception as e:
logger.error(f"Error assigning worker to manager: {e}")
return False, f"Error: {str(e)}"
def unassign_worker_from_manager(manager_id, worker_id, soft_delete=True):
"""
Remove a worker from a manager's supervision
Args:
manager_id (int): Manager user ID
worker_id (int): Worker user ID
soft_delete (bool): If True, set is_active=0; if False, delete record
Returns:
tuple: (success: bool, message: str)
"""
try:
db = get_db()
cursor = db.cursor()
if soft_delete:
cursor.execute("""
UPDATE worker_manager_bindings
SET is_active = 0, updated_at = NOW()
WHERE manager_id = %s AND worker_id = %s
""", (manager_id, worker_id))
else:
cursor.execute("""
DELETE FROM worker_manager_bindings
WHERE manager_id = %s AND worker_id = %s
""", (manager_id, worker_id))
db.commit()
logger.info(f"✓ Worker unassigned - Manager ID: {manager_id}, Worker ID: {worker_id}")
return True, "Worker unassigned successfully"
except Exception as e:
logger.error(f"Error unassigning worker: {e}")
return False, f"Error: {str(e)}"
def get_worker_binding_info(worker_id):
"""
Get binding information for a worker (who manages them, what zone)
Args:
worker_id (int): Worker user ID
Returns:
dict: {manager_id, manager_username, manager_name, zone, is_active} or None
"""
try:
db = get_db()
cursor = db.cursor()
cursor.execute("""
SELECT m.id, m.username, m.full_name, wmb.warehouse_zone, wmb.is_active
FROM worker_manager_bindings wmb
JOIN users m ON wmb.manager_id = m.id
WHERE wmb.worker_id = %s
LIMIT 1
""", (worker_id,))
result = cursor.fetchone()
if result:
return {
'manager_id': result[0],
'manager_username': result[1],
'manager_name': result[2],
'zone': result[3],
'is_active': result[4]
}
return None
except Exception as e:
logger.error(f"Error getting worker binding info: {e}")
return None
def get_available_workers_for_assignment(exclude_manager_id=None):
"""
Get all warehouse_worker users available for assignment
Args:
exclude_manager_id (int, optional): Manager ID to exclude from results
Returns:
list: List of dicts {id, username, full_name, current_manager}
"""
try:
db = get_db()
cursor = db.cursor()
query = """
SELECT u.id, u.username, u.full_name,
(SELECT m.full_name FROM worker_manager_bindings wmb
JOIN users m ON wmb.manager_id = m.id
WHERE wmb.worker_id = u.id AND wmb.is_active = 1 LIMIT 1) as current_manager
FROM users u
WHERE u.role = 'warehouse_worker'
ORDER BY u.full_name
"""
cursor.execute(query)
results = []
for row in cursor.fetchall():
results.append({
'id': row[0],
'username': row[1],
'full_name': row[2],
'current_manager': row[3]
})
return results
except Exception as e:
logger.error(f"Error getting available workers: {e}")
return []
def get_manager_assigned_workers(manager_id, include_inactive=False):
"""
Get all workers assigned to a specific manager
Args:
manager_id (int): Manager user ID
include_inactive (bool): Whether to include inactive bindings
Returns:
list: List of dicts {id, username, full_name, zone, is_active}
"""
try:
db = get_db()
cursor = db.cursor()
query = """
SELECT u.id, u.username, u.full_name, wmb.warehouse_zone, wmb.is_active
FROM worker_manager_bindings wmb
JOIN users u ON wmb.worker_id = u.id
WHERE wmb.manager_id = %s
"""
params = [manager_id]
if not include_inactive:
query += " AND wmb.is_active = 1"
query += " ORDER BY u.full_name"
cursor.execute(query, params)
results = []
for row in cursor.fetchall():
results.append({
'id': row[0],
'username': row[1],
'full_name': row[2],
'zone': row[3],
'is_active': row[4]
})
return results
except Exception as e:
logger.error(f"Error getting manager's workers: {e}")
return []
def validate_zone_name(zone_name):
"""
Validate zone name format (alphanumeric, spaces, hyphens, underscores allowed)
Args:
zone_name (str): Zone name to validate
Returns:
tuple: (is_valid: bool, message: str)
"""
if not zone_name:
return True, "No zone restriction" # NULL zone is valid (all zones)
if not isinstance(zone_name, str):
return False, "Zone must be a string"
zone_name = zone_name.strip()
if len(zone_name) > 100:
return False, "Zone name too long (max 100 characters)"
if len(zone_name) < 2:
return False, "Zone name too short (min 2 characters)"
# Allow alphanumeric, spaces, hyphens, underscores
import re
if not re.match(r'^[a-zA-Z0-9\s\-_]+$', zone_name):
return False, "Zone name contains invalid characters"
return True, "Valid zone name"
def get_warehouse_zones():
"""
Get list of all warehouse zones in use
Returns:
list: List of zone names currently assigned to workers
"""
try:
db = get_db()
cursor = db.cursor()
cursor.execute("""
SELECT DISTINCT warehouse_zone
FROM worker_manager_bindings
WHERE is_active = 1 AND warehouse_zone IS NOT NULL
ORDER BY warehouse_zone
""")
zones = [row[0] for row in cursor.fetchall()]
return zones
except Exception as e:
logger.error(f"Error getting warehouse zones: {e}")
return []
def reassign_worker(worker_id, new_manager_id, warehouse_zone=None):
"""
Reassign a worker from current manager to a new manager
Args:
worker_id (int): Worker user ID
new_manager_id (int): New manager user ID
warehouse_zone (str, optional): New zone restriction
Returns:
tuple: (success: bool, message: str)
"""
try:
if worker_id == new_manager_id:
return False, "Cannot assign a worker to themselves"
db = get_db()
cursor = db.cursor()
# Deactivate old binding
cursor.execute("""
UPDATE worker_manager_bindings
SET is_active = 0
WHERE worker_id = %s AND is_active = 1
""", (worker_id,))
# Create new binding
cursor.execute("""
INSERT INTO worker_manager_bindings (manager_id, worker_id, warehouse_zone)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE
is_active = 1,
warehouse_zone = %s,
updated_at = NOW()
""", (new_manager_id, worker_id, warehouse_zone, warehouse_zone))
db.commit()
logger.info(f"✓ Worker reassigned - Worker ID: {worker_id}, New Manager ID: {new_manager_id}")
return True, "Worker reassigned successfully"
except Exception as e:
logger.error(f"Error reassigning worker: {e}")
return False, f"Error: {str(e)}"