docs: Add comprehensive settings page analysis and improvements

- Add detailed settings page analysis report (settings.md)
- Document identified security vulnerabilities and code quality issues
- Provide prioritized improvement recommendations
- Document permission and access control issues
- Add testing checklist for validation
- Track modifications to settings.py, routes.py, and settings.html templates
This commit is contained in:
Quality App System
2026-01-23 22:54:11 +02:00
parent 64b67b2979
commit d45dc1dab1
8 changed files with 1969 additions and 600 deletions

View File

@@ -22,11 +22,7 @@ from app.settings import (
save_role_permissions_handler,
reset_role_permissions_handler,
save_all_role_permissions_handler,
reset_all_role_permissions_handler,
edit_user_handler,
create_user_handler,
delete_user_handler,
save_external_db_handler
reset_all_role_permissions_handler
)
from .print_module import get_unprinted_orders_data, get_printed_orders_data
from .access_control import (
@@ -398,18 +394,17 @@ def create_user_simple():
# Add to external database
with db_connection_context() as conn:
cursor = conn.cursor()
# Check if user already exists
cursor.execute("SELECT username FROM users WHERE username=%s", (username,))
if cursor.fetchone():
flash(f'User "{username}" already exists.')
conn.close()
return redirect(url_for('main.user_management_simple'))
# Insert new user
cursor.execute("INSERT INTO users (username, password, role, modules) VALUES (%s, %s, %s, %s)",
(username, password, role, modules_json))
conn.commit()
# Check if user already exists
cursor.execute("SELECT username FROM users WHERE username=%s", (username,))
if cursor.fetchone():
flash(f'User "{username}" already exists.')
return redirect(url_for('main.user_management_simple'))
# Insert new user
cursor.execute("INSERT INTO users (username, password, role, modules) VALUES (%s, %s, %s, %s)",
(username, password, role, modules_json))
conn.commit()
flash(f'User "{username}" created successfully as {role}.')
return redirect(url_for('main.user_management_simple'))
@@ -450,23 +445,22 @@ def edit_user_simple():
# Update in external database
with db_connection_context() as conn:
cursor = conn.cursor()
# Check if username is taken by another user
cursor.execute("SELECT id FROM users WHERE username=%s AND id!=%s", (username, user_id))
if cursor.fetchone():
flash(f'Username "{username}" is already taken.')
conn.close()
return redirect(url_for('main.user_management_simple'))
# Update user
if password:
cursor.execute("UPDATE users SET username=%s, password=%s, role=%s, modules=%s WHERE id=%s",
(username, password, role, modules_json, user_id))
else:
cursor.execute("UPDATE users SET username=%s, role=%s, modules=%s WHERE id=%s",
(username, role, modules_json, user_id))
conn.commit()
# Check if username is taken by another user
cursor.execute("SELECT id FROM users WHERE username=%s AND id!=%s", (username, user_id))
if cursor.fetchone():
flash(f'Username "{username}" is already taken.')
return redirect(url_for('main.user_management_simple'))
# Update user
if password:
cursor.execute("UPDATE users SET username=%s, password=%s, role=%s, modules=%s WHERE id=%s",
(username, password, role, modules_json, user_id))
else:
cursor.execute("UPDATE users SET username=%s, role=%s, modules=%s WHERE id=%s",
(username, role, modules_json, user_id))
conn.commit()
flash(f'User "{username}" updated successfully.')
return redirect(url_for('main.user_management_simple'))
@@ -490,15 +484,15 @@ def delete_user_simple():
# Delete from external database
with db_connection_context() as conn:
cursor = conn.cursor()
# Get username before deleting
cursor.execute("SELECT username FROM users WHERE id=%s", (user_id,))
row = cursor.fetchone()
username = row[0] if row else 'Unknown'
# Delete user
cursor.execute("DELETE FROM users WHERE id=%s", (user_id,))
conn.commit()
# Get username before deleting
cursor.execute("SELECT username FROM users WHERE id=%s", (user_id,))
row = cursor.fetchone()
username = row[0] if row else 'Unknown'
# Delete user
cursor.execute("DELETE FROM users WHERE id=%s", (user_id,))
conn.commit()
flash(f'User "{username}" deleted successfully.')
return redirect(url_for('main.user_management_simple'))
@@ -523,38 +517,36 @@ def quick_update_modules():
# Get current user to validate role
with db_connection_context() as conn:
cursor = conn.cursor()
cursor.execute("SELECT username, role, modules FROM users WHERE id=%s", (user_id,))
user_row = cursor.fetchone()
if not user_row:
flash('User not found.')
conn.close()
return redirect(url_for('main.user_management_simple'))
username, role, current_modules = user_row
# Validate modules for the role
from app.permissions_simple import validate_user_modules
is_valid, error_msg = validate_user_modules(role, modules)
if not is_valid:
flash(f'Invalid module assignment: {error_msg}')
conn.close()
return redirect(url_for('main.user_management_simple'))
# Prepare modules JSON
modules_json = None
if modules and role in ['manager', 'worker']:
import json
modules_json = json.dumps(modules)
elif not modules and role in ['manager', 'worker']:
# Empty modules list for manager/worker
import json
modules_json = json.dumps([])
# Update modules only
cursor.execute("UPDATE users SET modules=%s WHERE id=%s", (modules_json, user_id))
conn.commit()
cursor.execute("SELECT username, role, modules FROM users WHERE id=%s", (user_id,))
user_row = cursor.fetchone()
if not user_row:
flash('User not found.')
return redirect(url_for('main.user_management_simple'))
username, role, current_modules = user_row
# Validate modules for the role
from app.permissions_simple import validate_user_modules
is_valid, error_msg = validate_user_modules(role, modules)
if not is_valid:
flash(f'Invalid module assignment: {error_msg}')
return redirect(url_for('main.user_management_simple'))
# Prepare modules JSON
modules_json = None
if modules and role in ['manager', 'worker']:
import json
modules_json = json.dumps(modules)
elif not modules and role in ['manager', 'worker']:
# Empty modules list for manager/worker
import json
modules_json = json.dumps([])
# Update modules only
cursor.execute("UPDATE users SET modules=%s WHERE id=%s", (modules_json, user_id))
conn.commit()
flash(f'Modules updated successfully for user "{username}". New modules: {", ".join(modules) if modules else "None"}', 'success')
@@ -606,31 +598,31 @@ def scan():
with db_connection_context() as conn:
cursor = conn.cursor()
# Insert new entry - the BEFORE INSERT trigger 'set_quantities_scan1' will automatically
# calculate and set approved_quantity and rejected_quantity for this new record
insert_query = """
INSERT INTO scan1_orders (operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_query, (operator_code, cp_code, oc1_code, oc2_code, defect_code, date, time))
conn.commit()
# Get the quantities from the newly inserted row for the flash message
cp_base_code = cp_code[:10]
cursor.execute("""
SELECT approved_quantity, rejected_quantity
FROM scan1_orders
WHERE CP_full_code = %s
""", (cp_code,))
result = cursor.fetchone()
approved_count = result[0] if result else 0
rejected_count = result[1] if result else 0
# Flash appropriate message
if int(defect_code) == 0:
flash(f'✅ APPROVED scan recorded for {cp_code}. Total approved: {approved_count}')
else:
flash(f'❌ REJECTED scan recorded for {cp_code} (defect: {defect_code}). Total rejected: {rejected_count}')
# Insert new entry - the BEFORE INSERT trigger 'set_quantities_scan1' will automatically
# calculate and set approved_quantity and rejected_quantity for this new record
insert_query = """
INSERT INTO scan1_orders (operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_query, (operator_code, cp_code, oc1_code, oc2_code, defect_code, date, time))
conn.commit()
# Get the quantities from the newly inserted row for the flash message
cp_base_code = cp_code[:10]
cursor.execute("""
SELECT approved_quantity, rejected_quantity
FROM scan1_orders
WHERE CP_full_code = %s
""", (cp_code,))
result = cursor.fetchone()
approved_count = result[0] if result else 0
rejected_count = result[1] if result else 0
# Flash appropriate message
if int(defect_code) == 0:
flash(f'✅ APPROVED scan recorded for {cp_code}. Total approved: {approved_count}')
else:
flash(f'❌ REJECTED scan recorded for {cp_code} (defect: {defect_code}). Total rejected: {rejected_count}')
except mariadb.Error as e:
@@ -642,15 +634,15 @@ def scan():
try:
with db_connection_context() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
ORDER BY Id DESC
LIMIT 15
""")
raw_scan_data = cursor.fetchall()
# Apply formatting to scan data for consistent date display
scan_data = [[format_cell_data(cell) for cell in row] for row in raw_scan_data]
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
ORDER BY Id DESC
LIMIT 15
""")
raw_scan_data = cursor.fetchall()
# Apply formatting to scan data for consistent date display
scan_data = [[format_cell_data(cell) for cell in row] for row in raw_scan_data]
except mariadb.Error as e:
print(f"Error fetching scan data: {e}")
flash(f"Error fetching scan data: {e}")
@@ -685,32 +677,32 @@ def fg_scan():
with db_connection_context() as conn:
cursor = conn.cursor()
# Always insert a new entry - each scan is a separate record
# Note: The trigger 'increment_approved_quantity_fg' will automatically
# update approved_quantity or rejected_quantity for all records with same CP_base_code
insert_query = """
INSERT INTO scanfg_orders (operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_query, (operator_code, cp_code, oc1_code, oc2_code, defect_code, date, time))
conn.commit()
# Get the quantities from the newly inserted row for the flash message
cp_base_code = cp_code[:10]
cursor.execute("""
SELECT approved_quantity, rejected_quantity
FROM scanfg_orders
WHERE CP_full_code = %s
""", (cp_code,))
result = cursor.fetchone()
approved_count = result[0] if result else 0
rejected_count = result[1] if result else 0
# Flash appropriate message
if int(defect_code) == 0:
flash(f'✅ APPROVED scan recorded for {cp_code}. Total approved: {approved_count}')
else:
flash(f'❌ REJECTED scan recorded for {cp_code} (defect: {defect_code}). Total rejected: {rejected_count}')
# Always insert a new entry - each scan is a separate record
# Note: The trigger 'increment_approved_quantity_fg' will automatically
# update approved_quantity or rejected_quantity for all records with same CP_base_code
insert_query = """
INSERT INTO scanfg_orders (operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_query, (operator_code, cp_code, oc1_code, oc2_code, defect_code, date, time))
conn.commit()
# Get the quantities from the newly inserted row for the flash message
cp_base_code = cp_code[:10]
cursor.execute("""
SELECT approved_quantity, rejected_quantity
FROM scanfg_orders
WHERE CP_full_code = %s
""", (cp_code,))
result = cursor.fetchone()
approved_count = result[0] if result else 0
rejected_count = result[1] if result else 0
# Flash appropriate message
if int(defect_code) == 0:
flash(f'✅ APPROVED scan recorded for {cp_code}. Total approved: {approved_count}')
else:
flash(f'❌ REJECTED scan recorded for {cp_code} (defect: {defect_code}). Total rejected: {rejected_count}')
except mariadb.Error as e:
@@ -730,37 +722,21 @@ def fg_scan():
try:
with db_connection_context() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scanfg_orders
ORDER BY Id DESC
LIMIT 15
""")
raw_scan_data = cursor.fetchall()
# Apply formatting to scan data for consistent date display
scan_data = [[format_cell_data(cell) for cell in row] for row in raw_scan_data]
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scanfg_orders
ORDER BY Id DESC
LIMIT 15
""")
raw_scan_data = cursor.fetchall()
# Apply formatting to scan data for consistent date display
scan_data = [[format_cell_data(cell) for cell in row] for row in raw_scan_data]
except mariadb.Error as e:
print(f"Error fetching finish goods scan data: {e}")
flash(f"Error fetching scan data: {e}")
return render_template('fg_scan.html', scan_data=scan_data)
@bp.route('/create_user', methods=['POST'])
def create_user():
return create_user_handler()
@bp.route('/edit_user', methods=['POST'])
def edit_user():
return edit_user_handler()
@bp.route('/delete_user', methods=['POST'])
def delete_user():
return delete_user_handler()
@bp.route('/save_external_db', methods=['POST'])
def save_external_db():
return save_external_db_handler()
# Role Permissions Management Routes
@bp.route('/role_permissions')
@superadmin_only
@@ -917,90 +893,90 @@ def get_report_data():
with db_connection_context() as conn:
cursor = conn.cursor()
if report == "1": # Logic for the 1-day report (today's records)
today = datetime.now().strftime('%Y-%m-%d')
print(f"DEBUG: Daily report searching for records on date: {today}")
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date = ?
ORDER BY date DESC, time DESC
""", (today,))
rows = cursor.fetchall()
print(f"DEBUG: Daily report found {len(rows)} rows for today ({today}):", rows)
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
if report == "1": # Logic for the 1-day report (today's records)
today = datetime.now().strftime('%Y-%m-%d')
print(f"DEBUG: Daily report searching for records on date: {today}")
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date = ?
ORDER BY date DESC, time DESC
""", (today,))
rows = cursor.fetchall()
print(f"DEBUG: Daily report found {len(rows)} rows for today ({today}):", rows)
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
elif report == "2": # Logic for the 5-day report (last 5 days including today)
five_days_ago = datetime.now() - timedelta(days=4) # Last 4 days + today = 5 days
start_date = five_days_ago.strftime('%Y-%m-%d')
print(f"DEBUG: 5-day report searching for records from {start_date} onwards")
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date >= ?
ORDER BY date DESC, time DESC
""", (start_date,))
rows = cursor.fetchall()
print(f"DEBUG: 5-day report found {len(rows)} rows from {start_date} onwards:", rows)
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
elif report == "2": # Logic for the 5-day report (last 5 days including today)
five_days_ago = datetime.now() - timedelta(days=4) # Last 4 days + today = 5 days
start_date = five_days_ago.strftime('%Y-%m-%d')
print(f"DEBUG: 5-day report searching for records from {start_date} onwards")
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date >= ?
ORDER BY date DESC, time DESC
""", (start_date,))
rows = cursor.fetchall()
print(f"DEBUG: 5-day report found {len(rows)} rows from {start_date} onwards:", rows)
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
elif report == "3": # Logic for the report with non-zero quality_code (today only)
today = datetime.now().strftime('%Y-%m-%d')
print(f"DEBUG: Quality defects report (today) searching for records on {today} with quality issues")
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date = ? AND quality_code != 0
ORDER BY date DESC, time DESC
""", (today,))
rows = cursor.fetchall()
print(f"DEBUG: Quality defects report (today) found {len(rows)} rows with quality issues for {today}:", rows)
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
elif report == "3": # Logic for the report with non-zero quality_code (today only)
today = datetime.now().strftime('%Y-%m-%d')
print(f"DEBUG: Quality defects report (today) searching for records on {today} with quality issues")
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date = ? AND quality_code != 0
ORDER BY date DESC, time DESC
""", (today,))
rows = cursor.fetchall()
print(f"DEBUG: Quality defects report (today) found {len(rows)} rows with quality issues for {today}:", rows)
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
elif report == "4": # Logic for the report with non-zero quality_code (last 5 days)
five_days_ago = datetime.now() - timedelta(days=4) # Last 4 days + today = 5 days
start_date = five_days_ago.strftime('%Y-%m-%d')
print(f"DEBUG: Quality defects report (5 days) searching for records from {start_date} onwards with quality issues")
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date >= ? AND quality_code != 0
ORDER BY date DESC, time DESC
""", (start_date,))
rows = cursor.fetchall()
print(f"DEBUG: Quality defects report (5 days) found {len(rows)} rows with quality issues from {start_date} onwards:", rows)
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
elif report == "4": # Logic for the report with non-zero quality_code (last 5 days)
five_days_ago = datetime.now() - timedelta(days=4) # Last 4 days + today = 5 days
start_date = five_days_ago.strftime('%Y-%m-%d')
print(f"DEBUG: Quality defects report (5 days) searching for records from {start_date} onwards with quality issues")
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date >= ? AND quality_code != 0
ORDER BY date DESC, time DESC
""", (start_date,))
rows = cursor.fetchall()
print(f"DEBUG: Quality defects report (5 days) found {len(rows)} rows with quality issues from {start_date} onwards:", rows)
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
elif report == "5": # Logic for the 5-ft report (all rows)
# First check if table exists and has any data
try:
cursor.execute("SELECT COUNT(*) FROM scan1_orders")
total_count = cursor.fetchone()[0]
print(f"DEBUG: Total records in scan1_orders table: {total_count}")
if total_count == 0:
print("DEBUG: No data found in scan1_orders table")
data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity of order", "Rejected Quantity of order"]
data["rows"] = []
data["message"] = "No scan data available in the database. Please ensure scanning operations have been performed and data has been recorded."
else:
cursor.execute("""
SELECT Id, operator_code, CP_base_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
ORDER BY date DESC, time DESC
""")
rows = cursor.fetchall()
print(f"DEBUG: Fetched {len(rows)} rows for report 5 (all rows)")
data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity of order", "Rejected Quantity of order"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
elif report == "5": # Logic for the 5-ft report (all rows)
# First check if table exists and has any data
try:
cursor.execute("SELECT COUNT(*) FROM scan1_orders")
total_count = cursor.fetchone()[0]
print(f"DEBUG: Total records in scan1_orders table: {total_count}")
except mariadb.Error as table_error:
print(f"DEBUG: Table access error: {table_error}")
data["error"] = f"Database table error: {table_error}"
if total_count == 0:
print("DEBUG: No data found in scan1_orders table")
data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity of order", "Rejected Quantity of order"]
data["rows"] = []
data["message"] = "No scan data available in the database. Please ensure scanning operations have been performed and data has been recorded."
else:
cursor.execute("""
SELECT Id, operator_code, CP_base_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
ORDER BY date DESC, time DESC
""")
rows = cursor.fetchall()
print(f"DEBUG: Fetched {len(rows)} rows for report 5 (all rows)")
data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity of order", "Rejected Quantity of order"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
except mariadb.Error as table_error:
print(f"DEBUG: Table access error: {table_error}")
data["error"] = f"Database table error: {table_error}"
except mariadb.Error as e:
print(f"Error fetching report data: {e}")
@@ -1277,19 +1253,18 @@ def debug_dates():
try:
with db_connection_context() as conn:
cursor = conn.cursor()
# Get all distinct dates
cursor.execute("SELECT DISTINCT date FROM scan1_orders ORDER BY date DESC")
dates = cursor.fetchall()
# Get total count
cursor.execute("SELECT COUNT(*) FROM scan1_orders")
total_count = cursor.fetchone()[0]
# Get sample data
cursor.execute("SELECT date, time FROM scan1_orders ORDER BY date DESC LIMIT 5")
sample_data = cursor.fetchall()
# Get all distinct dates
cursor.execute("SELECT DISTINCT date FROM scan1_orders ORDER BY date DESC")
dates = cursor.fetchall()
# Get total count
cursor.execute("SELECT COUNT(*) FROM scan1_orders")
total_count = cursor.fetchone()[0]
# Get sample data
cursor.execute("SELECT date, time FROM scan1_orders ORDER BY date DESC LIMIT 5")
sample_data = cursor.fetchall()
return jsonify({
"total_records": total_count,
@@ -2408,21 +2383,21 @@ def view_orders():
# Get all orders data (not just unprinted)
with db_connection_context() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, comanda_productie, cod_articol, descr_com_prod, cantitate,
com_achiz_client, nr_linie_com_client, customer_name,
customer_article_number, open_for_order, line_number,
created_at, updated_at, printed_labels, data_livrare, dimensiune
FROM order_for_labels
ORDER BY created_at DESC
LIMIT 500
""")
orders_data = []
for row in cursor.fetchall():
orders_data.append({
'id': row[0],
cursor.execute("""
SELECT id, comanda_productie, cod_articol, descr_com_prod, cantitate,
com_achiz_client, nr_linie_com_client, customer_name,
customer_article_number, open_for_order, line_number,
created_at, updated_at, printed_labels, data_livrare, dimensiune
FROM order_for_labels
ORDER BY created_at DESC
LIMIT 500
""")
orders_data = []
for row in cursor.fetchall():
orders_data.append({
'id': row[0],
'comanda_productie': row[1],
'cod_articol': row[2],
'descr_com_prod': row[3],
@@ -3658,17 +3633,17 @@ def generate_labels_pdf(order_id, paper_saving_mode='true'):
# Get order data from database
with db_connection_context() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, comanda_productie, cod_articol, descr_com_prod, cantitate,
data_livrare, dimensiune, com_achiz_client, nr_linie_com_client, customer_name,
customer_article_number, open_for_order, line_number,
printed_labels, created_at, updated_at
FROM order_for_labels
WHERE id = %s
""", (order_id,))
row = cursor.fetchone()
cursor.execute("""
SELECT id, comanda_productie, cod_articol, descr_com_prod, cantitate,
data_livrare, dimensiune, com_achiz_client, nr_linie_com_client, customer_name,
customer_article_number, open_for_order, line_number,
printed_labels, created_at, updated_at
FROM order_for_labels
WHERE id = %s
""", (order_id,))
row = cursor.fetchone()
if not row:
return jsonify({'error': 'Order not found'}), 404
@@ -4025,17 +4000,17 @@ def get_order_data(order_id):
with db_connection_context() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, comanda_productie, cod_articol, descr_com_prod, cantitate,
data_livrare, dimensiune, com_achiz_client, nr_linie_com_client, customer_name,
customer_article_number, open_for_order, line_number,
printed_labels, created_at, updated_at
FROM order_for_labels
WHERE id = %s
""", (order_id,))
row = cursor.fetchone()
cursor.execute("""
SELECT id, comanda_productie, cod_articol, descr_com_prod, cantitate,
data_livrare, dimensiune, com_achiz_client, nr_linie_com_client, customer_name,
customer_article_number, open_for_order, line_number,
printed_labels, created_at, updated_at
FROM order_for_labels
WHERE id = %s
""", (order_id,))
row = cursor.fetchone()
if not row:
return jsonify({'error': 'Order not found'}), 404
@@ -4080,22 +4055,21 @@ def mark_printed():
# Connect to the database and update the printed status
with db_connection_context() as conn:
cursor = conn.cursor()
# Update the order to mark it as printed
update_query = """
UPDATE orders_for_labels
SET printed_labels = printed_labels + 1,
updated_at = NOW()
WHERE id = %s
"""
cursor.execute(update_query, (order_id,))
if cursor.rowcount == 0:
conn.close()
return jsonify({'error': 'Order not found'}), 404
conn.commit()
# Update the order to mark it as printed
update_query = """
UPDATE orders_for_labels
SET printed_labels = printed_labels + 1,
updated_at = NOW()
WHERE id = %s
"""
cursor.execute(update_query, (order_id,))
if cursor.rowcount == 0:
return jsonify({'error': 'Order not found'}), 404
conn.commit()
return jsonify({'success': True, 'message': 'Order marked as printed'})
@@ -5072,6 +5046,119 @@ def get_storage_info():
}), 500
@bp.route('/log_explorer')
@admin_plus
def log_explorer():
"""Display log explorer page"""
return render_template('log_explorer.html')
@bp.route('/api/logs/list', methods=['GET'])
@admin_plus
def get_logs_list():
"""Get list of all log files"""
import os
import glob
logs_dir = '/srv/quality_app/logs'
if not os.path.exists(logs_dir):
return jsonify({'success': True, 'logs': []})
log_files = []
for log_file in sorted(glob.glob(os.path.join(logs_dir, '*.log*')), reverse=True):
try:
stat = os.stat(log_file)
log_files.append({
'name': os.path.basename(log_file),
'size': stat.st_size,
'size_formatted': format_size_for_json(stat.st_size),
'modified': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
'path': log_file
})
except:
continue
return jsonify({'success': True, 'logs': log_files})
@bp.route('/api/logs/view/<filename>', methods=['GET'])
@admin_plus
def view_log_file(filename):
"""View contents of a specific log file with pagination"""
import os
# Security: prevent directory traversal
if '..' in filename or '/' in filename:
return jsonify({'success': False, 'message': 'Invalid filename'}), 400
logs_dir = '/srv/quality_app/logs'
log_path = os.path.join(logs_dir, filename)
# Verify the file is in the logs directory
if not os.path.abspath(log_path).startswith(os.path.abspath(logs_dir)):
return jsonify({'success': False, 'message': 'Invalid file path'}), 400
if not os.path.exists(log_path):
return jsonify({'success': False, 'message': 'Log file not found'}), 404
try:
lines_per_page = request.args.get('lines', 100, type=int)
page = request.args.get('page', 1, type=int)
# Limit lines per page
if lines_per_page < 10:
lines_per_page = 10
if lines_per_page > 1000:
lines_per_page = 1000
with open(log_path, 'r', encoding='utf-8', errors='ignore') as f:
all_lines = f.readlines()
total_lines = len(all_lines)
total_pages = (total_lines + lines_per_page - 1) // lines_per_page
# Ensure page is valid
if page < 1:
page = 1
if page > total_pages and total_pages > 0:
page = total_pages
# Get lines for current page (show from end, latest lines first)
start_idx = total_lines - (page * lines_per_page)
end_idx = total_lines - ((page - 1) * lines_per_page)
if start_idx < 0:
start_idx = 0
current_lines = all_lines[start_idx:end_idx]
current_lines.reverse() # Show latest first
return jsonify({
'success': True,
'filename': filename,
'lines': current_lines,
'current_page': page,
'total_pages': total_pages,
'total_lines': total_lines,
'lines_per_page': lines_per_page
})
except Exception as e:
return jsonify({'success': False, 'message': f'Error reading log: {str(e)}'}), 500
def format_size_for_json(size_bytes):
"""Format bytes to human readable size for JSON responses"""
if size_bytes >= 1024 * 1024 * 1024:
return f"{size_bytes / (1024 * 1024 * 1024):.2f} GB"
elif size_bytes >= 1024 * 1024:
return f"{size_bytes / (1024 * 1024):.2f} MB"
elif size_bytes >= 1024:
return f"{size_bytes / 1024:.2f} KB"
else:
return f"{size_bytes} bytes"
@bp.route('/api/maintenance/database-tables', methods=['GET'])
@admin_plus
def get_all_database_tables():
@@ -5594,8 +5681,8 @@ def api_assign_box_to_location():
try:
with db_connection_context() as conn:
cursor = conn.cursor()
cursor.execute("SELECT status FROM boxes_crates WHERE id = %s", (box_id,))
result = cursor.fetchone()
cursor.execute("SELECT status FROM boxes_crates WHERE id = %s", (box_id,))
result = cursor.fetchone()
if result and result[0] == 'open':
return jsonify({