cleaning structure

This commit is contained in:
Quality System Admin
2025-10-16 01:42:59 +03:00
parent e0ba349862
commit 50c791e242
469 changed files with 1016 additions and 29776 deletions

View File

@@ -22,6 +22,11 @@ from app.settings import (
save_external_db_handler
)
from .print_module import get_unprinted_orders_data
from .access_control import (
requires_role, superadmin_only, admin_plus, manager_plus,
requires_quality_module, requires_warehouse_module, requires_labels_module,
quality_manager_plus, warehouse_manager_plus, labels_manager_plus
)
bp = Blueprint('main', __name__)
warehouse_bp = Blueprint('warehouse', __name__)
@@ -35,7 +40,6 @@ def main_scan():
@bp.route('/', methods=['GET', 'POST'])
def login():
import sqlite3
if request.method == 'POST':
# Debug: print all form data received
print("All form data received:", dict(request.form))
@@ -52,67 +56,52 @@ def login():
user = None
print("Raw form input:", repr(username), repr(password))
# Logic: If username starts with #, check internal SQLite database
if username.startswith('#'):
username_clean = username[1:].strip()
password_clean = password.strip()
print(f"Checking internal database for: {username_clean}")
# Check internal SQLite database (py_app/instance/users.db)
internal_db_path = os.path.join(os.path.dirname(__file__), '../instance/users.db')
try:
conn = sqlite3.connect(internal_db_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'")
if cursor.fetchone():
cursor.execute("SELECT username, password, role FROM users WHERE username=? AND password=?", (username_clean, password_clean))
row = cursor.fetchone()
print("Internal DB query result:", row)
if row:
user = {'username': row[0], 'password': row[1], 'role': row[2]}
else:
print("No users table in internal database")
conn.close()
except Exception as e:
print("Internal DB error:", e)
else:
# Check external MariaDB database first
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SHOW TABLES LIKE 'users'")
if cursor.fetchone():
cursor.execute("SELECT username, password, role FROM users WHERE username=%s AND password=%s", (username.strip(), password.strip()))
row = cursor.fetchone()
print("External DB query result:", row)
if row:
user = {'username': row[0], 'password': row[1], 'role': row[2]}
conn.close()
except Exception as e:
print("External DB error:", e)
# Fallback to internal database if external fails
print("Falling back to internal database")
internal_db_path = os.path.join(os.path.dirname(__file__), '../instance/users.db')
# Check external MariaDB database
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SHOW TABLES LIKE 'users'")
if cursor.fetchone():
# First try with modules column
try:
conn = sqlite3.connect(internal_db_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'")
if cursor.fetchone():
cursor.execute("SELECT username, password, role FROM users WHERE username=? AND password=?", (username.strip(), password.strip()))
row = cursor.fetchone()
print("Internal DB fallback query result:", row)
if row:
user = {'username': row[0], 'password': row[1], 'role': row[2]}
conn.close()
except Exception as e2:
print("Internal DB fallback error:", e2)
cursor.execute("SELECT username, password, role, modules FROM users WHERE username=%s AND password=%s", (username, password))
row = cursor.fetchone()
print("External DB query result (with modules):", row)
if row:
user = {'username': row[0], 'password': row[1], 'role': row[2], 'modules': row[3] if len(row) > 3 else None}
except Exception as e:
print(f"Modules column not found, trying without: {e}")
# Fallback to query without modules column
cursor.execute("SELECT username, password, role FROM users WHERE username=%s AND password=%s", (username, password))
row = cursor.fetchone()
print("External DB query result (without modules):", row)
if row:
user = {'username': row[0], 'password': row[1], 'role': row[2], 'modules': None}
conn.close()
except Exception as e:
print("External DB error:", e)
flash('Database connection error. Please try again.')
return render_template('login.html')
if user:
session['user'] = user['username']
session['role'] = user['role']
print("Logged in as:", session.get('user'), session.get('role'))
# Load user's modules into session
user_modules = []
if 'modules' in user and user['modules']:
try:
import json
user_modules = json.loads(user['modules'])
except:
user_modules = []
# Superadmin and admin have access to all modules
if user['role'] in ['superadmin', 'admin']:
user_modules = ['quality', 'warehouse', 'labels']
session['modules'] = user_modules
print("Logged in as:", session.get('user'), session.get('role'), "modules:", user_modules)
return redirect(url_for('main.dashboard'))
else:
print("Login failed for:", username, password)
@@ -204,28 +193,222 @@ def dashboard():
return render_template('dashboard.html')
@bp.route('/settings')
@admin_plus
def settings():
return settings_handler()
@bp.route('/quality')
def quality():
if 'role' not in session or session['role'] not in ['superadmin', 'administrator', 'quality']:
flash('Access denied: Quality users only.')
# Simplified User Management Routes
@bp.route('/user_management_simple')
@admin_plus
def user_management_simple():
"""Simplified user management interface with 4-tier system"""
try:
# Get users from external database
users = []
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SHOW TABLES LIKE 'users'")
if cursor.fetchone():
cursor.execute("SELECT id, username, role, modules FROM users")
for row in cursor.fetchall():
user_data = {
'id': row[0],
'username': row[1],
'role': row[2],
'modules': row[3] if len(row) > 3 else None
}
# Create a mock user object with get_modules method
class MockUser:
def __init__(self, data):
self.id = data['id']
self.username = data['username']
self.role = data['role']
self.modules = data['modules']
def get_modules(self):
if not self.modules:
return []
try:
import json
return json.loads(self.modules)
except:
return []
users.append(MockUser(user_data))
conn.close()
return render_template('user_management_simple.html', users=users)
except Exception as e:
print(f"Error in user_management_simple: {e}")
flash('Error loading user management page.')
return redirect(url_for('main.dashboard'))
@bp.route('/create_user_simple', methods=['POST'])
@admin_plus
def create_user_simple():
"""Create a new user with the simplified 4-tier system"""
try:
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
role = request.form.get('role', '').strip()
modules = request.form.getlist('modules')
if not username or not password or not role:
flash('Username, password, and role are required.')
return redirect(url_for('main.user_management_simple'))
# Validate modules for the role
from app.permissions_simple import validate_user_modules
is_valid, error_msg = validate_user_modules(role, modules)
if not is_valid:
flash(f'Invalid module assignment: {error_msg}')
return redirect(url_for('main.user_management_simple'))
# Prepare modules JSON
modules_json = None
if modules and role in ['manager', 'worker']:
import json
modules_json = json.dumps(modules)
# Add to external database
conn = get_db_connection()
cursor = conn.cursor()
# Check if user already exists
cursor.execute("SELECT username FROM users WHERE username=%s", (username,))
if cursor.fetchone():
flash(f'User "{username}" already exists.')
conn.close()
return redirect(url_for('main.user_management_simple'))
# Insert new user
cursor.execute("INSERT INTO users (username, password, role, modules) VALUES (%s, %s, %s, %s)",
(username, password, role, modules_json))
conn.commit()
conn.close()
flash(f'User "{username}" created successfully as {role}.')
return redirect(url_for('main.user_management_simple'))
except Exception as e:
print(f"Error creating user: {e}")
flash('Error creating user.')
return redirect(url_for('main.user_management_simple'))
@bp.route('/edit_user_simple', methods=['POST'])
@admin_plus
def edit_user_simple():
"""Edit an existing user with the simplified 4-tier system"""
try:
user_id = request.form.get('user_id')
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
role = request.form.get('role', '').strip()
modules = request.form.getlist('modules')
if not user_id or not username or not role:
flash('User ID, username, and role are required.')
return redirect(url_for('main.user_management_simple'))
# Validate modules for the role
from app.permissions_simple import validate_user_modules
is_valid, error_msg = validate_user_modules(role, modules)
if not is_valid:
flash(f'Invalid module assignment: {error_msg}')
return redirect(url_for('main.user_management_simple'))
# Prepare modules JSON
modules_json = None
if modules and role in ['manager', 'worker']:
import json
modules_json = json.dumps(modules)
# Update in external database
conn = get_db_connection()
cursor = conn.cursor()
# Check if username is taken by another user
cursor.execute("SELECT id FROM users WHERE username=%s AND id!=%s", (username, user_id))
if cursor.fetchone():
flash(f'Username "{username}" is already taken.')
conn.close()
return redirect(url_for('main.user_management_simple'))
# Update user
if password:
cursor.execute("UPDATE users SET username=%s, password=%s, role=%s, modules=%s WHERE id=%s",
(username, password, role, modules_json, user_id))
else:
cursor.execute("UPDATE users SET username=%s, role=%s, modules=%s WHERE id=%s",
(username, role, modules_json, user_id))
conn.commit()
conn.close()
flash(f'User "{username}" updated successfully.')
return redirect(url_for('main.user_management_simple'))
except Exception as e:
print(f"Error editing user: {e}")
flash('Error editing user.')
return redirect(url_for('main.user_management_simple'))
@bp.route('/delete_user_simple', methods=['POST'])
@admin_plus
def delete_user_simple():
"""Delete a user from the simplified system"""
try:
user_id = request.form.get('user_id')
if not user_id:
flash('User ID is required.')
return redirect(url_for('main.user_management_simple'))
# Delete from external database
conn = get_db_connection()
cursor = conn.cursor()
# Get username before deleting
cursor.execute("SELECT username FROM users WHERE id=%s", (user_id,))
row = cursor.fetchone()
username = row[0] if row else 'Unknown'
# Delete user
cursor.execute("DELETE FROM users WHERE id=%s", (user_id,))
conn.commit()
conn.close()
flash(f'User "{username}" deleted successfully.')
return redirect(url_for('main.user_management_simple'))
except Exception as e:
print(f"Error deleting user: {e}")
flash('Error deleting user.')
return redirect(url_for('main.user_management_simple'))
@bp.route('/reports')
@requires_quality_module
def reports():
return render_template('main_page_reports.html')
@bp.route('/quality')
@requires_quality_module
def quality():
return render_template('quality.html')
@bp.route('/fg_quality')
@requires_quality_module
def fg_quality():
return render_template('fg_quality.html')
@bp.route('/warehouse')
@requires_warehouse_module
def warehouse():
if 'role' not in session or session['role'] not in ['superadmin', 'administrator', 'admin', 'warehouse']:
flash('Access denied: Warehouse users only.')
return redirect(url_for('main.dashboard'))
return render_template('main_page_warehouse.html')
@bp.route('/scan', methods=['GET', 'POST'])
@requires_quality_module
def scan():
if 'role' not in session or session['role'] not in ['superadmin', 'administrator', 'admin', 'scan']:
flash('Access denied: Scan users only.')
return redirect(url_for('main.dashboard'))
if request.method == 'POST':
# Handle form submission
@@ -420,6 +603,7 @@ def save_external_db():
# Role Permissions Management Routes
@bp.route('/role_permissions')
@superadmin_only
def role_permissions():
return role_permissions_handler()
@@ -541,6 +725,7 @@ def reset_all_role_permissions():
return reset_all_role_permissions_handler()
@bp.route('/get_report_data', methods=['GET'])
@quality_manager_plus
def get_report_data():
report = request.args.get('report')
data = {"headers": [], "rows": []}
@@ -1059,6 +1244,444 @@ def test_database():
"message": f"Database connection failed: {e}"
})
# FG Quality Routes - Report functions for scanfg_orders table
@bp.route('/get_fg_report_data', methods=['GET'])
@quality_manager_plus
def get_fg_report_data():
"""Get report data from scanfg_orders table"""
report = request.args.get('report')
data = {"headers": [], "rows": []}
try:
conn = get_db_connection()
cursor = conn.cursor()
if report == "1": # Daily FG report (today's records)
today = datetime.now().strftime('%Y-%m-%d')
print(f"DEBUG: Daily FG report searching for records on date: {today}")
cursor.execute("""
SELECT Id, operator_code, CP_base_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scanfg_orders
WHERE date = ?
ORDER BY date DESC, time DESC
""", (today,))
rows = cursor.fetchall()
print(f"DEBUG: Daily FG report found {len(rows)} rows for today ({today}):", rows)
data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
elif report == "2": # 5-day FG report (last 5 days including today)
five_days_ago = datetime.now() - timedelta(days=4) # Last 4 days + today = 5 days
start_date = five_days_ago.strftime('%Y-%m-%d')
print(f"DEBUG: 5-day FG report searching for records from {start_date} onwards")
cursor.execute("""
SELECT Id, operator_code, CP_base_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scanfg_orders
WHERE date >= ?
ORDER BY date DESC, time DESC
""", (start_date,))
rows = cursor.fetchall()
print(f"DEBUG: 5-day FG report found {len(rows)} rows from {start_date} onwards:", rows)
data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
elif report == "3": # FG quality defects report (today only)
today = datetime.now().strftime('%Y-%m-%d')
print(f"DEBUG: FG quality defects report (today) searching for records on {today} with quality issues")
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scanfg_orders
WHERE date = ? AND quality_code != 0
ORDER BY date DESC, time DESC
""", (today,))
rows = cursor.fetchall()
print(f"DEBUG: FG quality defects report (today) found {len(rows)} rows with quality issues for {today}:", rows)
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
elif report == "4": # FG quality defects report (last 5 days)
five_days_ago = datetime.now() - timedelta(days=4) # Last 4 days + today = 5 days
start_date = five_days_ago.strftime('%Y-%m-%d')
print(f"DEBUG: FG quality defects report (5 days) searching for records from {start_date} onwards with quality issues")
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scanfg_orders
WHERE date >= ? AND quality_code != 0
ORDER BY date DESC, time DESC
""", (start_date,))
rows = cursor.fetchall()
print(f"DEBUG: FG quality defects report (5 days) found {len(rows)} rows with quality issues from {start_date} onwards:", rows)
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
elif report == "5": # All FG records report
try:
cursor.execute("SELECT COUNT(*) FROM scanfg_orders")
total_count = cursor.fetchone()[0]
print(f"DEBUG: Total FG records in scanfg_orders table: {total_count}")
if total_count == 0:
print("DEBUG: No data found in scanfg_orders table")
data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = []
data["message"] = "No FG scan data available in the database. Please ensure FG scanning operations have been performed and data has been recorded."
else:
cursor.execute("""
SELECT Id, operator_code, CP_base_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scanfg_orders
ORDER BY date DESC, time DESC
""")
rows = cursor.fetchall()
print(f"DEBUG: Fetched {len(rows)} FG rows for report 5 (all rows)")
data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
except mariadb.Error as table_error:
print(f"DEBUG: FG table access error: {table_error}")
data["error"] = f"Database table error: {table_error}"
conn.close()
except mariadb.Error as e:
print(f"Error fetching FG report data: {e}")
data["error"] = "Error fetching FG report data."
print("FG Data being returned:", data)
return jsonify(data)
@bp.route('/test_fg_database', methods=['GET'])
def test_fg_database():
"""Test database connection and query the scanfg_orders table"""
# Check if user has superadmin permissions
if 'role' not in session or session['role'] != 'superadmin':
return jsonify({
"success": False,
"message": "Access denied: Superadmin users only."
})
try:
print("DEBUG: Testing FG database connection...")
conn = get_db_connection()
cursor = conn.cursor()
print("DEBUG: FG Database connection successful!")
# Test 1: Check if scanfg_orders table exists
try:
cursor.execute("SHOW TABLES LIKE 'scanfg_orders'")
table_exists = cursor.fetchone()
print(f"DEBUG: Table scanfg_orders exists: {table_exists is not None}")
if not table_exists:
conn.close()
return jsonify({
"success": False,
"message": "Table 'scanfg_orders' does not exist in the database"
})
except Exception as e:
print(f"DEBUG: Error checking FG table existence: {e}")
conn.close()
return jsonify({
"success": False,
"message": f"Error checking FG table existence: {e}"
})
# Test 2: Get FG table structure
try:
cursor.execute("DESCRIBE scanfg_orders")
table_structure = cursor.fetchall()
print(f"DEBUG: FG Table structure: {table_structure}")
except Exception as e:
print(f"DEBUG: Error getting FG table structure: {e}")
table_structure = []
# Test 3: Count total records
try:
cursor.execute("SELECT COUNT(*) FROM scanfg_orders")
total_count = cursor.fetchone()[0]
print(f"DEBUG: Total FG records: {total_count}")
except Exception as e:
print(f"DEBUG: Error counting FG records: {e}")
total_count = 0
# Test 4: Get available dates
try:
cursor.execute("SELECT DISTINCT date FROM scanfg_orders ORDER BY date DESC LIMIT 10")
date_results = cursor.fetchall()
available_dates = [str(row[0]) for row in date_results]
print(f"DEBUG: Available FG dates: {available_dates}")
except Exception as e:
print(f"DEBUG: Error getting FG dates: {e}")
available_dates = []
# Test 5: Get sample data
try:
cursor.execute("SELECT * FROM scanfg_orders ORDER BY date DESC, time DESC LIMIT 3")
sample_results = cursor.fetchall()
sample_data = []
for row in sample_results:
sample_data.append({
"id": row[0],
"operator_code": row[1],
"cp_full_code": row[2],
"oc1_code": row[3],
"oc2_code": row[4],
"cp_base_code": row[5] if len(row) > 5 else None,
"quality_code": row[6] if len(row) > 6 else row[5],
"date": str(row[7] if len(row) > 7 else row[6]),
"time": str(row[8] if len(row) > 8 else row[7]),
"approved_quantity": row[9] if len(row) > 9 and row[9] is not None else 0,
"rejected_quantity": row[10] if len(row) > 10 and row[10] is not None else 0
})
print(f"DEBUG: FG Sample data: {sample_data}")
except Exception as e:
print(f"DEBUG: Error getting FG sample data: {e}")
sample_data = []
conn.close()
# Add message about data availability
message_addendum = ""
if total_count == 0:
message_addendum = " No FG scan data has been recorded yet."
elif len(available_dates) == 0:
message_addendum = " FG records exist but no valid dates found."
else:
message_addendum = f" Latest FG data from: {available_dates[0] if available_dates else 'N/A'}"
return jsonify({
"success": True,
"database_connection": "OK",
"table_exists": table_exists is not None,
"table_structure": [{"field": row[0], "type": row[1], "null": row[2]} for row in table_structure],
"total_records": total_count,
"sample_data": sample_data,
"available_dates": available_dates,
"message": f"FG Database test completed. Found {total_count} records in scanfg_orders table.{message_addendum}"
})
except Exception as e:
print(f"DEBUG: FG Database test failed: {e}")
return jsonify({
"success": False,
"message": f"FG Database connection failed: {e}"
})
@bp.route('/generate_fg_report', methods=['GET'])
def generate_fg_report():
"""Generate FG report for specific date (calendar-based report)"""
from datetime import datetime, timedelta
report = request.args.get('report')
selected_date = request.args.get('date')
data = {"headers": [], "rows": []}
try:
conn = get_db_connection()
cursor = conn.cursor()
if report == "6" and selected_date: # Custom date FG report
print(f"DEBUG: FG report searching for date: {selected_date}")
# First, let's check what dates exist in the FG database
cursor.execute("SELECT DISTINCT date FROM scanfg_orders ORDER BY date DESC LIMIT 10")
existing_dates = cursor.fetchall()
print(f"DEBUG: Available FG dates in database: {existing_dates}")
# Try multiple date formats since FG data might use DD/MM/YYYY format
date_formats_to_try = [
selected_date, # YYYY-MM-DD format
datetime.strptime(selected_date, '%Y-%m-%d').strftime('%d/%m/%Y'), # DD/MM/YYYY format
datetime.strptime(selected_date, '%Y-%m-%d').strftime('%Y-%m-%d'), # Original format
]
rows = []
for date_format in date_formats_to_try:
print(f"DEBUG: Trying FG date format: {date_format}")
# Try exact match first
cursor.execute("""
SELECT Id, operator_code, CP_base_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scanfg_orders
WHERE date = ?
ORDER BY time DESC
""", (date_format,))
rows = cursor.fetchall()
print(f"DEBUG: FG exact match found {len(rows)} rows for {date_format}")
if len(rows) > 0:
break
# Try with DATE() function to handle different formats
cursor.execute("""
SELECT Id, operator_code, CP_base_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scanfg_orders
WHERE DATE(STR_TO_DATE(date, '%d/%m/%Y')) = ?
ORDER BY time DESC
""", (selected_date,))
rows = cursor.fetchall()
print(f"DEBUG: FG STR_TO_DATE match found {len(rows)} rows")
if len(rows) > 0:
break
# Try LIKE pattern
cursor.execute("""
SELECT Id, operator_code, CP_base_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scanfg_orders
WHERE date LIKE ?
ORDER BY time DESC
""", (f"%{date_format}%",))
rows = cursor.fetchall()
print(f"DEBUG: FG LIKE pattern match found {len(rows)} rows")
if len(rows) > 0:
break
print(f"DEBUG: Final FG result - {len(rows)} rows for date {selected_date}")
if len(rows) > 0:
print(f"DEBUG: Sample FG row: {rows[0]}")
data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
# Add helpful message if no data found
if len(rows) == 0:
data["message"] = f"No FG scan data found for {selected_date}. Please select a date when FG scanning operations were performed."
elif report == "7": # Date Range FG Report
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
if start_date and end_date:
print(f"DEBUG: FG Date range report - Start: {start_date}, End: {end_date}")
# Validate date format and order
try:
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_dt = datetime.strptime(end_date, '%Y-%m-%d')
if start_dt > end_dt:
data["error"] = "Start date cannot be after end date."
conn.close()
return jsonify(data)
except ValueError:
data["error"] = "Invalid date format. Please use YYYY-MM-DD format."
conn.close()
return jsonify(data)
# Convert to DD/MM/YYYY format for FG database
start_date_fg = start_dt.strftime('%d/%m/%Y')
end_date_fg = end_dt.strftime('%d/%m/%Y')
# First, check what dates exist in the FG database for the range
cursor.execute("""
SELECT DISTINCT date FROM scanfg_orders
WHERE STR_TO_DATE(date, '%d/%m/%Y') >= ? AND STR_TO_DATE(date, '%d/%m/%Y') <= ?
ORDER BY STR_TO_DATE(date, '%d/%m/%Y') DESC
""", (start_date, end_date))
existing_dates = cursor.fetchall()
print(f"DEBUG: Available FG dates in range: {existing_dates}")
# Query for all FG records in the date range
cursor.execute("""
SELECT Id, operator_code, CP_base_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scanfg_orders
WHERE STR_TO_DATE(date, '%d/%m/%Y') >= ? AND STR_TO_DATE(date, '%d/%m/%Y') <= ?
ORDER BY STR_TO_DATE(date, '%d/%m/%Y') DESC, time DESC
""", (start_date, end_date))
rows = cursor.fetchall()
print(f"DEBUG: FG Date range query found {len(rows)} rows from {start_date} to {end_date}")
data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
# Add helpful message if no data found
if len(rows) == 0:
data["message"] = f"No FG scan data found between {start_date} and {end_date}. Please select dates when FG scanning operations were performed."
else:
# Add summary information
total_approved = sum(row[9] for row in rows if row[9] is not None)
total_rejected = sum(row[10] for row in rows if row[10] is not None)
data["summary"] = {
"total_records": len(rows),
"date_range": f"{start_date} to {end_date}",
"total_approved": total_approved,
"total_rejected": total_rejected,
"dates_with_data": len(existing_dates)
}
else:
data["error"] = "Both start date and end date are required for FG date range report."
elif report == "8" and selected_date: # Custom date FG quality defects report
print(f"DEBUG: FG quality defects report for specific date: {selected_date}")
# Convert date format for FG database
try:
date_obj = datetime.strptime(selected_date, '%Y-%m-%d')
fg_date = date_obj.strftime('%d/%m/%Y')
except ValueError:
fg_date = selected_date
# Try multiple date formats for defects (quality_code != 0)
date_formats_to_try = [selected_date, fg_date]
rows = []
for date_format in date_formats_to_try:
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scanfg_orders
WHERE date = ? AND quality_code != 0
ORDER BY quality_code DESC, time DESC
""", (date_format,))
rows = cursor.fetchall()
print(f"DEBUG: FG quality defects found {len(rows)} rows for {date_format}")
if len(rows) > 0:
break
# Also try with STR_TO_DATE conversion
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scanfg_orders
WHERE STR_TO_DATE(date, '%d/%m/%Y') = ? AND quality_code != 0
ORDER BY quality_code DESC, time DESC
""", (selected_date,))
rows = cursor.fetchall()
print(f"DEBUG: FG quality defects STR_TO_DATE found {len(rows)} rows")
if len(rows) > 0:
break
print(f"DEBUG: Final FG quality defects result - {len(rows)} rows for date {selected_date}")
if len(rows) > 0:
print(f"DEBUG: Sample FG defective item: {rows[0]}")
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
# Add helpful message if no data found
if len(rows) == 0:
data["message"] = f"No FG quality defects found for {selected_date}. This could mean no FG scanning was performed or all items passed quality control."
else:
# Add summary for FG quality defects
total_defective_items = len(rows)
total_rejected_qty = sum(row[9] for row in rows if row[9] is not None)
unique_quality_codes = len(set(row[5] for row in rows if row[5] != 0))
data["defects_summary"] = {
"total_defective_items": total_defective_items,
"total_rejected_quantity": total_rejected_qty,
"unique_defect_types": unique_quality_codes,
"date": selected_date
}
conn.close()
except mariadb.Error as e:
print(f"Error fetching custom FG date report: {e}")
data["error"] = f"Error fetching FG report data for {selected_date if report == '6' or report == '8' else 'date range'}."
print("Custom FG date report data being returned:", data)
return jsonify(data)
@bp.route('/etichete')
def etichete():
if 'role' not in session or session['role'] not in ['superadmin', 'admin', 'administrator', 'etichete']:
@@ -1380,6 +2003,7 @@ def generate_pairing_key():
pairing_keys=keys)
@bp.route('/download_extension')
@superadmin_only
def download_extension():
"""Route for downloading the Chrome extension"""
# Load all pairing keys for display