Files
Quality App Developer 59e82c0209 Fix: Resolve 'Column date cannot be null' error in FG scan
- Added default date/time handling in save_fg_scan() function
- Backend now uses current date/time if values not provided
- Added hidden date/time form fields in frontend
- Updated JavaScript to populate hidden fields before submission
- Prevents null database errors when scanning orders
2026-01-27 17:52:57 +02:00

349 lines
13 KiB
Python

"""
Quality Module Business Logic
Handles database operations and business logic for the quality module
"""
from app.database import get_db
from flask import flash
import logging
logger = logging.getLogger(__name__)
def ensure_scanfg_orders_table():
"""Ensure the scanfg_orders table exists with proper schema"""
try:
db = get_db()
cursor = db.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS scanfg_orders (
Id INT AUTO_INCREMENT PRIMARY KEY,
operator_code VARCHAR(4) NOT NULL,
CP_full_code VARCHAR(15) NOT NULL,
OC1_code VARCHAR(4) NOT NULL,
OC2_code VARCHAR(4) NOT NULL,
quality_code TINYINT(3) NOT NULL,
date DATE NOT NULL,
time TIME NOT NULL,
approved_quantity INT DEFAULT 0,
rejected_quantity INT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_cp (CP_full_code),
INDEX idx_date (date),
INDEX idx_operator (operator_code),
UNIQUE KEY unique_cp_date (CP_full_code, date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
""")
db.commit()
logger.info("Table 'scanfg_orders' ready")
return True
except Exception as e:
logger.error(f"Error ensuring scanfg_orders table: {e}")
raise
def save_fg_scan(operator_code, cp_code, oc1_code, oc2_code, defect_code, date, time):
"""
Save a finish goods scan to the database
Args:
operator_code: Operator code (e.g., OP0001)
cp_code: CP full code (e.g., CP00002042-0001)
oc1_code: OC1 code (e.g., OC0001)
oc2_code: OC2 code (e.g., OC0002)
defect_code: Quality code / defect code (e.g., 000 for approved)
date: Scan date
time: Scan time
Returns:
tuple: (success: bool, approved_count: int, rejected_count: int)
"""
try:
from datetime import datetime
db = get_db()
cursor = db.cursor()
# Default to current date/time if not provided
if not date:
date = datetime.now().strftime('%Y-%m-%d')
if not time:
time = datetime.now().strftime('%H:%M:%S')
# Insert a new entry - each scan is a separate record
insert_query = """
INSERT INTO scanfg_orders (operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_query, (operator_code, cp_code, oc1_code, oc2_code, defect_code, date, time))
db.commit()
# Get the quantities from the table for feedback
cursor.execute("""
SELECT COUNT(*) as total_scans,
SUM(CASE WHEN quality_code = '000' THEN 1 ELSE 0 END) as approved_count,
SUM(CASE WHEN quality_code != '000' THEN 1 ELSE 0 END) as rejected_count
FROM scanfg_orders
WHERE CP_full_code = %s
""", (cp_code,))
result = cursor.fetchone()
approved_count = result[1] if result and result[1] else 0
rejected_count = result[2] if result and result[2] else 0
logger.info(f"Scan saved successfully: {cp_code} by {operator_code}")
return True, approved_count, rejected_count
except Exception as e:
logger.error(f"Error saving finish goods scan data: {e}")
raise
def get_latest_scans(limit=25):
"""
Fetch the latest scan records from the database
Args:
limit: Maximum number of scans to fetch (default: 25)
Returns:
list: List of scan dictionaries with calculated approved/rejected counts
"""
scan_groups = []
try:
db = get_db()
cursor = db.cursor()
# Get all scans ordered by date/time descending
cursor.execute("""
SELECT Id, operator_code, CP_full_code as cp_code, OC1_code as oc1_code, OC2_code as oc2_code,
quality_code as defect_code, date, time, created_at
FROM scanfg_orders
ORDER BY created_at DESC, Id DESC
LIMIT %s
""", (limit,))
results = cursor.fetchall()
if results:
# Convert result tuples to dictionaries for template access
columns = ['id', 'operator_code', 'cp_code', 'oc1_code', 'oc2_code', 'defect_code', 'date', 'time', 'created_at']
scan_groups = [dict(zip(columns, row)) for row in results]
# Now calculate approved and rejected counts for each CP code
for scan in scan_groups:
cp_code = scan['cp_code']
cursor.execute("""
SELECT
SUM(CASE WHEN quality_code = 0 OR quality_code = '000' THEN 1 ELSE 0 END) as approved_qty,
SUM(CASE WHEN quality_code != 0 AND quality_code != '000' THEN 1 ELSE 0 END) as rejected_qty
FROM scanfg_orders
WHERE CP_full_code = %s
""", (cp_code,))
count_result = cursor.fetchone()
scan['approved_qty'] = count_result[0] if count_result[0] else 0
scan['rejected_qty'] = count_result[1] if count_result[1] else 0
logger.info(f"Fetched {len(scan_groups)} scan records for display")
else:
logger.info("No scan records found in database")
except Exception as e:
logger.error(f"Error fetching finish goods scan data: {e}")
raise
return scan_groups
# Report Generation Functions
def get_fg_report(report_type, filter_date=None, start_date=None, end_date=None):
"""
Generate FG scan reports based on report type and filters
Args:
report_type: Type of report ('daily', 'select-day', 'date-range', '5-day',
'defects-today', 'defects-date', 'defects-range',
'defects-5day', 'all')
filter_date: Specific date filter (YYYY-MM-DD format)
start_date: Start date for range (YYYY-MM-DD format)
end_date: End date for range (YYYY-MM-DD format)
Returns:
dict: {
'success': bool,
'title': str,
'data': list of dicts,
'summary': {'approved_count': int, 'rejected_count': int}
}
"""
try:
db = get_db()
cursor = db.cursor()
# Build query based on report type
query = """
SELECT Id as id, operator_code, CP_full_code as cp_code, OC1_code as oc1_code,
OC2_code as oc2_code, quality_code as defect_code, date, time, created_at
FROM scanfg_orders
"""
params = []
title = "FG Scan Report"
is_defects_only = False
# Build WHERE clause based on report type
if report_type == 'daily':
title = "Today's FG Scans Report"
query += " WHERE DATE(date) = CURDATE()"
elif report_type == 'select-day':
title = f"FG Scans Report for {filter_date}"
query += " WHERE DATE(date) = %s"
params.append(filter_date)
elif report_type == 'date-range':
title = f"FG Scans Report ({start_date} to {end_date})"
query += " WHERE DATE(date) >= %s AND DATE(date) <= %s"
params.extend([start_date, end_date])
elif report_type == '5-day':
title = "Last 5 Days FG Scans Report"
query += " WHERE DATE(date) >= DATE_SUB(CURDATE(), INTERVAL 4 DAY)"
elif report_type == 'defects-today':
title = "Today's FG Defects Report"
query += " WHERE DATE(date) = CURDATE() AND quality_code != '000' AND quality_code != 0"
is_defects_only = True
elif report_type == 'defects-date':
title = f"FG Defects Report for {filter_date}"
query += " WHERE DATE(date) = %s AND quality_code != '000' AND quality_code != 0"
params.append(filter_date)
is_defects_only = True
elif report_type == 'defects-range':
title = f"FG Defects Report ({start_date} to {end_date})"
query += " WHERE DATE(date) >= %s AND DATE(date) <= %s AND quality_code != '000' AND quality_code != 0"
params.extend([start_date, end_date])
is_defects_only = True
elif report_type == 'defects-5day':
title = "Last 5 Days FG Defects Report"
query += " WHERE DATE(date) >= DATE_SUB(CURDATE(), INTERVAL 4 DAY) AND quality_code != '000' AND quality_code != 0"
is_defects_only = True
elif report_type == 'all':
title = "Complete FG Scans Database Report"
# No additional WHERE clause
# Add ORDER BY
query += " ORDER BY date DESC, time DESC"
# Execute query
cursor.execute(query, params)
results = cursor.fetchall()
# Convert to list of dicts and convert datetime objects to strings
columns = ['id', 'operator_code', 'cp_code', 'oc1_code', 'oc2_code', 'defect_code', 'date', 'time', 'created_at']
data = []
for row in results:
row_dict = dict(zip(columns, row))
# Convert date/time/datetime objects to strings for JSON serialization
for key in ['date', 'time', 'created_at']:
if row_dict[key] is not None:
row_dict[key] = str(row_dict[key])
data.append(row_dict)
# Calculate summary statistics
approved_count = sum(1 for row in data if row['defect_code'] == 0 or row['defect_code'] == '0' or str(row['defect_code']) == '000')
rejected_count = len(data) - approved_count
logger.info(f"Generated {report_type} report: {len(data)} records")
return {
'success': True,
'title': title,
'data': data,
'summary': {
'approved_count': approved_count,
'rejected_count': rejected_count
}
}
except Exception as e:
logger.error(f"Error generating FG report ({report_type}): {e}")
return {
'success': False,
'message': f"Error generating report: {str(e)}",
'data': [],
'summary': {'approved_count': 0, 'rejected_count': 0}
}
def get_daily_statistics():
"""
Get today's statistics for dashboard/summary
Returns:
dict: {'total': int, 'approved': int, 'rejected': int}
"""
try:
db = get_db()
cursor = db.cursor()
cursor.execute("""
SELECT COUNT(*) as total,
SUM(CASE WHEN quality_code = '000' OR quality_code = 0 THEN 1 ELSE 0 END) as approved,
SUM(CASE WHEN quality_code != '000' AND quality_code != 0 THEN 1 ELSE 0 END) as rejected
FROM scanfg_orders
WHERE DATE(date) = CURDATE()
""")
result = cursor.fetchone()
if result:
return {
'total': result[0] or 0,
'approved': result[1] or 0,
'rejected': result[2] or 0
}
return {'total': 0, 'approved': 0, 'rejected': 0}
except Exception as e:
logger.error(f"Error getting daily statistics: {e}")
return {'total': 0, 'approved': 0, 'rejected': 0}
def get_cp_statistics(cp_code):
"""
Get statistics for a specific CP code
Args:
cp_code: The CP code to get statistics for
Returns:
dict: {'total': int, 'approved': int, 'rejected': int}
"""
try:
db = get_db()
cursor = db.cursor()
cursor.execute("""
SELECT COUNT(*) as total,
SUM(CASE WHEN quality_code = '000' OR quality_code = 0 THEN 1 ELSE 0 END) as approved,
SUM(CASE WHEN quality_code != '000' AND quality_code != 0 THEN 1 ELSE 0 END) as rejected
FROM scanfg_orders
WHERE CP_full_code = %s
""", (cp_code,))
result = cursor.fetchone()
if result:
return {
'total': result[0] or 0,
'approved': result[1] or 0,
'rejected': result[2] or 0
}
return {'total': 0, 'approved': 0, 'rejected': 0}
except Exception as e:
logger.error(f"Error getting CP statistics for {cp_code}: {e}")
return {'total': 0, 'approved': 0, 'rejected': 0}