import os import mariadb from datetime import datetime, timedelta from flask import Blueprint, render_template, redirect, url_for, request, flash, session, current_app, jsonify from .models import User from . import db from reportlab.lib.pagesizes import letter from reportlab.pdfgen import canvas from flask import Blueprint, render_template, request, redirect, url_for, flash import csv from .warehouse import add_location from app.settings import ( settings_handler, role_permissions_handler, save_role_permissions_handler, reset_role_permissions_handler, save_all_role_permissions_handler, reset_all_role_permissions_handler, edit_user_handler, create_user_handler, delete_user_handler, save_external_db_handler ) from .print_module import get_unprinted_orders_data bp = Blueprint('main', __name__) warehouse_bp = Blueprint('warehouse', __name__) def format_cell_data(cell): """Helper function to format cell data, especially dates and times""" if isinstance(cell, datetime): # Format date as dd/mm/yyyy return cell.strftime('%d/%m/%Y') elif isinstance(cell, timedelta): # Convert timedelta to HH:MM:SS format total_seconds = int(cell.total_seconds()) hours, remainder = divmod(total_seconds, 3600) minutes, seconds = divmod(remainder, 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}" elif hasattr(cell, 'date'): # Handle date objects # Format date as dd/mm/yyyy return cell.strftime('%d/%m/%Y') else: return cell @bp.route('/store_articles') def store_articles(): return render_template('store_articles.html') @bp.route('/warehouse_reports') def warehouse_reports(): return render_template('warehouse_reports.html') def get_db_connection(): """Reads the external_server.conf file and returns a MariaDB database connection.""" settings_file = os.path.join(current_app.instance_path, 'external_server.conf') if not os.path.exists(settings_file): raise FileNotFoundError("The external_server.conf file is missing in the instance folder.") # Read settings from the configuration file settings = {} with open(settings_file, 'r') as f: for line in f: key, value = line.strip().split('=', 1) settings[key] = value # Create a database connection return mariadb.connect( user=settings['username'], password=settings['password'], host=settings['server_domain'], port=int(settings['port']), database=settings['database_name'] ) @bp.route('/login', methods=['GET', 'POST']) def login(): import sqlite3 if request.method == 'POST': # Debug: print all form data received print("All form data received:", dict(request.form)) # Safely get username and password with fallback username = request.form.get('username', '').strip() password = request.form.get('password', '').strip() if not username or not password: print("Missing username or password") flash('Please enter both username and password.') return render_template('login.html') user = None print("Raw form input:", repr(username), repr(password)) # Logic: If username starts with #, check internal SQLite database if username.startswith('#'): username_clean = username[1:].strip() password_clean = password.strip() print(f"Checking internal database for: {username_clean}") # Check internal SQLite database (py_app/instance/users.db) internal_db_path = os.path.join(os.path.dirname(__file__), '../instance/users.db') try: conn = sqlite3.connect(internal_db_path) cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'") if cursor.fetchone(): cursor.execute("SELECT username, password, role FROM users WHERE username=? AND password=?", (username_clean, password_clean)) row = cursor.fetchone() print("Internal DB query result:", row) if row: user = {'username': row[0], 'password': row[1], 'role': row[2]} else: print("No users table in internal database") conn.close() except Exception as e: print("Internal DB error:", e) else: # Check external MariaDB database first try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SHOW TABLES LIKE 'users'") if cursor.fetchone(): cursor.execute("SELECT username, password, role FROM users WHERE username=%s AND password=%s", (username.strip(), password.strip())) row = cursor.fetchone() print("External DB query result:", row) if row: user = {'username': row[0], 'password': row[1], 'role': row[2]} conn.close() except Exception as e: print("External DB error:", e) # Fallback to internal database if external fails print("Falling back to internal database") internal_db_path = os.path.join(os.path.dirname(__file__), '../instance/users.db') try: conn = sqlite3.connect(internal_db_path) cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'") if cursor.fetchone(): cursor.execute("SELECT username, password, role FROM users WHERE username=? AND password=?", (username.strip(), password.strip())) row = cursor.fetchone() print("Internal DB fallback query result:", row) if row: user = {'username': row[0], 'password': row[1], 'role': row[2]} conn.close() except Exception as e2: print("Internal DB fallback error:", e2) if user: session['user'] = user['username'] session['role'] = user['role'] print("Logged in as:", session.get('user'), session.get('role')) return redirect(url_for('main.dashboard')) else: print("Login failed for:", username, password) flash('Invalid credentials. Please try again.') return render_template('login.html') @bp.route('/dashboard') def dashboard(): print("Session user:", session.get('user'), session.get('role')) if 'user' not in session: return redirect(url_for('main.login')) return render_template('dashboard.html') @bp.route('/settings') def settings(): return settings_handler() @bp.route('/quality') def quality(): if 'role' not in session or session['role'] not in ['superadmin', 'quality']: flash('Access denied: Quality users only.') return redirect(url_for('main.dashboard')) return render_template('quality.html') @bp.route('/warehouse') def warehouse(): if 'role' not in session or session['role'] not in ['superadmin', 'warehouse']: flash('Access denied: Warehouse users only.') return redirect(url_for('main.dashboard')) return render_template('main_page_warehouse.html') @bp.route('/scan', methods=['GET', 'POST']) def scan(): if 'role' not in session or session['role'] not in ['superadmin', 'scan']: flash('Access denied: Scan users only.') return redirect(url_for('main.dashboard')) if request.method == 'POST': # Handle form submission operator_code = request.form.get('operator_code') cp_code = request.form.get('cp_code') oc1_code = request.form.get('oc1_code') oc2_code = request.form.get('oc2_code') defect_code = request.form.get('defect_code') date = request.form.get('date') time = request.form.get('time') try: # Connect to the database conn = get_db_connection() cursor = conn.cursor() # Check if the CP_full_code already exists cursor.execute("SELECT Id FROM scan1_orders WHERE CP_full_code = ?", (cp_code,)) existing_entry = cursor.fetchone() if existing_entry: # Update the existing entry update_query = """ UPDATE scan1_orders SET operator_code = ?, OC1_code = ?, OC2_code = ?, quality_code = ?, date = ?, time = ? WHERE CP_full_code = ? """ cursor.execute(update_query, (operator_code, oc1_code, oc2_code, defect_code, date, time, cp_code)) flash('Existing entry updated successfully.') else: # Insert a new entry insert_query = """ INSERT INTO scan1_orders (operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time) VALUES (?, ?, ?, ?, ?, ?, ?) """ cursor.execute(insert_query, (operator_code, cp_code, oc1_code, oc2_code, defect_code, date, time)) flash('New entry inserted successfully.') # Commit the transaction conn.commit() conn.close() except mariadb.Error as e: print(f"Error saving scan data: {e}") flash(f"Error saving scan data: {e}") # Fetch the latest scan data for display scan_data = [] try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity FROM scan1_orders ORDER BY Id DESC LIMIT 15 """) scan_data = cursor.fetchall() conn.close() except mariadb.Error as e: print(f"Error fetching scan data: {e}") flash(f"Error fetching scan data: {e}") return render_template('scan.html', scan_data=scan_data) @bp.route('/logout') def logout(): session.pop('user', None) session.pop('role', None) return redirect(url_for('main.login')) @bp.route('/create_user', methods=['POST']) def create_user(): return create_user_handler() @bp.route('/edit_user', methods=['POST']) def edit_user(): return edit_user_handler() @bp.route('/delete_user', methods=['POST']) def delete_user(): return delete_user_handler() @bp.route('/save_external_db', methods=['POST']) def save_external_db(): return save_external_db_handler() # Role Permissions Management Routes @bp.route('/role_permissions') def role_permissions(): return role_permissions_handler() @bp.route('/test_permissions') def test_permissions(): from app.settings import role_permissions_handler from flask import render_template, session, redirect, url_for, flash from app.permissions import APP_PERMISSIONS, ACTIONS # Check if superadmin if not session.get('role') == 'superadmin': flash('Access denied: Superadmin only.') return redirect(url_for('main.dashboard')) try: # Get the same data as role_permissions_handler from app.settings import get_external_db_connection conn = get_external_db_connection() cursor = conn.cursor() # Get roles from role_hierarchy table cursor.execute("SELECT role_name, display_name, description, level FROM role_hierarchy ORDER BY level DESC") role_data = cursor.fetchall() roles = {} for role_name, display_name, description, level in role_data: roles[role_name] = { 'display_name': display_name, 'description': description, 'level': level } conn.close() return render_template('test_permissions.html', roles=roles, pages=APP_PERMISSIONS, action_names=ACTIONS) except Exception as e: return f"Error: {e}" @bp.route('/role_permissions_simple') def role_permissions_simple(): # Use the same handler but different template from app.settings import get_external_db_connection from flask import render_template, session, redirect, url_for, flash from app.permissions import APP_PERMISSIONS, ACTIONS import json # Check if superadmin if not session.get('role') == 'superadmin': flash('Access denied: Superadmin only.') return redirect(url_for('main.dashboard')) try: # Get roles and their current permissions conn = get_external_db_connection() cursor = conn.cursor() # Get roles from role_hierarchy table cursor.execute("SELECT role_name, display_name, description, level FROM role_hierarchy ORDER BY level DESC") role_data = cursor.fetchall() roles = {} for role_name, display_name, description, level in role_data: roles[role_name] = { 'display_name': display_name, 'description': description, 'level': level } # Get current role permissions cursor.execute(""" SELECT role, permission_key FROM role_permissions WHERE granted = TRUE """) permission_data = cursor.fetchall() role_permissions = {} for role, permission_key in permission_data: if role not in role_permissions: role_permissions[role] = [] role_permissions[role].append(permission_key) conn.close() # Convert to JSON for JavaScript permissions_json = json.dumps(APP_PERMISSIONS) role_permissions_json = json.dumps(role_permissions) return render_template('role_permissions_simple.html', roles=roles, pages=APP_PERMISSIONS, action_names=ACTIONS, permissions_json=permissions_json, role_permissions_json=role_permissions_json) except Exception as e: flash(f'Error loading role permissions: {e}') return redirect(url_for('main.dashboard')) @bp.route('/settings/save_role_permissions', methods=['POST']) def save_role_permissions(): return save_role_permissions_handler() @bp.route('/settings/reset_role_permissions', methods=['POST']) def reset_role_permissions(): return reset_role_permissions_handler() @bp.route('/settings/save_all_role_permissions', methods=['POST']) def save_all_role_permissions(): return save_all_role_permissions_handler() @bp.route('/settings/reset_all_role_permissions', methods=['POST']) def reset_all_role_permissions(): return reset_all_role_permissions_handler() @bp.route('/get_report_data', methods=['GET']) def get_report_data(): report = request.args.get('report') data = {"headers": [], "rows": []} try: conn = get_db_connection() cursor = conn.cursor() if report == "1": # Logic for the 1-day report (today's records) today = datetime.now().strftime('%Y-%m-%d') print(f"DEBUG: Daily report searching for records on date: {today}") cursor.execute(""" SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity FROM scan1_orders WHERE date = ? ORDER BY date DESC, time DESC """, (today,)) rows = cursor.fetchall() print(f"DEBUG: Daily report found {len(rows)} rows for today ({today}):", rows) data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"] data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows] elif report == "2": # Logic for the 5-day report (last 5 days including today) five_days_ago = datetime.now() - timedelta(days=4) # Last 4 days + today = 5 days start_date = five_days_ago.strftime('%Y-%m-%d') print(f"DEBUG: 5-day report searching for records from {start_date} onwards") cursor.execute(""" SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity FROM scan1_orders WHERE date >= ? ORDER BY date DESC, time DESC """, (start_date,)) rows = cursor.fetchall() print(f"DEBUG: 5-day report found {len(rows)} rows from {start_date} onwards:", rows) data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"] data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows] elif report == "3": # Logic for the report with non-zero quality_code (today only) today = datetime.now().strftime('%Y-%m-%d') print(f"DEBUG: Quality defects report (today) searching for records on {today} with quality issues") cursor.execute(""" SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity FROM scan1_orders WHERE date = ? AND quality_code != 0 ORDER BY date DESC, time DESC """, (today,)) rows = cursor.fetchall() print(f"DEBUG: Quality defects report (today) found {len(rows)} rows with quality issues for {today}:", rows) data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"] data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows] elif report == "4": # Logic for the report with non-zero quality_code (last 5 days) five_days_ago = datetime.now() - timedelta(days=4) # Last 4 days + today = 5 days start_date = five_days_ago.strftime('%Y-%m-%d') print(f"DEBUG: Quality defects report (5 days) searching for records from {start_date} onwards with quality issues") cursor.execute(""" SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity FROM scan1_orders WHERE date >= ? AND quality_code != 0 ORDER BY date DESC, time DESC """, (start_date,)) rows = cursor.fetchall() print(f"DEBUG: Quality defects report (5 days) found {len(rows)} rows with quality issues from {start_date} onwards:", rows) data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"] data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows] elif report == "5": # Logic for the 5-ft report (all rows) # First check if table exists and has any data try: cursor.execute("SELECT COUNT(*) FROM scan1_orders") total_count = cursor.fetchone()[0] print(f"DEBUG: Total records in scan1_orders table: {total_count}") if total_count == 0: print("DEBUG: No data found in scan1_orders table") data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity of order", "Rejected Quantity of order"] data["rows"] = [] data["message"] = "No scan data available in the database. Please ensure scanning operations have been performed and data has been recorded." else: cursor.execute(""" SELECT Id, operator_code, CP_base_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity FROM scan1_orders ORDER BY date DESC, time DESC """) rows = cursor.fetchall() print(f"DEBUG: Fetched {len(rows)} rows for report 5 (all rows)") data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity of order", "Rejected Quantity of order"] data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows] except mariadb.Error as table_error: print(f"DEBUG: Table access error: {table_error}") data["error"] = f"Database table error: {table_error}" conn.close() except mariadb.Error as e: print(f"Error fetching report data: {e}") data["error"] = "Error fetching report data." print("Data being returned:", data) return jsonify(data) @bp.route('/generate_report', methods=['GET']) def generate_report(): """Generate report for specific date (calendar-based report)""" from datetime import datetime, timedelta report = request.args.get('report') selected_date = request.args.get('date') data = {"headers": [], "rows": []} try: conn = get_db_connection() cursor = conn.cursor() if report == "6" and selected_date: # Custom date report print(f"DEBUG: Searching for date: {selected_date}") # First, let's check what dates exist in the database cursor.execute("SELECT DISTINCT date FROM scan1_orders ORDER BY date DESC LIMIT 10") existing_dates = cursor.fetchall() print(f"DEBUG: Available dates in database: {existing_dates}") # Try exact match first cursor.execute(""" SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity FROM scan1_orders WHERE date = ? ORDER BY time DESC """, (selected_date,)) rows = cursor.fetchall() print(f"DEBUG: Exact match found {len(rows)} rows") # If no exact match, try with DATE() function to handle different formats if len(rows) == 0: cursor.execute(""" SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity FROM scan1_orders WHERE DATE(date) = ? ORDER BY time DESC """, (selected_date,)) rows = cursor.fetchall() print(f"DEBUG: DATE() function match found {len(rows)} rows") # If still no match, try LIKE pattern if len(rows) == 0: cursor.execute(""" SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity FROM scan1_orders WHERE date LIKE ? ORDER BY time DESC """, (f"{selected_date}%",)) rows = cursor.fetchall() print(f"DEBUG: LIKE pattern match found {len(rows)} rows") print(f"DEBUG: Final result - {len(rows)} rows for date {selected_date}") if len(rows) > 0: print(f"DEBUG: Sample row: {rows[0]}") data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"] data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows] # Add helpful message if no data found if len(rows) == 0: data["message"] = f"No scan data found for {selected_date}. Please select a date when scanning operations were performed." elif report == "7": # Date Range Report start_date = request.args.get('start_date') end_date = request.args.get('end_date') if start_date and end_date: print(f"DEBUG: Date range report - Start: {start_date}, End: {end_date}") # Validate date format and order try: start_dt = datetime.strptime(start_date, '%Y-%m-%d') end_dt = datetime.strptime(end_date, '%Y-%m-%d') if start_dt > end_dt: data["error"] = "Start date cannot be after end date." conn.close() return jsonify(data) except ValueError: data["error"] = "Invalid date format. Please use YYYY-MM-DD format." conn.close() return jsonify(data) # First, check what dates exist in the database for the range cursor.execute(""" SELECT DISTINCT date FROM scan1_orders WHERE date >= ? AND date <= ? ORDER BY date DESC """, (start_date, end_date)) existing_dates = cursor.fetchall() print(f"DEBUG: Available dates in range: {existing_dates}") # Query for all records in the date range cursor.execute(""" SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity FROM scan1_orders WHERE date >= ? AND date <= ? ORDER BY date DESC, time DESC """, (start_date, end_date)) rows = cursor.fetchall() print(f"DEBUG: Date range query found {len(rows)} rows from {start_date} to {end_date}") data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"] data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows] # Add helpful message if no data found if len(rows) == 0: data["message"] = f"No scan data found between {start_date} and {end_date}. Please select dates when scanning operations were performed." else: # Add summary information total_approved = sum(row[8] for row in rows if row[8] is not None) total_rejected = sum(row[9] for row in rows if row[9] is not None) data["summary"] = { "total_records": len(rows), "date_range": f"{start_date} to {end_date}", "total_approved": total_approved, "total_rejected": total_rejected, "dates_with_data": len(existing_dates) } else: data["error"] = "Both start date and end date are required for date range report." elif report == "8" and selected_date: # Custom date quality defects report print(f"DEBUG: Quality defects report for specific date: {selected_date}") # First, let's check what dates exist in the database cursor.execute("SELECT DISTINCT date FROM scan1_orders ORDER BY date DESC LIMIT 10") existing_dates = cursor.fetchall() print(f"DEBUG: Available dates in database: {existing_dates}") # Try exact match first for defects (quality_code != 0) cursor.execute(""" SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity FROM scan1_orders WHERE date = ? AND quality_code != 0 ORDER BY quality_code DESC, time DESC """, (selected_date,)) rows = cursor.fetchall() print(f"DEBUG: Quality defects exact match found {len(rows)} rows for {selected_date}") # If no exact match, try with DATE() function to handle different formats if len(rows) == 0: cursor.execute(""" SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity FROM scan1_orders WHERE DATE(date) = ? AND quality_code != 0 ORDER BY quality_code DESC, time DESC """, (selected_date,)) rows = cursor.fetchall() print(f"DEBUG: Quality defects DATE() function match found {len(rows)} rows") # If still no match, try LIKE pattern if len(rows) == 0: cursor.execute(""" SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity FROM scan1_orders WHERE date LIKE ? AND quality_code != 0 ORDER BY quality_code DESC, time DESC """, (f"{selected_date}%",)) rows = cursor.fetchall() print(f"DEBUG: Quality defects LIKE pattern match found {len(rows)} rows") print(f"DEBUG: Final quality defects result - {len(rows)} rows for date {selected_date}") if len(rows) > 0: print(f"DEBUG: Sample defective item: {rows[0]}") data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"] data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows] # Add helpful message if no data found if len(rows) == 0: data["message"] = f"No quality defects found for {selected_date}. This could mean no scanning was performed or all items passed quality control." else: # Add summary for quality defects total_defective_items = len(rows) total_rejected_qty = sum(row[9] for row in rows if row[9] is not None) unique_quality_codes = len(set(row[5] for row in rows if row[5] != 0)) data["defects_summary"] = { "total_defective_items": total_defective_items, "total_rejected_quantity": total_rejected_qty, "unique_defect_types": unique_quality_codes, "date": selected_date } elif report == "9": # Date Range Quality Defects Report print(f"DEBUG: Processing Date Range Quality Defects Report") # Get date range from request parameters start_date = request.args.get('start_date') end_date = request.args.get('end_date') print(f"DEBUG: Date range quality defects requested - Start: {start_date}, End: {end_date}") if not start_date or not end_date: data["error"] = "Both start date and end date are required for date range quality defects report." conn.close() return jsonify(data) try: # Validate date format from datetime import datetime datetime.strptime(start_date, '%Y-%m-%d') datetime.strptime(end_date, '%Y-%m-%d') # Check what dates are available in the database within the range cursor.execute(""" SELECT DISTINCT date FROM scan1_orders WHERE date >= ? AND date <= ? AND quality_code != 0 ORDER BY date DESC """, (start_date, end_date)) existing_dates = cursor.fetchall() print(f"DEBUG: Available dates with quality defects in range: {existing_dates}") # Query for quality defects in the date range cursor.execute(""" SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity FROM scan1_orders WHERE date >= ? AND date <= ? AND quality_code != 0 ORDER BY date DESC, quality_code DESC, time DESC """, (start_date, end_date)) rows = cursor.fetchall() print(f"DEBUG: Date range quality defects query found {len(rows)} rows from {start_date} to {end_date}") data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"] data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows] # Add helpful message if no data found if len(rows) == 0: data["message"] = f"No quality defects found between {start_date} and {end_date}. This could mean no scanning was performed in this date range or all items passed quality control." else: # Add summary for quality defects in date range total_defective_items = len(rows) total_rejected_qty = sum(row[9] for row in rows if row[9] is not None) unique_quality_codes = len(set(row[5] for row in rows if row[5] != 0)) unique_dates = len(set(row[6] for row in rows)) data["defects_summary"] = { "total_defective_items": total_defective_items, "total_rejected_quantity": total_rejected_qty, "unique_defect_types": unique_quality_codes, "date_range": f"{start_date} to {end_date}", "days_with_defects": unique_dates } except ValueError: data["error"] = "Invalid date format. Please use YYYY-MM-DD format." except Exception as e: print(f"DEBUG: Error in date range quality defects report: {e}") data["error"] = f"Error processing date range quality defects report: {e}" conn.close() except mariadb.Error as e: print(f"Error fetching custom date report: {e}") data["error"] = f"Error fetching report data for {selected_date if report == '6' or report == '8' else 'date range'}." print("Custom date report data being returned:", data) return jsonify(data) @bp.route('/debug_dates', methods=['GET']) def debug_dates(): """Debug route to check available dates in database""" try: conn = get_db_connection() cursor = conn.cursor() # Get all distinct dates cursor.execute("SELECT DISTINCT date FROM scan1_orders ORDER BY date DESC") dates = cursor.fetchall() # Get total count cursor.execute("SELECT COUNT(*) FROM scan1_orders") total_count = cursor.fetchone()[0] # Get sample data cursor.execute("SELECT date, time FROM scan1_orders ORDER BY date DESC LIMIT 5") sample_data = cursor.fetchall() conn.close() return jsonify({ "total_records": total_count, "available_dates": [str(date[0]) for date in dates], "sample_data": [{"date": str(row[0]), "time": str(row[1])} for row in sample_data] }) except Exception as e: return jsonify({"error": str(e)}) @bp.route('/test_database', methods=['GET']) def test_database(): """Test database connection and query the scan1_orders table""" # Check if user has superadmin permissions if 'role' not in session or session['role'] != 'superadmin': return jsonify({ "success": False, "error": "Access denied: Superadmin permissions required for database testing." }), 403 try: print("DEBUG: Testing database connection...") conn = get_db_connection() cursor = conn.cursor() print("DEBUG: Database connection successful!") # Test 1: Check if table exists try: cursor.execute("SHOW TABLES LIKE 'scan1_orders'") table_exists = cursor.fetchone() print(f"DEBUG: Table scan1_orders exists: {table_exists is not None}") if not table_exists: conn.close() return jsonify({ "success": False, "message": "Table 'scan1_orders' does not exist in the database" }) except Exception as e: print(f"DEBUG: Error checking table existence: {e}") conn.close() return jsonify({ "success": False, "message": f"Error checking table existence: {e}" }) # Test 2: Get table structure try: cursor.execute("DESCRIBE scan1_orders") table_structure = cursor.fetchall() print(f"DEBUG: Table structure: {table_structure}") except Exception as e: print(f"DEBUG: Error getting table structure: {e}") table_structure = [] # Test 3: Count total records try: cursor.execute("SELECT COUNT(*) FROM scan1_orders") total_count = cursor.fetchone()[0] print(f"DEBUG: Total records in table: {total_count}") except Exception as e: print(f"DEBUG: Error counting records: {e}") total_count = -1 # Test 4: Get sample data (if any exists) sample_data = [] try: cursor.execute("SELECT * FROM scan1_orders LIMIT 5") raw_data = cursor.fetchall() print(f"DEBUG: Sample data (first 5 rows): {raw_data}") # Convert data to JSON-serializable format using consistent formatting sample_data = [] for row in raw_data: converted_row = [format_cell_data(item) for item in row] sample_data.append(converted_row) except Exception as e: print(f"DEBUG: Error getting sample data: {e}") # Test 5: Get distinct dates (if any exist) available_dates = [] try: cursor.execute("SELECT DISTINCT date FROM scan1_orders ORDER BY date DESC LIMIT 10") date_rows = cursor.fetchall() available_dates = [str(row[0]) for row in date_rows] print(f"DEBUG: Available dates: {available_dates}") except Exception as e: print(f"DEBUG: Error getting dates: {e}") conn.close() # Test 6: Add a current date sample record for testing daily reports try: from datetime import datetime current_date = datetime.now().strftime('%Y-%m-%d') current_time = datetime.now().strftime('%H:%M:%S') # Check if we already have a record for today cursor.execute("SELECT COUNT(*) FROM scan1_orders WHERE date = ?", (current_date,)) today_count = cursor.fetchone()[0] if today_count == 0: print(f"DEBUG: No records found for today ({current_date}), adding sample record...") cursor.execute(""" INSERT INTO scan1_orders (operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ('OP01', 'CP99999999-0001', 'OC01', 'OC02', 0, current_date, current_time, 1, 0)) conn.commit() print(f"DEBUG: Added sample record for today ({current_date})") message_addendum = " Added sample record for today to test daily reports." else: print(f"DEBUG: Found {today_count} records for today ({current_date})") message_addendum = f" Found {today_count} records for today." except Exception as e: print(f"DEBUG: Error adding sample record: {e}") message_addendum = " Could not add sample record for testing." return jsonify({ "success": True, "database_connection": "OK", "table_exists": table_exists is not None, "table_structure": [{"field": row[0], "type": row[1], "null": row[2]} for row in table_structure], "total_records": total_count, "sample_data": sample_data, "available_dates": available_dates, "message": f"Database test completed. Found {total_count} records in scan1_orders table.{message_addendum}" }) except Exception as e: print(f"DEBUG: Database test failed: {e}") return jsonify({ "success": False, "message": f"Database connection failed: {e}" }) @bp.route('/etichete') def etichete(): if 'role' not in session or session['role'] not in ['superadmin', 'etichete']: flash('Access denied: Etichete users only.') return redirect(url_for('main.dashboard')) return render_template('main_page_etichete.html') @bp.route('/upload_data') def upload_data(): return render_template('upload_data.html') @bp.route('/print_module') def print_module(): return render_template('print_module.html') @bp.route('/download_extension') def download_extension(): """Route for downloading the Chrome extension""" return render_template('download_extension.html') @bp.route('/extension_files/') def extension_files(filename): """Serve Chrome extension files for download""" import os from flask import send_from_directory, current_app extension_dir = os.path.join(os.path.dirname(current_app.root_path), 'chrome_extension') return send_from_directory(extension_dir, filename) @bp.route('/create_extension_package', methods=['POST']) def create_extension_package(): """Create and serve ZIP package of Chrome extension""" import os import zipfile from flask import current_app, jsonify, send_file import tempfile try: # Use correct path to chrome_extension directory (it's in py_app, not py_app/app) extension_dir = os.path.join(os.path.dirname(current_app.root_path), 'chrome_extension') print(f"Looking for extension files in: {extension_dir}") if not os.path.exists(extension_dir): return jsonify({ 'success': False, 'error': f'Extension directory not found: {extension_dir}' }), 500 # List files in extension directory for debugging all_files = [] for root, dirs, files in os.walk(extension_dir): for file in files: file_path = os.path.join(root, file) all_files.append(file_path) print(f"Found files: {all_files}") # Create static directory if it doesn't exist static_dir = os.path.join(current_app.root_path, 'static') os.makedirs(static_dir, exist_ok=True) zip_filename = 'quality_recticel_print_helper.zip' zip_path = os.path.join(static_dir, zip_filename) # Create ZIP file directly in static directory with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: files_added = 0 # Add all extension files to ZIP for root, dirs, files in os.walk(extension_dir): for file in files: # Include all relevant files if file.endswith(('.json', '.js', '.html', '.css', '.png', '.md', '.txt')): file_path = os.path.join(root, file) # Create relative path for archive arcname = os.path.relpath(file_path, extension_dir) print(f"Adding file: {file_path} as {arcname}") zipf.write(file_path, arcname) files_added += 1 # Add a README file with installation instructions readme_content = """# Quality Recticel Print Helper Chrome Extension ## Installation Instructions: 1. Extract this ZIP file to a folder on your computer 2. Open Chrome and go to: chrome://extensions/ 3. Enable "Developer mode" in the top right 4. Click "Load unpacked" and select the extracted folder 5. The extension icon 🖨️ should appear in your toolbar ## Usage: 1. Go to the Print Module in the Quality Recticel application 2. Select an order from the table 3. Click the "🖨️ Print Direct" button 4. The label will print automatically to your default printer ## Troubleshooting: - Make sure your default printer is set up correctly - Click the extension icon to test printer connection - Check Chrome printer settings: chrome://settings/printing For support, contact your system administrator. """ zipf.writestr('README.txt', readme_content) files_added += 1 print(f"Total files added to ZIP: {files_added}") # Verify ZIP was created and has content if os.path.exists(zip_path): zip_size = os.path.getsize(zip_path) print(f"ZIP file created: {zip_path}, size: {zip_size} bytes") if zip_size > 0: return jsonify({ 'success': True, 'download_url': f'/static/{zip_filename}', 'files_included': files_added, 'zip_size': zip_size }) else: return jsonify({ 'success': False, 'error': 'ZIP file was created but is empty' }), 500 else: return jsonify({ 'success': False, 'error': 'Failed to create ZIP file' }), 500 except Exception as e: print(f"Error creating extension package: {e}") import traceback traceback.print_exc() return jsonify({ 'success': False, 'error': str(e) }), 500 @bp.route('/test_extension_files') def test_extension_files(): """Test route to check extension files""" import os from flask import current_app, jsonify extension_dir = os.path.join(os.path.dirname(current_app.root_path), 'chrome_extension') result = { 'extension_dir': extension_dir, 'dir_exists': os.path.exists(extension_dir), 'files': [] } if os.path.exists(extension_dir): for root, dirs, files in os.walk(extension_dir): for file in files: file_path = os.path.join(root, file) file_size = os.path.getsize(file_path) result['files'].append({ 'path': file_path, 'relative_path': os.path.relpath(file_path, extension_dir), 'size': file_size }) return jsonify(result) @bp.route('/label_templates') def label_templates(): return render_template('label_templates.html') @bp.route('/create_template') def create_template(): return render_template('create_template.html') @bp.route('/get_database_tables', methods=['GET']) def get_database_tables(): """Get list of database tables for template creation""" if 'role' not in session or session['role'] not in ['superadmin', 'warehouse_manager', 'etichete']: return jsonify({'error': 'Access denied'}), 403 try: # Get database connection using the same method as other functions settings_file = current_app.instance_path + '/external_server.conf' settings = {} with open(settings_file, 'r') as f: for line in f: key, value = line.strip().split('=', 1) settings[key] = value connection = mariadb.connect( user=settings['username'], password=settings['password'], host=settings['server_domain'], port=int(settings['port']), database=settings['database_name'] ) cursor = connection.cursor() # Get all tables in the database cursor.execute("SHOW TABLES") all_tables = [table[0] for table in cursor.fetchall()] # Filter to show relevant tables (prioritize order_for_labels) relevant_tables = [] if 'order_for_labels' in all_tables: relevant_tables.append('order_for_labels') # Add other potentially relevant tables for table in all_tables: if table not in relevant_tables and any(keyword in table.lower() for keyword in ['order', 'label', 'product', 'customer']): relevant_tables.append(table) connection.close() return jsonify({ 'success': True, 'tables': relevant_tables, 'recommended': 'order_for_labels' }) except Exception as e: return jsonify({'error': f'Database error: {str(e)}'}), 500 @bp.route('/get_table_columns/', methods=['GET']) def get_table_columns(table_name): """Get column names and descriptions for a specific table""" if 'role' not in session or session['role'] not in ['superadmin', 'warehouse_manager', 'etichete']: return jsonify({'error': 'Access denied'}), 403 try: # Get database connection settings_file = current_app.instance_path + '/external_server.conf' settings = {} with open(settings_file, 'r') as f: for line in f: key, value = line.strip().split('=', 1) settings[key] = value connection = mariadb.connect( user=settings['username'], password=settings['password'], host=settings['server_domain'], port=int(settings['port']), database=settings['database_name'] ) cursor = connection.cursor() # Verify table exists cursor.execute("SHOW TABLES LIKE %s", (table_name,)) if not cursor.fetchone(): return jsonify({'error': f'Table {table_name} not found'}), 404 # Get column information cursor.execute(f"DESCRIBE {table_name}") columns_info = cursor.fetchall() columns = [] for col_info in columns_info: column_name = col_info[0] column_type = col_info[1] is_nullable = col_info[2] == 'YES' column_default = col_info[4] # Get column comment if available cursor.execute(f""" SELECT COLUMN_COMMENT FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = %s AND COLUMN_NAME = %s AND TABLE_SCHEMA = DATABASE() """, (table_name, column_name)) comment_result = cursor.fetchone() column_comment = comment_result[0] if comment_result and comment_result[0] else '' # Create user-friendly description description = column_comment or column_name.replace('_', ' ').title() columns.append({ 'name': column_name, 'type': column_type, 'nullable': is_nullable, 'default': column_default, 'description': description, 'field_id': f"db_{column_name}" # Unique ID for template fields }) connection.close() return jsonify({ 'success': True, 'table': table_name, 'columns': columns }) except Exception as e: return jsonify({'error': f'Database error: {str(e)}'}), 500 @bp.route('/edit_template/') def edit_template(template_id): # Logic for editing a template will go here return f"Edit template with ID {template_id}" @bp.route('/delete_template/', methods=['POST']) def delete_template(template_id): # Logic for deleting a template will go here return f"Delete template with ID {template_id}" @bp.route('/get_tables') def get_tables(): # Replace with logic to fetch tables from your database tables = ['table1', 'table2', 'table3'] return jsonify({'tables': tables}) @bp.route('/get_columns') def get_columns(): table = request.args.get('table') # Replace with logic to fetch columns for the selected table columns = ['column1', 'column2', 'column3'] if table else [] return jsonify({'columns': columns}) @bp.route('/save_template', methods=['POST']) def save_template(): data = request.get_json() # Replace with logic to save the template to the database print(f"Saving template: {data}") return jsonify({'message': 'Template saved successfully!'}) @bp.route('/generate_pdf', methods=['POST']) def generate_pdf(): data = request.get_json() width = data.get('width', 100) # Default width in mm height = data.get('height', 50) # Default height in mm columns = data.get('columns', []) # Convert dimensions from mm to points (1 mm = 2.83465 points) width_points = width * 2.83465 height_points = height * 2.83465 # Ensure the /static/label_templates folder exists label_templates_folder = os.path.join(current_app.root_path, 'static', 'label_templates') os.makedirs(label_templates_folder, exist_ok=True) # Define the path for the PDF file pdf_file_path = os.path.join(label_templates_folder, 'label_template.pdf') # Create a PDF file c = canvas.Canvas(pdf_file_path, pagesize=(width_points, height_points)) # Add content to the PDF c.drawString(10, height_points - 20, "Label Template") y_position = height_points - 40 for column in columns: c.drawString(10, y_position, f"Column: {column}") y_position -= 20 # Save the PDF c.save() return jsonify({'message': 'PDF generated successfully!', 'pdf_path': f'/static/label_templates/label_template.pdf'}) # Order Labels Upload Module Routes @bp.route('/upload_orders', methods=['GET', 'POST']) def upload_orders(): """Route for uploading orders CSV files for label generation""" if 'role' not in session or session['role'] not in ['superadmin', 'warehouse', 'warehouse_manager']: flash('Access denied: Warehouse management permissions required.') return redirect(url_for('main.dashboard')) from app.order_labels import upload_orders_handler return upload_orders_handler() @bp.route('/view_orders') def view_orders(): """Route for viewing uploaded orders""" if 'role' not in session or session['role'] not in ['superadmin', 'warehouse', 'warehouse_manager', 'warehouse_worker']: flash('Access denied: Warehouse access required.') return redirect(url_for('main.dashboard')) from app.order_labels import get_orders_from_database orders = get_orders_from_database(200) # Get last 200 orders return render_template('view_orders.html', orders=orders) @bp.route('/get_unprinted_orders', methods=['GET']) def get_unprinted_orders(): """Get all rows from order_for_labels where printed != 1""" if 'role' not in session or session['role'] not in ['superadmin', 'warehouse_manager', 'etichete']: return jsonify({'error': 'Access denied'}), 403 try: data = get_unprinted_orders_data() return jsonify(data) except Exception as e: return jsonify({'error': str(e)}), 500 @warehouse_bp.route('/create_locations', methods=['GET', 'POST']) def create_locations(): from app.warehouse import create_locations_handler return create_locations_handler() @warehouse_bp.route('/import_locations_csv', methods=['GET', 'POST']) def import_locations_csv(): from app.warehouse import import_locations_csv_handler return import_locations_csv_handler()