Files
quality_recticel/py_app/app/routes.py

1617 lines
68 KiB
Python

import os
import mariadb
from datetime import datetime, timedelta
from flask import Blueprint, render_template, redirect, url_for, request, flash, session, current_app, jsonify
from .models import User
from . import db
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from flask import Blueprint, render_template, request, redirect, url_for, flash
import csv
from .warehouse import add_location
from app.settings import (
settings_handler,
role_permissions_handler,
save_role_permissions_handler,
reset_role_permissions_handler,
save_all_role_permissions_handler,
reset_all_role_permissions_handler,
edit_user_handler,
create_user_handler,
delete_user_handler,
save_external_db_handler
)
from .print_module import get_unprinted_orders_data
bp = Blueprint('main', __name__)
warehouse_bp = Blueprint('warehouse', __name__)
def format_cell_data(cell):
"""Helper function to format cell data, especially dates and times"""
if isinstance(cell, datetime):
# Format date as dd/mm/yyyy
return cell.strftime('%d/%m/%Y')
elif isinstance(cell, timedelta):
# Convert timedelta to HH:MM:SS format
total_seconds = int(cell.total_seconds())
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
elif hasattr(cell, 'date'): # Handle date objects
# Format date as dd/mm/yyyy
return cell.strftime('%d/%m/%Y')
else:
return cell
@bp.route('/store_articles')
def store_articles():
return render_template('store_articles.html')
@bp.route('/warehouse_reports')
def warehouse_reports():
return render_template('warehouse_reports.html')
def get_db_connection():
"""Reads the external_server.conf file and returns a MariaDB database connection."""
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
if not os.path.exists(settings_file):
raise FileNotFoundError("The external_server.conf file is missing in the instance folder.")
# Read settings from the configuration file
settings = {}
with open(settings_file, 'r') as f:
for line in f:
key, value = line.strip().split('=', 1)
settings[key] = value
# Create a database connection
return mariadb.connect(
user=settings['username'],
password=settings['password'],
host=settings['server_domain'],
port=int(settings['port']),
database=settings['database_name']
)
@bp.route('/login', methods=['GET', 'POST'])
def login():
import sqlite3
if request.method == 'POST':
# Debug: print all form data received
print("All form data received:", dict(request.form))
# Safely get username and password with fallback
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
if not username or not password:
print("Missing username or password")
flash('Please enter both username and password.')
return render_template('login.html')
user = None
print("Raw form input:", repr(username), repr(password))
# Logic: If username starts with #, check internal SQLite database
if username.startswith('#'):
username_clean = username[1:].strip()
password_clean = password.strip()
print(f"Checking internal database for: {username_clean}")
# Check internal SQLite database (py_app/instance/users.db)
internal_db_path = os.path.join(os.path.dirname(__file__), '../instance/users.db')
try:
conn = sqlite3.connect(internal_db_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'")
if cursor.fetchone():
cursor.execute("SELECT username, password, role FROM users WHERE username=? AND password=?", (username_clean, password_clean))
row = cursor.fetchone()
print("Internal DB query result:", row)
if row:
user = {'username': row[0], 'password': row[1], 'role': row[2]}
else:
print("No users table in internal database")
conn.close()
except Exception as e:
print("Internal DB error:", e)
else:
# Check external MariaDB database first
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SHOW TABLES LIKE 'users'")
if cursor.fetchone():
cursor.execute("SELECT username, password, role FROM users WHERE username=%s AND password=%s", (username.strip(), password.strip()))
row = cursor.fetchone()
print("External DB query result:", row)
if row:
user = {'username': row[0], 'password': row[1], 'role': row[2]}
conn.close()
except Exception as e:
print("External DB error:", e)
# Fallback to internal database if external fails
print("Falling back to internal database")
internal_db_path = os.path.join(os.path.dirname(__file__), '../instance/users.db')
try:
conn = sqlite3.connect(internal_db_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'")
if cursor.fetchone():
cursor.execute("SELECT username, password, role FROM users WHERE username=? AND password=?", (username.strip(), password.strip()))
row = cursor.fetchone()
print("Internal DB fallback query result:", row)
if row:
user = {'username': row[0], 'password': row[1], 'role': row[2]}
conn.close()
except Exception as e2:
print("Internal DB fallback error:", e2)
if user:
session['user'] = user['username']
session['role'] = user['role']
print("Logged in as:", session.get('user'), session.get('role'))
return redirect(url_for('main.dashboard'))
else:
print("Login failed for:", username, password)
flash('Invalid credentials. Please try again.')
return render_template('login.html')
@bp.route('/dashboard')
def dashboard():
print("Session user:", session.get('user'), session.get('role'))
if 'user' not in session:
return redirect(url_for('main.login'))
return render_template('dashboard.html')
@bp.route('/settings')
def settings():
return settings_handler()
@bp.route('/quality')
def quality():
if 'role' not in session or session['role'] not in ['superadmin', 'quality']:
flash('Access denied: Quality users only.')
return redirect(url_for('main.dashboard'))
return render_template('quality.html')
@bp.route('/warehouse')
def warehouse():
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse']:
flash('Access denied: Warehouse users only.')
return redirect(url_for('main.dashboard'))
return render_template('main_page_warehouse.html')
@bp.route('/scan', methods=['GET', 'POST'])
def scan():
if 'role' not in session or session['role'] not in ['superadmin', 'scan']:
flash('Access denied: Scan users only.')
return redirect(url_for('main.dashboard'))
if request.method == 'POST':
# Handle form submission
operator_code = request.form.get('operator_code')
cp_code = request.form.get('cp_code')
oc1_code = request.form.get('oc1_code')
oc2_code = request.form.get('oc2_code')
defect_code = request.form.get('defect_code')
date = request.form.get('date')
time = request.form.get('time')
try:
# Connect to the database
conn = get_db_connection()
cursor = conn.cursor()
# Check if the CP_full_code already exists
cursor.execute("SELECT Id FROM scan1_orders WHERE CP_full_code = ?", (cp_code,))
existing_entry = cursor.fetchone()
if existing_entry:
# Update the existing entry
update_query = """
UPDATE scan1_orders
SET operator_code = ?, OC1_code = ?, OC2_code = ?, quality_code = ?, date = ?, time = ?
WHERE CP_full_code = ?
"""
cursor.execute(update_query, (operator_code, oc1_code, oc2_code, defect_code, date, time, cp_code))
flash('Existing entry updated successfully.')
else:
# Insert a new entry
insert_query = """
INSERT INTO scan1_orders (operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time)
VALUES (?, ?, ?, ?, ?, ?, ?)
"""
cursor.execute(insert_query, (operator_code, cp_code, oc1_code, oc2_code, defect_code, date, time))
flash('New entry inserted successfully.')
# Commit the transaction
conn.commit()
conn.close()
except mariadb.Error as e:
print(f"Error saving scan data: {e}")
flash(f"Error saving scan data: {e}")
# Fetch the latest scan data for display
scan_data = []
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
ORDER BY Id DESC
LIMIT 15
""")
scan_data = cursor.fetchall()
conn.close()
except mariadb.Error as e:
print(f"Error fetching scan data: {e}")
flash(f"Error fetching scan data: {e}")
return render_template('scan.html', scan_data=scan_data)
@bp.route('/logout')
def logout():
session.pop('user', None)
session.pop('role', None)
return redirect(url_for('main.login'))
@bp.route('/create_user', methods=['POST'])
def create_user():
return create_user_handler()
@bp.route('/edit_user', methods=['POST'])
def edit_user():
return edit_user_handler()
@bp.route('/delete_user', methods=['POST'])
def delete_user():
return delete_user_handler()
@bp.route('/save_external_db', methods=['POST'])
def save_external_db():
return save_external_db_handler()
# Role Permissions Management Routes
@bp.route('/role_permissions')
def role_permissions():
return role_permissions_handler()
@bp.route('/test_permissions')
def test_permissions():
from app.settings import role_permissions_handler
from flask import render_template, session, redirect, url_for, flash
from app.permissions import APP_PERMISSIONS, ACTIONS
# Check if superadmin
if not session.get('role') == 'superadmin':
flash('Access denied: Superadmin only.')
return redirect(url_for('main.dashboard'))
try:
# Get the same data as role_permissions_handler
from app.settings import get_external_db_connection
conn = get_external_db_connection()
cursor = conn.cursor()
# Get roles from role_hierarchy table
cursor.execute("SELECT role_name, display_name, description, level FROM role_hierarchy ORDER BY level DESC")
role_data = cursor.fetchall()
roles = {}
for role_name, display_name, description, level in role_data:
roles[role_name] = {
'display_name': display_name,
'description': description,
'level': level
}
conn.close()
return render_template('test_permissions.html',
roles=roles,
pages=APP_PERMISSIONS,
action_names=ACTIONS)
except Exception as e:
return f"Error: {e}"
@bp.route('/role_permissions_simple')
def role_permissions_simple():
# Use the same handler but different template
from app.settings import get_external_db_connection
from flask import render_template, session, redirect, url_for, flash
from app.permissions import APP_PERMISSIONS, ACTIONS
import json
# Check if superadmin
if not session.get('role') == 'superadmin':
flash('Access denied: Superadmin only.')
return redirect(url_for('main.dashboard'))
try:
# Get roles and their current permissions
conn = get_external_db_connection()
cursor = conn.cursor()
# Get roles from role_hierarchy table
cursor.execute("SELECT role_name, display_name, description, level FROM role_hierarchy ORDER BY level DESC")
role_data = cursor.fetchall()
roles = {}
for role_name, display_name, description, level in role_data:
roles[role_name] = {
'display_name': display_name,
'description': description,
'level': level
}
# Get current role permissions
cursor.execute("""
SELECT role, permission_key
FROM role_permissions
WHERE granted = TRUE
""")
permission_data = cursor.fetchall()
role_permissions = {}
for role, permission_key in permission_data:
if role not in role_permissions:
role_permissions[role] = []
role_permissions[role].append(permission_key)
conn.close()
# Convert to JSON for JavaScript
permissions_json = json.dumps(APP_PERMISSIONS)
role_permissions_json = json.dumps(role_permissions)
return render_template('role_permissions_simple.html',
roles=roles,
pages=APP_PERMISSIONS,
action_names=ACTIONS,
permissions_json=permissions_json,
role_permissions_json=role_permissions_json)
except Exception as e:
flash(f'Error loading role permissions: {e}')
return redirect(url_for('main.dashboard'))
@bp.route('/settings/save_role_permissions', methods=['POST'])
def save_role_permissions():
return save_role_permissions_handler()
@bp.route('/settings/reset_role_permissions', methods=['POST'])
def reset_role_permissions():
return reset_role_permissions_handler()
@bp.route('/settings/save_all_role_permissions', methods=['POST'])
def save_all_role_permissions():
return save_all_role_permissions_handler()
@bp.route('/settings/reset_all_role_permissions', methods=['POST'])
def reset_all_role_permissions():
return reset_all_role_permissions_handler()
@bp.route('/get_report_data', methods=['GET'])
def get_report_data():
report = request.args.get('report')
data = {"headers": [], "rows": []}
try:
conn = get_db_connection()
cursor = conn.cursor()
if report == "1": # Logic for the 1-day report (today's records)
today = datetime.now().strftime('%Y-%m-%d')
print(f"DEBUG: Daily report searching for records on date: {today}")
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date = ?
ORDER BY date DESC, time DESC
""", (today,))
rows = cursor.fetchall()
print(f"DEBUG: Daily report found {len(rows)} rows for today ({today}):", rows)
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
elif report == "2": # Logic for the 5-day report (last 5 days including today)
five_days_ago = datetime.now() - timedelta(days=4) # Last 4 days + today = 5 days
start_date = five_days_ago.strftime('%Y-%m-%d')
print(f"DEBUG: 5-day report searching for records from {start_date} onwards")
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date >= ?
ORDER BY date DESC, time DESC
""", (start_date,))
rows = cursor.fetchall()
print(f"DEBUG: 5-day report found {len(rows)} rows from {start_date} onwards:", rows)
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
elif report == "3": # Logic for the report with non-zero quality_code (today only)
today = datetime.now().strftime('%Y-%m-%d')
print(f"DEBUG: Quality defects report (today) searching for records on {today} with quality issues")
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date = ? AND quality_code != 0
ORDER BY date DESC, time DESC
""", (today,))
rows = cursor.fetchall()
print(f"DEBUG: Quality defects report (today) found {len(rows)} rows with quality issues for {today}:", rows)
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
elif report == "4": # Logic for the report with non-zero quality_code (last 5 days)
five_days_ago = datetime.now() - timedelta(days=4) # Last 4 days + today = 5 days
start_date = five_days_ago.strftime('%Y-%m-%d')
print(f"DEBUG: Quality defects report (5 days) searching for records from {start_date} onwards with quality issues")
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date >= ? AND quality_code != 0
ORDER BY date DESC, time DESC
""", (start_date,))
rows = cursor.fetchall()
print(f"DEBUG: Quality defects report (5 days) found {len(rows)} rows with quality issues from {start_date} onwards:", rows)
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
elif report == "5": # Logic for the 5-ft report (all rows)
# First check if table exists and has any data
try:
cursor.execute("SELECT COUNT(*) FROM scan1_orders")
total_count = cursor.fetchone()[0]
print(f"DEBUG: Total records in scan1_orders table: {total_count}")
if total_count == 0:
print("DEBUG: No data found in scan1_orders table")
data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity of order", "Rejected Quantity of order"]
data["rows"] = []
data["message"] = "No scan data available in the database. Please ensure scanning operations have been performed and data has been recorded."
else:
cursor.execute("""
SELECT Id, operator_code, CP_base_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
ORDER BY date DESC, time DESC
""")
rows = cursor.fetchall()
print(f"DEBUG: Fetched {len(rows)} rows for report 5 (all rows)")
data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity of order", "Rejected Quantity of order"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
except mariadb.Error as table_error:
print(f"DEBUG: Table access error: {table_error}")
data["error"] = f"Database table error: {table_error}"
conn.close()
except mariadb.Error as e:
print(f"Error fetching report data: {e}")
data["error"] = "Error fetching report data."
print("Data being returned:", data)
return jsonify(data)
@bp.route('/generate_report', methods=['GET'])
def generate_report():
"""Generate report for specific date (calendar-based report)"""
from datetime import datetime, timedelta
report = request.args.get('report')
selected_date = request.args.get('date')
data = {"headers": [], "rows": []}
try:
conn = get_db_connection()
cursor = conn.cursor()
if report == "6" and selected_date: # Custom date report
print(f"DEBUG: Searching for date: {selected_date}")
# First, let's check what dates exist in the database
cursor.execute("SELECT DISTINCT date FROM scan1_orders ORDER BY date DESC LIMIT 10")
existing_dates = cursor.fetchall()
print(f"DEBUG: Available dates in database: {existing_dates}")
# Try exact match first
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date = ?
ORDER BY time DESC
""", (selected_date,))
rows = cursor.fetchall()
print(f"DEBUG: Exact match found {len(rows)} rows")
# If no exact match, try with DATE() function to handle different formats
if len(rows) == 0:
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE DATE(date) = ?
ORDER BY time DESC
""", (selected_date,))
rows = cursor.fetchall()
print(f"DEBUG: DATE() function match found {len(rows)} rows")
# If still no match, try LIKE pattern
if len(rows) == 0:
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date LIKE ?
ORDER BY time DESC
""", (f"{selected_date}%",))
rows = cursor.fetchall()
print(f"DEBUG: LIKE pattern match found {len(rows)} rows")
print(f"DEBUG: Final result - {len(rows)} rows for date {selected_date}")
if len(rows) > 0:
print(f"DEBUG: Sample row: {rows[0]}")
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
# Add helpful message if no data found
if len(rows) == 0:
data["message"] = f"No scan data found for {selected_date}. Please select a date when scanning operations were performed."
elif report == "7": # Date Range Report
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
if start_date and end_date:
print(f"DEBUG: Date range report - Start: {start_date}, End: {end_date}")
# Validate date format and order
try:
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_dt = datetime.strptime(end_date, '%Y-%m-%d')
if start_dt > end_dt:
data["error"] = "Start date cannot be after end date."
conn.close()
return jsonify(data)
except ValueError:
data["error"] = "Invalid date format. Please use YYYY-MM-DD format."
conn.close()
return jsonify(data)
# First, check what dates exist in the database for the range
cursor.execute("""
SELECT DISTINCT date FROM scan1_orders
WHERE date >= ? AND date <= ?
ORDER BY date DESC
""", (start_date, end_date))
existing_dates = cursor.fetchall()
print(f"DEBUG: Available dates in range: {existing_dates}")
# Query for all records in the date range
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date >= ? AND date <= ?
ORDER BY date DESC, time DESC
""", (start_date, end_date))
rows = cursor.fetchall()
print(f"DEBUG: Date range query found {len(rows)} rows from {start_date} to {end_date}")
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
# Add helpful message if no data found
if len(rows) == 0:
data["message"] = f"No scan data found between {start_date} and {end_date}. Please select dates when scanning operations were performed."
else:
# Add summary information
total_approved = sum(row[8] for row in rows if row[8] is not None)
total_rejected = sum(row[9] for row in rows if row[9] is not None)
data["summary"] = {
"total_records": len(rows),
"date_range": f"{start_date} to {end_date}",
"total_approved": total_approved,
"total_rejected": total_rejected,
"dates_with_data": len(existing_dates)
}
else:
data["error"] = "Both start date and end date are required for date range report."
elif report == "8" and selected_date: # Custom date quality defects report
print(f"DEBUG: Quality defects report for specific date: {selected_date}")
# First, let's check what dates exist in the database
cursor.execute("SELECT DISTINCT date FROM scan1_orders ORDER BY date DESC LIMIT 10")
existing_dates = cursor.fetchall()
print(f"DEBUG: Available dates in database: {existing_dates}")
# Try exact match first for defects (quality_code != 0)
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date = ? AND quality_code != 0
ORDER BY quality_code DESC, time DESC
""", (selected_date,))
rows = cursor.fetchall()
print(f"DEBUG: Quality defects exact match found {len(rows)} rows for {selected_date}")
# If no exact match, try with DATE() function to handle different formats
if len(rows) == 0:
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE DATE(date) = ? AND quality_code != 0
ORDER BY quality_code DESC, time DESC
""", (selected_date,))
rows = cursor.fetchall()
print(f"DEBUG: Quality defects DATE() function match found {len(rows)} rows")
# If still no match, try LIKE pattern
if len(rows) == 0:
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date LIKE ? AND quality_code != 0
ORDER BY quality_code DESC, time DESC
""", (f"{selected_date}%",))
rows = cursor.fetchall()
print(f"DEBUG: Quality defects LIKE pattern match found {len(rows)} rows")
print(f"DEBUG: Final quality defects result - {len(rows)} rows for date {selected_date}")
if len(rows) > 0:
print(f"DEBUG: Sample defective item: {rows[0]}")
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
# Add helpful message if no data found
if len(rows) == 0:
data["message"] = f"No quality defects found for {selected_date}. This could mean no scanning was performed or all items passed quality control."
else:
# Add summary for quality defects
total_defective_items = len(rows)
total_rejected_qty = sum(row[9] for row in rows if row[9] is not None)
unique_quality_codes = len(set(row[5] for row in rows if row[5] != 0))
data["defects_summary"] = {
"total_defective_items": total_defective_items,
"total_rejected_quantity": total_rejected_qty,
"unique_defect_types": unique_quality_codes,
"date": selected_date
}
elif report == "9": # Date Range Quality Defects Report
print(f"DEBUG: Processing Date Range Quality Defects Report")
# Get date range from request parameters
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
print(f"DEBUG: Date range quality defects requested - Start: {start_date}, End: {end_date}")
if not start_date or not end_date:
data["error"] = "Both start date and end date are required for date range quality defects report."
conn.close()
return jsonify(data)
try:
# Validate date format
from datetime import datetime
datetime.strptime(start_date, '%Y-%m-%d')
datetime.strptime(end_date, '%Y-%m-%d')
# Check what dates are available in the database within the range
cursor.execute("""
SELECT DISTINCT date
FROM scan1_orders
WHERE date >= ? AND date <= ? AND quality_code != 0
ORDER BY date DESC
""", (start_date, end_date))
existing_dates = cursor.fetchall()
print(f"DEBUG: Available dates with quality defects in range: {existing_dates}")
# Query for quality defects in the date range
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date >= ? AND date <= ? AND quality_code != 0
ORDER BY date DESC, quality_code DESC, time DESC
""", (start_date, end_date))
rows = cursor.fetchall()
print(f"DEBUG: Date range quality defects query found {len(rows)} rows from {start_date} to {end_date}")
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
# Add helpful message if no data found
if len(rows) == 0:
data["message"] = f"No quality defects found between {start_date} and {end_date}. This could mean no scanning was performed in this date range or all items passed quality control."
else:
# Add summary for quality defects in date range
total_defective_items = len(rows)
total_rejected_qty = sum(row[9] for row in rows if row[9] is not None)
unique_quality_codes = len(set(row[5] for row in rows if row[5] != 0))
unique_dates = len(set(row[6] for row in rows))
data["defects_summary"] = {
"total_defective_items": total_defective_items,
"total_rejected_quantity": total_rejected_qty,
"unique_defect_types": unique_quality_codes,
"date_range": f"{start_date} to {end_date}",
"days_with_defects": unique_dates
}
except ValueError:
data["error"] = "Invalid date format. Please use YYYY-MM-DD format."
except Exception as e:
print(f"DEBUG: Error in date range quality defects report: {e}")
data["error"] = f"Error processing date range quality defects report: {e}"
conn.close()
except mariadb.Error as e:
print(f"Error fetching custom date report: {e}")
data["error"] = f"Error fetching report data for {selected_date if report == '6' or report == '8' else 'date range'}."
print("Custom date report data being returned:", data)
return jsonify(data)
@bp.route('/debug_dates', methods=['GET'])
def debug_dates():
"""Debug route to check available dates in database"""
try:
conn = get_db_connection()
cursor = conn.cursor()
# Get all distinct dates
cursor.execute("SELECT DISTINCT date FROM scan1_orders ORDER BY date DESC")
dates = cursor.fetchall()
# Get total count
cursor.execute("SELECT COUNT(*) FROM scan1_orders")
total_count = cursor.fetchone()[0]
# Get sample data
cursor.execute("SELECT date, time FROM scan1_orders ORDER BY date DESC LIMIT 5")
sample_data = cursor.fetchall()
conn.close()
return jsonify({
"total_records": total_count,
"available_dates": [str(date[0]) for date in dates],
"sample_data": [{"date": str(row[0]), "time": str(row[1])} for row in sample_data]
})
except Exception as e:
return jsonify({"error": str(e)})
@bp.route('/test_database', methods=['GET'])
def test_database():
"""Test database connection and query the scan1_orders table"""
# Check if user has superadmin permissions
if 'role' not in session or session['role'] != 'superadmin':
return jsonify({
"success": False,
"error": "Access denied: Superadmin permissions required for database testing."
}), 403
try:
print("DEBUG: Testing database connection...")
conn = get_db_connection()
cursor = conn.cursor()
print("DEBUG: Database connection successful!")
# Test 1: Check if table exists
try:
cursor.execute("SHOW TABLES LIKE 'scan1_orders'")
table_exists = cursor.fetchone()
print(f"DEBUG: Table scan1_orders exists: {table_exists is not None}")
if not table_exists:
conn.close()
return jsonify({
"success": False,
"message": "Table 'scan1_orders' does not exist in the database"
})
except Exception as e:
print(f"DEBUG: Error checking table existence: {e}")
conn.close()
return jsonify({
"success": False,
"message": f"Error checking table existence: {e}"
})
# Test 2: Get table structure
try:
cursor.execute("DESCRIBE scan1_orders")
table_structure = cursor.fetchall()
print(f"DEBUG: Table structure: {table_structure}")
except Exception as e:
print(f"DEBUG: Error getting table structure: {e}")
table_structure = []
# Test 3: Count total records
try:
cursor.execute("SELECT COUNT(*) FROM scan1_orders")
total_count = cursor.fetchone()[0]
print(f"DEBUG: Total records in table: {total_count}")
except Exception as e:
print(f"DEBUG: Error counting records: {e}")
total_count = -1
# Test 4: Get sample data (if any exists)
sample_data = []
try:
cursor.execute("SELECT * FROM scan1_orders LIMIT 5")
raw_data = cursor.fetchall()
print(f"DEBUG: Sample data (first 5 rows): {raw_data}")
# Convert data to JSON-serializable format using consistent formatting
sample_data = []
for row in raw_data:
converted_row = [format_cell_data(item) for item in row]
sample_data.append(converted_row)
except Exception as e:
print(f"DEBUG: Error getting sample data: {e}")
# Test 5: Get distinct dates (if any exist)
available_dates = []
try:
cursor.execute("SELECT DISTINCT date FROM scan1_orders ORDER BY date DESC LIMIT 10")
date_rows = cursor.fetchall()
available_dates = [str(row[0]) for row in date_rows]
print(f"DEBUG: Available dates: {available_dates}")
except Exception as e:
print(f"DEBUG: Error getting dates: {e}")
conn.close()
# Test 6: Add a current date sample record for testing daily reports
try:
from datetime import datetime
current_date = datetime.now().strftime('%Y-%m-%d')
current_time = datetime.now().strftime('%H:%M:%S')
# Check if we already have a record for today
cursor.execute("SELECT COUNT(*) FROM scan1_orders WHERE date = ?", (current_date,))
today_count = cursor.fetchone()[0]
if today_count == 0:
print(f"DEBUG: No records found for today ({current_date}), adding sample record...")
cursor.execute("""
INSERT INTO scan1_orders (operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", ('OP01', 'CP99999999-0001', 'OC01', 'OC02', 0, current_date, current_time, 1, 0))
conn.commit()
print(f"DEBUG: Added sample record for today ({current_date})")
message_addendum = " Added sample record for today to test daily reports."
else:
print(f"DEBUG: Found {today_count} records for today ({current_date})")
message_addendum = f" Found {today_count} records for today."
except Exception as e:
print(f"DEBUG: Error adding sample record: {e}")
message_addendum = " Could not add sample record for testing."
return jsonify({
"success": True,
"database_connection": "OK",
"table_exists": table_exists is not None,
"table_structure": [{"field": row[0], "type": row[1], "null": row[2]} for row in table_structure],
"total_records": total_count,
"sample_data": sample_data,
"available_dates": available_dates,
"message": f"Database test completed. Found {total_count} records in scan1_orders table.{message_addendum}"
})
except Exception as e:
print(f"DEBUG: Database test failed: {e}")
return jsonify({
"success": False,
"message": f"Database connection failed: {e}"
})
@bp.route('/etichete')
def etichete():
if 'role' not in session or session['role'] not in ['superadmin', 'etichete']:
flash('Access denied: Etichete users only.')
return redirect(url_for('main.dashboard'))
return render_template('main_page_etichete.html')
@bp.route('/upload_data')
def upload_data():
return render_template('upload_data.html')
@bp.route('/print_module')
def print_module():
return render_template('print_module.html')
@bp.route('/download_extension')
def download_extension():
"""Route for downloading the Chrome extension"""
return render_template('download_extension.html')
@bp.route('/extension_files/<path:filename>')
def extension_files(filename):
"""Serve Chrome extension files for download"""
import os
from flask import send_from_directory, current_app
extension_dir = os.path.join(os.path.dirname(current_app.root_path), 'chrome_extension')
return send_from_directory(extension_dir, filename)
@bp.route('/create_extension_package', methods=['POST'])
def create_extension_package():
"""Create and serve ZIP package of Chrome extension"""
import os
import zipfile
from flask import current_app, jsonify, send_file
import tempfile
try:
# Use correct path to chrome_extension directory (it's in py_app, not py_app/app)
extension_dir = os.path.join(os.path.dirname(current_app.root_path), 'chrome_extension')
print(f"Looking for extension files in: {extension_dir}")
if not os.path.exists(extension_dir):
return jsonify({
'success': False,
'error': f'Extension directory not found: {extension_dir}'
}), 500
# List files in extension directory for debugging
all_files = []
for root, dirs, files in os.walk(extension_dir):
for file in files:
file_path = os.path.join(root, file)
all_files.append(file_path)
print(f"Found files: {all_files}")
# Create static directory if it doesn't exist
static_dir = os.path.join(current_app.root_path, 'static')
os.makedirs(static_dir, exist_ok=True)
zip_filename = 'quality_recticel_print_helper.zip'
zip_path = os.path.join(static_dir, zip_filename)
# Create ZIP file directly in static directory
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
files_added = 0
# Add all extension files to ZIP
for root, dirs, files in os.walk(extension_dir):
for file in files:
# Include all relevant files
if file.endswith(('.json', '.js', '.html', '.css', '.png', '.md', '.txt')):
file_path = os.path.join(root, file)
# Create relative path for archive
arcname = os.path.relpath(file_path, extension_dir)
print(f"Adding file: {file_path} as {arcname}")
zipf.write(file_path, arcname)
files_added += 1
# Add a README file with installation instructions
readme_content = """# Quality Recticel Print Helper Chrome Extension
## Installation Instructions:
1. Extract this ZIP file to a folder on your computer
2. Open Chrome and go to: chrome://extensions/
3. Enable "Developer mode" in the top right
4. Click "Load unpacked" and select the extracted folder
5. The extension icon 🖨️ should appear in your toolbar
## Usage:
1. Go to the Print Module in the Quality Recticel application
2. Select an order from the table
3. Click the "🖨️ Print Direct" button
4. The label will print automatically to your default printer
## Troubleshooting:
- Make sure your default printer is set up correctly
- Click the extension icon to test printer connection
- Check Chrome printer settings: chrome://settings/printing
For support, contact your system administrator.
"""
zipf.writestr('README.txt', readme_content)
files_added += 1
print(f"Total files added to ZIP: {files_added}")
# Verify ZIP was created and has content
if os.path.exists(zip_path):
zip_size = os.path.getsize(zip_path)
print(f"ZIP file created: {zip_path}, size: {zip_size} bytes")
if zip_size > 0:
return jsonify({
'success': True,
'download_url': f'/static/{zip_filename}',
'files_included': files_added,
'zip_size': zip_size
})
else:
return jsonify({
'success': False,
'error': 'ZIP file was created but is empty'
}), 500
else:
return jsonify({
'success': False,
'error': 'Failed to create ZIP file'
}), 500
except Exception as e:
print(f"Error creating extension package: {e}")
import traceback
traceback.print_exc()
return jsonify({
'success': False,
'error': str(e)
}), 500
@bp.route('/create_service_package', methods=['POST'])
def create_service_package():
"""Create and serve ZIP package of Windows Print Service"""
import os
import zipfile
from flask import current_app, jsonify
try:
# Path to the windows_print_service directory
service_dir = os.path.join(os.path.dirname(os.path.dirname(current_app.root_path)), 'windows_print_service')
print(f"Looking for service files in: {service_dir}")
if not os.path.exists(service_dir):
return jsonify({
'success': False,
'error': f'Windows service directory not found: {service_dir}'
}), 500
# Create static directory if it doesn't exist
static_dir = os.path.join(current_app.root_path, 'static')
os.makedirs(static_dir, exist_ok=True)
zip_filename = 'quality_recticel_print_service.zip'
zip_path = os.path.join(static_dir, zip_filename)
# Create ZIP file with Windows service package
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
files_added = 0
# Add all service files to ZIP (prioritize native PowerShell solution)
for root, dirs, files in os.walk(service_dir):
for file in files:
# Include all relevant files, with focus on native PowerShell solution
if file.endswith(('.ps1', '.bat', '.md', '.json', '.js', '.html', '.css', '.png', '.txt')):
file_path = os.path.join(root, file)
# Create relative path for archive
arcname = os.path.relpath(file_path, service_dir)
# Skip Python files in favor of PowerShell native solution
if file.endswith('.py') and not file.startswith('print_service'):
print(f"Skipping Python file (using native solution): {file_path}")
continue
print(f"Adding service file: {file_path} as {arcname}")
zipf.write(file_path, arcname)
files_added += 1
# Add installation instructions for native PowerShell solution
installation_readme = """# Quality Recticel Windows Print Service - Native Edition
## INSTALLATION INSTRUCTIONS (Native PowerShell - Zero Dependencies!)
### Prerequisites:
- Windows 10/11 or Windows Server 2016+
- Administrator privileges
- Google Chrome browser
- PowerShell (included with Windows)
### Quick Installation (Under 3 Minutes):
1. **Extract Files**: Extract this ZIP to any temporary location
✅ No permanent installation directory needed - files are copied during installation
2. **Install Native Service**: Right-click on 'install_native_service.bat' and select "Run as administrator"
✅ Pure PowerShell implementation - no Python or external dependencies required
3. **Install Chrome Extension**:
- Open Chrome → chrome://extensions/
- Enable "Developer mode"
- Click "Load unpacked" → Select the 'chrome_extension' folder
4. **Verify Installation**: Visit http://localhost:8765/health in your browser
Expected response: {"status": "healthy", "platform": "Windows PowerShell"}
### What Gets Installed:
- ✅ Native Windows Print Service (PowerShell-based, zero dependencies)
- ✅ Auto-start service configuration
- ✅ Service recovery options (automatic restart)
- ✅ Comprehensive logging system
### Files Included:
- 🔧 install_native_service.bat - Native PowerShell installer (RUN AS ADMIN)
- 🖥️ print_service.ps1 - Main PowerShell service (native Windows)
- 🗑️ uninstall_service.bat - Complete removal script
- 🌐 chrome_extension/ - Complete Chrome extension
- 📚 Documentation files (QUICK_SETUP_NATIVE.md, INSTALLATION_GUIDE.md, README.md)
### Native Advantages:
- 🚀 No Python dependencies - pure PowerShell
- ⚡ Faster startup and lower memory usage
- 🛡️ Enterprise-ready with Microsoft components only
- 📦 Tiny footprint - minimal system impact
### Support:
- 📖 Read QUICK_SETUP_NATIVE.md for 3-minute setup guide
- 📋 Read INSTALLATION_GUIDE.md for complete documentation
- 🛠️ Read README.md for technical details
### Service URLs:
- Health Check: http://localhost:8765/health
- Printer List: http://localhost:8765/printers
- API Documentation: See README.md
### Troubleshooting:
1. Service not starting? Run install_service.bat as Administrator
2. Can't connect? Check Windows Firewall (port 8765)
3. Chrome extension not working? Reload extension in chrome://extensions/
Installation takes ~5 minutes • Zero maintenance required
"""
zipf.writestr('INSTALLATION_README.txt', installation_readme)
files_added += 1
print(f"Total service files added to ZIP: {files_added}")
# Verify ZIP was created
if os.path.exists(zip_path):
zip_size = os.path.getsize(zip_path)
print(f"Service ZIP file created: {zip_path}, size: {zip_size} bytes")
if zip_size > 0:
return jsonify({
'success': True,
'download_url': f'/static/{zip_filename}',
'files_included': files_added,
'zip_size': zip_size
})
else:
return jsonify({
'success': False,
'error': 'ZIP file was created but is empty'
}), 500
else:
return jsonify({
'success': False,
'error': 'Failed to create service ZIP file'
}), 500
except Exception as e:
print(f"Error creating service package: {e}")
import traceback
traceback.print_exc()
return jsonify({
'success': False,
'error': str(e)
}), 500
@bp.route('/test_extension_files')
def test_extension_files():
"""Test route to check extension files"""
import os
from flask import current_app, jsonify
extension_dir = os.path.join(os.path.dirname(current_app.root_path), 'chrome_extension')
result = {
'extension_dir': extension_dir,
'dir_exists': os.path.exists(extension_dir),
'files': []
}
if os.path.exists(extension_dir):
for root, dirs, files in os.walk(extension_dir):
for file in files:
file_path = os.path.join(root, file)
file_size = os.path.getsize(file_path)
result['files'].append({
'path': file_path,
'relative_path': os.path.relpath(file_path, extension_dir),
'size': file_size
})
return jsonify(result)
@bp.route('/label_templates')
def label_templates():
return render_template('label_templates.html')
@bp.route('/create_template')
def create_template():
return render_template('create_template.html')
@bp.route('/get_database_tables', methods=['GET'])
def get_database_tables():
"""Get list of database tables for template creation"""
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse_manager', 'etichete']:
return jsonify({'error': 'Access denied'}), 403
try:
# Get database connection using the same method as other functions
settings_file = current_app.instance_path + '/external_server.conf'
settings = {}
with open(settings_file, 'r') as f:
for line in f:
key, value = line.strip().split('=', 1)
settings[key] = value
connection = mariadb.connect(
user=settings['username'],
password=settings['password'],
host=settings['server_domain'],
port=int(settings['port']),
database=settings['database_name']
)
cursor = connection.cursor()
# Get all tables in the database
cursor.execute("SHOW TABLES")
all_tables = [table[0] for table in cursor.fetchall()]
# Filter to show relevant tables (prioritize order_for_labels)
relevant_tables = []
if 'order_for_labels' in all_tables:
relevant_tables.append('order_for_labels')
# Add other potentially relevant tables
for table in all_tables:
if table not in relevant_tables and any(keyword in table.lower() for keyword in ['order', 'label', 'product', 'customer']):
relevant_tables.append(table)
connection.close()
return jsonify({
'success': True,
'tables': relevant_tables,
'recommended': 'order_for_labels'
})
except Exception as e:
return jsonify({'error': f'Database error: {str(e)}'}), 500
@bp.route('/get_table_columns/<table_name>', methods=['GET'])
def get_table_columns(table_name):
"""Get column names and descriptions for a specific table"""
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse_manager', 'etichete']:
return jsonify({'error': 'Access denied'}), 403
try:
# Get database connection
settings_file = current_app.instance_path + '/external_server.conf'
settings = {}
with open(settings_file, 'r') as f:
for line in f:
key, value = line.strip().split('=', 1)
settings[key] = value
connection = mariadb.connect(
user=settings['username'],
password=settings['password'],
host=settings['server_domain'],
port=int(settings['port']),
database=settings['database_name']
)
cursor = connection.cursor()
# Verify table exists
cursor.execute("SHOW TABLES LIKE %s", (table_name,))
if not cursor.fetchone():
return jsonify({'error': f'Table {table_name} not found'}), 404
# Get column information
cursor.execute(f"DESCRIBE {table_name}")
columns_info = cursor.fetchall()
columns = []
for col_info in columns_info:
column_name = col_info[0]
column_type = col_info[1]
is_nullable = col_info[2] == 'YES'
column_default = col_info[4]
# Get column comment if available
cursor.execute(f"""
SELECT COLUMN_COMMENT
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = %s AND COLUMN_NAME = %s AND TABLE_SCHEMA = DATABASE()
""", (table_name, column_name))
comment_result = cursor.fetchone()
column_comment = comment_result[0] if comment_result and comment_result[0] else ''
# Create user-friendly description
description = column_comment or column_name.replace('_', ' ').title()
columns.append({
'name': column_name,
'type': column_type,
'nullable': is_nullable,
'default': column_default,
'description': description,
'field_id': f"db_{column_name}" # Unique ID for template fields
})
connection.close()
return jsonify({
'success': True,
'table': table_name,
'columns': columns
})
except Exception as e:
return jsonify({'error': f'Database error: {str(e)}'}), 500
@bp.route('/edit_template/<int:template_id>')
def edit_template(template_id):
# Logic for editing a template will go here
return f"Edit template with ID {template_id}"
@bp.route('/delete_template/<int:template_id>', methods=['POST'])
def delete_template(template_id):
# Logic for deleting a template will go here
return f"Delete template with ID {template_id}"
@bp.route('/get_tables')
def get_tables():
# Replace with logic to fetch tables from your database
tables = ['table1', 'table2', 'table3']
return jsonify({'tables': tables})
@bp.route('/get_columns')
def get_columns():
table = request.args.get('table')
# Replace with logic to fetch columns for the selected table
columns = ['column1', 'column2', 'column3'] if table else []
return jsonify({'columns': columns})
@bp.route('/save_template', methods=['POST'])
def save_template():
data = request.get_json()
# Replace with logic to save the template to the database
print(f"Saving template: {data}")
return jsonify({'message': 'Template saved successfully!'})
@bp.route('/generate_pdf', methods=['POST'])
def generate_pdf():
data = request.get_json()
width = data.get('width', 100) # Default width in mm
height = data.get('height', 50) # Default height in mm
columns = data.get('columns', [])
# Convert dimensions from mm to points (1 mm = 2.83465 points)
width_points = width * 2.83465
height_points = height * 2.83465
# Ensure the /static/label_templates folder exists
label_templates_folder = os.path.join(current_app.root_path, 'static', 'label_templates')
os.makedirs(label_templates_folder, exist_ok=True)
# Define the path for the PDF file
pdf_file_path = os.path.join(label_templates_folder, 'label_template.pdf')
# Create a PDF file
c = canvas.Canvas(pdf_file_path, pagesize=(width_points, height_points))
# Add content to the PDF
c.drawString(10, height_points - 20, "Label Template")
y_position = height_points - 40
for column in columns:
c.drawString(10, y_position, f"Column: {column}")
y_position -= 20
# Save the PDF
c.save()
return jsonify({'message': 'PDF generated successfully!', 'pdf_path': f'/static/label_templates/label_template.pdf'})
# Order Labels Upload Module Routes
@bp.route('/upload_orders', methods=['GET', 'POST'])
def upload_orders():
"""Route for uploading orders CSV files for label generation"""
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse', 'warehouse_manager']:
flash('Access denied: Warehouse management permissions required.')
return redirect(url_for('main.dashboard'))
from app.order_labels import upload_orders_handler
return upload_orders_handler()
@bp.route('/view_orders')
def view_orders():
"""Route for viewing uploaded orders"""
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse', 'warehouse_manager', 'warehouse_worker']:
flash('Access denied: Warehouse access required.')
return redirect(url_for('main.dashboard'))
from app.order_labels import get_orders_from_database
orders = get_orders_from_database(200) # Get last 200 orders
return render_template('view_orders.html', orders=orders)
@bp.route('/get_unprinted_orders', methods=['GET'])
def get_unprinted_orders():
"""Get all rows from order_for_labels where printed != 1"""
print(f"DEBUG: get_unprinted_orders called. Session role: {session.get('role')}")
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse_manager', 'etichete']:
print(f"DEBUG: Access denied for role: {session.get('role')}")
return jsonify({'error': 'Access denied. Required roles: superadmin, warehouse_manager, etichete'}), 403
try:
print("DEBUG: Calling get_unprinted_orders_data()")
data = get_unprinted_orders_data()
print(f"DEBUG: Retrieved {len(data)} orders")
return jsonify(data)
except Exception as e:
print(f"DEBUG: Error in get_unprinted_orders: {e}")
import traceback
traceback.print_exc()
return jsonify({'error': str(e)}), 500
@bp.route('/generate_labels_pdf/<int:order_id>', methods=['POST'])
def generate_labels_pdf(order_id):
"""Generate PDF labels for a specific order"""
print(f"DEBUG: generate_labels_pdf called for order_id: {order_id}")
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse_manager', 'etichete']:
print(f"DEBUG: Access denied for role: {session.get('role')}")
return jsonify({'error': 'Access denied. Required roles: superadmin, warehouse_manager, etichete'}), 403
try:
from .pdf_generator import generate_order_labels_pdf, update_order_printed_status
from .print_module import get_db_connection
from flask import make_response
# Get order data from database
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT id, comanda_productie, cod_articol, descr_com_prod, cantitate,
data_livrare, dimensiune, com_achiz_client, nr_linie_com_client, customer_name,
customer_article_number, open_for_order, line_number,
printed_labels, created_at, updated_at
FROM order_for_labels
WHERE id = %s
""", (order_id,))
row = cursor.fetchone()
conn.close()
if not row:
return jsonify({'error': 'Order not found'}), 404
# Create order data dictionary
order_data = {
'id': row[0],
'comanda_productie': row[1],
'cod_articol': row[2],
'descr_com_prod': row[3],
'cantitate': row[4],
'data_livrare': row[5],
'dimensiune': row[6],
'com_achiz_client': row[7],
'nr_linie_com_client': row[8],
'customer_name': row[9],
'customer_article_number': row[10],
'open_for_order': row[11],
'line_number': row[12],
'printed_labels': row[13] if row[13] is not None else 0,
'created_at': row[14],
'updated_at': row[15]
}
print(f"DEBUG: Generating PDF for order {order_id} with quantity {order_data['cantitate']}")
# Generate PDF
pdf_buffer = generate_order_labels_pdf(order_id, order_data)
# Update printed status in database
update_success = update_order_printed_status(order_id)
if not update_success:
print(f"Warning: Could not update printed status for order {order_id}")
# Create response with PDF
response = make_response(pdf_buffer.getvalue())
response.headers['Content-Type'] = 'application/pdf'
response.headers['Content-Disposition'] = f'inline; filename="labels_order_{order_id}_{order_data["comanda_productie"]}.pdf"'
print(f"DEBUG: PDF generated successfully for order {order_id}")
return response
except Exception as e:
print(f"DEBUG: Error generating PDF for order {order_id}: {e}")
import traceback
traceback.print_exc()
return jsonify({'error': str(e)}), 500
@bp.route('/get_order_data/<int:order_id>', methods=['GET'])
def get_order_data(order_id):
"""Get specific order data for preview"""
print(f"DEBUG: get_order_data called for order_id: {order_id}")
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse_manager', 'etichete']:
return jsonify({'error': 'Access denied'}), 403
try:
from .print_module import get_db_connection
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT id, comanda_productie, cod_articol, descr_com_prod, cantitate,
data_livrare, dimensiune, com_achiz_client, nr_linie_com_client, customer_name,
customer_article_number, open_for_order, line_number,
printed_labels, created_at, updated_at
FROM order_for_labels
WHERE id = %s
""", (order_id,))
row = cursor.fetchone()
conn.close()
if not row:
return jsonify({'error': 'Order not found'}), 404
order_data = {
'id': row[0],
'comanda_productie': row[1],
'cod_articol': row[2],
'descr_com_prod': row[3],
'cantitate': row[4],
'data_livrare': str(row[5]) if row[5] else 'N/A',
'dimensiune': row[6],
'com_achiz_client': row[7],
'nr_linie_com_client': row[8],
'customer_name': row[9],
'customer_article_number': row[10],
'open_for_order': row[11],
'line_number': row[12],
'printed_labels': row[13] if row[13] is not None else 0,
'created_at': str(row[14]) if row[14] else 'N/A',
'updated_at': str(row[15]) if row[15] else 'N/A'
}
return jsonify(order_data)
except Exception as e:
print(f"DEBUG: Error getting order data for {order_id}: {e}")
import traceback
traceback.print_exc()
return jsonify({'error': str(e)}), 500
@warehouse_bp.route('/create_locations', methods=['GET', 'POST'])
def create_locations():
from app.warehouse import create_locations_handler
return create_locations_handler()
@warehouse_bp.route('/import_locations_csv', methods=['GET', 'POST'])
def import_locations_csv():
from app.warehouse import import_locations_csv_handler
return import_locations_csv_handler()