Files
quality_app-v2/app/modules/quality/routes.py
Quality App Developer b15cc93b9d FG Scan form validation improvements with warehouse module updates
- Fixed 3 JavaScript syntax errors in fg_scan.html (lines 951, 840-950, 1175-1215)
- Restored form field validation with proper null safety checks
- Re-enabled auto-advance between form fields
- Re-enabled CP code auto-complete with hyphen detection
- Updated validation error messages with clear format specifications and examples
- Added autocomplete='off' to all input fields
- Removed auto-prefix correction feature
- Updated warehouse routes and modules for box assignment workflow
- Added/improved database initialization scripts
- Updated requirements.txt dependencies

Format specifications implemented:
- Operator Code: OP + 2 digits (example: OP01, OP99)
- CP Code: CP + 8 digits + hyphen + 4 digits (example: CP00000000-0001)
- OC1/OC2 Codes: OC + 2 digits (example: OC01, OC99)
- Defect Code: 3 digits only
2026-01-30 10:50:06 +02:00

440 lines
16 KiB
Python

"""
Quality Module Routes
"""
from flask import Blueprint, render_template, session, redirect, url_for, request, jsonify, flash
from app.database import get_db
from app.modules.quality.quality import (
ensure_scanfg_orders_table,
save_fg_scan,
get_latest_scans,
get_fg_report,
get_daily_statistics,
get_cp_statistics
)
import logging
import base64
from io import BytesIO
from reportlab.lib.units import mm
from reportlab.pdfgen import canvas
from reportlab.graphics.barcode import code128
logger = logging.getLogger(__name__)
quality_bp = Blueprint('quality', __name__, url_prefix='/quality')
@quality_bp.route('/', methods=['GET'])
def quality_index():
"""Quality module main page"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
return render_template('modules/quality/index.html')
@quality_bp.route('/reports', methods=['GET'])
def quality_reports():
"""Quality reports page - displays FG scan reports"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
# Ensure scanfg_orders table exists
ensure_scanfg_orders_table()
return render_template('modules/quality/fg_reports.html')
@quality_bp.route('/fg_scan', methods=['GET', 'POST'])
def fg_scan():
"""Finish goods scan page - POST saves scan data, GET displays form and latest scans"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
# Ensure scanfg_orders table exists
ensure_scanfg_orders_table()
if request.method == 'POST':
# Handle form submission
operator_code = request.form.get('operator_code')
cp_code = request.form.get('cp_code')
oc1_code = request.form.get('oc1_code')
oc2_code = request.form.get('oc2_code')
defect_code = request.form.get('defect_code')
date = request.form.get('date')
time = request.form.get('time')
try:
# Save the scan using business logic function
success, scan_id, approved_count, rejected_count = save_fg_scan(
operator_code, cp_code, oc1_code, oc2_code, defect_code, date, time
)
# Note: Flash messages are NOT used here - JS notifications handle this on the frontend
# This prevents wide flash message alerts from appearing
except Exception as e:
logger.error(f"Error saving finish goods scan data: {e}")
scan_id = None
# Check if this is an AJAX request (for scan-to-boxes feature)
if request.headers.get('X-Requested-With') == 'XMLHttpRequest' or request.accept_mimetypes.best == 'application/json':
# For AJAX requests, return JSON response with scan ID
return jsonify({'success': True, 'message': 'Scan recorded successfully', 'scan_id': scan_id})
# For normal form submissions, redirect to prevent form resubmission (POST-Redirect-GET pattern)
return redirect(url_for('quality.fg_scan'))
# GET request - Fetch and display latest scans
try:
scan_groups = get_latest_scans(limit=25)
except Exception as e:
logger.error(f"Error fetching latest scans: {e}")
scan_groups = []
return render_template('modules/quality/fg_scan.html', scan_groups=scan_groups)
# API Routes for AJAX requests
@quality_bp.route('/api/fg_report', methods=['POST'])
def api_fg_report():
"""
API endpoint for generating FG reports via AJAX
Expected JSON body:
{
'report_type': 'daily|select-day|date-range|5-day|defects-today|defects-date|defects-range|defects-5day|all',
'filter_date': 'YYYY-MM-DD' (optional, for select-day/defects-date),
'start_date': 'YYYY-MM-DD' (optional, for date-range/defects-range),
'end_date': 'YYYY-MM-DD' (optional, for date-range/defects-range)
}
"""
if 'user_id' not in session:
return jsonify({'success': False, 'message': 'Unauthorized'}), 401
try:
data = request.get_json()
report_type = data.get('report_type')
filter_date = data.get('filter_date')
start_date = data.get('start_date')
end_date = data.get('end_date')
# Validate report type
valid_types = ['daily', 'select-day', 'date-range', '5-day', 'defects-today',
'defects-date', 'defects-range', 'defects-5day', 'all']
if report_type not in valid_types:
return jsonify({'success': False, 'message': 'Invalid report type'}), 400
# Generate report
result = get_fg_report(report_type, filter_date, start_date, end_date)
return jsonify(result)
except Exception as e:
logger.error(f"Error in API fg_report: {e}")
return jsonify({
'success': False,
'message': f'Error processing report: {str(e)}'
}), 500
@quality_bp.route('/api/daily_stats', methods=['GET'])
def api_daily_stats():
"""API endpoint for today's statistics"""
if 'user_id' not in session:
return jsonify({'success': False, 'message': 'Unauthorized'}), 401
try:
stats = get_daily_statistics()
return jsonify({
'success': True,
'data': stats
})
except Exception as e:
logger.error(f"Error in API daily_stats: {e}")
return jsonify({
'success': False,
'message': f'Error fetching statistics: {str(e)}'
}), 500
@quality_bp.route('/api/cp_stats/<cp_code>', methods=['GET'])
def api_cp_stats(cp_code):
"""API endpoint for CP code statistics"""
if 'user_id' not in session:
return jsonify({'success': False, 'message': 'Unauthorized'}), 401
try:
stats = get_cp_statistics(cp_code)
return jsonify({
'success': True,
'data': stats
})
except Exception as e:
logger.error(f"Error in API cp_stats: {e}")
return jsonify({
'success': False,
'message': f'Error fetching CP statistics: {str(e)}'
}), 500
# ============================================================================
# QUICK BOX CHECKPOINT ROUTES - For "Scan To Boxes" Feature
# ============================================================================
@quality_bp.route('/api/create-quick-box', methods=['POST'])
def create_quick_box():
"""Create a new box with auto-incremented number for quick box checkpoint and assign to FG_INCOMING"""
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
try:
conn = get_db()
cursor = conn.cursor()
# Get FG_INCOMING location ID
cursor.execute("""
SELECT id FROM warehouse_locations
WHERE location_code = 'FG_INCOMING'
LIMIT 1
""")
fg_incoming_result = cursor.fetchone()
if not fg_incoming_result:
logger.error("FG_INCOMING location not found in database")
return jsonify({'error': 'FG_INCOMING location not configured'}), 500
fg_incoming_id = fg_incoming_result[0]
# Get the next box number by finding max and incrementing
cursor.execute("""
SELECT MAX(CAST(SUBSTRING(box_number, 4) AS UNSIGNED))
FROM boxes_crates
WHERE box_number LIKE 'BOX%'
""")
result = cursor.fetchone()
next_num = (result[0] if result[0] else 0) + 1
box_number = f"BOX{str(next_num).zfill(8)}"
# Insert new box with FG_INCOMING location
user_id = session.get('user_id')
cursor.execute("""
INSERT INTO boxes_crates (box_number, status, location_id, created_by, created_at)
VALUES (%s, %s, %s, %s, NOW())
""", (box_number, 'open', fg_incoming_id, user_id))
conn.commit()
box_id = cursor.lastrowid
# Create initial location history entry
cursor.execute("""
INSERT INTO cp_location_history (cp_code, box_id, from_location_id, to_location_id, moved_by, reason)
VALUES (%s, %s, %s, %s, %s, %s)
""", (box_number, box_id, None, fg_incoming_id, user_id, 'Box created'))
conn.commit()
cursor.close()
logger.info(f"Quick box created: {box_number} (ID: {box_id}) assigned to FG_INCOMING")
return jsonify({
'success': True,
'box_number': box_number,
'box_id': box_id,
'location': 'FG_INCOMING',
'message': f'Box {box_number} created and assigned to FG_INCOMING location'
})
except Exception as e:
logger.error(f"Error creating quick box: {e}")
return jsonify({'error': str(e)}), 500
@quality_bp.route('/api/generate-box-label-pdf', methods=['POST'])
def generate_box_label_pdf():
"""Generate PDF label with barcode for printing via QZ Tray"""
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
try:
box_number = request.form.get('box_number', 'Unknown')
if not box_number or not box_number.startswith('BOX'):
return jsonify({'error': 'Invalid box number'}), 400
# Create PDF with 8cm x 5cm (landscape)
pdf_buffer = BytesIO()
page_width = 80 * mm # 8 cm
page_height = 50 * mm # 5 cm
c = canvas.Canvas(pdf_buffer, pagesize=(page_width, page_height))
c.setPageCompression(1)
c.setCreator("Quality App - Box Label System")
# Margins
margin = 2 * mm
usable_width = page_width - (2 * margin)
usable_height = page_height - (2 * margin)
# Text section at top
text_height = 12 * mm
barcode_height = usable_height - text_height - (1 * mm)
# Draw text label
text_y = page_height - margin - 8 * mm
c.setFont("Helvetica-Bold", 12)
c.drawString(margin, text_y, "BOX Nr:")
c.setFont("Courier-Bold", 14)
c.drawString(margin + 18 * mm, text_y, box_number)
# Generate and draw barcode
try:
barcode_obj = code128.Code128(
box_number,
barWidth=0.5 * mm,
barHeight=barcode_height - (2 * mm),
humanReadable=False
)
barcode_x = (page_width - barcode_obj.width) / 2
barcode_y = margin + 2 * mm
barcode_obj.drawOn(c, barcode_x, barcode_y)
except Exception as e:
logger.warning(f"Barcode generation warning: {e}")
# Continue without barcode if generation fails
c.save()
# Convert to base64
pdf_data = pdf_buffer.getvalue()
pdf_base64 = base64.b64encode(pdf_data).decode('utf-8')
logger.info(f"Generated PDF label for box: {box_number}")
return jsonify({
'success': True,
'pdf_base64': pdf_base64,
'box_number': box_number
})
except Exception as e:
logger.error(f"Error generating box label PDF: {e}")
return jsonify({'error': str(e)}), 500
@quality_bp.route('/api/assign-cp-to-box', methods=['POST'])
def assign_cp_to_box():
"""Assign CP code to box and update traceability"""
logger = logging.getLogger(__name__)
if 'user_id' not in session:
logger.warning("Unauthorized assign_cp_to_box request")
return jsonify({'error': 'Unauthorized'}), 401
try:
data = request.get_json()
if not data:
logger.error("No JSON data in request")
return jsonify({'error': 'No JSON data provided'}), 400
box_number = data.get('box_number', '').strip()
scan_id = data.get('scan_id')
cp_code = data.get('cp_code', '').strip() # Fallback for legacy requests
quantity = data.get('quantity', 1)
logger.info(f"Assigning to box {box_number}, scan_id: {scan_id}, cp_code: {cp_code}, qty: {quantity}")
conn = get_db()
cursor = conn.cursor()
# If scan_id is provided, fetch the CP code from the scan record
if scan_id:
cursor.execute("""
SELECT CP_full_code FROM scanfg_orders
WHERE id = %s
""", (scan_id,))
scan_result = cursor.fetchone()
if not scan_result:
cursor.close()
logger.error(f"Scan {scan_id} not found")
return jsonify({'error': f'Scan {scan_id} not found'}), 404
cp_code = scan_result[0]
logger.info(f"Retrieved CP code {cp_code} from scan {scan_id}")
if not box_number or not cp_code:
cursor.close()
logger.error(f"Missing required fields: box_number={box_number}, cp_code={cp_code}")
return jsonify({'error': 'Missing box_number or cp_code'}), 400
# Get box ID and location
cursor.execute("""
SELECT id, location_id FROM boxes_crates
WHERE box_number = %s
""", (box_number,))
box_result = cursor.fetchone()
if not box_result:
cursor.close()
logger.error(f"Box {box_number} not found")
return jsonify({'error': f'Box {box_number} not found'}), 404
box_id, location_id = box_result[0], box_result[1]
logger.info(f"Found box_id={box_id}, location_id={location_id}")
# Insert into box_contents
cursor.execute("""
INSERT INTO box_contents (box_id, cp_code, quantity, added_at)
VALUES (%s, %s, %s, NOW())
""", (box_id, cp_code, quantity))
logger.info(f"Inserted into box_contents")
# Update scanfg_orders to link CP to box and location
if scan_id:
# If we have a scan_id, update that specific scan record
cursor.execute("""
UPDATE scanfg_orders
SET box_id = %s, location_id = %s
WHERE id = %s
""", (box_id, location_id, scan_id))
logger.info(f"Updated scanfg_orders scan {scan_id}")
else:
# Legacy behavior: update by CP code (last one)
cursor.execute("""
UPDATE scanfg_orders
SET box_id = %s, location_id = %s
WHERE CP_full_code = %s
ORDER BY created_at DESC
LIMIT 1
""", (box_id, location_id, cp_code))
logger.info(f"Updated scanfg_orders for CP {cp_code}")
# Create location history entry
user_id = session.get('user_id')
cursor.execute("""
INSERT INTO cp_location_history (cp_code, box_id, from_location_id, to_location_id, moved_by, reason)
VALUES (%s, %s, %s, %s, %s, %s)
""", (cp_code, box_id, None, location_id, user_id, 'Assigned to box'))
logger.info(f"Created cp_location_history entry")
conn.commit()
cursor.close()
logger.info(f"✅ CP {cp_code} successfully assigned to box {box_number} (qty: {quantity}) in location {location_id}")
return jsonify({
'success': True,
'message': f'CP {cp_code} assigned to box {box_number}',
'cp_code': cp_code,
'box_id': box_id,
'box_number': box_number
})
except Exception as e:
logger.error(f"❌ Error assigning CP to box: {str(e)}", exc_info=True)
return jsonify({'error': str(e)}), 500