Files
quality_recticel/py_app/app/routes.py
2025-09-26 22:17:37 +03:00

2143 lines
87 KiB
Python

import os
import mariadb
from datetime import datetime, timedelta
from flask import Blueprint, render_template, redirect, url_for, request, flash, session, current_app, jsonify
from .models import User
from . import db
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from flask import Blueprint, render_template, request, redirect, url_for, flash
import csv
from .warehouse import add_location
from app.settings import (
settings_handler,
role_permissions_handler,
save_role_permissions_handler,
reset_role_permissions_handler,
save_all_role_permissions_handler,
reset_all_role_permissions_handler,
edit_user_handler,
create_user_handler,
delete_user_handler,
save_external_db_handler
)
from .print_module import get_unprinted_orders_data
bp = Blueprint('main', __name__)
warehouse_bp = Blueprint('warehouse', __name__)
def format_cell_data(cell):
"""Helper function to format cell data, especially dates and times"""
if isinstance(cell, datetime):
# Format date as dd/mm/yyyy
return cell.strftime('%d/%m/%Y')
elif isinstance(cell, timedelta):
# Convert timedelta to HH:MM:SS format
total_seconds = int(cell.total_seconds())
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
elif hasattr(cell, 'date'): # Handle date objects
# Format date as dd/mm/yyyy
return cell.strftime('%d/%m/%Y')
else:
return cell
@bp.route('/store_articles')
def store_articles():
return render_template('store_articles.html')
@bp.route('/warehouse_reports')
def warehouse_reports():
return render_template('warehouse_reports.html')
def get_db_connection():
"""Reads the external_server.conf file and returns a MariaDB database connection."""
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
if not os.path.exists(settings_file):
raise FileNotFoundError("The external_server.conf file is missing in the instance folder.")
# Read settings from the configuration file
settings = {}
with open(settings_file, 'r') as f:
for line in f:
key, value = line.strip().split('=', 1)
settings[key] = value
# Create a database connection
return mariadb.connect(
user=settings['username'],
password=settings['password'],
host=settings['server_domain'],
port=int(settings['port']),
database=settings['database_name']
)
@bp.route('/login', methods=['GET', 'POST'])
def login():
import sqlite3
if request.method == 'POST':
# Debug: print all form data received
print("All form data received:", dict(request.form))
# Safely get username and password with fallback
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
if not username or not password:
print("Missing username or password")
flash('Please enter both username and password.')
return render_template('login.html')
user = None
print("Raw form input:", repr(username), repr(password))
# Logic: If username starts with #, check internal SQLite database
if username.startswith('#'):
username_clean = username[1:].strip()
password_clean = password.strip()
print(f"Checking internal database for: {username_clean}")
# Check internal SQLite database (py_app/instance/users.db)
internal_db_path = os.path.join(os.path.dirname(__file__), '../instance/users.db')
try:
conn = sqlite3.connect(internal_db_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'")
if cursor.fetchone():
cursor.execute("SELECT username, password, role FROM users WHERE username=? AND password=?", (username_clean, password_clean))
row = cursor.fetchone()
print("Internal DB query result:", row)
if row:
user = {'username': row[0], 'password': row[1], 'role': row[2]}
else:
print("No users table in internal database")
conn.close()
except Exception as e:
print("Internal DB error:", e)
else:
# Check external MariaDB database first
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SHOW TABLES LIKE 'users'")
if cursor.fetchone():
cursor.execute("SELECT username, password, role FROM users WHERE username=%s AND password=%s", (username.strip(), password.strip()))
row = cursor.fetchone()
print("External DB query result:", row)
if row:
user = {'username': row[0], 'password': row[1], 'role': row[2]}
conn.close()
except Exception as e:
print("External DB error:", e)
# Fallback to internal database if external fails
print("Falling back to internal database")
internal_db_path = os.path.join(os.path.dirname(__file__), '../instance/users.db')
try:
conn = sqlite3.connect(internal_db_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'")
if cursor.fetchone():
cursor.execute("SELECT username, password, role FROM users WHERE username=? AND password=?", (username.strip(), password.strip()))
row = cursor.fetchone()
print("Internal DB fallback query result:", row)
if row:
user = {'username': row[0], 'password': row[1], 'role': row[2]}
conn.close()
except Exception as e2:
print("Internal DB fallback error:", e2)
if user:
session['user'] = user['username']
session['role'] = user['role']
print("Logged in as:", session.get('user'), session.get('role'))
return redirect(url_for('main.dashboard'))
else:
print("Login failed for:", username, password)
flash('Invalid credentials. Please try again.')
return render_template('login.html')
@bp.route('/dashboard')
def dashboard():
print("Session user:", session.get('user'), session.get('role'))
if 'user' not in session:
return redirect(url_for('main.login'))
return render_template('dashboard.html')
@bp.route('/settings')
def settings():
return settings_handler()
@bp.route('/quality')
def quality():
if 'role' not in session or session['role'] not in ['superadmin', 'quality']:
flash('Access denied: Quality users only.')
return redirect(url_for('main.dashboard'))
return render_template('quality.html')
@bp.route('/warehouse')
def warehouse():
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse']:
flash('Access denied: Warehouse users only.')
return redirect(url_for('main.dashboard'))
return render_template('main_page_warehouse.html')
@bp.route('/scan', methods=['GET', 'POST'])
def scan():
if 'role' not in session or session['role'] not in ['superadmin', 'scan']:
flash('Access denied: Scan users only.')
return redirect(url_for('main.dashboard'))
if request.method == 'POST':
# Handle form submission
operator_code = request.form.get('operator_code')
cp_code = request.form.get('cp_code')
oc1_code = request.form.get('oc1_code')
oc2_code = request.form.get('oc2_code')
defect_code = request.form.get('defect_code')
date = request.form.get('date')
time = request.form.get('time')
try:
# Connect to the database
conn = get_db_connection()
cursor = conn.cursor()
# Check if the CP_full_code already exists
cursor.execute("SELECT Id FROM scan1_orders WHERE CP_full_code = ?", (cp_code,))
existing_entry = cursor.fetchone()
if existing_entry:
# Update the existing entry
update_query = """
UPDATE scan1_orders
SET operator_code = ?, OC1_code = ?, OC2_code = ?, quality_code = ?, date = ?, time = ?
WHERE CP_full_code = ?
"""
cursor.execute(update_query, (operator_code, oc1_code, oc2_code, defect_code, date, time, cp_code))
flash('Existing entry updated successfully.')
else:
# Insert a new entry
insert_query = """
INSERT INTO scan1_orders (operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time)
VALUES (?, ?, ?, ?, ?, ?, ?)
"""
cursor.execute(insert_query, (operator_code, cp_code, oc1_code, oc2_code, defect_code, date, time))
flash('New entry inserted successfully.')
# Commit the transaction
conn.commit()
conn.close()
except mariadb.Error as e:
print(f"Error saving scan data: {e}")
flash(f"Error saving scan data: {e}")
# Fetch the latest scan data for display
scan_data = []
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
ORDER BY Id DESC
LIMIT 15
""")
scan_data = cursor.fetchall()
conn.close()
except mariadb.Error as e:
print(f"Error fetching scan data: {e}")
flash(f"Error fetching scan data: {e}")
return render_template('scan.html', scan_data=scan_data)
@bp.route('/logout')
def logout():
session.pop('user', None)
session.pop('role', None)
return redirect(url_for('main.login'))
@bp.route('/create_user', methods=['POST'])
def create_user():
return create_user_handler()
@bp.route('/edit_user', methods=['POST'])
def edit_user():
return edit_user_handler()
@bp.route('/delete_user', methods=['POST'])
def delete_user():
return delete_user_handler()
@bp.route('/save_external_db', methods=['POST'])
def save_external_db():
return save_external_db_handler()
# Role Permissions Management Routes
@bp.route('/role_permissions')
def role_permissions():
return role_permissions_handler()
@bp.route('/test_permissions')
def test_permissions():
from app.settings import role_permissions_handler
from flask import render_template, session, redirect, url_for, flash
from app.permissions import APP_PERMISSIONS, ACTIONS
# Check if superadmin
if not session.get('role') == 'superadmin':
flash('Access denied: Superadmin only.')
return redirect(url_for('main.dashboard'))
try:
# Get the same data as role_permissions_handler
from app.settings import get_external_db_connection
conn = get_external_db_connection()
cursor = conn.cursor()
# Get roles from role_hierarchy table
cursor.execute("SELECT role_name, display_name, description, level FROM role_hierarchy ORDER BY level DESC")
role_data = cursor.fetchall()
roles = {}
for role_name, display_name, description, level in role_data:
roles[role_name] = {
'display_name': display_name,
'description': description,
'level': level
}
conn.close()
return render_template('test_permissions.html',
roles=roles,
pages=APP_PERMISSIONS,
action_names=ACTIONS)
except Exception as e:
return f"Error: {e}"
@bp.route('/role_permissions_simple')
def role_permissions_simple():
# Use the same handler but different template
from app.settings import get_external_db_connection
from flask import render_template, session, redirect, url_for, flash
from app.permissions import APP_PERMISSIONS, ACTIONS
import json
# Check if superadmin
if not session.get('role') == 'superadmin':
flash('Access denied: Superadmin only.')
return redirect(url_for('main.dashboard'))
try:
# Get roles and their current permissions
conn = get_external_db_connection()
cursor = conn.cursor()
# Get roles from role_hierarchy table
cursor.execute("SELECT role_name, display_name, description, level FROM role_hierarchy ORDER BY level DESC")
role_data = cursor.fetchall()
roles = {}
for role_name, display_name, description, level in role_data:
roles[role_name] = {
'display_name': display_name,
'description': description,
'level': level
}
# Get current role permissions
cursor.execute("""
SELECT role, permission_key
FROM role_permissions
WHERE granted = TRUE
""")
permission_data = cursor.fetchall()
role_permissions = {}
for role, permission_key in permission_data:
if role not in role_permissions:
role_permissions[role] = []
role_permissions[role].append(permission_key)
conn.close()
# Convert to JSON for JavaScript
permissions_json = json.dumps(APP_PERMISSIONS)
role_permissions_json = json.dumps(role_permissions)
return render_template('role_permissions_simple.html',
roles=roles,
pages=APP_PERMISSIONS,
action_names=ACTIONS,
permissions_json=permissions_json,
role_permissions_json=role_permissions_json)
except Exception as e:
flash(f'Error loading role permissions: {e}')
return redirect(url_for('main.dashboard'))
@bp.route('/settings/save_role_permissions', methods=['POST'])
def save_role_permissions():
return save_role_permissions_handler()
@bp.route('/settings/reset_role_permissions', methods=['POST'])
def reset_role_permissions():
return reset_role_permissions_handler()
@bp.route('/settings/save_all_role_permissions', methods=['POST'])
def save_all_role_permissions():
return save_all_role_permissions_handler()
@bp.route('/settings/reset_all_role_permissions', methods=['POST'])
def reset_all_role_permissions():
return reset_all_role_permissions_handler()
@bp.route('/get_report_data', methods=['GET'])
def get_report_data():
report = request.args.get('report')
data = {"headers": [], "rows": []}
try:
conn = get_db_connection()
cursor = conn.cursor()
if report == "1": # Logic for the 1-day report (today's records)
today = datetime.now().strftime('%Y-%m-%d')
print(f"DEBUG: Daily report searching for records on date: {today}")
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date = ?
ORDER BY date DESC, time DESC
""", (today,))
rows = cursor.fetchall()
print(f"DEBUG: Daily report found {len(rows)} rows for today ({today}):", rows)
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
elif report == "2": # Logic for the 5-day report (last 5 days including today)
five_days_ago = datetime.now() - timedelta(days=4) # Last 4 days + today = 5 days
start_date = five_days_ago.strftime('%Y-%m-%d')
print(f"DEBUG: 5-day report searching for records from {start_date} onwards")
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date >= ?
ORDER BY date DESC, time DESC
""", (start_date,))
rows = cursor.fetchall()
print(f"DEBUG: 5-day report found {len(rows)} rows from {start_date} onwards:", rows)
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
elif report == "3": # Logic for the report with non-zero quality_code (today only)
today = datetime.now().strftime('%Y-%m-%d')
print(f"DEBUG: Quality defects report (today) searching for records on {today} with quality issues")
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date = ? AND quality_code != 0
ORDER BY date DESC, time DESC
""", (today,))
rows = cursor.fetchall()
print(f"DEBUG: Quality defects report (today) found {len(rows)} rows with quality issues for {today}:", rows)
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
elif report == "4": # Logic for the report with non-zero quality_code (last 5 days)
five_days_ago = datetime.now() - timedelta(days=4) # Last 4 days + today = 5 days
start_date = five_days_ago.strftime('%Y-%m-%d')
print(f"DEBUG: Quality defects report (5 days) searching for records from {start_date} onwards with quality issues")
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date >= ? AND quality_code != 0
ORDER BY date DESC, time DESC
""", (start_date,))
rows = cursor.fetchall()
print(f"DEBUG: Quality defects report (5 days) found {len(rows)} rows with quality issues from {start_date} onwards:", rows)
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
elif report == "5": # Logic for the 5-ft report (all rows)
# First check if table exists and has any data
try:
cursor.execute("SELECT COUNT(*) FROM scan1_orders")
total_count = cursor.fetchone()[0]
print(f"DEBUG: Total records in scan1_orders table: {total_count}")
if total_count == 0:
print("DEBUG: No data found in scan1_orders table")
data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity of order", "Rejected Quantity of order"]
data["rows"] = []
data["message"] = "No scan data available in the database. Please ensure scanning operations have been performed and data has been recorded."
else:
cursor.execute("""
SELECT Id, operator_code, CP_base_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
ORDER BY date DESC, time DESC
""")
rows = cursor.fetchall()
print(f"DEBUG: Fetched {len(rows)} rows for report 5 (all rows)")
data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity of order", "Rejected Quantity of order"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
except mariadb.Error as table_error:
print(f"DEBUG: Table access error: {table_error}")
data["error"] = f"Database table error: {table_error}"
conn.close()
except mariadb.Error as e:
print(f"Error fetching report data: {e}")
data["error"] = "Error fetching report data."
print("Data being returned:", data)
return jsonify(data)
@bp.route('/generate_report', methods=['GET'])
def generate_report():
"""Generate report for specific date (calendar-based report)"""
from datetime import datetime, timedelta
report = request.args.get('report')
selected_date = request.args.get('date')
data = {"headers": [], "rows": []}
try:
conn = get_db_connection()
cursor = conn.cursor()
if report == "6" and selected_date: # Custom date report
print(f"DEBUG: Searching for date: {selected_date}")
# First, let's check what dates exist in the database
cursor.execute("SELECT DISTINCT date FROM scan1_orders ORDER BY date DESC LIMIT 10")
existing_dates = cursor.fetchall()
print(f"DEBUG: Available dates in database: {existing_dates}")
# Try exact match first
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date = ?
ORDER BY time DESC
""", (selected_date,))
rows = cursor.fetchall()
print(f"DEBUG: Exact match found {len(rows)} rows")
# If no exact match, try with DATE() function to handle different formats
if len(rows) == 0:
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE DATE(date) = ?
ORDER BY time DESC
""", (selected_date,))
rows = cursor.fetchall()
print(f"DEBUG: DATE() function match found {len(rows)} rows")
# If still no match, try LIKE pattern
if len(rows) == 0:
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date LIKE ?
ORDER BY time DESC
""", (f"{selected_date}%",))
rows = cursor.fetchall()
print(f"DEBUG: LIKE pattern match found {len(rows)} rows")
print(f"DEBUG: Final result - {len(rows)} rows for date {selected_date}")
if len(rows) > 0:
print(f"DEBUG: Sample row: {rows[0]}")
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
# Add helpful message if no data found
if len(rows) == 0:
data["message"] = f"No scan data found for {selected_date}. Please select a date when scanning operations were performed."
elif report == "7": # Date Range Report
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
if start_date and end_date:
print(f"DEBUG: Date range report - Start: {start_date}, End: {end_date}")
# Validate date format and order
try:
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_dt = datetime.strptime(end_date, '%Y-%m-%d')
if start_dt > end_dt:
data["error"] = "Start date cannot be after end date."
conn.close()
return jsonify(data)
except ValueError:
data["error"] = "Invalid date format. Please use YYYY-MM-DD format."
conn.close()
return jsonify(data)
# First, check what dates exist in the database for the range
cursor.execute("""
SELECT DISTINCT date FROM scan1_orders
WHERE date >= ? AND date <= ?
ORDER BY date DESC
""", (start_date, end_date))
existing_dates = cursor.fetchall()
print(f"DEBUG: Available dates in range: {existing_dates}")
# Query for all records in the date range
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date >= ? AND date <= ?
ORDER BY date DESC, time DESC
""", (start_date, end_date))
rows = cursor.fetchall()
print(f"DEBUG: Date range query found {len(rows)} rows from {start_date} to {end_date}")
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
# Add helpful message if no data found
if len(rows) == 0:
data["message"] = f"No scan data found between {start_date} and {end_date}. Please select dates when scanning operations were performed."
else:
# Add summary information
total_approved = sum(row[8] for row in rows if row[8] is not None)
total_rejected = sum(row[9] for row in rows if row[9] is not None)
data["summary"] = {
"total_records": len(rows),
"date_range": f"{start_date} to {end_date}",
"total_approved": total_approved,
"total_rejected": total_rejected,
"dates_with_data": len(existing_dates)
}
else:
data["error"] = "Both start date and end date are required for date range report."
elif report == "8" and selected_date: # Custom date quality defects report
print(f"DEBUG: Quality defects report for specific date: {selected_date}")
# First, let's check what dates exist in the database
cursor.execute("SELECT DISTINCT date FROM scan1_orders ORDER BY date DESC LIMIT 10")
existing_dates = cursor.fetchall()
print(f"DEBUG: Available dates in database: {existing_dates}")
# Try exact match first for defects (quality_code != 0)
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date = ? AND quality_code != 0
ORDER BY quality_code DESC, time DESC
""", (selected_date,))
rows = cursor.fetchall()
print(f"DEBUG: Quality defects exact match found {len(rows)} rows for {selected_date}")
# If no exact match, try with DATE() function to handle different formats
if len(rows) == 0:
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE DATE(date) = ? AND quality_code != 0
ORDER BY quality_code DESC, time DESC
""", (selected_date,))
rows = cursor.fetchall()
print(f"DEBUG: Quality defects DATE() function match found {len(rows)} rows")
# If still no match, try LIKE pattern
if len(rows) == 0:
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date LIKE ? AND quality_code != 0
ORDER BY quality_code DESC, time DESC
""", (f"{selected_date}%",))
rows = cursor.fetchall()
print(f"DEBUG: Quality defects LIKE pattern match found {len(rows)} rows")
print(f"DEBUG: Final quality defects result - {len(rows)} rows for date {selected_date}")
if len(rows) > 0:
print(f"DEBUG: Sample defective item: {rows[0]}")
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
# Add helpful message if no data found
if len(rows) == 0:
data["message"] = f"No quality defects found for {selected_date}. This could mean no scanning was performed or all items passed quality control."
else:
# Add summary for quality defects
total_defective_items = len(rows)
total_rejected_qty = sum(row[9] for row in rows if row[9] is not None)
unique_quality_codes = len(set(row[5] for row in rows if row[5] != 0))
data["defects_summary"] = {
"total_defective_items": total_defective_items,
"total_rejected_quantity": total_rejected_qty,
"unique_defect_types": unique_quality_codes,
"date": selected_date
}
elif report == "9": # Date Range Quality Defects Report
print(f"DEBUG: Processing Date Range Quality Defects Report")
# Get date range from request parameters
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
print(f"DEBUG: Date range quality defects requested - Start: {start_date}, End: {end_date}")
if not start_date or not end_date:
data["error"] = "Both start date and end date are required for date range quality defects report."
conn.close()
return jsonify(data)
try:
# Validate date format
from datetime import datetime
datetime.strptime(start_date, '%Y-%m-%d')
datetime.strptime(end_date, '%Y-%m-%d')
# Check what dates are available in the database within the range
cursor.execute("""
SELECT DISTINCT date
FROM scan1_orders
WHERE date >= ? AND date <= ? AND quality_code != 0
ORDER BY date DESC
""", (start_date, end_date))
existing_dates = cursor.fetchall()
print(f"DEBUG: Available dates with quality defects in range: {existing_dates}")
# Query for quality defects in the date range
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date >= ? AND date <= ? AND quality_code != 0
ORDER BY date DESC, quality_code DESC, time DESC
""", (start_date, end_date))
rows = cursor.fetchall()
print(f"DEBUG: Date range quality defects query found {len(rows)} rows from {start_date} to {end_date}")
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
# Add helpful message if no data found
if len(rows) == 0:
data["message"] = f"No quality defects found between {start_date} and {end_date}. This could mean no scanning was performed in this date range or all items passed quality control."
else:
# Add summary for quality defects in date range
total_defective_items = len(rows)
total_rejected_qty = sum(row[9] for row in rows if row[9] is not None)
unique_quality_codes = len(set(row[5] for row in rows if row[5] != 0))
unique_dates = len(set(row[6] for row in rows))
data["defects_summary"] = {
"total_defective_items": total_defective_items,
"total_rejected_quantity": total_rejected_qty,
"unique_defect_types": unique_quality_codes,
"date_range": f"{start_date} to {end_date}",
"days_with_defects": unique_dates
}
except ValueError:
data["error"] = "Invalid date format. Please use YYYY-MM-DD format."
except Exception as e:
print(f"DEBUG: Error in date range quality defects report: {e}")
data["error"] = f"Error processing date range quality defects report: {e}"
conn.close()
except mariadb.Error as e:
print(f"Error fetching custom date report: {e}")
data["error"] = f"Error fetching report data for {selected_date if report == '6' or report == '8' else 'date range'}."
print("Custom date report data being returned:", data)
return jsonify(data)
@bp.route('/debug_dates', methods=['GET'])
def debug_dates():
"""Debug route to check available dates in database"""
try:
conn = get_db_connection()
cursor = conn.cursor()
# Get all distinct dates
cursor.execute("SELECT DISTINCT date FROM scan1_orders ORDER BY date DESC")
dates = cursor.fetchall()
# Get total count
cursor.execute("SELECT COUNT(*) FROM scan1_orders")
total_count = cursor.fetchone()[0]
# Get sample data
cursor.execute("SELECT date, time FROM scan1_orders ORDER BY date DESC LIMIT 5")
sample_data = cursor.fetchall()
conn.close()
return jsonify({
"total_records": total_count,
"available_dates": [str(date[0]) for date in dates],
"sample_data": [{"date": str(row[0]), "time": str(row[1])} for row in sample_data]
})
except Exception as e:
return jsonify({"error": str(e)})
@bp.route('/test_database', methods=['GET'])
def test_database():
"""Test database connection and query the scan1_orders table"""
# Check if user has superadmin permissions
if 'role' not in session or session['role'] != 'superadmin':
return jsonify({
"success": False,
"error": "Access denied: Superadmin permissions required for database testing."
}), 403
try:
print("DEBUG: Testing database connection...")
conn = get_db_connection()
cursor = conn.cursor()
print("DEBUG: Database connection successful!")
# Test 1: Check if table exists
try:
cursor.execute("SHOW TABLES LIKE 'scan1_orders'")
table_exists = cursor.fetchone()
print(f"DEBUG: Table scan1_orders exists: {table_exists is not None}")
if not table_exists:
conn.close()
return jsonify({
"success": False,
"message": "Table 'scan1_orders' does not exist in the database"
})
except Exception as e:
print(f"DEBUG: Error checking table existence: {e}")
conn.close()
return jsonify({
"success": False,
"message": f"Error checking table existence: {e}"
})
# Test 2: Get table structure
try:
cursor.execute("DESCRIBE scan1_orders")
table_structure = cursor.fetchall()
print(f"DEBUG: Table structure: {table_structure}")
except Exception as e:
print(f"DEBUG: Error getting table structure: {e}")
table_structure = []
# Test 3: Count total records
try:
cursor.execute("SELECT COUNT(*) FROM scan1_orders")
total_count = cursor.fetchone()[0]
print(f"DEBUG: Total records in table: {total_count}")
except Exception as e:
print(f"DEBUG: Error counting records: {e}")
total_count = -1
# Test 4: Get sample data (if any exists)
sample_data = []
try:
cursor.execute("SELECT * FROM scan1_orders LIMIT 5")
raw_data = cursor.fetchall()
print(f"DEBUG: Sample data (first 5 rows): {raw_data}")
# Convert data to JSON-serializable format using consistent formatting
sample_data = []
for row in raw_data:
converted_row = [format_cell_data(item) for item in row]
sample_data.append(converted_row)
except Exception as e:
print(f"DEBUG: Error getting sample data: {e}")
# Test 5: Get distinct dates (if any exist)
available_dates = []
try:
cursor.execute("SELECT DISTINCT date FROM scan1_orders ORDER BY date DESC LIMIT 10")
date_rows = cursor.fetchall()
available_dates = [str(row[0]) for row in date_rows]
print(f"DEBUG: Available dates: {available_dates}")
except Exception as e:
print(f"DEBUG: Error getting dates: {e}")
conn.close()
# Test 6: Add a current date sample record for testing daily reports
try:
from datetime import datetime
current_date = datetime.now().strftime('%Y-%m-%d')
current_time = datetime.now().strftime('%H:%M:%S')
# Check if we already have a record for today
cursor.execute("SELECT COUNT(*) FROM scan1_orders WHERE date = ?", (current_date,))
today_count = cursor.fetchone()[0]
if today_count == 0:
print(f"DEBUG: No records found for today ({current_date}), adding sample record...")
cursor.execute("""
INSERT INTO scan1_orders (operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", ('OP01', 'CP99999999-0001', 'OC01', 'OC02', 0, current_date, current_time, 1, 0))
conn.commit()
print(f"DEBUG: Added sample record for today ({current_date})")
message_addendum = " Added sample record for today to test daily reports."
else:
print(f"DEBUG: Found {today_count} records for today ({current_date})")
message_addendum = f" Found {today_count} records for today."
except Exception as e:
print(f"DEBUG: Error adding sample record: {e}")
message_addendum = " Could not add sample record for testing."
return jsonify({
"success": True,
"database_connection": "OK",
"table_exists": table_exists is not None,
"table_structure": [{"field": row[0], "type": row[1], "null": row[2]} for row in table_structure],
"total_records": total_count,
"sample_data": sample_data,
"available_dates": available_dates,
"message": f"Database test completed. Found {total_count} records in scan1_orders table.{message_addendum}"
})
except Exception as e:
print(f"DEBUG: Database test failed: {e}")
return jsonify({
"success": False,
"message": f"Database connection failed: {e}"
})
@bp.route('/etichete')
def etichete():
if 'role' not in session or session['role'] not in ['superadmin', 'etichete']:
flash('Access denied: Etichete users only.')
return redirect(url_for('main.dashboard'))
return render_template('main_page_etichete.html')
@bp.route('/upload_data')
def upload_data():
return render_template('upload_data.html')
@bp.route('/print_module')
def print_module():
return render_template('print_module.html')
@bp.route('/download_extension')
def download_extension():
"""Route for downloading the Chrome extension"""
return render_template('download_extension.html')
@bp.route('/extension_files/<path:filename>')
def extension_files(filename):
"""Serve Chrome extension files for download"""
import os
from flask import send_from_directory, current_app
extension_dir = os.path.join(os.path.dirname(current_app.root_path), 'chrome_extension')
return send_from_directory(extension_dir, filename)
@bp.route('/create_extension_package', methods=['POST'])
def create_extension_package():
"""Create and serve ZIP package of Chrome extension"""
import os
import zipfile
from flask import current_app, jsonify, send_file
import tempfile
try:
# Use correct path to chrome_extension directory (it's in py_app, not py_app/app)
extension_dir = os.path.join(os.path.dirname(current_app.root_path), 'chrome_extension')
print(f"Looking for extension files in: {extension_dir}")
if not os.path.exists(extension_dir):
return jsonify({
'success': False,
'error': f'Extension directory not found: {extension_dir}'
}), 500
# List files in extension directory for debugging
all_files = []
for root, dirs, files in os.walk(extension_dir):
for file in files:
file_path = os.path.join(root, file)
all_files.append(file_path)
print(f"Found files: {all_files}")
# Create static directory if it doesn't exist
static_dir = os.path.join(current_app.root_path, 'static')
os.makedirs(static_dir, exist_ok=True)
zip_filename = 'quality_recticel_print_helper.zip'
zip_path = os.path.join(static_dir, zip_filename)
# Create ZIP file directly in static directory
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
files_added = 0
# Add all extension files to ZIP
for root, dirs, files in os.walk(extension_dir):
for file in files:
# Include all relevant files
if file.endswith(('.json', '.js', '.html', '.css', '.png', '.md', '.txt')):
file_path = os.path.join(root, file)
# Create relative path for archive
arcname = os.path.relpath(file_path, extension_dir)
print(f"Adding file: {file_path} as {arcname}")
zipf.write(file_path, arcname)
files_added += 1
# Add a README file with installation instructions
readme_content = """# Quality Label Printing Helper Chrome Extension
## Installation Instructions:
1. Extract this ZIP file to a folder on your computer
2. Open Chrome and go to: chrome://extensions/
3. Enable "Developer mode" in the top right
4. Click "Load unpacked" and select the extracted folder
5. The extension icon 🖨️ should appear in your toolbar
## Usage:
1. Go to the Print Module in the Quality Label Printing application
2. Select an order from the table
3. Click the "🖨️ Print Direct" button
4. The label will print automatically to your default printer
## Troubleshooting:
- Make sure your default printer is set up correctly
- Click the extension icon to test printer connection
- Check Chrome printer settings: chrome://settings/printing
For support, contact your system administrator.
"""
zipf.writestr('README.txt', readme_content)
files_added += 1
print(f"Total files added to ZIP: {files_added}")
# Verify ZIP was created and has content
if os.path.exists(zip_path):
zip_size = os.path.getsize(zip_path)
print(f"ZIP file created: {zip_path}, size: {zip_size} bytes")
if zip_size > 0:
return jsonify({
'success': True,
'download_url': f'/static/{zip_filename}',
'files_included': files_added,
'zip_size': zip_size
})
else:
return jsonify({
'success': False,
'error': 'ZIP file was created but is empty'
}), 500
else:
return jsonify({
'success': False,
'error': 'Failed to create ZIP file'
}), 500
except Exception as e:
print(f"Error creating extension package: {e}")
import traceback
traceback.print_exc()
return jsonify({
'success': False,
'error': str(e)
}), 500
@bp.route('/create_service_package', methods=['POST'])
def create_service_package():
"""Create and serve the Enhanced Windows Print Service package with Error 1053 fixes"""
import os
import zipfile
from flask import current_app, jsonify, send_file
try:
# Path to the windows_print_service directory
service_dir = os.path.join(os.path.dirname(os.path.dirname(current_app.root_path)), 'windows_print_service')
print(f"Looking for service files in: {service_dir}")
# Check if the enhanced package already exists
enhanced_package_path = os.path.join(service_dir, 'QualityPrintService_COMPLETE_ZERO_DEPENDENCIES.zip')
if os.path.exists(enhanced_package_path):
# Serve the pre-built enhanced package with Error 1053 fixes
print(f"Serving pre-built enhanced package: {enhanced_package_path}")
# Copy to static directory for download
static_dir = os.path.join(current_app.root_path, 'static')
os.makedirs(static_dir, exist_ok=True)
zip_filename = 'quality_print_service_enhanced_with_error_1053_fixes.zip'
static_zip_path = os.path.join(static_dir, zip_filename)
# Copy the enhanced package to static directory
import shutil
shutil.copy2(enhanced_package_path, static_zip_path)
zip_size = os.path.getsize(static_zip_path)
return jsonify({
'success': True,
'download_url': f'/static/{zip_filename}',
'package_type': 'Enhanced with Error 1053 fixes',
'features': [
'Embedded Python 3.11.9 (zero dependencies)',
'Multiple installation methods with automatic fallback',
'Windows Service Error 1053 comprehensive fixes',
'Enhanced service wrapper with SCM communication',
'Task Scheduler and Startup Script fallbacks',
'Diagnostic and troubleshooting tools',
'Complete Chrome extension integration'
],
'zip_size': zip_size,
'installation_methods': 4
})
# Fallback: create basic package if enhanced one not available
if not os.path.exists(service_dir):
return jsonify({
'success': False,
'error': f'Windows service directory not found: {service_dir}'
}), 500
# Create static directory if it doesn't exist
static_dir = os.path.join(current_app.root_path, 'static')
os.makedirs(static_dir, exist_ok=True)
zip_filename = 'quality_print_service_complete_package.zip'
zip_path = os.path.join(static_dir, zip_filename)
# Create ZIP file with Complete Windows service package (all dependencies included)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
files_added = 0
# Add all service files to ZIP (complete self-contained version)
for root, dirs, files in os.walk(service_dir):
for file in files:
# Include all files for complete package
if file.endswith(('.py', '.bat', '.md', '.txt', '.json', '.js', '.html', '.css', '.png')):
file_path = os.path.join(root, file)
# Create relative path for archive
arcname = os.path.relpath(file_path, service_dir)
print(f"Adding service file: {file_path} as {arcname}")
zipf.write(file_path, arcname)
files_added += 1
# Add main installation instructions for complete self-contained solution
installation_readme = """# Quality Label Printing Service - Complete Self-Contained Package
## 🎯 ZERO DEPENDENCIES INSTALLATION - WORKS ON ANY WINDOWS SYSTEM!
### What's Included:
✅ Complete Python-based print service (uses only standard library)
✅ Windows Service installer with automatic recovery
✅ Chrome extension for web integration
✅ Multiple printing method fallbacks
✅ Comprehensive logging and error handling
✅ No external dependencies required (works with any Python 3.7+)
### Prerequisites:
- Windows 10/11 or Windows Server 2016+
- Administrator privileges
- Google Chrome browser
- Python 3.7+ (system or portable - installer detects automatically)
### 🚀 Quick Installation (5 Minutes):
#### Step 1: Extract Package
Extract this ZIP to any temporary location (Desktop, Downloads, etc.)
No permanent installation directory needed - files are copied automatically
#### Step 2: Install Windows Service
Right-click `install_service_complete.bat` and select "Run as administrator"
The installer will:
- ✅ Check for Python (system or use included portable version)
- ✅ Create service directory: C:\\QualityPrintService\\
- ✅ Install Windows service with auto-restart
- ✅ Configure logging: %USERPROFILE%\\PrintService\\logs\\
- ✅ Start service on port 8765
#### Step 3: Install Chrome Extension
- Open Chrome → chrome://extensions/
- Enable "Developer mode" (top right toggle)
- Click "Load unpacked" → Select the 'chrome_extension' folder
#### Step 4: Verify Installation
Visit: http://localhost:8765/health
Expected: {"status": "healthy", "service": "Windows Print Service"}
### 🔧 Files Included:
#### Core Service:
- `print_service_complete.py` - Complete service (zero external dependencies)
- `install_service_complete.bat` - Full installer (detects Python automatically)
- `uninstall_service_complete.bat` - Complete removal script
- `requirements_complete.txt` - Dependencies list (all standard library)
#### Chrome Integration:
- `chrome_extension/` - Complete Chrome extension
- `chrome_extension/manifest.json` - Extension configuration
- `chrome_extension/background.js` - Service communication
#### Documentation:
- `README_COMPLETE.md` - Comprehensive documentation
- `INSTALLATION_COMPLETE.md` - Detailed installation guide
- `PORTABLE_PYTHON_INSTRUCTIONS.txt` - Python distribution options
#### Build Tools:
- `build_package.py` - Package builder
- `build_executable.bat` - Creates standalone .exe (optional)
### 🛠️ Technical Features:
#### Printing Methods (Automatic Fallback):
1. Adobe Reader command line
2. SumatraPDF automation
3. PowerShell printing
4. Microsoft Edge integration
5. Windows system default
#### Service Architecture:
```
Quality Web App → Chrome Extension → Windows Service → Physical Printer
(localhost only) (port 8765) (any printer)
```
#### Service Endpoints:
- `GET /health` - Service health check
- `GET /printers` - List available printers
- `GET /status` - Service statistics
- `POST /print_pdf` - Print PDF (page-by-page supported)
### 🔒 Security & Performance:
- Runs on localhost only (127.0.0.1:8765)
- Memory usage: ~15-30 MB
- CPU usage: <1% (idle)
- Automatic temp file cleanup
- Secure PDF handling
### 🚨 Troubleshooting:
#### Service Won't Start:
```cmd
# Check service status
sc query QualityPrintService
# Check port availability
netstat -an | find "8765"
# View service logs
type "%USERPROFILE%\\PrintService\\logs\\print_service_*.log"
```
#### Python Issues:
The installer automatically detects Python or uses portable version.
If you see Python errors, ensure Python 3.7+ is installed.
#### Chrome Extension Issues:
1. Reload extension in chrome://extensions/
2. Check "Developer mode" is enabled
3. Verify service responds at http://localhost:8765/health
### 📞 Support:
1. Check README_COMPLETE.md for detailed documentation
2. Review INSTALLATION_COMPLETE.md for step-by-step guide
3. Check service logs for specific error messages
### 🎯 Why This Package is Better:
✅ Zero external dependencies (pure Python standard library)
✅ Works on any Windows system with Python
✅ Automatic service recovery and restart
✅ Multiple printing method fallbacks
✅ Complete documentation and support
✅ Chrome extension included
✅ Professional logging and error handling
Installation Time: ~5 minutes
Maintenance Required: Zero (auto-starts with Windows)
Ready to use immediately after installation!"""
zipf.writestr('INSTALLATION_README.txt', installation_readme)
files_added += 1
print(f"Total service files added to ZIP: {files_added}")
# Verify ZIP was created
if os.path.exists(zip_path):
zip_size = os.path.getsize(zip_path)
print(f"Complete service ZIP file created: {zip_path}, size: {zip_size} bytes")
if zip_size > 0:
return jsonify({
'success': True,
'download_url': f'/static/{zip_filename}',
'files_included': files_added,
'zip_size': zip_size,
'package_type': 'Complete Self-Contained Package',
'dependencies': 'All included (Python standard library only)'
})
else:
return jsonify({
'success': False,
'error': 'ZIP file was created but is empty'
}), 500
else:
return jsonify({
'success': False,
'error': 'Failed to create service ZIP file'
}), 500
except Exception as e:
print(f"Error creating complete service package: {e}")
import traceback
traceback.print_exc()
return jsonify({
'success': False,
'error': str(e)
}), 500
@bp.route('/create_zero_dependency_service_package', methods=['POST'])
def create_zero_dependency_service_package():
"""Create and serve ZIP package with embedded Python - ZERO external dependencies"""
import os
import tempfile
import zipfile
import urllib.request
import subprocess
from flask import current_app, jsonify
try:
print("🚀 Creating Zero-Dependency Service Package...")
# Path to the windows_print_service directory
service_dir = os.path.join(os.path.dirname(os.path.dirname(current_app.root_path)), 'windows_print_service')
print(f"Looking for service files in: {service_dir}")
if not os.path.exists(service_dir):
return jsonify({
'success': False,
'error': f'Windows service directory not found: {service_dir}'
}), 500
# Create static directory if it doesn't exist
static_dir = os.path.join(current_app.root_path, 'static')
os.makedirs(static_dir, exist_ok=True)
zip_filename = 'QualityPrintService_ZERO_DEPENDENCIES.zip'
zip_path = os.path.join(static_dir, zip_filename)
# Python embeddable version details
PYTHON_VERSION = "3.11.9"
PYTHON_DOWNLOAD_URL = f"https://www.python.org/ftp/python/{PYTHON_VERSION}/python-{PYTHON_VERSION}-embed-amd64.zip"
PYTHON_DIR_NAME = "python_embedded"
print(f"📥 Will download Python {PYTHON_VERSION} embedded...")
# Create the complete zero-dependency package
with tempfile.TemporaryDirectory() as temp_dir:
print(f"🔧 Using temporary directory: {temp_dir}")
# Download Python embedded
python_zip_path = os.path.join(temp_dir, "python-embed.zip")
python_extract_dir = os.path.join(temp_dir, PYTHON_DIR_NAME)
try:
print(f"📥 Downloading Python embedded from: {PYTHON_DOWNLOAD_URL}")
urllib.request.urlretrieve(PYTHON_DOWNLOAD_URL, python_zip_path)
print(f"✅ Downloaded Python to: {python_zip_path}")
# Extract Python
os.makedirs(python_extract_dir, exist_ok=True)
with zipfile.ZipFile(python_zip_path, 'r') as zip_ref:
zip_ref.extractall(python_extract_dir)
print(f"✅ Extracted Python to: {python_extract_dir}")
# Enable site-packages by modifying pth file
pth_files = [f for f in os.listdir(python_extract_dir) if f.endswith('._pth')]
if pth_files:
pth_file = os.path.join(python_extract_dir, pth_files[0])
with open(pth_file, 'a') as f:
f.write('\nimport site\n')
print("✅ Enabled site-packages in embedded Python")
except Exception as e:
return jsonify({
'success': False,
'error': f'Failed to download Python embedded: {str(e)}'
}), 500
# Create the complete package ZIP
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
files_added = 0
# Add Python embedded distribution
print("📁 Adding Python embedded distribution...")
for root, dirs, files in os.walk(python_extract_dir):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.join(PYTHON_DIR_NAME, os.path.relpath(file_path, python_extract_dir))
zipf.write(file_path, arcname)
files_added += 1
if files_added % 10 == 0: # Progress indicator
print(f" 📄 Added {files_added} Python files...")
# Add service files
print("📁 Adding service files...")
service_files = [
"print_service_complete.py",
"service_wrapper.py",
"install_service_complete.bat",
"uninstall_service_complete.bat",
"test_service.bat",
"TROUBLESHOOTING_1053.md",
"INSTALLATION_COMPLETE.md",
"PACKAGE_SUMMARY.md",
"README_COMPLETE.md"
]
for file_name in service_files:
file_path = os.path.join(service_dir, file_name)
if os.path.exists(file_path):
zipf.write(file_path, file_name)
files_added += 1
print(f" ✅ Added: {file_name}")
# Add Chrome extension
chrome_ext_dir = os.path.join(service_dir, "chrome_extension")
if os.path.exists(chrome_ext_dir):
print("📁 Adding Chrome extension...")
for root, dirs, files in os.walk(chrome_ext_dir):
for file in files:
if not file.startswith('.'):
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, service_dir)
zipf.write(file_path, arcname)
files_added += 1
# Create updated zero-dependency installer
installer_content = f'''@echo off
setlocal enabledelayedexpansion
echo ========================================
echo Quality Label Print Service Installer
echo ZERO DEPENDENCIES - COMPLETE PACKAGE
echo ========================================
echo.
echo This package includes EVERYTHING needed:
echo ✅ Embedded Python {PYTHON_VERSION}
echo ✅ Complete Print Service
echo ✅ Windows Service Installer
echo ✅ Chrome Extension
echo ✅ Auto-recovery System
echo.
REM Check for administrator privileges
net session >nul 2>&1
if %errorLevel% neq 0 (
echo ❌ ERROR: Administrator privileges required
echo Please right-click this file and select "Run as administrator"
echo.
pause
exit /b 1
)
echo [1/7] Administrator privileges confirmed ✅
echo.
REM Set variables
set CURRENT_DIR=%~dp0
set SERVICE_NAME=QualityPrintService
set SERVICE_DISPLAY_NAME=Quality Label Print Service
set INSTALL_DIR=C:\\QualityPrintService
set PYTHON_EXE=%INSTALL_DIR%\\{PYTHON_DIR_NAME}\\python.exe
set PYTHON_SCRIPT=%INSTALL_DIR%\\print_service_complete.py
set LOG_DIR=%USERPROFILE%\\PrintService\\logs
echo [2/7] Using embedded Python distribution ✅
echo Python location: %PYTHON_EXE%
echo.
REM Stop existing service if running
echo [3/7] Stopping existing service (if any)...
sc query "%SERVICE_NAME%" >nul 2>&1
if %errorLevel% equ 0 (
echo Stopping existing service...
net stop "%SERVICE_NAME%" >nul 2>&1
sc delete "%SERVICE_NAME%" >nul 2>&1
timeout /t 2 >nul
)
echo Service cleanup completed ✅
echo.
REM Create installation directory
echo [4/7] Creating installation directory...
if exist "%INSTALL_DIR%" (
echo Removing old installation...
rmdir /s /q "%INSTALL_DIR%" >nul 2>&1
)
mkdir "%INSTALL_DIR%" >nul 2>&1
echo Installation directory: %INSTALL_DIR% ✅
echo.
REM Copy all files to installation directory
echo [5/7] Installing service files...
echo Copying embedded Python...
xcopy "%CURRENT_DIR%{PYTHON_DIR_NAME}" "%INSTALL_DIR%\\{PYTHON_DIR_NAME}\\" /E /I /Y >nul
echo Copying service script...
copy "%CURRENT_DIR%print_service_complete.py" "%INSTALL_DIR%\\" >nul
echo Service files installed ✅
echo.
REM Create log directory
echo [6/7] Setting up logging...
mkdir "%LOG_DIR%" >nul 2>&1
echo Log directory: %LOG_DIR% ✅
echo.
REM Install and start Windows service
echo [7/7] Installing Windows service...
sc create "%SERVICE_NAME%" binPath= "\\"%PYTHON_EXE%\\" \\"%PYTHON_SCRIPT%\\"" DisplayName= "%SERVICE_DISPLAY_NAME%" start= auto
if %errorLevel% neq 0 (
echo ❌ Failed to create Windows service
pause
exit /b 1
)
REM Configure service recovery
sc failure "%SERVICE_NAME%" reset= 60 actions= restart/10000/restart/30000/restart/60000
sc config "%SERVICE_NAME%" depend= ""
REM Start the service
echo Starting service...
net start "%SERVICE_NAME%"
if %errorLevel% neq 0 (
echo ⚠️ Service created but failed to start immediately
echo This is normal - the service will start automatically on reboot
) else (
echo Service started successfully ✅
)
echo.
echo ========================================
echo INSTALLATION COMPLETED! 🎉
echo ========================================
echo.
echo ✅ Python embedded distribution installed
echo ✅ Windows Print Service installed and configured
echo ✅ Auto-recovery enabled (restarts on failure)
echo ✅ Service will start automatically on boot
echo.
echo 🌐 Service URL: http://localhost:8765
echo 📊 Health check: http://localhost:8765/health
echo 📁 Logs location: %LOG_DIR%
echo.
echo 📋 NEXT STEPS:
echo 1. Install Chrome extension from 'chrome_extension' folder
echo 2. Test service: http://localhost:8765/health
echo 3. Configure web application to use service
echo.
echo Press any key to test service connection...
pause >nul
REM Test service
echo Testing service connection...
timeout /t 3 >nul
curl -s http://localhost:8765/health >nul 2>&1
if %errorLevel% equ 0 (
echo ✅ Service is responding correctly!
) else (
echo ⚠️ Service test failed - may need a moment to start
echo Check logs in: %LOG_DIR%
)
echo.
echo Installation complete! Service is ready to use.
pause
'''
zipf.writestr("INSTALL_ZERO_DEPENDENCIES.bat", installer_content)
files_added += 1
# Add comprehensive README
readme_content = f'''# Quality Print Service - ZERO DEPENDENCIES Package
## 🎯 COMPLETE SELF-CONTAINED INSTALLATION
This package contains EVERYTHING needed to run the Quality Print Service:
### ✅ What's Included:
- **Embedded Python {PYTHON_VERSION}** - No system Python required!
- **Complete Print Service** - Zero external dependencies
- **Windows Service Installer** - Automatic installation and recovery
- **Chrome Extension** - Web browser integration
- **Comprehensive Documentation** - Installation and usage guides
### 🚀 INSTALLATION (5 Minutes):
#### Requirements:
- Windows 10/11 or Windows Server 2016+
- Administrator privileges (for service installation)
- Google Chrome browser
#### Step 1: Extract Package
- Extract this ZIP file to any location (Desktop, Downloads, etc.)
- No permanent location needed - installer copies files automatically
#### Step 2: Install Service
- Right-click `INSTALL_ZERO_DEPENDENCIES.bat`
- Select "Run as administrator"
- Follow the installation prompts
#### Step 3: Install Chrome Extension
- Open Chrome browser
- Navigate to `chrome://extensions/`
- Enable "Developer mode" (toggle in top-right)
- Click "Load unpacked"
- Select the `chrome_extension` folder from extracted package
#### Step 4: Test Installation
- Visit: http://localhost:8765/health
- Should return: {{"status": "healthy", "service": "Windows Print Service"}}
### 🔧 Technical Details:
**Service Architecture:**
```
Quality Web App → Chrome Extension → Windows Service → Printer
```
**Printing Methods (automatic fallback):**
1. Adobe Reader (silent printing)
2. SumatraPDF (if Adobe unavailable)
3. PowerShell Print-Document
4. Microsoft Edge (fallback)
5. Windows default printer
**Service Management:**
- Automatic startup on Windows boot
- Auto-recovery on failure (3 restart attempts)
- Comprehensive logging in: `%USERPROFILE%\\PrintService\\logs\\`
**Network Configuration:**
- Service runs on: http://localhost:8765
- Chrome extension communicates via this local endpoint
- No external network access required
### 📦 Package Size: ~15MB (includes full Python runtime)
### ⏱️ Installation Time: ~5 minutes
### 🔧 Maintenance Required: Zero (auto-starts with Windows)
Ready to use immediately after installation!
'''
zipf.writestr("README_ZERO_DEPENDENCIES.md", readme_content)
files_added += 1
# Verify ZIP was created successfully
if os.path.exists(zip_path):
zip_size = os.path.getsize(zip_path)
print(f"✅ Zero-dependency package created: {zip_path}")
print(f"📄 Total files: {files_added}")
print(f"📏 Size: {zip_size / 1024 / 1024:.1f} MB")
if zip_size > 0:
return jsonify({
'success': True,
'download_url': f'/static/{zip_filename}',
'files_included': files_added,
'zip_size': zip_size,
'package_type': 'Zero Dependencies - Complete Package',
'python_version': PYTHON_VERSION,
'dependencies': 'None - Everything included!',
'estimated_size_mb': round(zip_size / 1024 / 1024, 1)
})
else:
return jsonify({
'success': False,
'error': 'ZIP file was created but is empty'
}), 500
else:
return jsonify({
'success': False,
'error': 'Failed to create zero-dependency ZIP file'
}), 500
except Exception as e:
print(f"❌ Error creating zero-dependency service package: {e}")
import traceback
traceback.print_exc()
return jsonify({
'success': False,
'error': str(e)
}), 500
@bp.route('/test_extension_files')
def test_extension_files():
"""Test route to check extension files"""
import os
from flask import current_app, jsonify
extension_dir = os.path.join(os.path.dirname(current_app.root_path), 'chrome_extension')
result = {
'extension_dir': extension_dir,
'dir_exists': os.path.exists(extension_dir),
'files': []
}
if os.path.exists(extension_dir):
for root, dirs, files in os.walk(extension_dir):
for file in files:
file_path = os.path.join(root, file)
file_size = os.path.getsize(file_path)
result['files'].append({
'path': file_path,
'relative_path': os.path.relpath(file_path, extension_dir),
'size': file_size
})
return jsonify(result)
@bp.route('/label_templates')
def label_templates():
return render_template('label_templates.html')
@bp.route('/create_template')
def create_template():
return render_template('create_template.html')
@bp.route('/get_database_tables', methods=['GET'])
def get_database_tables():
"""Get list of database tables for template creation"""
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse_manager', 'etichete']:
return jsonify({'error': 'Access denied'}), 403
try:
# Get database connection using the same method as other functions
settings_file = current_app.instance_path + '/external_server.conf'
settings = {}
with open(settings_file, 'r') as f:
for line in f:
key, value = line.strip().split('=', 1)
settings[key] = value
connection = mariadb.connect(
user=settings['username'],
password=settings['password'],
host=settings['server_domain'],
port=int(settings['port']),
database=settings['database_name']
)
cursor = connection.cursor()
# Get all tables in the database
cursor.execute("SHOW TABLES")
all_tables = [table[0] for table in cursor.fetchall()]
# Filter to show relevant tables (prioritize order_for_labels)
relevant_tables = []
if 'order_for_labels' in all_tables:
relevant_tables.append('order_for_labels')
# Add other potentially relevant tables
for table in all_tables:
if table not in relevant_tables and any(keyword in table.lower() for keyword in ['order', 'label', 'product', 'customer']):
relevant_tables.append(table)
connection.close()
return jsonify({
'success': True,
'tables': relevant_tables,
'recommended': 'order_for_labels'
})
except Exception as e:
return jsonify({'error': f'Database error: {str(e)}'}), 500
@bp.route('/get_table_columns/<table_name>', methods=['GET'])
def get_table_columns(table_name):
"""Get column names and descriptions for a specific table"""
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse_manager', 'etichete']:
return jsonify({'error': 'Access denied'}), 403
try:
# Get database connection
settings_file = current_app.instance_path + '/external_server.conf'
settings = {}
with open(settings_file, 'r') as f:
for line in f:
key, value = line.strip().split('=', 1)
settings[key] = value
connection = mariadb.connect(
user=settings['username'],
password=settings['password'],
host=settings['server_domain'],
port=int(settings['port']),
database=settings['database_name']
)
cursor = connection.cursor()
# Verify table exists
cursor.execute("SHOW TABLES LIKE %s", (table_name,))
if not cursor.fetchone():
return jsonify({'error': f'Table {table_name} not found'}), 404
# Get column information
cursor.execute(f"DESCRIBE {table_name}")
columns_info = cursor.fetchall()
columns = []
for col_info in columns_info:
column_name = col_info[0]
column_type = col_info[1]
is_nullable = col_info[2] == 'YES'
column_default = col_info[4]
# Get column comment if available
cursor.execute(f"""
SELECT COLUMN_COMMENT
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = %s AND COLUMN_NAME = %s AND TABLE_SCHEMA = DATABASE()
""", (table_name, column_name))
comment_result = cursor.fetchone()
column_comment = comment_result[0] if comment_result and comment_result[0] else ''
# Create user-friendly description
description = column_comment or column_name.replace('_', ' ').title()
columns.append({
'name': column_name,
'type': column_type,
'nullable': is_nullable,
'default': column_default,
'description': description,
'field_id': f"db_{column_name}" # Unique ID for template fields
})
connection.close()
return jsonify({
'success': True,
'table': table_name,
'columns': columns
})
except Exception as e:
return jsonify({'error': f'Database error: {str(e)}'}), 500
@bp.route('/edit_template/<int:template_id>')
def edit_template(template_id):
# Logic for editing a template will go here
return f"Edit template with ID {template_id}"
@bp.route('/delete_template/<int:template_id>', methods=['POST'])
def delete_template(template_id):
# Logic for deleting a template will go here
return f"Delete template with ID {template_id}"
@bp.route('/get_tables')
def get_tables():
# Replace with logic to fetch tables from your database
tables = ['table1', 'table2', 'table3']
return jsonify({'tables': tables})
@bp.route('/get_columns')
def get_columns():
table = request.args.get('table')
# Replace with logic to fetch columns for the selected table
columns = ['column1', 'column2', 'column3'] if table else []
return jsonify({'columns': columns})
@bp.route('/save_template', methods=['POST'])
def save_template():
data = request.get_json()
# Replace with logic to save the template to the database
print(f"Saving template: {data}")
return jsonify({'message': 'Template saved successfully!'})
@bp.route('/generate_pdf', methods=['POST'])
def generate_pdf():
data = request.get_json()
width = data.get('width', 100) # Default width in mm
height = data.get('height', 50) # Default height in mm
columns = data.get('columns', [])
# Convert dimensions from mm to points (1 mm = 2.83465 points)
width_points = width * 2.83465
height_points = height * 2.83465
# Ensure the /static/label_templates folder exists
label_templates_folder = os.path.join(current_app.root_path, 'static', 'label_templates')
os.makedirs(label_templates_folder, exist_ok=True)
# Define the path for the PDF file
pdf_file_path = os.path.join(label_templates_folder, 'label_template.pdf')
# Create a PDF file
c = canvas.Canvas(pdf_file_path, pagesize=(width_points, height_points))
# Add content to the PDF
c.drawString(10, height_points - 20, "Label Template")
y_position = height_points - 40
for column in columns:
c.drawString(10, y_position, f"Column: {column}")
y_position -= 20
# Save the PDF
c.save()
return jsonify({'message': 'PDF generated successfully!', 'pdf_path': f'/static/label_templates/label_template.pdf'})
# Order Labels Upload Module Routes
@bp.route('/upload_orders', methods=['GET', 'POST'])
def upload_orders():
"""Route for uploading orders CSV files for label generation"""
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse', 'warehouse_manager']:
flash('Access denied: Warehouse management permissions required.')
return redirect(url_for('main.dashboard'))
from app.order_labels import upload_orders_handler
return upload_orders_handler()
@bp.route('/view_orders')
def view_orders():
"""Route for viewing uploaded orders"""
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse', 'warehouse_manager', 'warehouse_worker']:
flash('Access denied: Warehouse access required.')
return redirect(url_for('main.dashboard'))
from app.order_labels import get_orders_from_database
orders = get_orders_from_database(200) # Get last 200 orders
return render_template('view_orders.html', orders=orders)
@bp.route('/get_unprinted_orders', methods=['GET'])
def get_unprinted_orders():
"""Get all rows from order_for_labels where printed != 1"""
print(f"DEBUG: get_unprinted_orders called. Session role: {session.get('role')}")
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse_manager', 'etichete']:
print(f"DEBUG: Access denied for role: {session.get('role')}")
return jsonify({'error': 'Access denied. Required roles: superadmin, warehouse_manager, etichete'}), 403
try:
print("DEBUG: Calling get_unprinted_orders_data()")
data = get_unprinted_orders_data()
print(f"DEBUG: Retrieved {len(data)} orders")
return jsonify(data)
except Exception as e:
print(f"DEBUG: Error in get_unprinted_orders: {e}")
import traceback
traceback.print_exc()
return jsonify({'error': str(e)}), 500
@bp.route('/generate_labels_pdf/<int:order_id>', methods=['POST'])
@bp.route('/generate_labels_pdf/<int:order_id>/<paper_saving_mode>', methods=['POST'])
def generate_labels_pdf(order_id, paper_saving_mode='true'):
"""Generate PDF labels for a specific order"""
print(f"DEBUG: generate_labels_pdf called for order_id: {order_id}")
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse_manager', 'etichete']:
print(f"DEBUG: Access denied for role: {session.get('role')}")
return jsonify({'error': 'Access denied. Required roles: superadmin, warehouse_manager, etichete'}), 403
try:
from .pdf_generator import generate_order_labels_pdf, update_order_printed_status
from .print_module import get_db_connection
from flask import make_response
# Get order data from database
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT id, comanda_productie, cod_articol, descr_com_prod, cantitate,
data_livrare, dimensiune, com_achiz_client, nr_linie_com_client, customer_name,
customer_article_number, open_for_order, line_number,
printed_labels, created_at, updated_at
FROM order_for_labels
WHERE id = %s
""", (order_id,))
row = cursor.fetchone()
conn.close()
if not row:
return jsonify({'error': 'Order not found'}), 404
# Create order data dictionary
order_data = {
'id': row[0],
'comanda_productie': row[1],
'cod_articol': row[2],
'descr_com_prod': row[3],
'cantitate': row[4],
'data_livrare': row[5],
'dimensiune': row[6],
'com_achiz_client': row[7],
'nr_linie_com_client': row[8],
'customer_name': row[9],
'customer_article_number': row[10],
'open_for_order': row[11],
'line_number': row[12],
'printed_labels': row[13] if row[13] is not None else 0,
'created_at': row[14],
'updated_at': row[15]
}
print(f"DEBUG: Generating PDF for order {order_id} with quantity {order_data['cantitate']}")
# Check if paper-saving mode is enabled (default: true)
use_paper_saving = paper_saving_mode.lower() == 'true'
print(f"DEBUG: Paper-saving mode: {use_paper_saving}")
# Generate PDF with paper-saving option
pdf_buffer = generate_order_labels_pdf(order_id, order_data, paper_saving_mode=use_paper_saving)
# Update printed status in database
update_success = update_order_printed_status(order_id)
if not update_success:
print(f"Warning: Could not update printed status for order {order_id}")
# Create response with PDF
response = make_response(pdf_buffer.getvalue())
response.headers['Content-Type'] = 'application/pdf'
response.headers['Content-Disposition'] = f'inline; filename="labels_order_{order_id}_{order_data["comanda_productie"]}.pdf"'
print(f"DEBUG: PDF generated successfully for order {order_id}")
return response
except Exception as e:
print(f"DEBUG: Error generating PDF for order {order_id}: {e}")
import traceback
traceback.print_exc()
return jsonify({'error': str(e)}), 500
@bp.route('/update_printed_status/<int:order_id>', methods=['POST'])
def update_printed_status(order_id):
"""Update printed status for direct printing (without PDF generation)"""
print(f"DEBUG: update_printed_status called for order_id: {order_id}")
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse_manager', 'etichete']:
print(f"DEBUG: Access denied for role: {session.get('role')}")
return jsonify({'error': 'Access denied. Required roles: superadmin, warehouse_manager, etichete'}), 403
try:
from .pdf_generator import update_order_printed_status
# Update printed status in database
update_success = update_order_printed_status(order_id)
if update_success:
print(f"DEBUG: Successfully updated printed status for order {order_id}")
return jsonify({'success': True, 'message': f'Order {order_id} marked as printed'})
else:
print(f"WARNING: Could not update printed status for order {order_id}")
return jsonify({'error': 'Could not update printed status'}), 500
except Exception as e:
print(f"DEBUG: Error in update_printed_status: {e}")
return jsonify({'error': 'Internal server error'}), 500
@bp.route('/get_order_data/<int:order_id>', methods=['GET'])
def get_order_data(order_id):
"""Get specific order data for preview"""
print(f"DEBUG: get_order_data called for order_id: {order_id}")
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse_manager', 'etichete']:
return jsonify({'error': 'Access denied'}), 403
try:
from .print_module import get_db_connection
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT id, comanda_productie, cod_articol, descr_com_prod, cantitate,
data_livrare, dimensiune, com_achiz_client, nr_linie_com_client, customer_name,
customer_article_number, open_for_order, line_number,
printed_labels, created_at, updated_at
FROM order_for_labels
WHERE id = %s
""", (order_id,))
row = cursor.fetchone()
conn.close()
if not row:
return jsonify({'error': 'Order not found'}), 404
order_data = {
'id': row[0],
'comanda_productie': row[1],
'cod_articol': row[2],
'descr_com_prod': row[3],
'cantitate': row[4],
'data_livrare': str(row[5]) if row[5] else 'N/A',
'dimensiune': row[6],
'com_achiz_client': row[7],
'nr_linie_com_client': row[8],
'customer_name': row[9],
'customer_article_number': row[10],
'open_for_order': row[11],
'line_number': row[12],
'printed_labels': row[13] if row[13] is not None else 0,
'created_at': str(row[14]) if row[14] else 'N/A',
'updated_at': str(row[15]) if row[15] else 'N/A'
}
return jsonify(order_data)
except Exception as e:
print(f"DEBUG: Error getting order data for {order_id}: {e}")
import traceback
traceback.print_exc()
return jsonify({'error': str(e)}), 500
@warehouse_bp.route('/create_locations', methods=['GET', 'POST'])
def create_locations():
from app.warehouse import create_locations_handler
return create_locations_handler()
@warehouse_bp.route('/import_locations_csv', methods=['GET', 'POST'])
def import_locations_csv():
from app.warehouse import import_locations_csv_handler
return import_locations_csv_handler()
# NOTE for frontend/extension developers:
# To print labels, call the Chrome extension and pass the PDF URL:
# /generate_labels_pdf/<order_id>
# The extension should POST to http://localhost:8765/print/silent with:
# {
# "pdf_url": "https://your-linux-server/generate_labels_pdf/15",
# "printer_name": "default",
# "copies": 1
# }