""" Warehouse Module - Helper Functions Provides functions for warehouse operations """ import logging import pymysql from app.database import get_db logger = logging.getLogger(__name__) def ensure_warehouse_locations_table(): """Ensure warehouse_locations table exists""" try: conn = get_db() cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS warehouse_locations ( id BIGINT AUTO_INCREMENT PRIMARY KEY, location_code VARCHAR(12) UNIQUE NOT NULL, size INT, description VARCHAR(250), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_location_code (location_code) ) """) conn.commit() cursor.close() logger.info("warehouse_locations table ensured") return True except Exception as e: logger.error(f"Error ensuring warehouse_locations table: {e}") return False def add_location(location_code, size, description): """Add a new warehouse location""" try: ensure_warehouse_locations_table() conn = get_db() cursor = conn.cursor() cursor.execute(""" INSERT INTO warehouse_locations (location_code, size, description) VALUES (%s, %s, %s) """, (location_code, size if size else None, description)) conn.commit() cursor.close() return True, "Location added successfully." except Exception as e: if "Duplicate entry" in str(e): return False, f"Failed: Location code '{location_code}' already exists." logger.error(f"Error adding location: {e}") return False, f"Error adding location: {str(e)}" def get_all_locations(): """Get all warehouse locations""" try: ensure_warehouse_locations_table() conn = get_db() cursor = conn.cursor() cursor.execute(""" SELECT id, location_code, size, description, created_at, updated_at FROM warehouse_locations ORDER BY id DESC """) locations = cursor.fetchall() cursor.close() result = [] for loc in locations: result.append({ 'id': loc[0], 'location_code': loc[1], 'size': loc[2], 'description': loc[3], 'created_at': loc[4], 'updated_at': loc[5] }) return result except Exception as e: logger.error(f"Error getting locations: {e}") return [] def get_location_by_id(location_id): """Get a specific location by ID""" try: conn = get_db() cursor = conn.cursor() cursor.execute(""" SELECT id, location_code, size, description, created_at, updated_at FROM warehouse_locations WHERE id = %s """, (location_id,)) loc = cursor.fetchone() cursor.close() if loc: return { 'id': loc[0], 'location_code': loc[1], 'size': loc[2], 'description': loc[3], 'created_at': loc[4], 'updated_at': loc[5] } return None except Exception as e: logger.error(f"Error getting location: {e}") return None def update_location(location_id, location_code=None, size=None, description=None): """Update a warehouse location Args: location_id: ID of location to update location_code: New location code (optional - cannot be changed in form) size: New size (optional) description: New description (optional) """ try: conn = get_db() cursor = conn.cursor() # Build update query dynamically updates = [] params = [] if location_code: updates.append("location_code = %s") params.append(location_code) if size is not None: updates.append("size = %s") params.append(size) if description is not None: updates.append("description = %s") params.append(description) if not updates: return False, "No fields to update" params.append(location_id) query = f"UPDATE warehouse_locations SET {', '.join(updates)} WHERE id = %s" cursor.execute(query, params) conn.commit() cursor.close() return True, "Location updated successfully." except Exception as e: if "Duplicate entry" in str(e): return False, f"Failed: Location code already exists." logger.error(f"Error updating location: {e}") return False, f"Error updating location: {str(e)}" def delete_location(location_id): """Delete a warehouse location""" try: conn = get_db() cursor = conn.cursor() cursor.execute("DELETE FROM warehouse_locations WHERE id = %s", (location_id,)) conn.commit() cursor.close() return True, "Location deleted successfully." except Exception as e: logger.error(f"Error deleting location: {e}") return False, f"Error deleting location: {str(e)}" def delete_multiple_locations(location_ids): """Delete multiple warehouse locations""" try: if not location_ids: return False, "No locations to delete." conn = get_db() cursor = conn.cursor() deleted_count = 0 for loc_id in location_ids: try: cursor.execute("DELETE FROM warehouse_locations WHERE id = %s", (int(loc_id),)) if cursor.rowcount > 0: deleted_count += 1 except: pass conn.commit() cursor.close() return True, f"Deleted {deleted_count} location(s)." except Exception as e: logger.error(f"Error deleting multiple locations: {e}") return False, f"Error deleting locations: {str(e)}" # ============================================================================ # Set Boxes Locations - Functions for assigning boxes to locations # ============================================================================ def search_box_by_number(box_number): """Search for a box by its number Returns: tuple: (success: bool, box_data: dict or None, status_code: int) """ try: if not box_number or not str(box_number).strip(): return False, None, 400 conn = get_db() cursor = conn.cursor() cursor.execute(""" SELECT b.id, b.box_number, b.status, b.location_id, COALESCE(l.location_code, 'Not assigned') as location_code, b.created_at FROM boxes_crates b LEFT JOIN warehouse_locations l ON b.location_id = l.id WHERE b.box_number = %s """, (str(box_number).strip(),)) result = cursor.fetchone() cursor.close() if not result: return False, None, 404 box_data = { 'id': result[0], 'box_number': result[1], 'status': result[2], 'location_id': result[3], 'location_code': result[4], 'created_at': str(result[5]) } return True, box_data, 200 except Exception as e: logger.error(f"Error searching box: {e}") return False, None, 500 def search_location_with_boxes(location_code): """Search for a location and get all boxes assigned to it Returns: tuple: (success: bool, data: dict, status_code: int) """ try: if not location_code or not str(location_code).strip(): return False, {}, 400 conn = get_db() cursor = conn.cursor() # Get location info cursor.execute(""" SELECT id, location_code, size, description FROM warehouse_locations WHERE location_code = %s """, (str(location_code).strip(),)) location = cursor.fetchone() if not location: cursor.close() return False, {'error': f'Location "{location_code}" not found'}, 404 location_id = location[0] # Get all boxes in this location cursor.execute(""" SELECT id, box_number, status, created_at FROM boxes_crates WHERE location_id = %s ORDER BY id DESC """, (location_id,)) boxes = cursor.fetchall() cursor.close() location_data = { 'id': location[0], 'location_code': location[1], 'size': location[2], 'description': location[3] } boxes_list = [] for box in boxes: boxes_list.append({ 'id': box[0], 'box_number': box[1], 'status': box[2], 'created_at': str(box[3]) }) return True, {'location': location_data, 'boxes': boxes_list}, 200 except Exception as e: logger.error(f"Error searching location: {e}") return False, {'error': str(e)}, 500 def assign_box_to_location(box_id, location_code): """Assign a box to a warehouse location Returns: tuple: (success: bool, message: str, status_code: int) """ try: if not box_id or not location_code: return False, 'Box ID and location code are required', 400 conn = get_db() cursor = conn.cursor() # Check if location exists cursor.execute(""" SELECT id FROM warehouse_locations WHERE location_code = %s """, (location_code,)) location = cursor.fetchone() if not location: cursor.close() return False, f'Location "{location_code}" not found', 404 location_id = location[0] # Get box info cursor.execute(""" SELECT box_number FROM boxes_crates WHERE id = %s """, (box_id,)) box = cursor.fetchone() if not box: cursor.close() return False, 'Box not found', 404 # Update box location cursor.execute(""" UPDATE boxes_crates SET location_id = %s, updated_at = NOW() WHERE id = %s """, (location_id, box_id)) conn.commit() cursor.close() return True, f'Box "{box[0]}" assigned to location "{location_code}"', 200 except Exception as e: logger.error(f"Error assigning box to location: {e}") return False, f'Error: {str(e)}', 500 def move_box_to_new_location(box_id, new_location_code): """Move a box from current location to a new location Returns: tuple: (success: bool, message: str, status_code: int) """ try: if not box_id or not new_location_code: return False, 'Box ID and new location code are required', 400 conn = get_db() cursor = conn.cursor() # Check if new location exists cursor.execute(""" SELECT id FROM warehouse_locations WHERE location_code = %s """, (new_location_code,)) location = cursor.fetchone() if not location: cursor.close() return False, f'Location "{new_location_code}" not found', 404 new_location_id = location[0] # Get box info cursor.execute(""" SELECT box_number FROM boxes_crates WHERE id = %s """, (box_id,)) box = cursor.fetchone() if not box: cursor.close() return False, 'Box not found', 404 # Update box location cursor.execute(""" UPDATE boxes_crates SET location_id = %s, updated_at = NOW() WHERE id = %s """, (new_location_id, box_id)) conn.commit() cursor.close() return True, f'Box "{box[0]}" moved to location "{new_location_code}"', 200 except Exception as e: logger.error(f"Error moving box: {e}") return False, f'Error: {str(e)}', 500 def get_cp_inventory_list(limit=100, offset=0): """ Get CP articles from scanfg_orders with box and location info Groups by CP_full_code (8 digits) to show all entries with that base CP Returns latest entries first Returns: List of CP inventory records with box and location info """ try: conn = get_db() cursor = conn.cursor() # Get all CP codes with their box and location info (latest entries first) cursor.execute(""" SELECT s.id, s.CP_full_code, SUBSTRING(s.CP_full_code, 1, 10) as cp_base, COUNT(*) as total_entries, s.box_id, bc.box_number, wl.location_code, wl.id as location_id, MAX(s.date) as latest_date, MAX(s.time) as latest_time, SUM(s.approved_quantity) as total_approved, SUM(s.rejected_quantity) as total_rejected FROM scanfg_orders s LEFT JOIN boxes_crates bc ON s.box_id = bc.id LEFT JOIN warehouse_locations wl ON s.location_id = wl.id GROUP BY SUBSTRING(s.CP_full_code, 1, 10), s.box_id ORDER BY MAX(s.created_at) DESC LIMIT %s OFFSET %s """, (limit, offset)) # Convert tuples to dicts using cursor description columns = [col[0] for col in cursor.description] results = [{columns[i]: row[i] for i in range(len(columns))} for row in cursor.fetchall()] cursor.close() return results if results else [] except Exception as e: logger.error(f"Error getting CP inventory: {e}") return [] def search_cp_code(cp_code_search): """ Search for CP codes - can search by full CP code or CP base (8 digits) Args: cp_code_search: Search string (can be "CP00000001" or "CP00000001-0001") Returns: List of matching CP inventory records """ try: conn = get_db() cursor = conn.cursor() # Remove hyphen and get base CP code if full CP provided search_term = cp_code_search.replace('-', '').strip().upper() # Search for matching CP codes cursor.execute(""" SELECT s.id, s.CP_full_code, SUBSTRING(s.CP_full_code, 1, 10) as cp_base, COUNT(*) as total_entries, s.box_id, bc.box_number, wl.location_code, wl.id as location_id, MAX(s.date) as latest_date, MAX(s.time) as latest_time, SUM(s.approved_quantity) as total_approved, SUM(s.rejected_quantity) as total_rejected FROM scanfg_orders s LEFT JOIN boxes_crates bc ON s.box_id = bc.id LEFT JOIN warehouse_locations wl ON s.location_id = wl.id WHERE REPLACE(s.CP_full_code, '-', '') LIKE %s GROUP BY SUBSTRING(s.CP_full_code, 1, 10), s.box_id ORDER BY MAX(s.created_at) DESC """, (f"{search_term}%",)) # Convert tuples to dicts using cursor description columns = [col[0] for col in cursor.description] results = [{columns[i]: row[i] for i in range(len(columns))} for row in cursor.fetchall()] cursor.close() return results if results else [] except Exception as e: logger.error(f"Error searching CP code: {e}") return [] def search_by_box_number(box_number_search): """ Search for box number and get all CP codes in that box Args: box_number_search: Box number to search for Returns: List of CP entries in the box """ try: conn = get_db() cursor = conn.cursor() box_search = box_number_search.strip().upper() cursor.execute(""" SELECT s.id, s.CP_full_code, s.operator_code, s.quality_code, s.date, s.time, s.approved_quantity, s.rejected_quantity, bc.box_number, bc.id as box_id, wl.location_code, wl.id as location_id, s.created_at FROM scanfg_orders s LEFT JOIN boxes_crates bc ON s.box_id = bc.id LEFT JOIN warehouse_locations wl ON s.location_id = wl.id WHERE bc.box_number LIKE %s ORDER BY s.created_at DESC LIMIT 500 """, (f"%{box_search}%",)) # Convert tuples to dicts using cursor description columns = [col[0] for col in cursor.description] results = [{columns[i]: row[i] for i in range(len(columns))} for row in cursor.fetchall()] cursor.close() return results if results else [] except Exception as e: logger.error(f"Error searching by box number: {e}") return [] def get_cp_details(cp_code): """ Get detailed information for a specific CP code (8 digits) Shows all variations (with different 4-digit suffixes) and their locations Args: cp_code: CP base code (8 digits, e.g., "CP00000001") Returns: List of all CP variations with their box and location info """ try: conn = get_db() cursor = conn.cursor() # Search for all entries with this CP base cursor.execute(""" SELECT s.id, s.CP_full_code, s.operator_code, s.OC1_code, s.OC2_code, s.quality_code, s.date, s.time, s.approved_quantity, s.rejected_quantity, bc.box_number, bc.id as box_id, wl.location_code, wl.id as location_id, wl.description, s.created_at FROM scanfg_orders s LEFT JOIN boxes_crates bc ON s.box_id = bc.id LEFT JOIN warehouse_locations wl ON s.location_id = wl.id WHERE SUBSTRING(s.CP_full_code, 1, 10) = %s ORDER BY s.created_at DESC LIMIT 1000 """, (cp_code.upper(),)) # Convert tuples to dicts using cursor description columns = [col[0] for col in cursor.description] results = [{columns[i]: row[i] for i in range(len(columns))} for row in cursor.fetchall()] cursor.close() return results if results else [] except Exception as e: logger.error(f"Error getting CP details: {e}") return []