Files
quality_app-v2/app/modules/warehouse/warehouse.py
Quality App Developer f54e1bebc3 feat: Add Set Orders on Boxes feature with debouncing and page refresh
- Created warehouse_orders.py module with 8 order management functions
- Added /warehouse/set-orders-on-boxes route and 7 API endpoints
- Implemented 4-tab interface: assign, find, move, and view orders
- Changed assign input from dropdown to text field with BOX validation
- Fixed location join issue in warehouse.py (use boxes_crates.location_id)
- Added debouncing flag to prevent multiple rapid form submissions
- Added page refresh after successful order assignment
- Disabled assign button during processing
- Added page refresh with 2 second delay for UX feedback
- Added CP code validation in inventory page
- Improved modal styling with theme support
- Fixed set_boxes_locations page to refresh box info after assignments
2026-02-02 01:06:03 +02:00

679 lines
21 KiB
Python

"""
Warehouse Module - Helper Functions
Provides functions for warehouse operations
"""
import logging
import pymysql
from app.database import get_db
logger = logging.getLogger(__name__)
def ensure_warehouse_locations_table():
"""Ensure warehouse_locations table exists"""
try:
conn = get_db()
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS warehouse_locations (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
location_code VARCHAR(12) UNIQUE NOT NULL,
size INT,
description VARCHAR(250),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_location_code (location_code)
)
""")
conn.commit()
cursor.close()
logger.info("warehouse_locations table ensured")
return True
except Exception as e:
logger.error(f"Error ensuring warehouse_locations table: {e}")
return False
def add_location(location_code, size, description):
"""Add a new warehouse location"""
try:
ensure_warehouse_locations_table()
conn = get_db()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO warehouse_locations (location_code, size, description)
VALUES (%s, %s, %s)
""", (location_code, size if size else None, description))
conn.commit()
cursor.close()
return True, "Location added successfully."
except Exception as e:
if "Duplicate entry" in str(e):
return False, f"Failed: Location code '{location_code}' already exists."
logger.error(f"Error adding location: {e}")
return False, f"Error adding location: {str(e)}"
def get_all_locations():
"""Get all warehouse locations"""
try:
ensure_warehouse_locations_table()
conn = get_db()
cursor = conn.cursor()
cursor.execute("""
SELECT id, location_code, size, description, created_at, updated_at
FROM warehouse_locations
ORDER BY id DESC
""")
locations = cursor.fetchall()
cursor.close()
result = []
for loc in locations:
result.append({
'id': loc[0],
'location_code': loc[1],
'size': loc[2],
'description': loc[3],
'created_at': loc[4],
'updated_at': loc[5]
})
return result
except Exception as e:
logger.error(f"Error getting locations: {e}")
return []
def get_location_by_id(location_id):
"""Get a specific location by ID"""
try:
conn = get_db()
cursor = conn.cursor()
cursor.execute("""
SELECT id, location_code, size, description, created_at, updated_at
FROM warehouse_locations
WHERE id = %s
""", (location_id,))
loc = cursor.fetchone()
cursor.close()
if loc:
return {
'id': loc[0],
'location_code': loc[1],
'size': loc[2],
'description': loc[3],
'created_at': loc[4],
'updated_at': loc[5]
}
return None
except Exception as e:
logger.error(f"Error getting location: {e}")
return None
def update_location(location_id, location_code=None, size=None, description=None):
"""Update a warehouse location
Args:
location_id: ID of location to update
location_code: New location code (optional - cannot be changed in form)
size: New size (optional)
description: New description (optional)
"""
try:
conn = get_db()
cursor = conn.cursor()
# Build update query dynamically
updates = []
params = []
if location_code:
updates.append("location_code = %s")
params.append(location_code)
if size is not None:
updates.append("size = %s")
params.append(size)
if description is not None:
updates.append("description = %s")
params.append(description)
if not updates:
return False, "No fields to update"
params.append(location_id)
query = f"UPDATE warehouse_locations SET {', '.join(updates)} WHERE id = %s"
cursor.execute(query, params)
conn.commit()
cursor.close()
return True, "Location updated successfully."
except Exception as e:
if "Duplicate entry" in str(e):
return False, f"Failed: Location code already exists."
logger.error(f"Error updating location: {e}")
return False, f"Error updating location: {str(e)}"
def delete_location(location_id):
"""Delete a warehouse location"""
try:
conn = get_db()
cursor = conn.cursor()
cursor.execute("DELETE FROM warehouse_locations WHERE id = %s", (location_id,))
conn.commit()
cursor.close()
return True, "Location deleted successfully."
except Exception as e:
logger.error(f"Error deleting location: {e}")
return False, f"Error deleting location: {str(e)}"
def delete_multiple_locations(location_ids):
"""Delete multiple warehouse locations"""
try:
if not location_ids:
return False, "No locations to delete."
conn = get_db()
cursor = conn.cursor()
deleted_count = 0
for loc_id in location_ids:
try:
cursor.execute("DELETE FROM warehouse_locations WHERE id = %s", (int(loc_id),))
if cursor.rowcount > 0:
deleted_count += 1
except:
pass
conn.commit()
cursor.close()
return True, f"Deleted {deleted_count} location(s)."
except Exception as e:
logger.error(f"Error deleting multiple locations: {e}")
return False, f"Error deleting locations: {str(e)}"
# ============================================================================
# Set Boxes Locations - Functions for assigning boxes to locations
# ============================================================================
def search_box_by_number(box_number):
"""Search for a box by its number
Returns:
tuple: (success: bool, box_data: dict or None, status_code: int)
"""
try:
if not box_number or not str(box_number).strip():
return False, None, 400
conn = get_db()
cursor = conn.cursor()
cursor.execute("""
SELECT
b.id,
b.box_number,
b.status,
b.location_id,
COALESCE(l.location_code, 'Not assigned') as location_code,
b.created_at
FROM boxes_crates b
LEFT JOIN warehouse_locations l ON b.location_id = l.id
WHERE b.box_number = %s
""", (str(box_number).strip(),))
result = cursor.fetchone()
cursor.close()
if not result:
return False, None, 404
box_data = {
'id': result[0],
'box_number': result[1],
'status': result[2],
'location_id': result[3],
'location_code': result[4],
'created_at': str(result[5])
}
return True, box_data, 200
except Exception as e:
logger.error(f"Error searching box: {e}")
return False, None, 500
def search_location_with_boxes(location_code):
"""Search for a location and get all boxes assigned to it
Returns:
tuple: (success: bool, data: dict, status_code: int)
"""
try:
if not location_code or not str(location_code).strip():
return False, {}, 400
conn = get_db()
cursor = conn.cursor()
# Get location info
cursor.execute("""
SELECT id, location_code, size, description
FROM warehouse_locations
WHERE location_code = %s
""", (str(location_code).strip(),))
location = cursor.fetchone()
if not location:
cursor.close()
return False, {'error': f'Location "{location_code}" not found'}, 404
location_id = location[0]
# Get all boxes in this location
cursor.execute("""
SELECT
id,
box_number,
status,
created_at
FROM boxes_crates
WHERE location_id = %s
ORDER BY id DESC
""", (location_id,))
boxes = cursor.fetchall()
cursor.close()
location_data = {
'id': location[0],
'location_code': location[1],
'size': location[2],
'description': location[3]
}
boxes_list = []
for box in boxes:
boxes_list.append({
'id': box[0],
'box_number': box[1],
'status': box[2],
'created_at': str(box[3])
})
return True, {'location': location_data, 'boxes': boxes_list}, 200
except Exception as e:
logger.error(f"Error searching location: {e}")
return False, {'error': str(e)}, 500
def assign_box_to_location(box_id, location_code):
"""Assign a box to a warehouse location
Returns:
tuple: (success: bool, message: str, status_code: int)
"""
try:
if not box_id or not location_code:
return False, 'Box ID and location code are required', 400
conn = get_db()
cursor = conn.cursor()
# Check if location exists
cursor.execute("""
SELECT id FROM warehouse_locations
WHERE location_code = %s
""", (location_code,))
location = cursor.fetchone()
if not location:
cursor.close()
return False, f'Location "{location_code}" not found', 404
location_id = location[0]
# Get box info
cursor.execute("""
SELECT box_number FROM boxes_crates WHERE id = %s
""", (box_id,))
box = cursor.fetchone()
if not box:
cursor.close()
return False, 'Box not found', 404
# Update box location
cursor.execute("""
UPDATE boxes_crates
SET location_id = %s, updated_at = NOW()
WHERE id = %s
""", (location_id, box_id))
conn.commit()
cursor.close()
return True, f'Box "{box[0]}" assigned to location "{location_code}"', 200
except Exception as e:
logger.error(f"Error assigning box to location: {e}")
return False, f'Error: {str(e)}', 500
def move_box_to_new_location(box_id, new_location_code):
"""Move a box from current location to a new location
Returns:
tuple: (success: bool, message: str, status_code: int)
"""
try:
if not box_id or not new_location_code:
return False, 'Box ID and new location code are required', 400
conn = get_db()
cursor = conn.cursor()
# Check if new location exists
cursor.execute("""
SELECT id FROM warehouse_locations
WHERE location_code = %s
""", (new_location_code,))
location = cursor.fetchone()
if not location:
cursor.close()
return False, f'Location "{new_location_code}" not found', 404
new_location_id = location[0]
# Get box info
cursor.execute("""
SELECT box_number FROM boxes_crates WHERE id = %s
""", (box_id,))
box = cursor.fetchone()
if not box:
cursor.close()
return False, 'Box not found', 404
# Update box location
cursor.execute("""
UPDATE boxes_crates
SET location_id = %s, updated_at = NOW()
WHERE id = %s
""", (new_location_id, box_id))
conn.commit()
cursor.close()
return True, f'Box "{box[0]}" moved to location "{new_location_code}"', 200
except Exception as e:
logger.error(f"Error moving box: {e}")
return False, f'Error: {str(e)}', 500
def get_cp_inventory_list(limit=100, offset=0):
"""
Get CP articles from scanfg_orders with box and location info
Groups by CP_full_code (8 digits) to show all entries with that base CP
Returns latest entries first
Returns:
List of CP inventory records with box and location info
"""
try:
conn = get_db()
cursor = conn.cursor()
# Get all CP codes with their box and location info (latest entries first)
# NOTE: Use box's CURRENT location (from boxes_crates), not the historical scan location
cursor.execute("""
SELECT
s.id,
s.CP_full_code,
SUBSTRING(s.CP_full_code, 1, 10) as cp_base,
COUNT(*) as total_entries,
s.box_id,
bc.box_number,
wl.location_code,
wl.id as location_id,
MAX(s.date) as latest_date,
MAX(s.time) as latest_time,
SUM(s.approved_quantity) as total_approved,
SUM(s.rejected_quantity) as total_rejected
FROM scanfg_orders s
LEFT JOIN boxes_crates bc ON s.box_id = bc.id
LEFT JOIN warehouse_locations wl ON bc.location_id = wl.id
GROUP BY SUBSTRING(s.CP_full_code, 1, 10), s.box_id
ORDER BY MAX(s.created_at) DESC
LIMIT %s OFFSET %s
""", (limit, offset))
# Convert tuples to dicts using cursor description
columns = [col[0] for col in cursor.description]
results = []
for row in cursor.fetchall():
row_dict = {columns[i]: row[i] for i in range(len(columns))}
# Convert time and date fields to strings to avoid JSON serialization issues
if row_dict.get('latest_date'):
row_dict['latest_date'] = str(row_dict['latest_date'])
if row_dict.get('latest_time'):
row_dict['latest_time'] = str(row_dict['latest_time'])
results.append(row_dict)
cursor.close()
return results if results else []
except Exception as e:
logger.error(f"Error getting CP inventory: {e}")
return []
def search_cp_code(cp_code_search):
"""
Search for CP codes - can search by full CP code or CP base (8 digits)
Args:
cp_code_search: Search string (can be "CP00000001" or "CP00000001-0001")
Returns:
List of matching CP inventory records
"""
try:
conn = get_db()
cursor = conn.cursor()
# Remove hyphen and get base CP code if full CP provided
search_term = cp_code_search.replace('-', '').strip().upper()
# Search for matching CP codes
# NOTE: Use box's CURRENT location (from boxes_crates), not the historical scan location
cursor.execute("""
SELECT
s.id,
s.CP_full_code,
SUBSTRING(s.CP_full_code, 1, 10) as cp_base,
COUNT(*) as total_entries,
s.box_id,
bc.box_number,
wl.location_code,
wl.id as location_id,
MAX(s.date) as latest_date,
MAX(s.time) as latest_time,
SUM(s.approved_quantity) as total_approved,
SUM(s.rejected_quantity) as total_rejected
FROM scanfg_orders s
LEFT JOIN boxes_crates bc ON s.box_id = bc.id
LEFT JOIN warehouse_locations wl ON bc.location_id = wl.id
WHERE REPLACE(s.CP_full_code, '-', '') LIKE %s
GROUP BY SUBSTRING(s.CP_full_code, 1, 10), s.box_id
ORDER BY MAX(s.created_at) DESC
""", (f"{search_term}%",))
# Convert tuples to dicts using cursor description
columns = [col[0] for col in cursor.description]
results = []
for row in cursor.fetchall():
row_dict = {columns[i]: row[i] for i in range(len(columns))}
# Convert time and date fields to strings to avoid JSON serialization issues
if row_dict.get('latest_date'):
row_dict['latest_date'] = str(row_dict['latest_date'])
if row_dict.get('latest_time'):
row_dict['latest_time'] = str(row_dict['latest_time'])
results.append(row_dict)
cursor.close()
return results if results else []
except Exception as e:
logger.error(f"Error searching CP code: {e}")
return []
def search_by_box_number(box_number_search):
"""
Search for box number and get all CP codes in that box
Args:
box_number_search: Box number to search for
Returns:
List of CP entries in the box
"""
try:
conn = get_db()
cursor = conn.cursor()
box_search = box_number_search.strip().upper()
cursor.execute("""
SELECT
s.id,
s.CP_full_code,
s.operator_code,
s.quality_code,
s.date,
s.time,
s.approved_quantity,
s.rejected_quantity,
bc.box_number,
bc.id as box_id,
wl.location_code,
wl.id as location_id,
s.created_at
FROM scanfg_orders s
LEFT JOIN boxes_crates bc ON s.box_id = bc.id
LEFT JOIN warehouse_locations wl ON bc.location_id = wl.id
WHERE bc.box_number LIKE %s
ORDER BY s.created_at DESC
LIMIT 500
""", (f"%{box_search}%",))
# Convert tuples to dicts using cursor description
columns = [col[0] for col in cursor.description]
results = []
for row in cursor.fetchall():
row_dict = {columns[i]: row[i] for i in range(len(columns))}
# Convert time and date fields to strings to avoid JSON serialization issues
if row_dict.get('date'):
row_dict['date'] = str(row_dict['date'])
if row_dict.get('time'):
row_dict['time'] = str(row_dict['time'])
if row_dict.get('created_at'):
row_dict['created_at'] = str(row_dict['created_at'])
results.append(row_dict)
cursor.close()
return results if results else []
except Exception as e:
logger.error(f"Error searching by box number: {e}")
return []
def get_cp_details(cp_code):
"""
Get detailed information for a specific CP code (8 digits)
Shows all variations (with different 4-digit suffixes) and their locations
Args:
cp_code: CP base code (8 digits, e.g., "CP00000001")
Returns:
List of all CP variations with their box and location info
"""
try:
conn = get_db()
cursor = conn.cursor()
# Search for all entries with this CP base
# NOTE: Use box's CURRENT location (from boxes_crates), not the historical scan location
cursor.execute("""
SELECT
s.id,
s.CP_full_code,
s.operator_code,
s.OC1_code,
s.OC2_code,
s.quality_code,
s.date,
s.time,
s.approved_quantity,
s.rejected_quantity,
bc.box_number,
bc.id as box_id,
wl.location_code,
wl.id as location_id,
wl.description,
s.created_at
FROM scanfg_orders s
LEFT JOIN boxes_crates bc ON s.box_id = bc.id
LEFT JOIN warehouse_locations wl ON bc.location_id = wl.id
WHERE SUBSTRING(s.CP_full_code, 1, 10) = %s
ORDER BY s.created_at DESC
LIMIT 1000
""", (cp_code.upper(),))
# Convert tuples to dicts using cursor description
columns = [col[0] for col in cursor.description]
results = []
for row in cursor.fetchall():
row_dict = {columns[i]: row[i] for i in range(len(columns))}
# Convert time and date fields to strings to avoid JSON serialization issues
if row_dict.get('date'):
row_dict['date'] = str(row_dict['date'])
if row_dict.get('time'):
row_dict['time'] = str(row_dict['time'])
if row_dict.get('created_at'):
row_dict['created_at'] = str(row_dict['created_at'])
results.append(row_dict)
cursor.close()
return results if results else []
except Exception as e:
logger.error(f"Error getting CP details: {e}")
return []