import mariadb from flask import current_app, request, render_template, session, redirect, url_for, jsonify, make_response import csv, os, tempfile from reportlab.lib.pagesizes import letter from reportlab.pdfgen import canvas from reportlab.lib.units import cm, mm from reportlab.graphics.barcode import code128 import io def get_db_connection(): settings_file = current_app.instance_path + '/external_server.conf' settings = {} with open(settings_file, 'r') as f: for line in f: line = line.strip() if line and '=' in line and not line.startswith('#'): key, value = line.split('=', 1) settings[key] = value return mariadb.connect( user=settings['username'], password=settings['password'], host=settings['server_domain'], port=int(settings['port']), database=settings['database_name'] ) def ensure_warehouse_locations_table(): try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SHOW TABLES LIKE 'warehouse_locations'") result = cursor.fetchone() if not result: cursor.execute(''' CREATE TABLE IF NOT EXISTS warehouse_locations ( id BIGINT AUTO_INCREMENT PRIMARY KEY, location_code VARCHAR(12) NOT NULL, size INT, description VARCHAR(250) ) ''') conn.commit() conn.close() except Exception as e: print(f"Error ensuring warehouse_locations table: {e}") def ensure_boxes_crates_table(): try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SHOW TABLES LIKE 'boxes_crates'") result = cursor.fetchone() if not result: cursor.execute(''' CREATE TABLE IF NOT EXISTS boxes_crates ( id BIGINT AUTO_INCREMENT PRIMARY KEY, box_number VARCHAR(8) NOT NULL UNIQUE, status ENUM('open', 'closed') DEFAULT 'open', location_id BIGINT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, created_by VARCHAR(100), FOREIGN KEY (location_id) REFERENCES warehouse_locations(id) ON DELETE SET NULL ) ''') conn.commit() conn.close() except Exception as e: print(f"Error ensuring boxes_crates table: {e}") def ensure_box_contents_table(): """Ensure box_contents table exists for tracking CP codes in boxes""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SHOW TABLES LIKE 'box_contents'") result = cursor.fetchone() if not result: cursor.execute(''' CREATE TABLE IF NOT EXISTS box_contents ( id BIGINT AUTO_INCREMENT PRIMARY KEY, box_id BIGINT NOT NULL, cp_code VARCHAR(15) NOT NULL, scan_id BIGINT, scanned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, scanned_by VARCHAR(100), FOREIGN KEY (box_id) REFERENCES boxes_crates(id) ON DELETE CASCADE, INDEX idx_box_id (box_id), INDEX idx_cp_code (cp_code) ) ''') conn.commit() print("box_contents table created successfully") conn.close() except Exception as e: print(f"Error ensuring box_contents table: {e}") def ensure_location_contents_table(): """Ensure location_contents table exists for tracking boxes in locations""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SHOW TABLES LIKE 'location_contents'") result = cursor.fetchone() if not result: cursor.execute(''' CREATE TABLE IF NOT EXISTS location_contents ( id BIGINT AUTO_INCREMENT PRIMARY KEY, location_id BIGINT NOT NULL, box_id BIGINT NOT NULL, placed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, placed_by VARCHAR(100), removed_at TIMESTAMP NULL, removed_by VARCHAR(100), status ENUM('active', 'removed') DEFAULT 'active', FOREIGN KEY (location_id) REFERENCES warehouse_locations(id) ON DELETE CASCADE, FOREIGN KEY (box_id) REFERENCES boxes_crates(id) ON DELETE CASCADE, INDEX idx_location_id (location_id), INDEX idx_box_id (box_id), INDEX idx_status (status) ) ''') conn.commit() print("location_contents table created successfully") conn.close() except Exception as e: print(f"Error ensuring location_contents table: {e}") # Add warehouse-specific functions below def add_location(location_code, size, description): conn = get_db_connection() cursor = conn.cursor() try: cursor.execute( "INSERT INTO warehouse_locations (location_code, size, description) VALUES (?, ?, ?)", (location_code, size if size else None, description) ) conn.commit() conn.close() return "Location added successfully." except mariadb.IntegrityError: conn.close() return "Failed: Location code already exists." def get_locations(): conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT id, location_code, size, description FROM warehouse_locations ORDER BY id DESC") locations = cursor.fetchall() conn.close() return locations def recreate_warehouse_locations_table(): conn = get_db_connection() cursor = conn.cursor() cursor.execute("DROP TABLE IF EXISTS warehouse_locations") cursor.execute(""" CREATE TABLE warehouse_locations ( id INT AUTO_INCREMENT PRIMARY KEY, location_code VARCHAR(12) UNIQUE NOT NULL, size INT, description VARCHAR(250) ) """) conn.commit() conn.close() def delete_locations_by_ids(ids_str): ids = [id.strip() for id in ids_str.split(',') if id.strip().isdigit()] if not ids: return "No valid IDs provided." conn = get_db_connection() cursor = conn.cursor() deleted = 0 for id in ids: cursor.execute("DELETE FROM warehouse_locations WHERE id = %s", (id,)) if cursor.rowcount: deleted += 1 conn.commit() conn.close() return f"Deleted {deleted} location(s)." def create_locations_handler(): try: # Ensure table exists ensure_warehouse_locations_table() if request.method == "POST": if request.form.get("delete_locations"): ids_str = request.form.get("delete_ids", "") message = delete_locations_by_ids(ids_str) session['flash_message'] = message else: location_code = request.form.get("location_code") size = request.form.get("size") description = request.form.get("description") message = add_location(location_code, size, description) session['flash_message'] = message # Redirect to prevent form resubmission on page reload return redirect(url_for('warehouse.create_locations')) # Get flash message from session if any message = session.pop('flash_message', None) locations = get_locations() return render_template("create_locations.html", locations=locations, message=message) except Exception as e: import traceback error_trace = traceback.format_exc() print(f"Error in create_locations_handler: {e}") print(error_trace) return f"

Error loading warehouse locations

{error_trace}
", 500 def import_locations_csv_handler(): report = None locations = [] errors = [] temp_dir = tempfile.gettempdir() if request.method == 'POST': file = request.files.get('csv_file') if file and file.filename.endswith('.csv'): temp_path = os.path.join(temp_dir, file.filename) file.save(temp_path) session['csv_filename'] = file.filename session['csv_filepath'] = temp_path with open(temp_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) locations = [] for row in reader: location_code = row.get('Location Code') or row.get('location_code') or '' size = row.get('Size') or row.get('size') or '' description = row.get('Description') or row.get('description') or '' locations.append((location_code, size, description)) session['csv_locations'] = locations elif 'csv_locations' in session: locations = session['csv_locations'] if request.form.get('create_locations') and locations: added = 0 failed = 0 errors = [] for loc in locations: location_code, size, description = loc result = add_location(location_code, size, description) if result and 'success' in result.lower(): added += 1 else: failed += 1 errors.append(location_code or '') report = f"{added} locations were added to warehouse_locations table. {failed} locations failed: {', '.join(errors)}" session.pop('csv_locations', None) session.pop('csv_filename', None) session.pop('csv_filepath', None) return redirect(url_for('warehouse.import_locations_csv') + '#created') elif 'csv_locations' in session: locations = session['csv_locations'] return render_template('import_locations_csv.html', report=report, locations=locations) def generate_location_label_pdf(): """Generate PDF for location barcode label (8cm x 5cm landscape)""" try: data = request.get_json() location_code = data.get('location_code', '') if not location_code: return jsonify({'error': 'Location code is required'}), 400 print(f"DEBUG: Generating location label PDF for: {location_code}") # Create PDF in memory buffer = io.BytesIO() # Create PDF with 8cm x 5cm page size in landscape orientation page_width = 80 * mm # 8 cm page_height = 50 * mm # 5 cm c = canvas.Canvas(buffer, pagesize=(page_width, page_height)) # Optimize for label printer c.setPageCompression(1) c.setCreator("Trasabilitate Location Label System") c.setTitle("Location Label - Optimized for Label Printers") # Define margins and usable area margin = 2 * mm usable_width = page_width - (2 * margin) usable_height = page_height - (2 * margin) # Calculate vertical layout # Top section: "Location nr: XXXX" text # Bottom section: Barcode text_height = 12 * mm # Space for text at top barcode_height = usable_height - text_height - (1 * mm) # Rest for barcode with minimal spacing # === TOP SECTION: Location Nr Label === # Position from top of usable area text_y = page_height - margin - text_height # Draw "Location nr:" label c.setFont("Helvetica-Bold", 14) label_text = "Location nr:" label_width = c.stringWidth(label_text, "Helvetica-Bold", 14) # Draw location code c.setFont("Helvetica-Bold", 18) code_text = location_code code_width = c.stringWidth(code_text, "Helvetica-Bold", 18) # Calculate total width and center everything total_text_width = label_width + 3*mm + code_width # 3mm spacing between label and code start_x = margin + (usable_width - total_text_width) / 2 # Draw label text c.setFont("Helvetica-Bold", 14) c.drawString(start_x, text_y + 5*mm, label_text) # Draw location code c.setFont("Helvetica-Bold", 18) c.drawString(start_x + label_width + 3*mm, text_y + 5*mm, code_text) # === BOTTOM SECTION: Barcode === barcode_y = margin try: # Create barcode for location code barcode = code128.Code128( location_code, barWidth=0.4*mm, # Thicker bars for better scanning barHeight=barcode_height, humanReadable=True, fontSize=10 ) # Calculate scaling to fit width scale_factor = usable_width / barcode.width # Center the barcode horizontally barcode_x = margin + (usable_width - (barcode.width * scale_factor)) / 2 # Draw the barcode c.saveState() c.translate(barcode_x, barcode_y) c.scale(scale_factor, 1) barcode.drawOn(c, 0, 0) c.restoreState() print(f"DEBUG: Barcode generated successfully with scale factor: {scale_factor}") except Exception as e: print(f"DEBUG: Error generating barcode: {e}") # Fallback: draw text if barcode fails c.setFont("Helvetica-Bold", 12) text_width = c.stringWidth(location_code, "Helvetica-Bold", 12) c.drawString((page_width - text_width) / 2, barcode_y + barcode_height/2, location_code) # Finalize PDF c.save() # Prepare response buffer.seek(0) response = make_response(buffer.getvalue()) response.headers['Content-Type'] = 'application/pdf' response.headers['Content-Disposition'] = f'inline; filename=location_{location_code}_label.pdf' print(f"DEBUG: Location label PDF generated successfully") return response except Exception as e: print(f"Error generating location label PDF: {e}") import traceback traceback.print_exc() return jsonify({'error': str(e)}), 500 def update_location(location_id, location_code, size, description): """Update an existing warehouse location""" try: conn = get_db_connection() cursor = conn.cursor() # Check if location exists cursor.execute("SELECT id FROM warehouse_locations WHERE id = %s", (location_id,)) if not cursor.fetchone(): conn.close() return {"success": False, "error": "Location not found"} # Check if location code already exists for different location cursor.execute("SELECT id FROM warehouse_locations WHERE location_code = %s AND id != %s", (location_code, location_id)) if cursor.fetchone(): conn.close() return {"success": False, "error": "Location code already exists"} # Update location cursor.execute( "UPDATE warehouse_locations SET location_code = %s, size = %s, description = %s WHERE id = %s", (location_code, size if size else None, description, location_id) ) conn.commit() conn.close() return {"success": True, "message": "Location updated successfully"} except Exception as e: print(f"Error updating location: {e}") return {"success": False, "error": str(e)} # ============================================================================ # Boxes/Crates Functions # ============================================================================ def generate_box_number(): """Generate next box number with 8 digits (00000001, 00000002, etc.)""" conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT MAX(CAST(box_number AS UNSIGNED)) FROM boxes_crates") result = cursor.fetchone() conn.close() if result and result[0]: next_number = int(result[0]) + 1 else: next_number = 1 return str(next_number).zfill(8) def add_box(location_id=None, created_by=None): """Add a new box/crate""" conn = get_db_connection() cursor = conn.cursor() box_number = generate_box_number() try: cursor.execute( "INSERT INTO boxes_crates (box_number, status, location_id, created_by) VALUES (%s, %s, %s, %s)", (box_number, 'open', location_id if location_id else None, created_by) ) conn.commit() conn.close() return f"Box {box_number} created successfully" except Exception as e: conn.close() return f"Error creating box: {e}" def get_boxes(): """Get all boxes with location information""" conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT b.id, b.box_number, b.status, l.location_code, b.created_at, b.updated_at, b.created_by, b.location_id FROM boxes_crates b LEFT JOIN warehouse_locations l ON b.location_id = l.id ORDER BY b.id DESC """) boxes = cursor.fetchall() conn.close() return boxes def update_box_status(box_id, status): """Update box status (open/closed)""" conn = get_db_connection() cursor = conn.cursor() try: cursor.execute( "UPDATE boxes_crates SET status = %s WHERE id = %s", (status, box_id) ) conn.commit() conn.close() return {"success": True, "message": f"Box status updated to {status}"} except Exception as e: conn.close() return {"success": False, "error": str(e)} def update_box_location(box_id, location_id): """Update box location""" conn = get_db_connection() cursor = conn.cursor() try: cursor.execute( "UPDATE boxes_crates SET location_id = %s WHERE id = %s", (location_id if location_id else None, box_id) ) conn.commit() conn.close() return {"success": True, "message": "Box location updated"} except Exception as e: conn.close() return {"success": False, "error": str(e)} def delete_boxes_by_ids(ids_str): """Delete boxes by comma-separated IDs""" ids = [id.strip() for id in ids_str.split(',') if id.strip().isdigit()] if not ids: return "No valid IDs provided." conn = get_db_connection() cursor = conn.cursor() deleted = 0 for id in ids: cursor.execute("DELETE FROM boxes_crates WHERE id = %s", (id,)) if cursor.rowcount: deleted += 1 conn.commit() conn.close() return f"Deleted {deleted} box(es)." def manage_boxes_handler(): """Handler for boxes/crates management page""" try: # Ensure table exists ensure_boxes_crates_table() ensure_warehouse_locations_table() if request.method == "POST": action = request.form.get("action") if action == "delete_boxes": ids_str = request.form.get("delete_ids", "") message = delete_boxes_by_ids(ids_str) session['flash_message'] = message elif action == "add_box": created_by = session.get('user', 'Unknown') message = add_box(None, created_by) # Create box without location # Check if this is an AJAX request if request.headers.get('X-Requested-With') == 'XMLHttpRequest': # Extract box number from message (format: "Box 12345678 created successfully") import re match = re.search(r'Box (\d{8})', message) if match: return jsonify({'success': True, 'box_number': match.group(1), 'message': message}) else: return jsonify({'success': False, 'error': message}) session['flash_message'] = message elif action == "update_status": box_id = request.form.get("box_id") new_status = request.form.get("new_status") message = update_box_status(box_id, new_status) session['flash_message'] = message elif action == "update_location": box_id = request.form.get("box_id") new_location_id = request.form.get("new_location_id") message = update_box_location(box_id, new_location_id) session['flash_message'] = message return redirect(url_for('warehouse.manage_boxes')) # Get flash message from session if any message = session.pop('flash_message', None) boxes = get_boxes() locations = get_locations() return render_template("manage_boxes.html", boxes=boxes, locations=locations, message=message) except Exception as e: import traceback error_trace = traceback.format_exc() print(f"Error in manage_boxes_handler: {e}") print(error_trace) return f"

Error loading boxes management

{error_trace}
", 500 def delete_location_by_id(location_id): """Delete a warehouse location by ID""" try: conn = get_db_connection() cursor = conn.cursor() # Check if location exists cursor.execute("SELECT location_code FROM warehouse_locations WHERE id = %s", (location_id,)) location = cursor.fetchone() if not location: conn.close() return {"success": False, "error": "Location not found"} # Delete location cursor.execute("DELETE FROM warehouse_locations WHERE id = %s", (location_id,)) conn.commit() conn.close() return {"success": True, "message": f"Location '{location[0]}' deleted successfully"} except Exception as e: print(f"Error deleting location: {e}") return {"success": False, "error": str(e)} def assign_cp_to_box_handler(): """Handle assigning CP code to a box""" from flask import request, jsonify, session import json try: # Ensure box_contents table exists ensure_box_contents_table() data = json.loads(request.data) box_number = data.get('box_number') cp_code = data.get('cp_code') scanned_by = session.get('user', 'Unknown') if not box_number or not cp_code: return jsonify({'success': False, 'error': 'Missing box_number or cp_code'}), 400 conn = get_db_connection() cursor = conn.cursor() # Find the box by number cursor.execute("SELECT id FROM boxes_crates WHERE box_number = %s", (box_number,)) box = cursor.fetchone() if not box: conn.close() return jsonify({'success': False, 'error': f'Box {box_number} not found'}), 404 box_id = box[0] # Insert into box_contents cursor.execute(""" INSERT INTO box_contents (box_id, cp_code, scanned_by) VALUES (%s, %s, %s) """, (box_id, cp_code, scanned_by)) conn.commit() conn.close() return jsonify({ 'success': True, 'message': f'CP {cp_code} assigned to box {box_number}' }), 200 except Exception as e: import traceback print(f"Error in assign_cp_to_box_handler: {e}") print(traceback.format_exc()) return jsonify({'success': False, 'error': str(e)}), 500 def view_warehouse_inventory_handler(): """Handle warehouse inventory view - shows CP codes, boxes, and locations""" from flask import render_template, request try: # Ensure tables exist ensure_box_contents_table() ensure_location_contents_table() # Get search parameters search_cp = request.args.get('search_cp', '').strip() search_box = request.args.get('search_box', '').strip() search_location = request.args.get('search_location', '').strip() conn = get_db_connection() cursor = conn.cursor() # Build query with joins and filters query = """ SELECT bc.cp_code, b.box_number, wl.location_code, bc.scanned_at, bc.scanned_by, lc.placed_at, lc.placed_by, b.status as box_status, lc.status as location_status FROM box_contents bc INNER JOIN boxes_crates b ON bc.box_id = b.id LEFT JOIN location_contents lc ON b.id = lc.box_id AND lc.status = 'active' LEFT JOIN warehouse_locations wl ON lc.location_id = wl.id WHERE 1=1 """ params = [] if search_cp: query += " AND bc.cp_code LIKE %s" params.append(f"%{search_cp}%") if search_box: query += " AND b.box_number LIKE %s" params.append(f"%{search_box}%") if search_location: query += " AND wl.location_code LIKE %s" params.append(f"%{search_location}%") query += " ORDER BY bc.scanned_at DESC" cursor.execute(query, params) inventory_data = cursor.fetchall() conn.close() return render_template( 'warehouse_inventory.html', inventory_data=inventory_data, search_cp=search_cp, search_box=search_box, search_location=search_location ) except Exception as e: import traceback error_trace = traceback.format_exc() print(f"Error in view_warehouse_inventory_handler: {e}") print(error_trace) return f"

Error loading warehouse inventory

{error_trace}
", 500 # Box Location Management Functions def search_box_by_number(box_number): """ Search for a box by box number and return its details including location Args: box_number (str): The box number to search for Returns: tuple: (success: bool, data: dict, status_code: int) """ try: if not box_number: return False, {'message': 'Box number is required'}, 400 conn = get_db_connection() cursor = conn.cursor() # Search for the box and get its location info cursor.execute(""" SELECT b.id, b.box_number, b.status, b.location_id, w.location_code FROM boxes_crates b LEFT JOIN warehouse_locations w ON b.location_id = w.id WHERE b.box_number = %s """, (box_number,)) result = cursor.fetchone() conn.close() if result: return True, { 'box': { 'id': result[0], 'box_number': result[1], 'status': result[2], 'location_id': result[3], 'location_code': result[4] } }, 200 else: return False, {'message': f'Box "{box_number}" not found in the system'}, 404 except Exception as e: return False, {'message': f'Error searching for box: {str(e)}'}, 500 def assign_box_to_location(box_id, location_code): """ Assign a box to a warehouse location Args: box_id (int): The ID of the box to assign location_code (str): The location code to assign the box to Returns: tuple: (success: bool, data: dict, status_code: int) """ try: if not box_id or not location_code: return False, {'message': 'Box ID and location code are required'}, 400 conn = get_db_connection() cursor = conn.cursor() # Check if location exists cursor.execute("SELECT id FROM warehouse_locations WHERE location_code = %s", (location_code,)) location_result = cursor.fetchone() if not location_result: conn.close() return False, {'message': f'Location "{location_code}" not found in the system'}, 404 location_id = location_result[0] # Update box location cursor.execute(""" UPDATE boxes_crates SET location_id = %s, updated_at = NOW() WHERE id = %s """, (location_id, box_id)) conn.commit() conn.close() return True, {'message': f'Box successfully assigned to location "{location_code}"'}, 200 except Exception as e: return False, {'message': f'Error assigning box to location: {str(e)}'}, 500 def search_location_with_boxes(location_code): """ Search for a location and get all boxes assigned to it Args: location_code (str): The location code to search for Returns: tuple: (success: bool, data: dict, status_code: int) """ try: if not location_code: return False, {'message': 'Location code is required'}, 400 conn = get_db_connection() cursor = conn.cursor() # Search for the location cursor.execute(""" SELECT id, location_code, size, description FROM warehouse_locations WHERE location_code = %s """, (location_code,)) location_result = cursor.fetchone() if not location_result: conn.close() return False, {'message': f'Location "{location_code}" not found in the system'}, 404 location_id = location_result[0] # Get all boxes assigned to this location cursor.execute(""" SELECT b.id, b.box_number, b.status, b.created_at FROM boxes_crates b WHERE b.location_id = %s ORDER BY b.box_number """, (location_id,)) boxes_results = cursor.fetchall() conn.close() boxes = [] for box in boxes_results: boxes.append({ 'id': box[0], 'box_number': box[1], 'status': box[2], 'created_at': box[3].strftime('%Y-%m-%d %H:%M:%S') if box[3] else None }) return True, { 'location': { 'id': location_result[0], 'location_code': location_result[1], 'size': location_result[2], 'description': location_result[3] }, 'boxes': boxes }, 200 except Exception as e: return False, {'message': f'Error searching for location: {str(e)}'}, 500 def move_box_to_new_location(box_id, new_location_code): """ Move a box from its current location to a new location Args: box_id (int): The ID of the box to move new_location_code (str): The new location code Returns: tuple: (success: bool, data: dict, status_code: int) """ try: if not box_id or not new_location_code: return False, {'message': 'Box ID and new location code are required'}, 400 conn = get_db_connection() cursor = conn.cursor() # Check if new location exists cursor.execute("SELECT id FROM warehouse_locations WHERE location_code = %s", (new_location_code,)) location_result = cursor.fetchone() if not location_result: conn.close() return False, {'message': f'Location "{new_location_code}" not found in the system'}, 404 new_location_id = location_result[0] # Get box number for response message cursor.execute("SELECT box_number FROM boxes_crates WHERE id = %s", (box_id,)) box_result = cursor.fetchone() if not box_result: conn.close() return False, {'message': 'Box not found'}, 404 box_number = box_result[0] # Update box location cursor.execute(""" UPDATE boxes_crates SET location_id = %s, updated_at = NOW() WHERE id = %s """, (new_location_id, box_id)) conn.commit() conn.close() return True, {'message': f'Box "{box_number}" successfully moved to location "{new_location_code}"'}, 200 except Exception as e: return False, {'message': f'Error moving box to new location: {str(e)}'}, 500 def change_box_status(box_id, new_status): """ Change the status of a box (open/closed) Args: box_id (int): The ID of the box new_status (str): The new status ('open' or 'closed') Returns: tuple: (success: bool, data: dict, status_code: int) """ try: if not box_id: return False, {'message': 'Box ID is required'}, 400 if new_status not in ['open', 'closed']: return False, {'message': 'Invalid status. Must be "open" or "closed"'}, 400 conn = get_db_connection() cursor = conn.cursor() # Get box number for response message cursor.execute("SELECT box_number FROM boxes_crates WHERE id = %s", (box_id,)) box_result = cursor.fetchone() if not box_result: conn.close() return False, {'message': 'Box not found'}, 404 box_number = box_result[0] # Update box status cursor.execute(""" UPDATE boxes_crates SET status = %s, updated_at = NOW() WHERE id = %s """, (new_status, box_id)) conn.commit() conn.close() return True, {'message': f'Box "{box_number}" status changed to "{new_status}"'}, 200 except Exception as e: return False, {'message': f'Error changing box status: {str(e)}'}, 500