""" Quality Module Business Logic Handles database operations and business logic for the quality module """ from app.database import get_db from flask import flash import logging logger = logging.getLogger(__name__) def ensure_scanfg_orders_table(): """Ensure the scanfg_orders table exists with proper schema""" try: db = get_db() cursor = db.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS scanfg_orders ( Id INT AUTO_INCREMENT PRIMARY KEY, operator_code VARCHAR(4) NOT NULL, CP_full_code VARCHAR(15) NOT NULL, OC1_code VARCHAR(4) NOT NULL, OC2_code VARCHAR(4) NOT NULL, quality_code TINYINT(3) NOT NULL, date DATE NOT NULL, time TIME NOT NULL, approved_quantity INT DEFAULT 0, rejected_quantity INT DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, INDEX idx_cp (CP_full_code), INDEX idx_date (date), INDEX idx_operator (operator_code), UNIQUE KEY unique_cp_date (CP_full_code, date) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """) db.commit() logger.info("Table 'scanfg_orders' ready") return True except Exception as e: logger.error(f"Error ensuring scanfg_orders table: {e}") raise def save_fg_scan(operator_code, cp_code, oc1_code, oc2_code, defect_code, date, time): """ Save a finish goods scan to the database Args: operator_code: Operator code (e.g., OP0001) cp_code: CP full code (e.g., CP00002042-0001) oc1_code: OC1 code (e.g., OC0001) oc2_code: OC2 code (e.g., OC0002) defect_code: Quality code / defect code (e.g., 000 for approved) date: Scan date time: Scan time Returns: tuple: (success: bool, approved_count: int, rejected_count: int) """ try: db = get_db() cursor = db.cursor() # Insert a new entry - each scan is a separate record insert_query = """ INSERT INTO scanfg_orders (operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time) VALUES (%s, %s, %s, %s, %s, %s, %s) """ cursor.execute(insert_query, (operator_code, cp_code, oc1_code, oc2_code, defect_code, date, time)) db.commit() # Get the quantities from the table for feedback cursor.execute(""" SELECT COUNT(*) as total_scans, SUM(CASE WHEN quality_code = '000' THEN 1 ELSE 0 END) as approved_count, SUM(CASE WHEN quality_code != '000' THEN 1 ELSE 0 END) as rejected_count FROM scanfg_orders WHERE CP_full_code = %s """, (cp_code,)) result = cursor.fetchone() approved_count = result[1] if result and result[1] else 0 rejected_count = result[2] if result and result[2] else 0 logger.info(f"Scan saved successfully: {cp_code} by {operator_code}") return True, approved_count, rejected_count except Exception as e: logger.error(f"Error saving finish goods scan data: {e}") raise def get_latest_scans(limit=25): """ Fetch the latest scan records from the database Args: limit: Maximum number of scans to fetch (default: 25) Returns: list: List of scan dictionaries with calculated approved/rejected counts """ scan_groups = [] try: db = get_db() cursor = db.cursor() # Get all scans ordered by date/time descending cursor.execute(""" SELECT Id, operator_code, CP_full_code as cp_code, OC1_code as oc1_code, OC2_code as oc2_code, quality_code as defect_code, date, time, created_at FROM scanfg_orders ORDER BY created_at DESC, Id DESC LIMIT %s """, (limit,)) results = cursor.fetchall() if results: # Convert result tuples to dictionaries for template access columns = ['id', 'operator_code', 'cp_code', 'oc1_code', 'oc2_code', 'defect_code', 'date', 'time', 'created_at'] scan_groups = [dict(zip(columns, row)) for row in results] # Now calculate approved and rejected counts for each CP code for scan in scan_groups: cp_code = scan['cp_code'] cursor.execute(""" SELECT SUM(CASE WHEN quality_code = 0 OR quality_code = '000' THEN 1 ELSE 0 END) as approved_qty, SUM(CASE WHEN quality_code != 0 AND quality_code != '000' THEN 1 ELSE 0 END) as rejected_qty FROM scanfg_orders WHERE CP_full_code = %s """, (cp_code,)) count_result = cursor.fetchone() scan['approved_qty'] = count_result[0] if count_result[0] else 0 scan['rejected_qty'] = count_result[1] if count_result[1] else 0 logger.info(f"Fetched {len(scan_groups)} scan records for display") else: logger.info("No scan records found in database") except Exception as e: logger.error(f"Error fetching finish goods scan data: {e}") raise return scan_groups # Report Generation Functions def get_fg_report(report_type, filter_date=None, start_date=None, end_date=None): """ Generate FG scan reports based on report type and filters Args: report_type: Type of report ('daily', 'select-day', 'date-range', '5-day', 'defects-today', 'defects-date', 'defects-range', 'defects-5day', 'all') filter_date: Specific date filter (YYYY-MM-DD format) start_date: Start date for range (YYYY-MM-DD format) end_date: End date for range (YYYY-MM-DD format) Returns: dict: { 'success': bool, 'title': str, 'data': list of dicts, 'summary': {'approved_count': int, 'rejected_count': int} } """ try: db = get_db() cursor = db.cursor() # Build query based on report type query = """ SELECT Id as id, operator_code, CP_full_code as cp_code, OC1_code as oc1_code, OC2_code as oc2_code, quality_code as defect_code, date, time, created_at FROM scanfg_orders """ params = [] title = "FG Scan Report" is_defects_only = False # Build WHERE clause based on report type if report_type == 'daily': title = "Today's FG Scans Report" query += " WHERE DATE(date) = CURDATE()" elif report_type == 'select-day': title = f"FG Scans Report for {filter_date}" query += " WHERE DATE(date) = %s" params.append(filter_date) elif report_type == 'date-range': title = f"FG Scans Report ({start_date} to {end_date})" query += " WHERE DATE(date) >= %s AND DATE(date) <= %s" params.extend([start_date, end_date]) elif report_type == '5-day': title = "Last 5 Days FG Scans Report" query += " WHERE DATE(date) >= DATE_SUB(CURDATE(), INTERVAL 4 DAY)" elif report_type == 'defects-today': title = "Today's FG Defects Report" query += " WHERE DATE(date) = CURDATE() AND quality_code != '000' AND quality_code != 0" is_defects_only = True elif report_type == 'defects-date': title = f"FG Defects Report for {filter_date}" query += " WHERE DATE(date) = %s AND quality_code != '000' AND quality_code != 0" params.append(filter_date) is_defects_only = True elif report_type == 'defects-range': title = f"FG Defects Report ({start_date} to {end_date})" query += " WHERE DATE(date) >= %s AND DATE(date) <= %s AND quality_code != '000' AND quality_code != 0" params.extend([start_date, end_date]) is_defects_only = True elif report_type == 'defects-5day': title = "Last 5 Days FG Defects Report" query += " WHERE DATE(date) >= DATE_SUB(CURDATE(), INTERVAL 4 DAY) AND quality_code != '000' AND quality_code != 0" is_defects_only = True elif report_type == 'all': title = "Complete FG Scans Database Report" # No additional WHERE clause # Add ORDER BY query += " ORDER BY date DESC, time DESC" # Execute query cursor.execute(query, params) results = cursor.fetchall() # Convert to list of dicts and convert datetime objects to strings columns = ['id', 'operator_code', 'cp_code', 'oc1_code', 'oc2_code', 'defect_code', 'date', 'time', 'created_at'] data = [] for row in results: row_dict = dict(zip(columns, row)) # Convert date/time/datetime objects to strings for JSON serialization for key in ['date', 'time', 'created_at']: if row_dict[key] is not None: row_dict[key] = str(row_dict[key]) data.append(row_dict) # Calculate summary statistics approved_count = sum(1 for row in data if row['defect_code'] == 0 or row['defect_code'] == '0' or str(row['defect_code']) == '000') rejected_count = len(data) - approved_count logger.info(f"Generated {report_type} report: {len(data)} records") return { 'success': True, 'title': title, 'data': data, 'summary': { 'approved_count': approved_count, 'rejected_count': rejected_count } } except Exception as e: logger.error(f"Error generating FG report ({report_type}): {e}") return { 'success': False, 'message': f"Error generating report: {str(e)}", 'data': [], 'summary': {'approved_count': 0, 'rejected_count': 0} } def get_daily_statistics(): """ Get today's statistics for dashboard/summary Returns: dict: {'total': int, 'approved': int, 'rejected': int} """ try: db = get_db() cursor = db.cursor() cursor.execute(""" SELECT COUNT(*) as total, SUM(CASE WHEN quality_code = '000' OR quality_code = 0 THEN 1 ELSE 0 END) as approved, SUM(CASE WHEN quality_code != '000' AND quality_code != 0 THEN 1 ELSE 0 END) as rejected FROM scanfg_orders WHERE DATE(date) = CURDATE() """) result = cursor.fetchone() if result: return { 'total': result[0] or 0, 'approved': result[1] or 0, 'rejected': result[2] or 0 } return {'total': 0, 'approved': 0, 'rejected': 0} except Exception as e: logger.error(f"Error getting daily statistics: {e}") return {'total': 0, 'approved': 0, 'rejected': 0} def get_cp_statistics(cp_code): """ Get statistics for a specific CP code Args: cp_code: The CP code to get statistics for Returns: dict: {'total': int, 'approved': int, 'rejected': int} """ try: db = get_db() cursor = db.cursor() cursor.execute(""" SELECT COUNT(*) as total, SUM(CASE WHEN quality_code = '000' OR quality_code = 0 THEN 1 ELSE 0 END) as approved, SUM(CASE WHEN quality_code != '000' AND quality_code != 0 THEN 1 ELSE 0 END) as rejected FROM scanfg_orders WHERE CP_full_code = %s """, (cp_code,)) result = cursor.fetchone() if result: return { 'total': result[0] or 0, 'approved': result[1] or 0, 'rejected': result[2] or 0 } return {'total': 0, 'approved': 0, 'rejected': 0} except Exception as e: logger.error(f"Error getting CP statistics for {cp_code}: {e}") return {'total': 0, 'approved': 0, 'rejected': 0}