import mariadb from flask import current_app, request, render_template, session, redirect, url_for, jsonify, make_response import csv, os, tempfile from reportlab.lib.pagesizes import letter from reportlab.pdfgen import canvas from reportlab.lib.units import cm from reportlab.graphics.barcode import code128 import io def get_db_connection(): settings_file = current_app.instance_path + '/external_server.conf' settings = {} with open(settings_file, 'r') as f: for line in f: line = line.strip() if line and '=' in line and not line.startswith('#'): key, value = line.split('=', 1) settings[key] = value return mariadb.connect( user=settings['username'], password=settings['password'], host=settings['server_domain'], port=int(settings['port']), database=settings['database_name'] ) def ensure_warehouse_locations_table(): try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SHOW TABLES LIKE 'warehouse_locations'") result = cursor.fetchone() if not result: cursor.execute(''' CREATE TABLE IF NOT EXISTS warehouse_locations ( id BIGINT AUTO_INCREMENT PRIMARY KEY, location_code VARCHAR(12) NOT NULL, size INT, description VARCHAR(250) ) ''') conn.commit() conn.close() except Exception as e: print(f"Error ensuring warehouse_locations table: {e}") def ensure_boxes_crates_table(): try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SHOW TABLES LIKE 'boxes_crates'") result = cursor.fetchone() if not result: cursor.execute(''' CREATE TABLE IF NOT EXISTS boxes_crates ( id BIGINT AUTO_INCREMENT PRIMARY KEY, box_number VARCHAR(8) NOT NULL UNIQUE, status ENUM('open', 'closed') DEFAULT 'open', location_id BIGINT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, created_by VARCHAR(100), FOREIGN KEY (location_id) REFERENCES warehouse_locations(id) ON DELETE SET NULL ) ''') conn.commit() conn.close() except Exception as e: print(f"Error ensuring boxes_crates table: {e}") def ensure_box_contents_table(): """Ensure box_contents table exists for tracking CP codes in boxes""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SHOW TABLES LIKE 'box_contents'") result = cursor.fetchone() if not result: cursor.execute(''' CREATE TABLE IF NOT EXISTS box_contents ( id BIGINT AUTO_INCREMENT PRIMARY KEY, box_id BIGINT NOT NULL, cp_code VARCHAR(15) NOT NULL, scan_id BIGINT, scanned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, scanned_by VARCHAR(100), FOREIGN KEY (box_id) REFERENCES boxes_crates(id) ON DELETE CASCADE, INDEX idx_box_id (box_id), INDEX idx_cp_code (cp_code) ) ''') conn.commit() print("box_contents table created successfully") conn.close() except Exception as e: print(f"Error ensuring box_contents table: {e}") def ensure_location_contents_table(): """Ensure location_contents table exists for tracking boxes in locations""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SHOW TABLES LIKE 'location_contents'") result = cursor.fetchone() if not result: cursor.execute(''' CREATE TABLE IF NOT EXISTS location_contents ( id BIGINT AUTO_INCREMENT PRIMARY KEY, location_id BIGINT NOT NULL, box_id BIGINT NOT NULL, placed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, placed_by VARCHAR(100), removed_at TIMESTAMP NULL, removed_by VARCHAR(100), status ENUM('active', 'removed') DEFAULT 'active', FOREIGN KEY (location_id) REFERENCES warehouse_locations(id) ON DELETE CASCADE, FOREIGN KEY (box_id) REFERENCES boxes_crates(id) ON DELETE CASCADE, INDEX idx_location_id (location_id), INDEX idx_box_id (box_id), INDEX idx_status (status) ) ''') conn.commit() print("location_contents table created successfully") conn.close() except Exception as e: print(f"Error ensuring location_contents table: {e}") # Add warehouse-specific functions below def add_location(location_code, size, description): conn = get_db_connection() cursor = conn.cursor() try: cursor.execute( "INSERT INTO warehouse_locations (location_code, size, description) VALUES (?, ?, ?)", (location_code, size if size else None, description) ) conn.commit() conn.close() return "Location added successfully." except mariadb.IntegrityError: conn.close() return "Failed: Location code already exists." def get_locations(): conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT id, location_code, size, description FROM warehouse_locations ORDER BY id DESC") locations = cursor.fetchall() conn.close() return locations def recreate_warehouse_locations_table(): conn = get_db_connection() cursor = conn.cursor() cursor.execute("DROP TABLE IF EXISTS warehouse_locations") cursor.execute(""" CREATE TABLE warehouse_locations ( id INT AUTO_INCREMENT PRIMARY KEY, location_code VARCHAR(12) UNIQUE NOT NULL, size INT, description VARCHAR(250) ) """) conn.commit() conn.close() def delete_locations_by_ids(ids_str): ids = [id.strip() for id in ids_str.split(',') if id.strip().isdigit()] if not ids: return "No valid IDs provided." conn = get_db_connection() cursor = conn.cursor() deleted = 0 for id in ids: cursor.execute("DELETE FROM warehouse_locations WHERE id = %s", (id,)) if cursor.rowcount: deleted += 1 conn.commit() conn.close() return f"Deleted {deleted} location(s)." def create_locations_handler(): try: # Ensure table exists ensure_warehouse_locations_table() if request.method == "POST": if request.form.get("delete_locations"): ids_str = request.form.get("delete_ids", "") message = delete_locations_by_ids(ids_str) session['flash_message'] = message else: location_code = request.form.get("location_code") size = request.form.get("size") description = request.form.get("description") message = add_location(location_code, size, description) session['flash_message'] = message # Redirect to prevent form resubmission on page reload return redirect(url_for('warehouse.create_locations')) # Get flash message from session if any message = session.pop('flash_message', None) locations = get_locations() return render_template("create_locations.html", locations=locations, message=message) except Exception as e: import traceback error_trace = traceback.format_exc() print(f"Error in create_locations_handler: {e}") print(error_trace) return f"

Error loading warehouse locations

{error_trace}
", 500 def import_locations_csv_handler(): report = None locations = [] errors = [] temp_dir = tempfile.gettempdir() if request.method == 'POST': file = request.files.get('csv_file') if file and file.filename.endswith('.csv'): temp_path = os.path.join(temp_dir, file.filename) file.save(temp_path) session['csv_filename'] = file.filename session['csv_filepath'] = temp_path with open(temp_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) locations = [] for row in reader: location_code = row.get('Location Code') or row.get('location_code') or '' size = row.get('Size') or row.get('size') or '' description = row.get('Description') or row.get('description') or '' locations.append((location_code, size, description)) session['csv_locations'] = locations elif 'csv_locations' in session: locations = session['csv_locations'] if request.form.get('create_locations') and locations: added = 0 failed = 0 errors = [] for loc in locations: location_code, size, description = loc result = add_location(location_code, size, description) if result and 'success' in result.lower(): added += 1 else: failed += 1 errors.append(location_code or '') report = f"{added} locations were added to warehouse_locations table. {failed} locations failed: {', '.join(errors)}" session.pop('csv_locations', None) session.pop('csv_filename', None) session.pop('csv_filepath', None) return redirect(url_for('warehouse.import_locations_csv') + '#created') elif 'csv_locations' in session: locations = session['csv_locations'] return render_template('import_locations_csv.html', report=report, locations=locations) def generate_location_label_pdf(): """Generate PDF for location barcode label (8x4cm)""" try: data = request.get_json() location_code = data.get('location_code', '') if not location_code: return jsonify({'error': 'Location code is required'}), 400 # Create PDF in memory buffer = io.BytesIO() # Create PDF with 8x4cm page size (width x height) page_width = 8 * cm page_height = 4 * cm c = canvas.Canvas(buffer, pagesize=(page_width, page_height)) # Generate Code128 barcode barcode = code128.Code128(location_code, barWidth=1.0, humanReadable=False) # Calculate the desired barcode dimensions (fill most of the label) desired_barcode_width = 7 * cm # Almost full width desired_barcode_height = 2.5 * cm # Most of the height # Calculate scaling factor to fit the desired width scale = desired_barcode_width / barcode.width # Calculate actual dimensions after scaling actual_width = barcode.width * scale actual_height = barcode.height * scale # Center the barcode on the label barcode_x = (page_width - actual_width) / 2 barcode_y = (page_height - actual_height) / 2 + 0.3 * cm # Slightly above center for text space # Draw barcode with scaling c.saveState() c.translate(barcode_x, barcode_y) c.scale(scale, scale) barcode.drawOn(c, 0, 0) c.restoreState() # Add location code text below barcode c.setFont("Helvetica-Bold", 10) text_width = c.stringWidth(location_code, "Helvetica-Bold", 10) text_x = (page_width - text_width) / 2 text_y = barcode_y - 0.5 * cm # Below the barcode c.drawString(text_x, text_y, location_code) # Finalize PDF c.save() # Prepare response buffer.seek(0) response = make_response(buffer.getvalue()) response.headers['Content-Type'] = 'application/pdf' response.headers['Content-Disposition'] = f'inline; filename=location_{location_code}_label.pdf' return response except Exception as e: print(f"Error generating location label PDF: {e}") return jsonify({'error': str(e)}), 500 def update_location(location_id, location_code, size, description): """Update an existing warehouse location""" try: conn = get_db_connection() cursor = conn.cursor() # Check if location exists cursor.execute("SELECT id FROM warehouse_locations WHERE id = %s", (location_id,)) if not cursor.fetchone(): conn.close() return {"success": False, "error": "Location not found"} # Check if location code already exists for different location cursor.execute("SELECT id FROM warehouse_locations WHERE location_code = %s AND id != %s", (location_code, location_id)) if cursor.fetchone(): conn.close() return {"success": False, "error": "Location code already exists"} # Update location cursor.execute( "UPDATE warehouse_locations SET location_code = %s, size = %s, description = %s WHERE id = %s", (location_code, size if size else None, description, location_id) ) conn.commit() conn.close() return {"success": True, "message": "Location updated successfully"} except Exception as e: print(f"Error updating location: {e}") return {"success": False, "error": str(e)} # ============================================================================ # Boxes/Crates Functions # ============================================================================ def generate_box_number(): """Generate next box number with 8 digits (00000001, 00000002, etc.)""" conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT MAX(CAST(box_number AS UNSIGNED)) FROM boxes_crates") result = cursor.fetchone() conn.close() if result and result[0]: next_number = int(result[0]) + 1 else: next_number = 1 return str(next_number).zfill(8) def add_box(location_id=None, created_by=None): """Add a new box/crate""" conn = get_db_connection() cursor = conn.cursor() box_number = generate_box_number() try: cursor.execute( "INSERT INTO boxes_crates (box_number, status, location_id, created_by) VALUES (%s, %s, %s, %s)", (box_number, 'open', location_id if location_id else None, created_by) ) conn.commit() conn.close() return f"Box {box_number} created successfully" except Exception as e: conn.close() return f"Error creating box: {e}" def get_boxes(): """Get all boxes with location information""" conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT b.id, b.box_number, b.status, l.location_code, b.created_at, b.updated_at, b.created_by, b.location_id FROM boxes_crates b LEFT JOIN warehouse_locations l ON b.location_id = l.id ORDER BY b.id DESC """) boxes = cursor.fetchall() conn.close() return boxes def update_box_status(box_id, status): """Update box status (open/closed)""" conn = get_db_connection() cursor = conn.cursor() try: cursor.execute( "UPDATE boxes_crates SET status = %s WHERE id = %s", (status, box_id) ) conn.commit() conn.close() return {"success": True, "message": f"Box status updated to {status}"} except Exception as e: conn.close() return {"success": False, "error": str(e)} def update_box_location(box_id, location_id): """Update box location""" conn = get_db_connection() cursor = conn.cursor() try: cursor.execute( "UPDATE boxes_crates SET location_id = %s WHERE id = %s", (location_id if location_id else None, box_id) ) conn.commit() conn.close() return {"success": True, "message": "Box location updated"} except Exception as e: conn.close() return {"success": False, "error": str(e)} def delete_boxes_by_ids(ids_str): """Delete boxes by comma-separated IDs""" ids = [id.strip() for id in ids_str.split(',') if id.strip().isdigit()] if not ids: return "No valid IDs provided." conn = get_db_connection() cursor = conn.cursor() deleted = 0 for id in ids: cursor.execute("DELETE FROM boxes_crates WHERE id = %s", (id,)) if cursor.rowcount: deleted += 1 conn.commit() conn.close() return f"Deleted {deleted} box(es)." def manage_boxes_handler(): """Handler for boxes/crates management page""" try: # Ensure table exists ensure_boxes_crates_table() ensure_warehouse_locations_table() if request.method == "POST": action = request.form.get("action") if action == "delete_boxes": ids_str = request.form.get("delete_ids", "") message = delete_boxes_by_ids(ids_str) session['flash_message'] = message elif action == "add_box": created_by = session.get('user', 'Unknown') message = add_box(None, created_by) # Create box without location # Check if this is an AJAX request if request.headers.get('X-Requested-With') == 'XMLHttpRequest': # Extract box number from message (format: "Box 12345678 created successfully") import re match = re.search(r'Box (\d{8})', message) if match: return jsonify({'success': True, 'box_number': match.group(1), 'message': message}) else: return jsonify({'success': False, 'error': message}) session['flash_message'] = message elif action == "update_status": box_id = request.form.get("box_id") new_status = request.form.get("new_status") message = update_box_status(box_id, new_status) session['flash_message'] = message elif action == "update_location": box_id = request.form.get("box_id") new_location_id = request.form.get("new_location_id") message = update_box_location(box_id, new_location_id) session['flash_message'] = message return redirect(url_for('warehouse.manage_boxes')) # Get flash message from session if any message = session.pop('flash_message', None) boxes = get_boxes() locations = get_locations() return render_template("manage_boxes.html", boxes=boxes, locations=locations, message=message) except Exception as e: import traceback error_trace = traceback.format_exc() print(f"Error in manage_boxes_handler: {e}") print(error_trace) return f"

Error loading boxes management

{error_trace}
", 500 def delete_location_by_id(location_id): """Delete a warehouse location by ID""" try: conn = get_db_connection() cursor = conn.cursor() # Check if location exists cursor.execute("SELECT location_code FROM warehouse_locations WHERE id = %s", (location_id,)) location = cursor.fetchone() if not location: conn.close() return {"success": False, "error": "Location not found"} # Delete location cursor.execute("DELETE FROM warehouse_locations WHERE id = %s", (location_id,)) conn.commit() conn.close() return {"success": True, "message": f"Location '{location[0]}' deleted successfully"} except Exception as e: print(f"Error deleting location: {e}") return {"success": False, "error": str(e)} def assign_cp_to_box_handler(): """Handle assigning CP code to a box""" from flask import request, jsonify, session import json try: # Ensure box_contents table exists ensure_box_contents_table() data = json.loads(request.data) box_number = data.get('box_number') cp_code = data.get('cp_code') scanned_by = session.get('user', 'Unknown') if not box_number or not cp_code: return jsonify({'success': False, 'error': 'Missing box_number or cp_code'}), 400 conn = get_db_connection() cursor = conn.cursor() # Find the box by number cursor.execute("SELECT id FROM boxes_crates WHERE box_number = %s", (box_number,)) box = cursor.fetchone() if not box: conn.close() return jsonify({'success': False, 'error': f'Box {box_number} not found'}), 404 box_id = box[0] # Insert into box_contents cursor.execute(""" INSERT INTO box_contents (box_id, cp_code, scanned_by) VALUES (%s, %s, %s) """, (box_id, cp_code, scanned_by)) conn.commit() conn.close() return jsonify({ 'success': True, 'message': f'CP {cp_code} assigned to box {box_number}' }), 200 except Exception as e: import traceback print(f"Error in assign_cp_to_box_handler: {e}") print(traceback.format_exc()) return jsonify({'success': False, 'error': str(e)}), 500 def view_warehouse_inventory_handler(): """Handle warehouse inventory view - shows CP codes, boxes, and locations""" from flask import render_template, request try: # Ensure tables exist ensure_box_contents_table() ensure_location_contents_table() # Get search parameters search_cp = request.args.get('search_cp', '').strip() search_box = request.args.get('search_box', '').strip() search_location = request.args.get('search_location', '').strip() conn = get_db_connection() cursor = conn.cursor() # Build query with joins and filters query = """ SELECT bc.cp_code, b.box_number, wl.location_code, bc.scanned_at, bc.scanned_by, lc.placed_at, lc.placed_by, b.status as box_status, lc.status as location_status FROM box_contents bc INNER JOIN boxes_crates b ON bc.box_id = b.id LEFT JOIN location_contents lc ON b.id = lc.box_id AND lc.status = 'active' LEFT JOIN warehouse_locations wl ON lc.location_id = wl.id WHERE 1=1 """ params = [] if search_cp: query += " AND bc.cp_code LIKE %s" params.append(f"%{search_cp}%") if search_box: query += " AND b.box_number LIKE %s" params.append(f"%{search_box}%") if search_location: query += " AND wl.location_code LIKE %s" params.append(f"%{search_location}%") query += " ORDER BY bc.scanned_at DESC" cursor.execute(query, params) inventory_data = cursor.fetchall() conn.close() return render_template( 'warehouse_inventory.html', inventory_data=inventory_data, search_cp=search_cp, search_box=search_box, search_location=search_location ) except Exception as e: import traceback error_trace = traceback.format_exc() print(f"Error in view_warehouse_inventory_handler: {e}") print(error_trace) return f"

Error loading warehouse inventory

{error_trace}
", 500