2208 lines
90 KiB
Python
2208 lines
90 KiB
Python
import os
|
|
import mariadb
|
|
from datetime import datetime, timedelta
|
|
from flask import Blueprint, render_template, redirect, url_for, request, flash, session, current_app, jsonify, send_from_directory
|
|
from .models import User
|
|
from . import db
|
|
from reportlab.lib.pagesizes import letter
|
|
from reportlab.pdfgen import canvas
|
|
from flask import Blueprint, render_template, request, redirect, url_for, flash
|
|
import csv
|
|
from .warehouse import add_location
|
|
from app.settings import (
|
|
settings_handler,
|
|
role_permissions_handler,
|
|
save_role_permissions_handler,
|
|
reset_role_permissions_handler,
|
|
save_all_role_permissions_handler,
|
|
reset_all_role_permissions_handler,
|
|
edit_user_handler,
|
|
create_user_handler,
|
|
delete_user_handler,
|
|
save_external_db_handler
|
|
)
|
|
from .print_module import get_unprinted_orders_data
|
|
|
|
bp = Blueprint('main', __name__)
|
|
warehouse_bp = Blueprint('warehouse', __name__)
|
|
|
|
def format_cell_data(cell):
|
|
"""Helper function to format cell data, especially dates and times"""
|
|
if isinstance(cell, datetime):
|
|
# Format date as dd/mm/yyyy
|
|
return cell.strftime('%d/%m/%Y')
|
|
elif isinstance(cell, timedelta):
|
|
# Convert timedelta to HH:MM:SS format
|
|
total_seconds = int(cell.total_seconds())
|
|
hours, remainder = divmod(total_seconds, 3600)
|
|
minutes, seconds = divmod(remainder, 60)
|
|
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
|
|
elif hasattr(cell, 'date'): # Handle date objects
|
|
# Format date as dd/mm/yyyy
|
|
return cell.strftime('%d/%m/%Y')
|
|
else:
|
|
return cell
|
|
|
|
@bp.route('/store_articles')
|
|
def store_articles():
|
|
return render_template('store_articles.html')
|
|
|
|
@bp.route('/warehouse_reports')
|
|
def warehouse_reports():
|
|
return render_template('warehouse_reports.html')
|
|
|
|
def get_db_connection():
|
|
"""Reads the external_server.conf file and returns a MariaDB database connection."""
|
|
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
|
|
if not os.path.exists(settings_file):
|
|
raise FileNotFoundError("The external_server.conf file is missing in the instance folder.")
|
|
|
|
# Read settings from the configuration file
|
|
settings = {}
|
|
with open(settings_file, 'r') as f:
|
|
for line in f:
|
|
key, value = line.strip().split('=', 1)
|
|
settings[key] = value
|
|
|
|
# Create a database connection
|
|
return mariadb.connect(
|
|
user=settings['username'],
|
|
password=settings['password'],
|
|
host=settings['server_domain'],
|
|
port=int(settings['port']),
|
|
database=settings['database_name']
|
|
)
|
|
|
|
@bp.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
import sqlite3
|
|
if request.method == 'POST':
|
|
# Debug: print all form data received
|
|
print("All form data received:", dict(request.form))
|
|
|
|
# Safely get username and password with fallback
|
|
username = request.form.get('username', '').strip()
|
|
password = request.form.get('password', '').strip()
|
|
|
|
if not username or not password:
|
|
print("Missing username or password")
|
|
flash('Please enter both username and password.')
|
|
return render_template('login.html')
|
|
|
|
user = None
|
|
print("Raw form input:", repr(username), repr(password))
|
|
|
|
# Logic: If username starts with #, check internal SQLite database
|
|
if username.startswith('#'):
|
|
username_clean = username[1:].strip()
|
|
password_clean = password.strip()
|
|
print(f"Checking internal database for: {username_clean}")
|
|
|
|
# Check internal SQLite database (py_app/instance/users.db)
|
|
internal_db_path = os.path.join(os.path.dirname(__file__), '../instance/users.db')
|
|
try:
|
|
conn = sqlite3.connect(internal_db_path)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'")
|
|
if cursor.fetchone():
|
|
cursor.execute("SELECT username, password, role FROM users WHERE username=? AND password=?", (username_clean, password_clean))
|
|
row = cursor.fetchone()
|
|
print("Internal DB query result:", row)
|
|
if row:
|
|
user = {'username': row[0], 'password': row[1], 'role': row[2]}
|
|
else:
|
|
print("No users table in internal database")
|
|
conn.close()
|
|
except Exception as e:
|
|
print("Internal DB error:", e)
|
|
|
|
else:
|
|
# Check external MariaDB database first
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SHOW TABLES LIKE 'users'")
|
|
if cursor.fetchone():
|
|
cursor.execute("SELECT username, password, role FROM users WHERE username=%s AND password=%s", (username.strip(), password.strip()))
|
|
row = cursor.fetchone()
|
|
print("External DB query result:", row)
|
|
if row:
|
|
user = {'username': row[0], 'password': row[1], 'role': row[2]}
|
|
conn.close()
|
|
except Exception as e:
|
|
print("External DB error:", e)
|
|
|
|
# Fallback to internal database if external fails
|
|
print("Falling back to internal database")
|
|
internal_db_path = os.path.join(os.path.dirname(__file__), '../instance/users.db')
|
|
try:
|
|
conn = sqlite3.connect(internal_db_path)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'")
|
|
if cursor.fetchone():
|
|
cursor.execute("SELECT username, password, role FROM users WHERE username=? AND password=?", (username.strip(), password.strip()))
|
|
row = cursor.fetchone()
|
|
print("Internal DB fallback query result:", row)
|
|
if row:
|
|
user = {'username': row[0], 'password': row[1], 'role': row[2]}
|
|
conn.close()
|
|
except Exception as e2:
|
|
print("Internal DB fallback error:", e2)
|
|
|
|
if user:
|
|
session['user'] = user['username']
|
|
session['role'] = user['role']
|
|
print("Logged in as:", session.get('user'), session.get('role'))
|
|
return redirect(url_for('main.dashboard'))
|
|
else:
|
|
print("Login failed for:", username, password)
|
|
flash('Invalid credentials. Please try again.')
|
|
return render_template('login.html')
|
|
|
|
@bp.route('/dashboard')
|
|
def dashboard():
|
|
print("Session user:", session.get('user'), session.get('role'))
|
|
if 'user' not in session:
|
|
return redirect(url_for('main.login'))
|
|
return render_template('dashboard.html')
|
|
|
|
@bp.route('/settings')
|
|
def settings():
|
|
return settings_handler()
|
|
|
|
@bp.route('/quality')
|
|
def quality():
|
|
if 'role' not in session or session['role'] not in ['superadmin', 'quality']:
|
|
flash('Access denied: Quality users only.')
|
|
return redirect(url_for('main.dashboard'))
|
|
return render_template('quality.html')
|
|
|
|
@bp.route('/warehouse')
|
|
def warehouse():
|
|
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse']:
|
|
flash('Access denied: Warehouse users only.')
|
|
return redirect(url_for('main.dashboard'))
|
|
return render_template('main_page_warehouse.html')
|
|
|
|
@bp.route('/scan', methods=['GET', 'POST'])
|
|
def scan():
|
|
if 'role' not in session or session['role'] not in ['superadmin', 'scan']:
|
|
flash('Access denied: Scan users only.')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
if request.method == 'POST':
|
|
# Handle form submission
|
|
operator_code = request.form.get('operator_code')
|
|
cp_code = request.form.get('cp_code')
|
|
oc1_code = request.form.get('oc1_code')
|
|
oc2_code = request.form.get('oc2_code')
|
|
defect_code = request.form.get('defect_code')
|
|
date = request.form.get('date')
|
|
time = request.form.get('time')
|
|
|
|
try:
|
|
# Connect to the database
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if the CP_full_code already exists
|
|
cursor.execute("SELECT Id FROM scan1_orders WHERE CP_full_code = ?", (cp_code,))
|
|
existing_entry = cursor.fetchone()
|
|
|
|
if existing_entry:
|
|
# Update the existing entry
|
|
update_query = """
|
|
UPDATE scan1_orders
|
|
SET operator_code = ?, OC1_code = ?, OC2_code = ?, quality_code = ?, date = ?, time = ?
|
|
WHERE CP_full_code = ?
|
|
"""
|
|
cursor.execute(update_query, (operator_code, oc1_code, oc2_code, defect_code, date, time, cp_code))
|
|
flash('Existing entry updated successfully.')
|
|
else:
|
|
# Insert a new entry
|
|
insert_query = """
|
|
INSERT INTO scan1_orders (operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
"""
|
|
cursor.execute(insert_query, (operator_code, cp_code, oc1_code, oc2_code, defect_code, date, time))
|
|
flash('New entry inserted successfully.')
|
|
|
|
# Commit the transaction
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
except mariadb.Error as e:
|
|
print(f"Error saving scan data: {e}")
|
|
flash(f"Error saving scan data: {e}")
|
|
|
|
# Fetch the latest scan data for display
|
|
scan_data = []
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
ORDER BY Id DESC
|
|
LIMIT 15
|
|
""")
|
|
scan_data = cursor.fetchall()
|
|
conn.close()
|
|
except mariadb.Error as e:
|
|
print(f"Error fetching scan data: {e}")
|
|
flash(f"Error fetching scan data: {e}")
|
|
|
|
return render_template('scan.html', scan_data=scan_data)
|
|
|
|
@bp.route('/logout')
|
|
def logout():
|
|
session.pop('user', None)
|
|
session.pop('role', None)
|
|
return redirect(url_for('main.login'))
|
|
|
|
@bp.route('/create_user', methods=['POST'])
|
|
def create_user():
|
|
return create_user_handler()
|
|
|
|
@bp.route('/edit_user', methods=['POST'])
|
|
def edit_user():
|
|
return edit_user_handler()
|
|
|
|
@bp.route('/delete_user', methods=['POST'])
|
|
def delete_user():
|
|
return delete_user_handler()
|
|
|
|
@bp.route('/save_external_db', methods=['POST'])
|
|
def save_external_db():
|
|
return save_external_db_handler()
|
|
|
|
# Role Permissions Management Routes
|
|
@bp.route('/role_permissions')
|
|
def role_permissions():
|
|
return role_permissions_handler()
|
|
|
|
@bp.route('/test_permissions')
|
|
def test_permissions():
|
|
from app.settings import role_permissions_handler
|
|
from flask import render_template, session, redirect, url_for, flash
|
|
from app.permissions import APP_PERMISSIONS, ACTIONS
|
|
|
|
# Check if superadmin
|
|
if not session.get('role') == 'superadmin':
|
|
flash('Access denied: Superadmin only.')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
try:
|
|
# Get the same data as role_permissions_handler
|
|
from app.settings import get_external_db_connection
|
|
|
|
conn = get_external_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Get roles from role_hierarchy table
|
|
cursor.execute("SELECT role_name, display_name, description, level FROM role_hierarchy ORDER BY level DESC")
|
|
role_data = cursor.fetchall()
|
|
|
|
roles = {}
|
|
for role_name, display_name, description, level in role_data:
|
|
roles[role_name] = {
|
|
'display_name': display_name,
|
|
'description': description,
|
|
'level': level
|
|
}
|
|
|
|
conn.close()
|
|
|
|
return render_template('test_permissions.html',
|
|
roles=roles,
|
|
pages=APP_PERMISSIONS,
|
|
action_names=ACTIONS)
|
|
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
@bp.route('/role_permissions_simple')
|
|
def role_permissions_simple():
|
|
# Use the same handler but different template
|
|
from app.settings import get_external_db_connection
|
|
from flask import render_template, session, redirect, url_for, flash
|
|
from app.permissions import APP_PERMISSIONS, ACTIONS
|
|
import json
|
|
|
|
# Check if superadmin
|
|
if not session.get('role') == 'superadmin':
|
|
flash('Access denied: Superadmin only.')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
try:
|
|
# Get roles and their current permissions
|
|
conn = get_external_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Get roles from role_hierarchy table
|
|
cursor.execute("SELECT role_name, display_name, description, level FROM role_hierarchy ORDER BY level DESC")
|
|
role_data = cursor.fetchall()
|
|
|
|
roles = {}
|
|
for role_name, display_name, description, level in role_data:
|
|
roles[role_name] = {
|
|
'display_name': display_name,
|
|
'description': description,
|
|
'level': level
|
|
}
|
|
|
|
# Get current role permissions
|
|
cursor.execute("""
|
|
SELECT role, permission_key
|
|
FROM role_permissions
|
|
WHERE granted = TRUE
|
|
""")
|
|
permission_data = cursor.fetchall()
|
|
|
|
role_permissions = {}
|
|
for role, permission_key in permission_data:
|
|
if role not in role_permissions:
|
|
role_permissions[role] = []
|
|
role_permissions[role].append(permission_key)
|
|
|
|
conn.close()
|
|
|
|
# Convert to JSON for JavaScript
|
|
permissions_json = json.dumps(APP_PERMISSIONS)
|
|
role_permissions_json = json.dumps(role_permissions)
|
|
|
|
return render_template('role_permissions_simple.html',
|
|
roles=roles,
|
|
pages=APP_PERMISSIONS,
|
|
action_names=ACTIONS,
|
|
permissions_json=permissions_json,
|
|
role_permissions_json=role_permissions_json)
|
|
|
|
except Exception as e:
|
|
flash(f'Error loading role permissions: {e}')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
@bp.route('/settings/save_role_permissions', methods=['POST'])
|
|
def save_role_permissions():
|
|
return save_role_permissions_handler()
|
|
|
|
@bp.route('/settings/reset_role_permissions', methods=['POST'])
|
|
def reset_role_permissions():
|
|
return reset_role_permissions_handler()
|
|
|
|
@bp.route('/settings/save_all_role_permissions', methods=['POST'])
|
|
def save_all_role_permissions():
|
|
return save_all_role_permissions_handler()
|
|
|
|
@bp.route('/settings/reset_all_role_permissions', methods=['POST'])
|
|
def reset_all_role_permissions():
|
|
return reset_all_role_permissions_handler()
|
|
|
|
@bp.route('/get_report_data', methods=['GET'])
|
|
def get_report_data():
|
|
report = request.args.get('report')
|
|
data = {"headers": [], "rows": []}
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
if report == "1": # Logic for the 1-day report (today's records)
|
|
today = datetime.now().strftime('%Y-%m-%d')
|
|
print(f"DEBUG: Daily report searching for records on date: {today}")
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
WHERE date = ?
|
|
ORDER BY date DESC, time DESC
|
|
""", (today,))
|
|
rows = cursor.fetchall()
|
|
print(f"DEBUG: Daily report found {len(rows)} rows for today ({today}):", rows)
|
|
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
|
|
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
|
|
|
|
elif report == "2": # Logic for the 5-day report (last 5 days including today)
|
|
five_days_ago = datetime.now() - timedelta(days=4) # Last 4 days + today = 5 days
|
|
start_date = five_days_ago.strftime('%Y-%m-%d')
|
|
print(f"DEBUG: 5-day report searching for records from {start_date} onwards")
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
WHERE date >= ?
|
|
ORDER BY date DESC, time DESC
|
|
""", (start_date,))
|
|
rows = cursor.fetchall()
|
|
print(f"DEBUG: 5-day report found {len(rows)} rows from {start_date} onwards:", rows)
|
|
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
|
|
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
|
|
|
|
elif report == "3": # Logic for the report with non-zero quality_code (today only)
|
|
today = datetime.now().strftime('%Y-%m-%d')
|
|
print(f"DEBUG: Quality defects report (today) searching for records on {today} with quality issues")
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
WHERE date = ? AND quality_code != 0
|
|
ORDER BY date DESC, time DESC
|
|
""", (today,))
|
|
rows = cursor.fetchall()
|
|
print(f"DEBUG: Quality defects report (today) found {len(rows)} rows with quality issues for {today}:", rows)
|
|
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
|
|
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
|
|
|
|
elif report == "4": # Logic for the report with non-zero quality_code (last 5 days)
|
|
five_days_ago = datetime.now() - timedelta(days=4) # Last 4 days + today = 5 days
|
|
start_date = five_days_ago.strftime('%Y-%m-%d')
|
|
print(f"DEBUG: Quality defects report (5 days) searching for records from {start_date} onwards with quality issues")
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
WHERE date >= ? AND quality_code != 0
|
|
ORDER BY date DESC, time DESC
|
|
""", (start_date,))
|
|
rows = cursor.fetchall()
|
|
print(f"DEBUG: Quality defects report (5 days) found {len(rows)} rows with quality issues from {start_date} onwards:", rows)
|
|
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
|
|
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
|
|
|
|
elif report == "5": # Logic for the 5-ft report (all rows)
|
|
# First check if table exists and has any data
|
|
try:
|
|
cursor.execute("SELECT COUNT(*) FROM scan1_orders")
|
|
total_count = cursor.fetchone()[0]
|
|
print(f"DEBUG: Total records in scan1_orders table: {total_count}")
|
|
|
|
if total_count == 0:
|
|
print("DEBUG: No data found in scan1_orders table")
|
|
data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity of order", "Rejected Quantity of order"]
|
|
data["rows"] = []
|
|
data["message"] = "No scan data available in the database. Please ensure scanning operations have been performed and data has been recorded."
|
|
else:
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_base_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
ORDER BY date DESC, time DESC
|
|
""")
|
|
rows = cursor.fetchall()
|
|
print(f"DEBUG: Fetched {len(rows)} rows for report 5 (all rows)")
|
|
data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity of order", "Rejected Quantity of order"]
|
|
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
|
|
|
|
except mariadb.Error as table_error:
|
|
print(f"DEBUG: Table access error: {table_error}")
|
|
data["error"] = f"Database table error: {table_error}"
|
|
|
|
conn.close()
|
|
except mariadb.Error as e:
|
|
print(f"Error fetching report data: {e}")
|
|
data["error"] = "Error fetching report data."
|
|
|
|
print("Data being returned:", data)
|
|
return jsonify(data)
|
|
|
|
@bp.route('/generate_report', methods=['GET'])
|
|
def generate_report():
|
|
"""Generate report for specific date (calendar-based report)"""
|
|
from datetime import datetime, timedelta
|
|
|
|
report = request.args.get('report')
|
|
selected_date = request.args.get('date')
|
|
data = {"headers": [], "rows": []}
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
if report == "6" and selected_date: # Custom date report
|
|
print(f"DEBUG: Searching for date: {selected_date}")
|
|
|
|
# First, let's check what dates exist in the database
|
|
cursor.execute("SELECT DISTINCT date FROM scan1_orders ORDER BY date DESC LIMIT 10")
|
|
existing_dates = cursor.fetchall()
|
|
print(f"DEBUG: Available dates in database: {existing_dates}")
|
|
|
|
# Try exact match first
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
WHERE date = ?
|
|
ORDER BY time DESC
|
|
""", (selected_date,))
|
|
rows = cursor.fetchall()
|
|
print(f"DEBUG: Exact match found {len(rows)} rows")
|
|
|
|
# If no exact match, try with DATE() function to handle different formats
|
|
if len(rows) == 0:
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
WHERE DATE(date) = ?
|
|
ORDER BY time DESC
|
|
""", (selected_date,))
|
|
rows = cursor.fetchall()
|
|
print(f"DEBUG: DATE() function match found {len(rows)} rows")
|
|
|
|
# If still no match, try LIKE pattern
|
|
if len(rows) == 0:
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
WHERE date LIKE ?
|
|
ORDER BY time DESC
|
|
""", (f"{selected_date}%",))
|
|
rows = cursor.fetchall()
|
|
print(f"DEBUG: LIKE pattern match found {len(rows)} rows")
|
|
|
|
print(f"DEBUG: Final result - {len(rows)} rows for date {selected_date}")
|
|
if len(rows) > 0:
|
|
print(f"DEBUG: Sample row: {rows[0]}")
|
|
|
|
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
|
|
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
|
|
|
|
# Add helpful message if no data found
|
|
if len(rows) == 0:
|
|
data["message"] = f"No scan data found for {selected_date}. Please select a date when scanning operations were performed."
|
|
|
|
elif report == "7": # Date Range Report
|
|
start_date = request.args.get('start_date')
|
|
end_date = request.args.get('end_date')
|
|
|
|
if start_date and end_date:
|
|
print(f"DEBUG: Date range report - Start: {start_date}, End: {end_date}")
|
|
|
|
# Validate date format and order
|
|
try:
|
|
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
|
|
end_dt = datetime.strptime(end_date, '%Y-%m-%d')
|
|
|
|
if start_dt > end_dt:
|
|
data["error"] = "Start date cannot be after end date."
|
|
conn.close()
|
|
return jsonify(data)
|
|
|
|
except ValueError:
|
|
data["error"] = "Invalid date format. Please use YYYY-MM-DD format."
|
|
conn.close()
|
|
return jsonify(data)
|
|
|
|
# First, check what dates exist in the database for the range
|
|
cursor.execute("""
|
|
SELECT DISTINCT date FROM scan1_orders
|
|
WHERE date >= ? AND date <= ?
|
|
ORDER BY date DESC
|
|
""", (start_date, end_date))
|
|
existing_dates = cursor.fetchall()
|
|
print(f"DEBUG: Available dates in range: {existing_dates}")
|
|
|
|
# Query for all records in the date range
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
WHERE date >= ? AND date <= ?
|
|
ORDER BY date DESC, time DESC
|
|
""", (start_date, end_date))
|
|
rows = cursor.fetchall()
|
|
print(f"DEBUG: Date range query found {len(rows)} rows from {start_date} to {end_date}")
|
|
|
|
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
|
|
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
|
|
|
|
# Add helpful message if no data found
|
|
if len(rows) == 0:
|
|
data["message"] = f"No scan data found between {start_date} and {end_date}. Please select dates when scanning operations were performed."
|
|
else:
|
|
# Add summary information
|
|
total_approved = sum(row[8] for row in rows if row[8] is not None)
|
|
total_rejected = sum(row[9] for row in rows if row[9] is not None)
|
|
data["summary"] = {
|
|
"total_records": len(rows),
|
|
"date_range": f"{start_date} to {end_date}",
|
|
"total_approved": total_approved,
|
|
"total_rejected": total_rejected,
|
|
"dates_with_data": len(existing_dates)
|
|
}
|
|
else:
|
|
data["error"] = "Both start date and end date are required for date range report."
|
|
|
|
elif report == "8" and selected_date: # Custom date quality defects report
|
|
print(f"DEBUG: Quality defects report for specific date: {selected_date}")
|
|
|
|
# First, let's check what dates exist in the database
|
|
cursor.execute("SELECT DISTINCT date FROM scan1_orders ORDER BY date DESC LIMIT 10")
|
|
existing_dates = cursor.fetchall()
|
|
print(f"DEBUG: Available dates in database: {existing_dates}")
|
|
|
|
# Try exact match first for defects (quality_code != 0)
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
WHERE date = ? AND quality_code != 0
|
|
ORDER BY quality_code DESC, time DESC
|
|
""", (selected_date,))
|
|
rows = cursor.fetchall()
|
|
print(f"DEBUG: Quality defects exact match found {len(rows)} rows for {selected_date}")
|
|
|
|
# If no exact match, try with DATE() function to handle different formats
|
|
if len(rows) == 0:
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
WHERE DATE(date) = ? AND quality_code != 0
|
|
ORDER BY quality_code DESC, time DESC
|
|
""", (selected_date,))
|
|
rows = cursor.fetchall()
|
|
print(f"DEBUG: Quality defects DATE() function match found {len(rows)} rows")
|
|
|
|
# If still no match, try LIKE pattern
|
|
if len(rows) == 0:
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
WHERE date LIKE ? AND quality_code != 0
|
|
ORDER BY quality_code DESC, time DESC
|
|
""", (f"{selected_date}%",))
|
|
rows = cursor.fetchall()
|
|
print(f"DEBUG: Quality defects LIKE pattern match found {len(rows)} rows")
|
|
|
|
print(f"DEBUG: Final quality defects result - {len(rows)} rows for date {selected_date}")
|
|
if len(rows) > 0:
|
|
print(f"DEBUG: Sample defective item: {rows[0]}")
|
|
|
|
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
|
|
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
|
|
|
|
# Add helpful message if no data found
|
|
if len(rows) == 0:
|
|
data["message"] = f"No quality defects found for {selected_date}. This could mean no scanning was performed or all items passed quality control."
|
|
else:
|
|
# Add summary for quality defects
|
|
total_defective_items = len(rows)
|
|
total_rejected_qty = sum(row[9] for row in rows if row[9] is not None)
|
|
unique_quality_codes = len(set(row[5] for row in rows if row[5] != 0))
|
|
|
|
data["defects_summary"] = {
|
|
"total_defective_items": total_defective_items,
|
|
"total_rejected_quantity": total_rejected_qty,
|
|
"unique_defect_types": unique_quality_codes,
|
|
"date": selected_date
|
|
}
|
|
|
|
elif report == "9": # Date Range Quality Defects Report
|
|
print(f"DEBUG: Processing Date Range Quality Defects Report")
|
|
|
|
# Get date range from request parameters
|
|
start_date = request.args.get('start_date')
|
|
end_date = request.args.get('end_date')
|
|
|
|
print(f"DEBUG: Date range quality defects requested - Start: {start_date}, End: {end_date}")
|
|
|
|
if not start_date or not end_date:
|
|
data["error"] = "Both start date and end date are required for date range quality defects report."
|
|
conn.close()
|
|
return jsonify(data)
|
|
|
|
try:
|
|
# Validate date format
|
|
from datetime import datetime
|
|
datetime.strptime(start_date, '%Y-%m-%d')
|
|
datetime.strptime(end_date, '%Y-%m-%d')
|
|
|
|
# Check what dates are available in the database within the range
|
|
cursor.execute("""
|
|
SELECT DISTINCT date
|
|
FROM scan1_orders
|
|
WHERE date >= ? AND date <= ? AND quality_code != 0
|
|
ORDER BY date DESC
|
|
""", (start_date, end_date))
|
|
existing_dates = cursor.fetchall()
|
|
print(f"DEBUG: Available dates with quality defects in range: {existing_dates}")
|
|
|
|
# Query for quality defects in the date range
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
WHERE date >= ? AND date <= ? AND quality_code != 0
|
|
ORDER BY date DESC, quality_code DESC, time DESC
|
|
""", (start_date, end_date))
|
|
rows = cursor.fetchall()
|
|
print(f"DEBUG: Date range quality defects query found {len(rows)} rows from {start_date} to {end_date}")
|
|
|
|
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
|
|
data["rows"] = [[format_cell_data(cell) for cell in row] for row in rows]
|
|
|
|
# Add helpful message if no data found
|
|
if len(rows) == 0:
|
|
data["message"] = f"No quality defects found between {start_date} and {end_date}. This could mean no scanning was performed in this date range or all items passed quality control."
|
|
else:
|
|
# Add summary for quality defects in date range
|
|
total_defective_items = len(rows)
|
|
total_rejected_qty = sum(row[9] for row in rows if row[9] is not None)
|
|
unique_quality_codes = len(set(row[5] for row in rows if row[5] != 0))
|
|
unique_dates = len(set(row[6] for row in rows))
|
|
|
|
data["defects_summary"] = {
|
|
"total_defective_items": total_defective_items,
|
|
"total_rejected_quantity": total_rejected_qty,
|
|
"unique_defect_types": unique_quality_codes,
|
|
"date_range": f"{start_date} to {end_date}",
|
|
"days_with_defects": unique_dates
|
|
}
|
|
|
|
except ValueError:
|
|
data["error"] = "Invalid date format. Please use YYYY-MM-DD format."
|
|
except Exception as e:
|
|
print(f"DEBUG: Error in date range quality defects report: {e}")
|
|
data["error"] = f"Error processing date range quality defects report: {e}"
|
|
|
|
conn.close()
|
|
except mariadb.Error as e:
|
|
print(f"Error fetching custom date report: {e}")
|
|
data["error"] = f"Error fetching report data for {selected_date if report == '6' or report == '8' else 'date range'}."
|
|
|
|
print("Custom date report data being returned:", data)
|
|
return jsonify(data)
|
|
|
|
@bp.route('/debug_dates', methods=['GET'])
|
|
def debug_dates():
|
|
"""Debug route to check available dates in database"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Get all distinct dates
|
|
cursor.execute("SELECT DISTINCT date FROM scan1_orders ORDER BY date DESC")
|
|
dates = cursor.fetchall()
|
|
|
|
# Get total count
|
|
cursor.execute("SELECT COUNT(*) FROM scan1_orders")
|
|
total_count = cursor.fetchone()[0]
|
|
|
|
# Get sample data
|
|
cursor.execute("SELECT date, time FROM scan1_orders ORDER BY date DESC LIMIT 5")
|
|
sample_data = cursor.fetchall()
|
|
|
|
conn.close()
|
|
|
|
return jsonify({
|
|
"total_records": total_count,
|
|
"available_dates": [str(date[0]) for date in dates],
|
|
"sample_data": [{"date": str(row[0]), "time": str(row[1])} for row in sample_data]
|
|
})
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)})
|
|
|
|
@bp.route('/test_database', methods=['GET'])
|
|
def test_database():
|
|
"""Test database connection and query the scan1_orders table"""
|
|
# Check if user has superadmin permissions
|
|
if 'role' not in session or session['role'] != 'superadmin':
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Access denied: Superadmin permissions required for database testing."
|
|
}), 403
|
|
|
|
try:
|
|
print("DEBUG: Testing database connection...")
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
print("DEBUG: Database connection successful!")
|
|
|
|
# Test 1: Check if table exists
|
|
try:
|
|
cursor.execute("SHOW TABLES LIKE 'scan1_orders'")
|
|
table_exists = cursor.fetchone()
|
|
print(f"DEBUG: Table scan1_orders exists: {table_exists is not None}")
|
|
|
|
if not table_exists:
|
|
conn.close()
|
|
return jsonify({
|
|
"success": False,
|
|
"message": "Table 'scan1_orders' does not exist in the database"
|
|
})
|
|
except Exception as e:
|
|
print(f"DEBUG: Error checking table existence: {e}")
|
|
conn.close()
|
|
return jsonify({
|
|
"success": False,
|
|
"message": f"Error checking table existence: {e}"
|
|
})
|
|
|
|
# Test 2: Get table structure
|
|
try:
|
|
cursor.execute("DESCRIBE scan1_orders")
|
|
table_structure = cursor.fetchall()
|
|
print(f"DEBUG: Table structure: {table_structure}")
|
|
except Exception as e:
|
|
print(f"DEBUG: Error getting table structure: {e}")
|
|
table_structure = []
|
|
|
|
# Test 3: Count total records
|
|
try:
|
|
cursor.execute("SELECT COUNT(*) FROM scan1_orders")
|
|
total_count = cursor.fetchone()[0]
|
|
print(f"DEBUG: Total records in table: {total_count}")
|
|
except Exception as e:
|
|
print(f"DEBUG: Error counting records: {e}")
|
|
total_count = -1
|
|
|
|
# Test 4: Get sample data (if any exists)
|
|
sample_data = []
|
|
try:
|
|
cursor.execute("SELECT * FROM scan1_orders LIMIT 5")
|
|
raw_data = cursor.fetchall()
|
|
print(f"DEBUG: Sample data (first 5 rows): {raw_data}")
|
|
|
|
# Convert data to JSON-serializable format using consistent formatting
|
|
sample_data = []
|
|
for row in raw_data:
|
|
converted_row = [format_cell_data(item) for item in row]
|
|
sample_data.append(converted_row)
|
|
except Exception as e:
|
|
print(f"DEBUG: Error getting sample data: {e}")
|
|
|
|
# Test 5: Get distinct dates (if any exist)
|
|
available_dates = []
|
|
try:
|
|
cursor.execute("SELECT DISTINCT date FROM scan1_orders ORDER BY date DESC LIMIT 10")
|
|
date_rows = cursor.fetchall()
|
|
available_dates = [str(row[0]) for row in date_rows]
|
|
print(f"DEBUG: Available dates: {available_dates}")
|
|
except Exception as e:
|
|
print(f"DEBUG: Error getting dates: {e}")
|
|
|
|
conn.close()
|
|
|
|
# Test 6: Add a current date sample record for testing daily reports
|
|
try:
|
|
from datetime import datetime
|
|
current_date = datetime.now().strftime('%Y-%m-%d')
|
|
current_time = datetime.now().strftime('%H:%M:%S')
|
|
|
|
# Check if we already have a record for today
|
|
cursor.execute("SELECT COUNT(*) FROM scan1_orders WHERE date = ?", (current_date,))
|
|
today_count = cursor.fetchone()[0]
|
|
|
|
if today_count == 0:
|
|
print(f"DEBUG: No records found for today ({current_date}), adding sample record...")
|
|
cursor.execute("""
|
|
INSERT INTO scan1_orders (operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", ('OP01', 'CP99999999-0001', 'OC01', 'OC02', 0, current_date, current_time, 1, 0))
|
|
conn.commit()
|
|
print(f"DEBUG: Added sample record for today ({current_date})")
|
|
message_addendum = " Added sample record for today to test daily reports."
|
|
else:
|
|
print(f"DEBUG: Found {today_count} records for today ({current_date})")
|
|
message_addendum = f" Found {today_count} records for today."
|
|
except Exception as e:
|
|
print(f"DEBUG: Error adding sample record: {e}")
|
|
message_addendum = " Could not add sample record for testing."
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"database_connection": "OK",
|
|
"table_exists": table_exists is not None,
|
|
"table_structure": [{"field": row[0], "type": row[1], "null": row[2]} for row in table_structure],
|
|
"total_records": total_count,
|
|
"sample_data": sample_data,
|
|
"available_dates": available_dates,
|
|
"message": f"Database test completed. Found {total_count} records in scan1_orders table.{message_addendum}"
|
|
})
|
|
|
|
except Exception as e:
|
|
print(f"DEBUG: Database test failed: {e}")
|
|
return jsonify({
|
|
"success": False,
|
|
"message": f"Database connection failed: {e}"
|
|
})
|
|
|
|
@bp.route('/etichete')
|
|
def etichete():
|
|
if 'role' not in session or session['role'] not in ['superadmin', 'etichete']:
|
|
flash('Access denied: Etichete users only.')
|
|
return redirect(url_for('main.dashboard'))
|
|
return render_template('main_page_etichete.html')
|
|
|
|
@bp.route('/upload_data')
|
|
def upload_data():
|
|
return render_template('upload_data.html')
|
|
|
|
@bp.route('/print_module')
|
|
def print_module():
|
|
return render_template('print_module.html')
|
|
|
|
@bp.route('/download_extension')
|
|
def download_extension():
|
|
"""Route for downloading the Chrome extension"""
|
|
return render_template('download_extension.html')
|
|
|
|
@bp.route('/extension_files/<path:filename>')
|
|
def extension_files(filename):
|
|
"""Serve Chrome extension files for download"""
|
|
import os
|
|
from flask import send_from_directory, current_app
|
|
|
|
extension_dir = os.path.join(os.path.dirname(current_app.root_path), 'chrome_extension')
|
|
return send_from_directory(extension_dir, filename)
|
|
|
|
@bp.route('/create_extension_package', methods=['POST'])
|
|
def create_extension_package():
|
|
"""Create and serve ZIP package of Chrome extension"""
|
|
import os
|
|
import zipfile
|
|
from flask import current_app, jsonify, send_file
|
|
import tempfile
|
|
|
|
try:
|
|
# Use correct path to chrome_extension directory (it's in py_app, not py_app/app)
|
|
extension_dir = os.path.join(os.path.dirname(current_app.root_path), 'chrome_extension')
|
|
print(f"Looking for extension files in: {extension_dir}")
|
|
|
|
if not os.path.exists(extension_dir):
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Extension directory not found: {extension_dir}'
|
|
}), 500
|
|
|
|
# List files in extension directory for debugging
|
|
all_files = []
|
|
for root, dirs, files in os.walk(extension_dir):
|
|
for file in files:
|
|
file_path = os.path.join(root, file)
|
|
all_files.append(file_path)
|
|
|
|
print(f"Found files: {all_files}")
|
|
|
|
# Create static directory if it doesn't exist
|
|
static_dir = os.path.join(current_app.root_path, 'static')
|
|
os.makedirs(static_dir, exist_ok=True)
|
|
|
|
zip_filename = 'quality_recticel_print_helper.zip'
|
|
zip_path = os.path.join(static_dir, zip_filename)
|
|
|
|
# Create ZIP file directly in static directory
|
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
|
files_added = 0
|
|
|
|
# Add all extension files to ZIP
|
|
for root, dirs, files in os.walk(extension_dir):
|
|
for file in files:
|
|
# Include all relevant files
|
|
if file.endswith(('.json', '.js', '.html', '.css', '.png', '.md', '.txt')):
|
|
file_path = os.path.join(root, file)
|
|
# Create relative path for archive
|
|
arcname = os.path.relpath(file_path, extension_dir)
|
|
|
|
print(f"Adding file: {file_path} as {arcname}")
|
|
zipf.write(file_path, arcname)
|
|
files_added += 1
|
|
|
|
# Add a README file with installation instructions
|
|
readme_content = """# Quality Label Printing Helper Chrome Extension
|
|
|
|
## Installation Instructions:
|
|
|
|
1. Extract this ZIP file to a folder on your computer
|
|
2. Open Chrome and go to: chrome://extensions/
|
|
3. Enable "Developer mode" in the top right
|
|
4. Click "Load unpacked" and select the extracted folder
|
|
5. The extension icon 🖨️ should appear in your toolbar
|
|
|
|
## Usage:
|
|
|
|
1. Go to the Print Module in the Quality Label Printing application
|
|
2. Select an order from the table
|
|
3. Click the "🖨️ Print Direct" button
|
|
4. The label will print automatically to your default printer
|
|
|
|
## Troubleshooting:
|
|
|
|
- Make sure your default printer is set up correctly
|
|
- Click the extension icon to test printer connection
|
|
- Check Chrome printer settings: chrome://settings/printing
|
|
|
|
For support, contact your system administrator.
|
|
"""
|
|
zipf.writestr('README.txt', readme_content)
|
|
files_added += 1
|
|
|
|
print(f"Total files added to ZIP: {files_added}")
|
|
|
|
# Verify ZIP was created and has content
|
|
if os.path.exists(zip_path):
|
|
zip_size = os.path.getsize(zip_path)
|
|
print(f"ZIP file created: {zip_path}, size: {zip_size} bytes")
|
|
|
|
if zip_size > 0:
|
|
return jsonify({
|
|
'success': True,
|
|
'download_url': f'/static/{zip_filename}',
|
|
'files_included': files_added,
|
|
'zip_size': zip_size
|
|
})
|
|
else:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'ZIP file was created but is empty'
|
|
}), 500
|
|
else:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to create ZIP file'
|
|
}), 500
|
|
|
|
except Exception as e:
|
|
print(f"Error creating extension package: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return jsonify({
|
|
'success': False,
|
|
'error': str(e)
|
|
}), 500
|
|
|
|
@bp.route('/create_service_package', methods=['POST'])
|
|
def create_service_package():
|
|
"""Create and serve the Enhanced Windows Print Service package with Error 1053 fixes"""
|
|
import os
|
|
import zipfile
|
|
from flask import current_app, jsonify, send_file
|
|
|
|
try:
|
|
# Path to the windows_print_service directory
|
|
service_dir = os.path.join(os.path.dirname(os.path.dirname(current_app.root_path)), 'windows_print_service')
|
|
print(f"Looking for service files in: {service_dir}")
|
|
|
|
# Check if the enhanced package already exists
|
|
enhanced_package_path = os.path.join(service_dir, 'QualityPrintService_COMPLETE_ZERO_DEPENDENCIES.zip')
|
|
|
|
if os.path.exists(enhanced_package_path):
|
|
# Serve the pre-built enhanced package with Error 1053 fixes
|
|
print(f"Serving pre-built enhanced package: {enhanced_package_path}")
|
|
|
|
# Copy to static directory for download
|
|
static_dir = os.path.join(current_app.root_path, 'static')
|
|
os.makedirs(static_dir, exist_ok=True)
|
|
|
|
zip_filename = 'quality_print_service_enhanced_with_error_1053_fixes.zip'
|
|
static_zip_path = os.path.join(static_dir, zip_filename)
|
|
|
|
# Copy the enhanced package to static directory
|
|
import shutil
|
|
shutil.copy2(enhanced_package_path, static_zip_path)
|
|
|
|
zip_size = os.path.getsize(static_zip_path)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'download_url': f'/static/{zip_filename}',
|
|
'package_type': 'Enhanced with Error 1053 fixes',
|
|
'features': [
|
|
'Embedded Python 3.11.9 (zero dependencies)',
|
|
'Multiple installation methods with automatic fallback',
|
|
'Windows Service Error 1053 comprehensive fixes',
|
|
'Enhanced service wrapper with SCM communication',
|
|
'Task Scheduler and Startup Script fallbacks',
|
|
'Diagnostic and troubleshooting tools',
|
|
'Complete Chrome extension integration'
|
|
],
|
|
'zip_size': zip_size,
|
|
'installation_methods': 4
|
|
})
|
|
|
|
# Fallback: create basic package if enhanced one not available
|
|
if not os.path.exists(service_dir):
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Windows service directory not found: {service_dir}'
|
|
}), 500
|
|
|
|
# Create static directory if it doesn't exist
|
|
static_dir = os.path.join(current_app.root_path, 'static')
|
|
os.makedirs(static_dir, exist_ok=True)
|
|
|
|
zip_filename = 'quality_print_service_complete_package.zip'
|
|
zip_path = os.path.join(static_dir, zip_filename)
|
|
|
|
# Create ZIP file with Complete Windows service package (all dependencies included)
|
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
|
files_added = 0
|
|
|
|
# Add all service files to ZIP (complete self-contained version)
|
|
for root, dirs, files in os.walk(service_dir):
|
|
for file in files:
|
|
# Include all files for complete package
|
|
if file.endswith(('.py', '.bat', '.md', '.txt', '.json', '.js', '.html', '.css', '.png')):
|
|
file_path = os.path.join(root, file)
|
|
# Create relative path for archive
|
|
arcname = os.path.relpath(file_path, service_dir)
|
|
|
|
print(f"Adding service file: {file_path} as {arcname}")
|
|
zipf.write(file_path, arcname)
|
|
files_added += 1
|
|
|
|
# Add main installation instructions for complete self-contained solution
|
|
installation_readme = """# Quality Label Printing Service - Complete Self-Contained Package
|
|
|
|
## 🎯 ZERO DEPENDENCIES INSTALLATION - WORKS ON ANY WINDOWS SYSTEM!
|
|
|
|
### What's Included:
|
|
✅ Complete Python-based print service (uses only standard library)
|
|
✅ Windows Service installer with automatic recovery
|
|
✅ Chrome extension for web integration
|
|
✅ Multiple printing method fallbacks
|
|
✅ Comprehensive logging and error handling
|
|
✅ No external dependencies required (works with any Python 3.7+)
|
|
|
|
### Prerequisites:
|
|
- Windows 10/11 or Windows Server 2016+
|
|
- Administrator privileges
|
|
- Google Chrome browser
|
|
- Python 3.7+ (system or portable - installer detects automatically)
|
|
|
|
### 🚀 Quick Installation (5 Minutes):
|
|
|
|
#### Step 1: Extract Package
|
|
Extract this ZIP to any temporary location (Desktop, Downloads, etc.)
|
|
No permanent installation directory needed - files are copied automatically
|
|
|
|
#### Step 2: Install Windows Service
|
|
Right-click `install_service_complete.bat` and select "Run as administrator"
|
|
|
|
The installer will:
|
|
- ✅ Check for Python (system or use included portable version)
|
|
- ✅ Create service directory: C:\\QualityPrintService\\
|
|
- ✅ Install Windows service with auto-restart
|
|
- ✅ Configure logging: %USERPROFILE%\\PrintService\\logs\\
|
|
- ✅ Start service on port 8765
|
|
|
|
#### Step 3: Install Chrome Extension
|
|
- Open Chrome → chrome://extensions/
|
|
- Enable "Developer mode" (top right toggle)
|
|
- Click "Load unpacked" → Select the 'chrome_extension' folder
|
|
|
|
#### Step 4: Verify Installation
|
|
Visit: http://localhost:8765/health
|
|
Expected: {"status": "healthy", "service": "Windows Print Service"}
|
|
|
|
### 🔧 Files Included:
|
|
|
|
#### Core Service:
|
|
- `print_service_complete.py` - Complete service (zero external dependencies)
|
|
- `install_service_complete.bat` - Full installer (detects Python automatically)
|
|
- `uninstall_service_complete.bat` - Complete removal script
|
|
- `requirements_complete.txt` - Dependencies list (all standard library)
|
|
|
|
#### Chrome Integration:
|
|
- `chrome_extension/` - Complete Chrome extension
|
|
- `chrome_extension/manifest.json` - Extension configuration
|
|
- `chrome_extension/background.js` - Service communication
|
|
|
|
#### Documentation:
|
|
- `README_COMPLETE.md` - Comprehensive documentation
|
|
- `INSTALLATION_COMPLETE.md` - Detailed installation guide
|
|
- `PORTABLE_PYTHON_INSTRUCTIONS.txt` - Python distribution options
|
|
|
|
#### Build Tools:
|
|
- `build_package.py` - Package builder
|
|
- `build_executable.bat` - Creates standalone .exe (optional)
|
|
|
|
### 🛠️ Technical Features:
|
|
|
|
#### Printing Methods (Automatic Fallback):
|
|
1. Adobe Reader command line
|
|
2. SumatraPDF automation
|
|
3. PowerShell printing
|
|
4. Microsoft Edge integration
|
|
5. Windows system default
|
|
|
|
#### Service Architecture:
|
|
```
|
|
Quality Web App → Chrome Extension → Windows Service → Physical Printer
|
|
(localhost only) (port 8765) (any printer)
|
|
```
|
|
|
|
#### Service Endpoints:
|
|
- `GET /health` - Service health check
|
|
- `GET /printers` - List available printers
|
|
- `GET /status` - Service statistics
|
|
- `POST /print_pdf` - Print PDF (page-by-page supported)
|
|
|
|
### 🔒 Security & Performance:
|
|
- Runs on localhost only (127.0.0.1:8765)
|
|
- Memory usage: ~15-30 MB
|
|
- CPU usage: <1% (idle)
|
|
- Automatic temp file cleanup
|
|
- Secure PDF handling
|
|
|
|
### 🚨 Troubleshooting:
|
|
|
|
#### Service Won't Start:
|
|
```cmd
|
|
# Check service status
|
|
sc query QualityPrintService
|
|
|
|
# Check port availability
|
|
netstat -an | find "8765"
|
|
|
|
# View service logs
|
|
type "%USERPROFILE%\\PrintService\\logs\\print_service_*.log"
|
|
```
|
|
|
|
#### Python Issues:
|
|
The installer automatically detects Python or uses portable version.
|
|
If you see Python errors, ensure Python 3.7+ is installed.
|
|
|
|
#### Chrome Extension Issues:
|
|
1. Reload extension in chrome://extensions/
|
|
2. Check "Developer mode" is enabled
|
|
3. Verify service responds at http://localhost:8765/health
|
|
|
|
### 📞 Support:
|
|
1. Check README_COMPLETE.md for detailed documentation
|
|
2. Review INSTALLATION_COMPLETE.md for step-by-step guide
|
|
3. Check service logs for specific error messages
|
|
|
|
### 🎯 Why This Package is Better:
|
|
✅ Zero external dependencies (pure Python standard library)
|
|
✅ Works on any Windows system with Python
|
|
✅ Automatic service recovery and restart
|
|
✅ Multiple printing method fallbacks
|
|
✅ Complete documentation and support
|
|
✅ Chrome extension included
|
|
✅ Professional logging and error handling
|
|
|
|
Installation Time: ~5 minutes
|
|
Maintenance Required: Zero (auto-starts with Windows)
|
|
|
|
Ready to use immediately after installation!"""
|
|
|
|
zipf.writestr('INSTALLATION_README.txt', installation_readme)
|
|
files_added += 1
|
|
|
|
print(f"Total service files added to ZIP: {files_added}")
|
|
|
|
# Verify ZIP was created
|
|
if os.path.exists(zip_path):
|
|
zip_size = os.path.getsize(zip_path)
|
|
print(f"Complete service ZIP file created: {zip_path}, size: {zip_size} bytes")
|
|
|
|
if zip_size > 0:
|
|
return jsonify({
|
|
'success': True,
|
|
'download_url': f'/static/{zip_filename}',
|
|
'files_included': files_added,
|
|
'zip_size': zip_size,
|
|
'package_type': 'Complete Self-Contained Package',
|
|
'dependencies': 'All included (Python standard library only)'
|
|
})
|
|
else:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'ZIP file was created but is empty'
|
|
}), 500
|
|
else:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to create service ZIP file'
|
|
}), 500
|
|
|
|
except Exception as e:
|
|
print(f"Error creating complete service package: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return jsonify({
|
|
'success': False,
|
|
'error': str(e)
|
|
}), 500
|
|
|
|
@bp.route('/create_zero_dependency_service_package', methods=['POST'])
|
|
def create_zero_dependency_service_package():
|
|
"""Create and serve ZIP package with embedded Python - ZERO external dependencies"""
|
|
import os
|
|
import tempfile
|
|
import zipfile
|
|
import urllib.request
|
|
import subprocess
|
|
from flask import current_app, jsonify
|
|
|
|
try:
|
|
print("🚀 Creating Zero-Dependency Service Package...")
|
|
|
|
# Path to the windows_print_service directory
|
|
service_dir = os.path.join(os.path.dirname(os.path.dirname(current_app.root_path)), 'windows_print_service')
|
|
print(f"Looking for service files in: {service_dir}")
|
|
|
|
if not os.path.exists(service_dir):
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Windows service directory not found: {service_dir}'
|
|
}), 500
|
|
|
|
# Create static directory if it doesn't exist
|
|
static_dir = os.path.join(current_app.root_path, 'static')
|
|
os.makedirs(static_dir, exist_ok=True)
|
|
|
|
zip_filename = 'QualityPrintService_ZERO_DEPENDENCIES.zip'
|
|
zip_path = os.path.join(static_dir, zip_filename)
|
|
|
|
# Python embeddable version details
|
|
PYTHON_VERSION = "3.11.9"
|
|
PYTHON_DOWNLOAD_URL = f"https://www.python.org/ftp/python/{PYTHON_VERSION}/python-{PYTHON_VERSION}-embed-amd64.zip"
|
|
PYTHON_DIR_NAME = "python_embedded"
|
|
|
|
print(f"📥 Will download Python {PYTHON_VERSION} embedded...")
|
|
|
|
# Create the complete zero-dependency package
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
print(f"🔧 Using temporary directory: {temp_dir}")
|
|
|
|
# Download Python embedded
|
|
python_zip_path = os.path.join(temp_dir, "python-embed.zip")
|
|
python_extract_dir = os.path.join(temp_dir, PYTHON_DIR_NAME)
|
|
|
|
try:
|
|
print(f"📥 Downloading Python embedded from: {PYTHON_DOWNLOAD_URL}")
|
|
urllib.request.urlretrieve(PYTHON_DOWNLOAD_URL, python_zip_path)
|
|
print(f"✅ Downloaded Python to: {python_zip_path}")
|
|
|
|
# Extract Python
|
|
os.makedirs(python_extract_dir, exist_ok=True)
|
|
with zipfile.ZipFile(python_zip_path, 'r') as zip_ref:
|
|
zip_ref.extractall(python_extract_dir)
|
|
print(f"✅ Extracted Python to: {python_extract_dir}")
|
|
|
|
# Enable site-packages by modifying pth file
|
|
pth_files = [f for f in os.listdir(python_extract_dir) if f.endswith('._pth')]
|
|
if pth_files:
|
|
pth_file = os.path.join(python_extract_dir, pth_files[0])
|
|
with open(pth_file, 'a') as f:
|
|
f.write('\nimport site\n')
|
|
print("✅ Enabled site-packages in embedded Python")
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Failed to download Python embedded: {str(e)}'
|
|
}), 500
|
|
|
|
# Create the complete package ZIP
|
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
|
files_added = 0
|
|
|
|
# Add Python embedded distribution
|
|
print("📁 Adding Python embedded distribution...")
|
|
for root, dirs, files in os.walk(python_extract_dir):
|
|
for file in files:
|
|
file_path = os.path.join(root, file)
|
|
arcname = os.path.join(PYTHON_DIR_NAME, os.path.relpath(file_path, python_extract_dir))
|
|
zipf.write(file_path, arcname)
|
|
files_added += 1
|
|
if files_added % 10 == 0: # Progress indicator
|
|
print(f" 📄 Added {files_added} Python files...")
|
|
|
|
# Add service files
|
|
print("📁 Adding service files...")
|
|
service_files = [
|
|
"print_service_complete.py",
|
|
"service_wrapper.py",
|
|
"install_service_complete.bat",
|
|
"uninstall_service_complete.bat",
|
|
"test_service.bat",
|
|
"TROUBLESHOOTING_1053.md",
|
|
"INSTALLATION_COMPLETE.md",
|
|
"PACKAGE_SUMMARY.md",
|
|
"README_COMPLETE.md"
|
|
]
|
|
|
|
for file_name in service_files:
|
|
file_path = os.path.join(service_dir, file_name)
|
|
if os.path.exists(file_path):
|
|
zipf.write(file_path, file_name)
|
|
files_added += 1
|
|
print(f" ✅ Added: {file_name}")
|
|
|
|
# Add Chrome extension
|
|
chrome_ext_dir = os.path.join(service_dir, "chrome_extension")
|
|
if os.path.exists(chrome_ext_dir):
|
|
print("📁 Adding Chrome extension...")
|
|
for root, dirs, files in os.walk(chrome_ext_dir):
|
|
for file in files:
|
|
if not file.startswith('.'):
|
|
file_path = os.path.join(root, file)
|
|
arcname = os.path.relpath(file_path, service_dir)
|
|
zipf.write(file_path, arcname)
|
|
files_added += 1
|
|
|
|
# Create updated zero-dependency installer
|
|
installer_content = f'''@echo off
|
|
setlocal enabledelayedexpansion
|
|
|
|
echo ========================================
|
|
echo Quality Label Print Service Installer
|
|
echo ZERO DEPENDENCIES - COMPLETE PACKAGE
|
|
echo ========================================
|
|
echo.
|
|
echo This package includes EVERYTHING needed:
|
|
echo ✅ Embedded Python {PYTHON_VERSION}
|
|
echo ✅ Complete Print Service
|
|
echo ✅ Windows Service Installer
|
|
echo ✅ Chrome Extension
|
|
echo ✅ Auto-recovery System
|
|
echo.
|
|
|
|
REM Check for administrator privileges
|
|
net session >nul 2>&1
|
|
if %errorLevel% neq 0 (
|
|
echo ❌ ERROR: Administrator privileges required
|
|
echo Please right-click this file and select "Run as administrator"
|
|
echo.
|
|
pause
|
|
exit /b 1
|
|
)
|
|
|
|
echo [1/7] Administrator privileges confirmed ✅
|
|
echo.
|
|
|
|
REM Set variables
|
|
set CURRENT_DIR=%~dp0
|
|
set SERVICE_NAME=QualityPrintService
|
|
set SERVICE_DISPLAY_NAME=Quality Label Print Service
|
|
set INSTALL_DIR=C:\\QualityPrintService
|
|
set PYTHON_EXE=%INSTALL_DIR%\\{PYTHON_DIR_NAME}\\python.exe
|
|
set PYTHON_SCRIPT=%INSTALL_DIR%\\print_service_complete.py
|
|
set LOG_DIR=%USERPROFILE%\\PrintService\\logs
|
|
|
|
echo [2/7] Using embedded Python distribution ✅
|
|
echo Python location: %PYTHON_EXE%
|
|
echo.
|
|
|
|
REM Stop existing service if running
|
|
echo [3/7] Stopping existing service (if any)...
|
|
sc query "%SERVICE_NAME%" >nul 2>&1
|
|
if %errorLevel% equ 0 (
|
|
echo Stopping existing service...
|
|
net stop "%SERVICE_NAME%" >nul 2>&1
|
|
sc delete "%SERVICE_NAME%" >nul 2>&1
|
|
timeout /t 2 >nul
|
|
)
|
|
echo Service cleanup completed ✅
|
|
echo.
|
|
|
|
REM Create installation directory
|
|
echo [4/7] Creating installation directory...
|
|
if exist "%INSTALL_DIR%" (
|
|
echo Removing old installation...
|
|
rmdir /s /q "%INSTALL_DIR%" >nul 2>&1
|
|
)
|
|
mkdir "%INSTALL_DIR%" >nul 2>&1
|
|
echo Installation directory: %INSTALL_DIR% ✅
|
|
echo.
|
|
|
|
REM Copy all files to installation directory
|
|
echo [5/7] Installing service files...
|
|
echo Copying embedded Python...
|
|
xcopy "%CURRENT_DIR%{PYTHON_DIR_NAME}" "%INSTALL_DIR%\\{PYTHON_DIR_NAME}\\" /E /I /Y >nul
|
|
echo Copying service script...
|
|
copy "%CURRENT_DIR%print_service_complete.py" "%INSTALL_DIR%\\" >nul
|
|
echo Service files installed ✅
|
|
echo.
|
|
|
|
REM Create log directory
|
|
echo [6/7] Setting up logging...
|
|
mkdir "%LOG_DIR%" >nul 2>&1
|
|
echo Log directory: %LOG_DIR% ✅
|
|
echo.
|
|
|
|
REM Install and start Windows service
|
|
echo [7/7] Installing Windows service...
|
|
sc create "%SERVICE_NAME%" binPath= "\\"%PYTHON_EXE%\\" \\"%PYTHON_SCRIPT%\\"" DisplayName= "%SERVICE_DISPLAY_NAME%" start= auto
|
|
if %errorLevel% neq 0 (
|
|
echo ❌ Failed to create Windows service
|
|
pause
|
|
exit /b 1
|
|
)
|
|
|
|
REM Configure service recovery
|
|
sc failure "%SERVICE_NAME%" reset= 60 actions= restart/10000/restart/30000/restart/60000
|
|
sc config "%SERVICE_NAME%" depend= ""
|
|
|
|
REM Start the service
|
|
echo Starting service...
|
|
net start "%SERVICE_NAME%"
|
|
if %errorLevel% neq 0 (
|
|
echo ⚠️ Service created but failed to start immediately
|
|
echo This is normal - the service will start automatically on reboot
|
|
) else (
|
|
echo Service started successfully ✅
|
|
)
|
|
|
|
echo.
|
|
echo ========================================
|
|
echo INSTALLATION COMPLETED! 🎉
|
|
echo ========================================
|
|
echo.
|
|
echo ✅ Python embedded distribution installed
|
|
echo ✅ Windows Print Service installed and configured
|
|
echo ✅ Auto-recovery enabled (restarts on failure)
|
|
echo ✅ Service will start automatically on boot
|
|
echo.
|
|
echo 🌐 Service URL: http://localhost:8765
|
|
echo 📊 Health check: http://localhost:8765/health
|
|
echo 📁 Logs location: %LOG_DIR%
|
|
echo.
|
|
echo 📋 NEXT STEPS:
|
|
echo 1. Install Chrome extension from 'chrome_extension' folder
|
|
echo 2. Test service: http://localhost:8765/health
|
|
echo 3. Configure web application to use service
|
|
echo.
|
|
echo Press any key to test service connection...
|
|
pause >nul
|
|
|
|
REM Test service
|
|
echo Testing service connection...
|
|
timeout /t 3 >nul
|
|
curl -s http://localhost:8765/health >nul 2>&1
|
|
if %errorLevel% equ 0 (
|
|
echo ✅ Service is responding correctly!
|
|
) else (
|
|
echo ⚠️ Service test failed - may need a moment to start
|
|
echo Check logs in: %LOG_DIR%
|
|
)
|
|
|
|
echo.
|
|
echo Installation complete! Service is ready to use.
|
|
pause
|
|
'''
|
|
|
|
zipf.writestr("INSTALL_ZERO_DEPENDENCIES.bat", installer_content)
|
|
files_added += 1
|
|
|
|
# Add comprehensive README
|
|
readme_content = f'''# Quality Print Service - ZERO DEPENDENCIES Package
|
|
|
|
## 🎯 COMPLETE SELF-CONTAINED INSTALLATION
|
|
|
|
This package contains EVERYTHING needed to run the Quality Print Service:
|
|
|
|
### ✅ What's Included:
|
|
- **Embedded Python {PYTHON_VERSION}** - No system Python required!
|
|
- **Complete Print Service** - Zero external dependencies
|
|
- **Windows Service Installer** - Automatic installation and recovery
|
|
- **Chrome Extension** - Web browser integration
|
|
- **Comprehensive Documentation** - Installation and usage guides
|
|
|
|
### 🚀 INSTALLATION (5 Minutes):
|
|
|
|
#### Requirements:
|
|
- Windows 10/11 or Windows Server 2016+
|
|
- Administrator privileges (for service installation)
|
|
- Google Chrome browser
|
|
|
|
#### Step 1: Extract Package
|
|
- Extract this ZIP file to any location (Desktop, Downloads, etc.)
|
|
- No permanent location needed - installer copies files automatically
|
|
|
|
#### Step 2: Install Service
|
|
- Right-click `INSTALL_ZERO_DEPENDENCIES.bat`
|
|
- Select "Run as administrator"
|
|
- Follow the installation prompts
|
|
|
|
#### Step 3: Install Chrome Extension
|
|
- Open Chrome browser
|
|
- Navigate to `chrome://extensions/`
|
|
- Enable "Developer mode" (toggle in top-right)
|
|
- Click "Load unpacked"
|
|
- Select the `chrome_extension` folder from extracted package
|
|
|
|
#### Step 4: Test Installation
|
|
- Visit: http://localhost:8765/health
|
|
- Should return: {{"status": "healthy", "service": "Windows Print Service"}}
|
|
|
|
### 🔧 Technical Details:
|
|
|
|
**Service Architecture:**
|
|
```
|
|
Quality Web App → Chrome Extension → Windows Service → Printer
|
|
```
|
|
|
|
**Printing Methods (automatic fallback):**
|
|
1. Adobe Reader (silent printing)
|
|
2. SumatraPDF (if Adobe unavailable)
|
|
3. PowerShell Print-Document
|
|
4. Microsoft Edge (fallback)
|
|
5. Windows default printer
|
|
|
|
**Service Management:**
|
|
- Automatic startup on Windows boot
|
|
- Auto-recovery on failure (3 restart attempts)
|
|
- Comprehensive logging in: `%USERPROFILE%\\PrintService\\logs\\`
|
|
|
|
**Network Configuration:**
|
|
- Service runs on: http://localhost:8765
|
|
- Chrome extension communicates via this local endpoint
|
|
- No external network access required
|
|
|
|
### 📦 Package Size: ~15MB (includes full Python runtime)
|
|
### ⏱️ Installation Time: ~5 minutes
|
|
### 🔧 Maintenance Required: Zero (auto-starts with Windows)
|
|
|
|
Ready to use immediately after installation!
|
|
'''
|
|
|
|
zipf.writestr("README_ZERO_DEPENDENCIES.md", readme_content)
|
|
files_added += 1
|
|
|
|
# Verify ZIP was created successfully
|
|
if os.path.exists(zip_path):
|
|
zip_size = os.path.getsize(zip_path)
|
|
print(f"✅ Zero-dependency package created: {zip_path}")
|
|
print(f"📄 Total files: {files_added}")
|
|
print(f"📏 Size: {zip_size / 1024 / 1024:.1f} MB")
|
|
|
|
if zip_size > 0:
|
|
return jsonify({
|
|
'success': True,
|
|
'download_url': f'/static/{zip_filename}',
|
|
'files_included': files_added,
|
|
'zip_size': zip_size,
|
|
'package_type': 'Zero Dependencies - Complete Package',
|
|
'python_version': PYTHON_VERSION,
|
|
'dependencies': 'None - Everything included!',
|
|
'estimated_size_mb': round(zip_size / 1024 / 1024, 1)
|
|
})
|
|
else:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'ZIP file was created but is empty'
|
|
}), 500
|
|
else:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to create zero-dependency ZIP file'
|
|
}), 500
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error creating zero-dependency service package: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return jsonify({
|
|
'success': False,
|
|
'error': str(e)
|
|
}), 500
|
|
|
|
@bp.route('/test_extension_files')
|
|
def test_extension_files():
|
|
"""Test route to check extension files"""
|
|
import os
|
|
from flask import current_app, jsonify
|
|
|
|
extension_dir = os.path.join(os.path.dirname(current_app.root_path), 'chrome_extension')
|
|
|
|
result = {
|
|
'extension_dir': extension_dir,
|
|
'dir_exists': os.path.exists(extension_dir),
|
|
'files': []
|
|
}
|
|
|
|
if os.path.exists(extension_dir):
|
|
for root, dirs, files in os.walk(extension_dir):
|
|
for file in files:
|
|
file_path = os.path.join(root, file)
|
|
file_size = os.path.getsize(file_path)
|
|
result['files'].append({
|
|
'path': file_path,
|
|
'relative_path': os.path.relpath(file_path, extension_dir),
|
|
'size': file_size
|
|
})
|
|
|
|
return jsonify(result)
|
|
|
|
@bp.route('/label_templates')
|
|
def label_templates():
|
|
return render_template('label_templates.html')
|
|
|
|
@bp.route('/create_template')
|
|
def create_template():
|
|
return render_template('create_template.html')
|
|
|
|
@bp.route('/get_database_tables', methods=['GET'])
|
|
def get_database_tables():
|
|
"""Get list of database tables for template creation"""
|
|
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse_manager', 'etichete']:
|
|
return jsonify({'error': 'Access denied'}), 403
|
|
|
|
try:
|
|
# Get database connection using the same method as other functions
|
|
settings_file = current_app.instance_path + '/external_server.conf'
|
|
settings = {}
|
|
with open(settings_file, 'r') as f:
|
|
for line in f:
|
|
key, value = line.strip().split('=', 1)
|
|
settings[key] = value
|
|
|
|
connection = mariadb.connect(
|
|
user=settings['username'],
|
|
password=settings['password'],
|
|
host=settings['server_domain'],
|
|
port=int(settings['port']),
|
|
database=settings['database_name']
|
|
)
|
|
|
|
cursor = connection.cursor()
|
|
|
|
# Get all tables in the database
|
|
cursor.execute("SHOW TABLES")
|
|
all_tables = [table[0] for table in cursor.fetchall()]
|
|
|
|
# Filter to show relevant tables (prioritize order_for_labels)
|
|
relevant_tables = []
|
|
if 'order_for_labels' in all_tables:
|
|
relevant_tables.append('order_for_labels')
|
|
|
|
# Add other potentially relevant tables
|
|
for table in all_tables:
|
|
if table not in relevant_tables and any(keyword in table.lower() for keyword in ['order', 'label', 'product', 'customer']):
|
|
relevant_tables.append(table)
|
|
|
|
connection.close()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'tables': relevant_tables,
|
|
'recommended': 'order_for_labels'
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': f'Database error: {str(e)}'}), 500
|
|
|
|
@bp.route('/get_table_columns/<table_name>', methods=['GET'])
|
|
def get_table_columns(table_name):
|
|
"""Get column names and descriptions for a specific table"""
|
|
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse_manager', 'etichete']:
|
|
return jsonify({'error': 'Access denied'}), 403
|
|
|
|
try:
|
|
# Get database connection
|
|
settings_file = current_app.instance_path + '/external_server.conf'
|
|
settings = {}
|
|
with open(settings_file, 'r') as f:
|
|
for line in f:
|
|
key, value = line.strip().split('=', 1)
|
|
settings[key] = value
|
|
|
|
connection = mariadb.connect(
|
|
user=settings['username'],
|
|
password=settings['password'],
|
|
host=settings['server_domain'],
|
|
port=int(settings['port']),
|
|
database=settings['database_name']
|
|
)
|
|
|
|
cursor = connection.cursor()
|
|
|
|
# Verify table exists
|
|
cursor.execute("SHOW TABLES LIKE %s", (table_name,))
|
|
if not cursor.fetchone():
|
|
return jsonify({'error': f'Table {table_name} not found'}), 404
|
|
|
|
# Get column information
|
|
cursor.execute(f"DESCRIBE {table_name}")
|
|
columns_info = cursor.fetchall()
|
|
|
|
columns = []
|
|
for col_info in columns_info:
|
|
column_name = col_info[0]
|
|
column_type = col_info[1]
|
|
is_nullable = col_info[2] == 'YES'
|
|
column_default = col_info[4]
|
|
|
|
# Get column comment if available
|
|
cursor.execute(f"""
|
|
SELECT COLUMN_COMMENT
|
|
FROM INFORMATION_SCHEMA.COLUMNS
|
|
WHERE TABLE_NAME = %s AND COLUMN_NAME = %s AND TABLE_SCHEMA = DATABASE()
|
|
""", (table_name, column_name))
|
|
|
|
comment_result = cursor.fetchone()
|
|
column_comment = comment_result[0] if comment_result and comment_result[0] else ''
|
|
|
|
# Create user-friendly description
|
|
description = column_comment or column_name.replace('_', ' ').title()
|
|
|
|
columns.append({
|
|
'name': column_name,
|
|
'type': column_type,
|
|
'nullable': is_nullable,
|
|
'default': column_default,
|
|
'description': description,
|
|
'field_id': f"db_{column_name}" # Unique ID for template fields
|
|
})
|
|
|
|
connection.close()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'table': table_name,
|
|
'columns': columns
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': f'Database error: {str(e)}'}), 500
|
|
|
|
@bp.route('/edit_template/<int:template_id>')
|
|
def edit_template(template_id):
|
|
# Logic for editing a template will go here
|
|
return f"Edit template with ID {template_id}"
|
|
|
|
@bp.route('/delete_template/<int:template_id>', methods=['POST'])
|
|
def delete_template(template_id):
|
|
# Logic for deleting a template will go here
|
|
return f"Delete template with ID {template_id}"
|
|
|
|
@bp.route('/get_tables')
|
|
def get_tables():
|
|
# Replace with logic to fetch tables from your database
|
|
tables = ['table1', 'table2', 'table3']
|
|
return jsonify({'tables': tables})
|
|
|
|
@bp.route('/get_columns')
|
|
def get_columns():
|
|
table = request.args.get('table')
|
|
# Replace with logic to fetch columns for the selected table
|
|
columns = ['column1', 'column2', 'column3'] if table else []
|
|
return jsonify({'columns': columns})
|
|
|
|
@bp.route('/save_template', methods=['POST'])
|
|
def save_template():
|
|
data = request.get_json()
|
|
# Replace with logic to save the template to the database
|
|
print(f"Saving template: {data}")
|
|
return jsonify({'message': 'Template saved successfully!'})
|
|
|
|
@bp.route('/generate_pdf', methods=['POST'])
|
|
def generate_pdf():
|
|
data = request.get_json()
|
|
width = data.get('width', 100) # Default width in mm
|
|
height = data.get('height', 50) # Default height in mm
|
|
columns = data.get('columns', [])
|
|
|
|
# Convert dimensions from mm to points (1 mm = 2.83465 points)
|
|
width_points = width * 2.83465
|
|
height_points = height * 2.83465
|
|
|
|
# Ensure the /static/label_templates folder exists
|
|
label_templates_folder = os.path.join(current_app.root_path, 'static', 'label_templates')
|
|
os.makedirs(label_templates_folder, exist_ok=True)
|
|
|
|
# Define the path for the PDF file
|
|
pdf_file_path = os.path.join(label_templates_folder, 'label_template.pdf')
|
|
|
|
# Create a PDF file
|
|
c = canvas.Canvas(pdf_file_path, pagesize=(width_points, height_points))
|
|
|
|
# Add content to the PDF
|
|
c.drawString(10, height_points - 20, "Label Template")
|
|
y_position = height_points - 40
|
|
for column in columns:
|
|
c.drawString(10, y_position, f"Column: {column}")
|
|
y_position -= 20
|
|
|
|
# Save the PDF
|
|
c.save()
|
|
|
|
return jsonify({'message': 'PDF generated successfully!', 'pdf_path': f'/static/label_templates/label_template.pdf'})
|
|
|
|
# Order Labels Upload Module Routes
|
|
@bp.route('/upload_orders', methods=['GET', 'POST'])
|
|
def upload_orders():
|
|
"""Route for uploading orders CSV files for label generation"""
|
|
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse', 'warehouse_manager']:
|
|
flash('Access denied: Warehouse management permissions required.')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
from app.order_labels import upload_orders_handler
|
|
return upload_orders_handler()
|
|
|
|
@bp.route('/view_orders')
|
|
def view_orders():
|
|
"""Route for viewing uploaded orders"""
|
|
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse', 'warehouse_manager', 'warehouse_worker']:
|
|
flash('Access denied: Warehouse access required.')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
from app.order_labels import get_orders_from_database
|
|
orders = get_orders_from_database(200) # Get last 200 orders
|
|
return render_template('view_orders.html', orders=orders)
|
|
|
|
@bp.route('/get_unprinted_orders', methods=['GET'])
|
|
def get_unprinted_orders():
|
|
"""Get all rows from order_for_labels where printed != 1"""
|
|
print(f"DEBUG: get_unprinted_orders called. Session role: {session.get('role')}")
|
|
|
|
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse_manager', 'etichete']:
|
|
print(f"DEBUG: Access denied for role: {session.get('role')}")
|
|
return jsonify({'error': 'Access denied. Required roles: superadmin, warehouse_manager, etichete'}), 403
|
|
|
|
try:
|
|
print("DEBUG: Calling get_unprinted_orders_data()")
|
|
data = get_unprinted_orders_data()
|
|
print(f"DEBUG: Retrieved {len(data)} orders")
|
|
return jsonify(data)
|
|
|
|
except Exception as e:
|
|
print(f"DEBUG: Error in get_unprinted_orders: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@bp.route('/generate_labels_pdf/<int:order_id>', methods=['POST'])
|
|
@bp.route('/generate_labels_pdf/<int:order_id>/<paper_saving_mode>', methods=['POST'])
|
|
def generate_labels_pdf(order_id, paper_saving_mode='true'):
|
|
"""Generate PDF labels for a specific order"""
|
|
print(f"DEBUG: generate_labels_pdf called for order_id: {order_id}")
|
|
|
|
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse_manager', 'etichete']:
|
|
print(f"DEBUG: Access denied for role: {session.get('role')}")
|
|
return jsonify({'error': 'Access denied. Required roles: superadmin, warehouse_manager, etichete'}), 403
|
|
|
|
try:
|
|
from .pdf_generator import generate_order_labels_pdf, update_order_printed_status
|
|
from .print_module import get_db_connection
|
|
from flask import make_response
|
|
|
|
# Get order data from database
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT id, comanda_productie, cod_articol, descr_com_prod, cantitate,
|
|
data_livrare, dimensiune, com_achiz_client, nr_linie_com_client, customer_name,
|
|
customer_article_number, open_for_order, line_number,
|
|
printed_labels, created_at, updated_at
|
|
FROM order_for_labels
|
|
WHERE id = %s
|
|
""", (order_id,))
|
|
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if not row:
|
|
return jsonify({'error': 'Order not found'}), 404
|
|
|
|
# Create order data dictionary
|
|
order_data = {
|
|
'id': row[0],
|
|
'comanda_productie': row[1],
|
|
'cod_articol': row[2],
|
|
'descr_com_prod': row[3],
|
|
'cantitate': row[4],
|
|
'data_livrare': row[5],
|
|
'dimensiune': row[6],
|
|
'com_achiz_client': row[7],
|
|
'nr_linie_com_client': row[8],
|
|
'customer_name': row[9],
|
|
'customer_article_number': row[10],
|
|
'open_for_order': row[11],
|
|
'line_number': row[12],
|
|
'printed_labels': row[13] if row[13] is not None else 0,
|
|
'created_at': row[14],
|
|
'updated_at': row[15]
|
|
}
|
|
|
|
print(f"DEBUG: Generating PDF for order {order_id} with quantity {order_data['cantitate']}")
|
|
|
|
# Check if paper-saving mode is enabled (default: true)
|
|
use_paper_saving = paper_saving_mode.lower() == 'true'
|
|
print(f"DEBUG: Paper-saving mode: {use_paper_saving}")
|
|
|
|
# Generate PDF with paper-saving option
|
|
pdf_buffer = generate_order_labels_pdf(order_id, order_data, paper_saving_mode=use_paper_saving)
|
|
|
|
# Update printed status in database
|
|
update_success = update_order_printed_status(order_id)
|
|
if not update_success:
|
|
print(f"Warning: Could not update printed status for order {order_id}")
|
|
|
|
# Create response with PDF
|
|
response = make_response(pdf_buffer.getvalue())
|
|
response.headers['Content-Type'] = 'application/pdf'
|
|
response.headers['Content-Disposition'] = f'inline; filename="labels_order_{order_id}_{order_data["comanda_productie"]}.pdf"'
|
|
|
|
print(f"DEBUG: PDF generated successfully for order {order_id}")
|
|
return response
|
|
|
|
except Exception as e:
|
|
print(f"DEBUG: Error generating PDF for order {order_id}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@bp.route('/update_printed_status/<int:order_id>', methods=['POST'])
|
|
def update_printed_status(order_id):
|
|
"""Update printed status for direct printing (without PDF generation)"""
|
|
print(f"DEBUG: update_printed_status called for order_id: {order_id}")
|
|
|
|
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse_manager', 'etichete']:
|
|
print(f"DEBUG: Access denied for role: {session.get('role')}")
|
|
return jsonify({'error': 'Access denied. Required roles: superadmin, warehouse_manager, etichete'}), 403
|
|
|
|
try:
|
|
from .pdf_generator import update_order_printed_status
|
|
|
|
# Update printed status in database
|
|
update_success = update_order_printed_status(order_id)
|
|
|
|
if update_success:
|
|
print(f"DEBUG: Successfully updated printed status for order {order_id}")
|
|
return jsonify({'success': True, 'message': f'Order {order_id} marked as printed'})
|
|
else:
|
|
print(f"WARNING: Could not update printed status for order {order_id}")
|
|
return jsonify({'error': 'Could not update printed status'}), 500
|
|
|
|
except Exception as e:
|
|
print(f"DEBUG: Error in update_printed_status: {e}")
|
|
return jsonify({'error': 'Internal server error'}), 500
|
|
|
|
|
|
@bp.route('/download_print_service')
|
|
def download_print_service():
|
|
"""Download the direct print service package"""
|
|
try:
|
|
from flask import send_from_directory, current_app
|
|
import os
|
|
|
|
# Check if complete package is requested
|
|
package_type = request.args.get('package', 'basic')
|
|
|
|
if package_type == 'complete':
|
|
# Serve the complete package
|
|
filename = 'QualityPrintService_Complete.zip'
|
|
else:
|
|
# Serve the basic package (legacy)
|
|
filename = 'RecticelPrintService.zip'
|
|
|
|
# Path to the print service files
|
|
service_path = os.path.join(current_app.root_path, 'static', 'downloads')
|
|
|
|
# Create the directory if it doesn't exist
|
|
os.makedirs(service_path, exist_ok=True)
|
|
|
|
# Check if the zip file exists
|
|
full_path = os.path.join(service_path, filename)
|
|
|
|
if not os.path.exists(full_path):
|
|
# If the zip doesn't exist, return information about creating it
|
|
return jsonify({
|
|
'error': f'Print service package ({filename}) not found',
|
|
'message': f'The {package_type} print service package is not available',
|
|
'suggestion': 'Please contact the administrator or use the basic package'
|
|
}), 404
|
|
|
|
return send_from_directory(service_path, filename, as_attachment=True)
|
|
|
|
except Exception as e:
|
|
print(f"DEBUG: Error downloading print service: {e}")
|
|
return jsonify({'error': 'Failed to download print service'}), 500
|
|
|
|
@bp.route('/download/desktop-app')
|
|
def download_desktop_app():
|
|
"""Download the Quality Print Desktop v1.0.0 application"""
|
|
try:
|
|
# Path to the desktop app file
|
|
downloads_path = os.path.join(current_app.root_path, 'static', 'downloads')
|
|
filename = 'Quality_Print_Desktop_v1.0.0.zip'
|
|
|
|
# Check if the zip file exists
|
|
full_path = os.path.join(downloads_path, filename)
|
|
|
|
if not os.path.exists(full_path):
|
|
return jsonify({
|
|
'error': 'Desktop app package not found',
|
|
'message': 'Quality Print Desktop v1.0.0 is not available',
|
|
'suggestion': 'Please contact the administrator'
|
|
}), 404
|
|
|
|
return send_from_directory(downloads_path, filename, as_attachment=True)
|
|
|
|
except Exception as e:
|
|
print(f"DEBUG: Error downloading desktop app: {e}")
|
|
return jsonify({'error': 'Failed to download desktop app'}), 500
|
|
|
|
@bp.route('/get_order_data/<int:order_id>', methods=['GET'])
|
|
def get_order_data(order_id):
|
|
"""Get specific order data for preview"""
|
|
print(f"DEBUG: get_order_data called for order_id: {order_id}")
|
|
|
|
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse_manager', 'etichete']:
|
|
return jsonify({'error': 'Access denied'}), 403
|
|
|
|
try:
|
|
from .print_module import get_db_connection
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT id, comanda_productie, cod_articol, descr_com_prod, cantitate,
|
|
data_livrare, dimensiune, com_achiz_client, nr_linie_com_client, customer_name,
|
|
customer_article_number, open_for_order, line_number,
|
|
printed_labels, created_at, updated_at
|
|
FROM order_for_labels
|
|
WHERE id = %s
|
|
""", (order_id,))
|
|
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if not row:
|
|
return jsonify({'error': 'Order not found'}), 404
|
|
|
|
order_data = {
|
|
'id': row[0],
|
|
'comanda_productie': row[1],
|
|
'cod_articol': row[2],
|
|
'descr_com_prod': row[3],
|
|
'cantitate': row[4],
|
|
'data_livrare': str(row[5]) if row[5] else 'N/A',
|
|
'dimensiune': row[6],
|
|
'com_achiz_client': row[7],
|
|
'nr_linie_com_client': row[8],
|
|
'customer_name': row[9],
|
|
'customer_article_number': row[10],
|
|
'open_for_order': row[11],
|
|
'line_number': row[12],
|
|
'printed_labels': row[13] if row[13] is not None else 0,
|
|
'created_at': str(row[14]) if row[14] else 'N/A',
|
|
'updated_at': str(row[15]) if row[15] else 'N/A'
|
|
}
|
|
|
|
return jsonify(order_data)
|
|
|
|
except Exception as e:
|
|
print(f"DEBUG: Error getting order data for {order_id}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@warehouse_bp.route('/create_locations', methods=['GET', 'POST'])
|
|
def create_locations():
|
|
from app.warehouse import create_locations_handler
|
|
return create_locations_handler()
|
|
|
|
@warehouse_bp.route('/import_locations_csv', methods=['GET', 'POST'])
|
|
def import_locations_csv():
|
|
from app.warehouse import import_locations_csv_handler
|
|
return import_locations_csv_handler()
|
|
|
|
|
|
# NOTE for frontend/extension developers:
|
|
# To print labels, call the Chrome extension and pass the PDF URL:
|
|
# /generate_labels_pdf/<order_id>
|
|
# The extension should POST to http://localhost:8765/print/silent with:
|
|
# {
|
|
# "pdf_url": "https://your-linux-server/generate_labels_pdf/15",
|
|
# "printer_name": "default",
|
|
# "copies": 1
|
|
# } |