- Renamed Store Articles card to Set Boxes Locations on warehouse main page - Created new mobile-optimized page with two tabs for box location management: - Tab 1: Assign box to location (scan box, change status to closed, assign location) - Tab 2: Move box from location (scan location, list boxes, move to new location) - Added box status management (open/closed) with status change button - Enforced rule: only closed boxes can be assigned to locations - Moved API logic to warehouse.py module: - search_box_by_number() - assign_box_to_location() - search_location_with_boxes() - move_box_to_new_location() - change_box_status() - Added API routes in routes.py as thin wrappers - Aligned page theme colors with application Bootstrap theme - Added dark mode support for the new page - Added Warehouse Main button to page header - Removed 'INNOFA ROMANIA SRL' branding from: - Print module label preview and PDF generation - Print lost labels page - pdf_generator.py PDF creation function
980 lines
34 KiB
Python
980 lines
34 KiB
Python
import mariadb
|
|
from flask import current_app, request, render_template, session, redirect, url_for, jsonify, make_response
|
|
import csv, os, tempfile
|
|
from reportlab.lib.pagesizes import letter
|
|
from reportlab.pdfgen import canvas
|
|
from reportlab.lib.units import cm, mm
|
|
from reportlab.graphics.barcode import code128
|
|
import io
|
|
|
|
def get_db_connection():
|
|
settings_file = current_app.instance_path + '/external_server.conf'
|
|
settings = {}
|
|
with open(settings_file, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and '=' in line and not line.startswith('#'):
|
|
key, value = line.split('=', 1)
|
|
settings[key] = value
|
|
return mariadb.connect(
|
|
user=settings['username'],
|
|
password=settings['password'],
|
|
host=settings['server_domain'],
|
|
port=int(settings['port']),
|
|
database=settings['database_name']
|
|
)
|
|
|
|
def ensure_warehouse_locations_table():
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SHOW TABLES LIKE 'warehouse_locations'")
|
|
result = cursor.fetchone()
|
|
if not result:
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS warehouse_locations (
|
|
id BIGINT AUTO_INCREMENT PRIMARY KEY,
|
|
location_code VARCHAR(12) NOT NULL,
|
|
size INT,
|
|
description VARCHAR(250)
|
|
)
|
|
''')
|
|
conn.commit()
|
|
conn.close()
|
|
except Exception as e:
|
|
print(f"Error ensuring warehouse_locations table: {e}")
|
|
|
|
def ensure_boxes_crates_table():
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SHOW TABLES LIKE 'boxes_crates'")
|
|
result = cursor.fetchone()
|
|
if not result:
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS boxes_crates (
|
|
id BIGINT AUTO_INCREMENT PRIMARY KEY,
|
|
box_number VARCHAR(8) NOT NULL UNIQUE,
|
|
status ENUM('open', 'closed') DEFAULT 'open',
|
|
location_id BIGINT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
created_by VARCHAR(100),
|
|
FOREIGN KEY (location_id) REFERENCES warehouse_locations(id) ON DELETE SET NULL
|
|
)
|
|
''')
|
|
conn.commit()
|
|
conn.close()
|
|
except Exception as e:
|
|
print(f"Error ensuring boxes_crates table: {e}")
|
|
|
|
def ensure_box_contents_table():
|
|
"""Ensure box_contents table exists for tracking CP codes in boxes"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SHOW TABLES LIKE 'box_contents'")
|
|
result = cursor.fetchone()
|
|
if not result:
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS box_contents (
|
|
id BIGINT AUTO_INCREMENT PRIMARY KEY,
|
|
box_id BIGINT NOT NULL,
|
|
cp_code VARCHAR(15) NOT NULL,
|
|
scan_id BIGINT,
|
|
scanned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
scanned_by VARCHAR(100),
|
|
FOREIGN KEY (box_id) REFERENCES boxes_crates(id) ON DELETE CASCADE,
|
|
INDEX idx_box_id (box_id),
|
|
INDEX idx_cp_code (cp_code)
|
|
)
|
|
''')
|
|
conn.commit()
|
|
print("box_contents table created successfully")
|
|
conn.close()
|
|
except Exception as e:
|
|
print(f"Error ensuring box_contents table: {e}")
|
|
|
|
def ensure_location_contents_table():
|
|
"""Ensure location_contents table exists for tracking boxes in locations"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SHOW TABLES LIKE 'location_contents'")
|
|
result = cursor.fetchone()
|
|
if not result:
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS location_contents (
|
|
id BIGINT AUTO_INCREMENT PRIMARY KEY,
|
|
location_id BIGINT NOT NULL,
|
|
box_id BIGINT NOT NULL,
|
|
placed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
placed_by VARCHAR(100),
|
|
removed_at TIMESTAMP NULL,
|
|
removed_by VARCHAR(100),
|
|
status ENUM('active', 'removed') DEFAULT 'active',
|
|
FOREIGN KEY (location_id) REFERENCES warehouse_locations(id) ON DELETE CASCADE,
|
|
FOREIGN KEY (box_id) REFERENCES boxes_crates(id) ON DELETE CASCADE,
|
|
INDEX idx_location_id (location_id),
|
|
INDEX idx_box_id (box_id),
|
|
INDEX idx_status (status)
|
|
)
|
|
''')
|
|
conn.commit()
|
|
print("location_contents table created successfully")
|
|
conn.close()
|
|
except Exception as e:
|
|
print(f"Error ensuring location_contents table: {e}")
|
|
|
|
# Add warehouse-specific functions below
|
|
def add_location(location_code, size, description):
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute(
|
|
"INSERT INTO warehouse_locations (location_code, size, description) VALUES (?, ?, ?)",
|
|
(location_code, size if size else None, description)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return "Location added successfully."
|
|
except mariadb.IntegrityError:
|
|
conn.close()
|
|
return "Failed: Location code already exists."
|
|
|
|
def get_locations():
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT id, location_code, size, description FROM warehouse_locations ORDER BY id DESC")
|
|
locations = cursor.fetchall()
|
|
conn.close()
|
|
return locations
|
|
|
|
def recreate_warehouse_locations_table():
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("DROP TABLE IF EXISTS warehouse_locations")
|
|
cursor.execute("""
|
|
CREATE TABLE warehouse_locations (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
location_code VARCHAR(12) UNIQUE NOT NULL,
|
|
size INT,
|
|
description VARCHAR(250)
|
|
)
|
|
""")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def delete_locations_by_ids(ids_str):
|
|
ids = [id.strip() for id in ids_str.split(',') if id.strip().isdigit()]
|
|
if not ids:
|
|
return "No valid IDs provided."
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
deleted = 0
|
|
for id in ids:
|
|
cursor.execute("DELETE FROM warehouse_locations WHERE id = %s", (id,))
|
|
if cursor.rowcount:
|
|
deleted += 1
|
|
conn.commit()
|
|
conn.close()
|
|
return f"Deleted {deleted} location(s)."
|
|
|
|
def create_locations_handler():
|
|
try:
|
|
# Ensure table exists
|
|
ensure_warehouse_locations_table()
|
|
|
|
if request.method == "POST":
|
|
if request.form.get("delete_locations"):
|
|
ids_str = request.form.get("delete_ids", "")
|
|
message = delete_locations_by_ids(ids_str)
|
|
session['flash_message'] = message
|
|
else:
|
|
location_code = request.form.get("location_code")
|
|
size = request.form.get("size")
|
|
description = request.form.get("description")
|
|
message = add_location(location_code, size, description)
|
|
session['flash_message'] = message
|
|
# Redirect to prevent form resubmission on page reload
|
|
return redirect(url_for('warehouse.create_locations'))
|
|
|
|
# Get flash message from session if any
|
|
message = session.pop('flash_message', None)
|
|
locations = get_locations()
|
|
return render_template("create_locations.html", locations=locations, message=message)
|
|
except Exception as e:
|
|
import traceback
|
|
error_trace = traceback.format_exc()
|
|
print(f"Error in create_locations_handler: {e}")
|
|
print(error_trace)
|
|
return f"<h1>Error loading warehouse locations</h1><pre>{error_trace}</pre>", 500
|
|
|
|
def import_locations_csv_handler():
|
|
report = None
|
|
locations = []
|
|
errors = []
|
|
temp_dir = tempfile.gettempdir()
|
|
if request.method == 'POST':
|
|
file = request.files.get('csv_file')
|
|
if file and file.filename.endswith('.csv'):
|
|
temp_path = os.path.join(temp_dir, file.filename)
|
|
file.save(temp_path)
|
|
session['csv_filename'] = file.filename
|
|
session['csv_filepath'] = temp_path
|
|
with open(temp_path, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
locations = []
|
|
for row in reader:
|
|
location_code = row.get('Location Code') or row.get('location_code') or ''
|
|
size = row.get('Size') or row.get('size') or ''
|
|
description = row.get('Description') or row.get('description') or ''
|
|
locations.append((location_code, size, description))
|
|
session['csv_locations'] = locations
|
|
elif 'csv_locations' in session:
|
|
locations = session['csv_locations']
|
|
if request.form.get('create_locations') and locations:
|
|
added = 0
|
|
failed = 0
|
|
errors = []
|
|
for loc in locations:
|
|
location_code, size, description = loc
|
|
result = add_location(location_code, size, description)
|
|
if result and 'success' in result.lower():
|
|
added += 1
|
|
else:
|
|
failed += 1
|
|
errors.append(location_code or '')
|
|
report = f"{added} locations were added to warehouse_locations table. {failed} locations failed: {', '.join(errors)}"
|
|
session.pop('csv_locations', None)
|
|
session.pop('csv_filename', None)
|
|
session.pop('csv_filepath', None)
|
|
return redirect(url_for('warehouse.import_locations_csv') + '#created')
|
|
elif 'csv_locations' in session:
|
|
locations = session['csv_locations']
|
|
return render_template('import_locations_csv.html', report=report, locations=locations)
|
|
|
|
def generate_location_label_pdf():
|
|
"""Generate PDF for location barcode label (8cm x 5cm landscape)"""
|
|
try:
|
|
data = request.get_json()
|
|
location_code = data.get('location_code', '')
|
|
|
|
if not location_code:
|
|
return jsonify({'error': 'Location code is required'}), 400
|
|
|
|
print(f"DEBUG: Generating location label PDF for: {location_code}")
|
|
|
|
# Create PDF in memory
|
|
buffer = io.BytesIO()
|
|
|
|
# Create PDF with 8cm x 5cm page size in landscape orientation
|
|
page_width = 80 * mm # 8 cm
|
|
page_height = 50 * mm # 5 cm
|
|
|
|
c = canvas.Canvas(buffer, pagesize=(page_width, page_height))
|
|
|
|
# Optimize for label printer
|
|
c.setPageCompression(1)
|
|
c.setCreator("Trasabilitate Location Label System")
|
|
c.setTitle("Location Label - Optimized for Label Printers")
|
|
|
|
# Define margins and usable area
|
|
margin = 2 * mm
|
|
usable_width = page_width - (2 * margin)
|
|
usable_height = page_height - (2 * margin)
|
|
|
|
# Calculate vertical layout
|
|
# Top section: "Location nr: XXXX" text
|
|
# Bottom section: Barcode
|
|
text_height = 12 * mm # Space for text at top
|
|
barcode_height = usable_height - text_height - (1 * mm) # Rest for barcode with minimal spacing
|
|
|
|
# === TOP SECTION: Location Nr Label ===
|
|
# Position from top of usable area
|
|
text_y = page_height - margin - text_height
|
|
|
|
# Draw "Location nr:" label
|
|
c.setFont("Helvetica-Bold", 14)
|
|
label_text = "Location nr:"
|
|
label_width = c.stringWidth(label_text, "Helvetica-Bold", 14)
|
|
|
|
# Draw location code
|
|
c.setFont("Helvetica-Bold", 18)
|
|
code_text = location_code
|
|
code_width = c.stringWidth(code_text, "Helvetica-Bold", 18)
|
|
|
|
# Calculate total width and center everything
|
|
total_text_width = label_width + 3*mm + code_width # 3mm spacing between label and code
|
|
start_x = margin + (usable_width - total_text_width) / 2
|
|
|
|
# Draw label text
|
|
c.setFont("Helvetica-Bold", 14)
|
|
c.drawString(start_x, text_y + 5*mm, label_text)
|
|
|
|
# Draw location code
|
|
c.setFont("Helvetica-Bold", 18)
|
|
c.drawString(start_x + label_width + 3*mm, text_y + 5*mm, code_text)
|
|
|
|
# === BOTTOM SECTION: Barcode ===
|
|
barcode_y = margin
|
|
|
|
try:
|
|
# Create barcode for location code
|
|
barcode = code128.Code128(
|
|
location_code,
|
|
barWidth=0.4*mm, # Thicker bars for better scanning
|
|
barHeight=barcode_height,
|
|
humanReadable=True,
|
|
fontSize=10
|
|
)
|
|
|
|
# Calculate scaling to fit width
|
|
scale_factor = usable_width / barcode.width
|
|
|
|
# Center the barcode horizontally
|
|
barcode_x = margin + (usable_width - (barcode.width * scale_factor)) / 2
|
|
|
|
# Draw the barcode
|
|
c.saveState()
|
|
c.translate(barcode_x, barcode_y)
|
|
c.scale(scale_factor, 1)
|
|
barcode.drawOn(c, 0, 0)
|
|
c.restoreState()
|
|
|
|
print(f"DEBUG: Barcode generated successfully with scale factor: {scale_factor}")
|
|
|
|
except Exception as e:
|
|
print(f"DEBUG: Error generating barcode: {e}")
|
|
# Fallback: draw text if barcode fails
|
|
c.setFont("Helvetica-Bold", 12)
|
|
text_width = c.stringWidth(location_code, "Helvetica-Bold", 12)
|
|
c.drawString((page_width - text_width) / 2, barcode_y + barcode_height/2, location_code)
|
|
|
|
# Finalize PDF
|
|
c.save()
|
|
|
|
# Prepare response
|
|
buffer.seek(0)
|
|
response = make_response(buffer.getvalue())
|
|
response.headers['Content-Type'] = 'application/pdf'
|
|
response.headers['Content-Disposition'] = f'inline; filename=location_{location_code}_label.pdf'
|
|
|
|
print(f"DEBUG: Location label PDF generated successfully")
|
|
return response
|
|
|
|
except Exception as e:
|
|
print(f"Error generating location label PDF: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
def update_location(location_id, location_code, size, description):
|
|
"""Update an existing warehouse location"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if location exists
|
|
cursor.execute("SELECT id FROM warehouse_locations WHERE id = %s", (location_id,))
|
|
if not cursor.fetchone():
|
|
conn.close()
|
|
return {"success": False, "error": "Location not found"}
|
|
|
|
# Check if location code already exists for different location
|
|
cursor.execute("SELECT id FROM warehouse_locations WHERE location_code = %s AND id != %s", (location_code, location_id))
|
|
if cursor.fetchone():
|
|
conn.close()
|
|
return {"success": False, "error": "Location code already exists"}
|
|
|
|
# Update location
|
|
cursor.execute(
|
|
"UPDATE warehouse_locations SET location_code = %s, size = %s, description = %s WHERE id = %s",
|
|
(location_code, size if size else None, description, location_id)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {"success": True, "message": "Location updated successfully"}
|
|
|
|
except Exception as e:
|
|
print(f"Error updating location: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
# ============================================================================
|
|
# Boxes/Crates Functions
|
|
# ============================================================================
|
|
|
|
def generate_box_number():
|
|
"""Generate next box number with 8 digits (00000001, 00000002, etc.)"""
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT MAX(CAST(box_number AS UNSIGNED)) FROM boxes_crates")
|
|
result = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if result and result[0]:
|
|
next_number = int(result[0]) + 1
|
|
else:
|
|
next_number = 1
|
|
|
|
return str(next_number).zfill(8)
|
|
|
|
def add_box(location_id=None, created_by=None):
|
|
"""Add a new box/crate"""
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
box_number = generate_box_number()
|
|
|
|
try:
|
|
cursor.execute(
|
|
"INSERT INTO boxes_crates (box_number, status, location_id, created_by) VALUES (%s, %s, %s, %s)",
|
|
(box_number, 'open', location_id if location_id else None, created_by)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return f"Box {box_number} created successfully"
|
|
except Exception as e:
|
|
conn.close()
|
|
return f"Error creating box: {e}"
|
|
|
|
def get_boxes():
|
|
"""Get all boxes with location information"""
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT
|
|
b.id,
|
|
b.box_number,
|
|
b.status,
|
|
l.location_code,
|
|
b.created_at,
|
|
b.updated_at,
|
|
b.created_by,
|
|
b.location_id
|
|
FROM boxes_crates b
|
|
LEFT JOIN warehouse_locations l ON b.location_id = l.id
|
|
ORDER BY b.id DESC
|
|
""")
|
|
boxes = cursor.fetchall()
|
|
conn.close()
|
|
return boxes
|
|
|
|
def update_box_status(box_id, status):
|
|
"""Update box status (open/closed)"""
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute(
|
|
"UPDATE boxes_crates SET status = %s WHERE id = %s",
|
|
(status, box_id)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return {"success": True, "message": f"Box status updated to {status}"}
|
|
except Exception as e:
|
|
conn.close()
|
|
return {"success": False, "error": str(e)}
|
|
|
|
def update_box_location(box_id, location_id):
|
|
"""Update box location"""
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute(
|
|
"UPDATE boxes_crates SET location_id = %s WHERE id = %s",
|
|
(location_id if location_id else None, box_id)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return {"success": True, "message": "Box location updated"}
|
|
except Exception as e:
|
|
conn.close()
|
|
return {"success": False, "error": str(e)}
|
|
|
|
def delete_boxes_by_ids(ids_str):
|
|
"""Delete boxes by comma-separated IDs"""
|
|
ids = [id.strip() for id in ids_str.split(',') if id.strip().isdigit()]
|
|
if not ids:
|
|
return "No valid IDs provided."
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
deleted = 0
|
|
for id in ids:
|
|
cursor.execute("DELETE FROM boxes_crates WHERE id = %s", (id,))
|
|
if cursor.rowcount:
|
|
deleted += 1
|
|
conn.commit()
|
|
conn.close()
|
|
return f"Deleted {deleted} box(es)."
|
|
|
|
def manage_boxes_handler():
|
|
"""Handler for boxes/crates management page"""
|
|
try:
|
|
# Ensure table exists
|
|
ensure_boxes_crates_table()
|
|
ensure_warehouse_locations_table()
|
|
|
|
if request.method == "POST":
|
|
action = request.form.get("action")
|
|
|
|
if action == "delete_boxes":
|
|
ids_str = request.form.get("delete_ids", "")
|
|
message = delete_boxes_by_ids(ids_str)
|
|
session['flash_message'] = message
|
|
elif action == "add_box":
|
|
created_by = session.get('user', 'Unknown')
|
|
message = add_box(None, created_by) # Create box without location
|
|
|
|
# Check if this is an AJAX request
|
|
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
|
|
# Extract box number from message (format: "Box 12345678 created successfully")
|
|
import re
|
|
match = re.search(r'Box (\d{8})', message)
|
|
if match:
|
|
return jsonify({'success': True, 'box_number': match.group(1), 'message': message})
|
|
else:
|
|
return jsonify({'success': False, 'error': message})
|
|
|
|
session['flash_message'] = message
|
|
elif action == "update_status":
|
|
box_id = request.form.get("box_id")
|
|
new_status = request.form.get("new_status")
|
|
message = update_box_status(box_id, new_status)
|
|
session['flash_message'] = message
|
|
elif action == "update_location":
|
|
box_id = request.form.get("box_id")
|
|
new_location_id = request.form.get("new_location_id")
|
|
message = update_box_location(box_id, new_location_id)
|
|
session['flash_message'] = message
|
|
|
|
return redirect(url_for('warehouse.manage_boxes'))
|
|
|
|
# Get flash message from session if any
|
|
message = session.pop('flash_message', None)
|
|
boxes = get_boxes()
|
|
locations = get_locations()
|
|
return render_template("manage_boxes.html", boxes=boxes, locations=locations, message=message)
|
|
except Exception as e:
|
|
import traceback
|
|
error_trace = traceback.format_exc()
|
|
print(f"Error in manage_boxes_handler: {e}")
|
|
print(error_trace)
|
|
return f"<h1>Error loading boxes management</h1><pre>{error_trace}</pre>", 500
|
|
|
|
def delete_location_by_id(location_id):
|
|
"""Delete a warehouse location by ID"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if location exists
|
|
cursor.execute("SELECT location_code FROM warehouse_locations WHERE id = %s", (location_id,))
|
|
location = cursor.fetchone()
|
|
if not location:
|
|
conn.close()
|
|
return {"success": False, "error": "Location not found"}
|
|
|
|
# Delete location
|
|
cursor.execute("DELETE FROM warehouse_locations WHERE id = %s", (location_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {"success": True, "message": f"Location '{location[0]}' deleted successfully"}
|
|
|
|
except Exception as e:
|
|
print(f"Error deleting location: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
def assign_cp_to_box_handler():
|
|
"""Handle assigning CP code to a box"""
|
|
from flask import request, jsonify, session
|
|
import json
|
|
|
|
try:
|
|
# Ensure box_contents table exists
|
|
ensure_box_contents_table()
|
|
|
|
data = json.loads(request.data)
|
|
box_number = data.get('box_number')
|
|
cp_code = data.get('cp_code')
|
|
scanned_by = session.get('user', 'Unknown')
|
|
|
|
if not box_number or not cp_code:
|
|
return jsonify({'success': False, 'error': 'Missing box_number or cp_code'}), 400
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Find the box by number
|
|
cursor.execute("SELECT id FROM boxes_crates WHERE box_number = %s", (box_number,))
|
|
box = cursor.fetchone()
|
|
|
|
if not box:
|
|
conn.close()
|
|
return jsonify({'success': False, 'error': f'Box {box_number} not found'}), 404
|
|
|
|
box_id = box[0]
|
|
|
|
# Insert into box_contents
|
|
cursor.execute("""
|
|
INSERT INTO box_contents (box_id, cp_code, scanned_by)
|
|
VALUES (%s, %s, %s)
|
|
""", (box_id, cp_code, scanned_by))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'CP {cp_code} assigned to box {box_number}'
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
print(f"Error in assign_cp_to_box_handler: {e}")
|
|
print(traceback.format_exc())
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
def view_warehouse_inventory_handler():
|
|
"""Handle warehouse inventory view - shows CP codes, boxes, and locations"""
|
|
from flask import render_template, request
|
|
|
|
try:
|
|
# Ensure tables exist
|
|
ensure_box_contents_table()
|
|
ensure_location_contents_table()
|
|
|
|
# Get search parameters
|
|
search_cp = request.args.get('search_cp', '').strip()
|
|
search_box = request.args.get('search_box', '').strip()
|
|
search_location = request.args.get('search_location', '').strip()
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Build query with joins and filters
|
|
query = """
|
|
SELECT
|
|
bc.cp_code,
|
|
b.box_number,
|
|
wl.location_code,
|
|
bc.scanned_at,
|
|
bc.scanned_by,
|
|
lc.placed_at,
|
|
lc.placed_by,
|
|
b.status as box_status,
|
|
lc.status as location_status
|
|
FROM box_contents bc
|
|
INNER JOIN boxes_crates b ON bc.box_id = b.id
|
|
LEFT JOIN location_contents lc ON b.id = lc.box_id AND lc.status = 'active'
|
|
LEFT JOIN warehouse_locations wl ON lc.location_id = wl.id
|
|
WHERE 1=1
|
|
"""
|
|
|
|
params = []
|
|
|
|
if search_cp:
|
|
query += " AND bc.cp_code LIKE %s"
|
|
params.append(f"%{search_cp}%")
|
|
|
|
if search_box:
|
|
query += " AND b.box_number LIKE %s"
|
|
params.append(f"%{search_box}%")
|
|
|
|
if search_location:
|
|
query += " AND wl.location_code LIKE %s"
|
|
params.append(f"%{search_location}%")
|
|
|
|
query += " ORDER BY bc.scanned_at DESC"
|
|
|
|
cursor.execute(query, params)
|
|
inventory_data = cursor.fetchall()
|
|
|
|
conn.close()
|
|
|
|
return render_template(
|
|
'warehouse_inventory.html',
|
|
inventory_data=inventory_data,
|
|
search_cp=search_cp,
|
|
search_box=search_box,
|
|
search_location=search_location
|
|
)
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
error_trace = traceback.format_exc()
|
|
print(f"Error in view_warehouse_inventory_handler: {e}")
|
|
print(error_trace)
|
|
return f"<h1>Error loading warehouse inventory</h1><pre>{error_trace}</pre>", 500
|
|
|
|
|
|
# Box Location Management Functions
|
|
|
|
def search_box_by_number(box_number):
|
|
"""
|
|
Search for a box by box number and return its details including location
|
|
|
|
Args:
|
|
box_number (str): The box number to search for
|
|
|
|
Returns:
|
|
tuple: (success: bool, data: dict, status_code: int)
|
|
"""
|
|
try:
|
|
if not box_number:
|
|
return False, {'message': 'Box number is required'}, 400
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Search for the box and get its location info
|
|
cursor.execute("""
|
|
SELECT
|
|
b.id,
|
|
b.box_number,
|
|
b.status,
|
|
b.location_id,
|
|
w.location_code
|
|
FROM boxes_crates b
|
|
LEFT JOIN warehouse_locations w ON b.location_id = w.id
|
|
WHERE b.box_number = %s
|
|
""", (box_number,))
|
|
|
|
result = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if result:
|
|
return True, {
|
|
'box': {
|
|
'id': result[0],
|
|
'box_number': result[1],
|
|
'status': result[2],
|
|
'location_id': result[3],
|
|
'location_code': result[4]
|
|
}
|
|
}, 200
|
|
else:
|
|
return False, {'message': f'Box "{box_number}" not found in the system'}, 404
|
|
|
|
except Exception as e:
|
|
return False, {'message': f'Error searching for box: {str(e)}'}, 500
|
|
|
|
|
|
def assign_box_to_location(box_id, location_code):
|
|
"""
|
|
Assign a box to a warehouse location
|
|
|
|
Args:
|
|
box_id (int): The ID of the box to assign
|
|
location_code (str): The location code to assign the box to
|
|
|
|
Returns:
|
|
tuple: (success: bool, data: dict, status_code: int)
|
|
"""
|
|
try:
|
|
if not box_id or not location_code:
|
|
return False, {'message': 'Box ID and location code are required'}, 400
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if location exists
|
|
cursor.execute("SELECT id FROM warehouse_locations WHERE location_code = %s", (location_code,))
|
|
location_result = cursor.fetchone()
|
|
|
|
if not location_result:
|
|
conn.close()
|
|
return False, {'message': f'Location "{location_code}" not found in the system'}, 404
|
|
|
|
location_id = location_result[0]
|
|
|
|
# Update box location
|
|
cursor.execute("""
|
|
UPDATE boxes_crates
|
|
SET location_id = %s, updated_at = NOW()
|
|
WHERE id = %s
|
|
""", (location_id, box_id))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return True, {'message': f'Box successfully assigned to location "{location_code}"'}, 200
|
|
|
|
except Exception as e:
|
|
return False, {'message': f'Error assigning box to location: {str(e)}'}, 500
|
|
|
|
|
|
def search_location_with_boxes(location_code):
|
|
"""
|
|
Search for a location and get all boxes assigned to it
|
|
|
|
Args:
|
|
location_code (str): The location code to search for
|
|
|
|
Returns:
|
|
tuple: (success: bool, data: dict, status_code: int)
|
|
"""
|
|
try:
|
|
if not location_code:
|
|
return False, {'message': 'Location code is required'}, 400
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Search for the location
|
|
cursor.execute("""
|
|
SELECT id, location_code, size, description
|
|
FROM warehouse_locations
|
|
WHERE location_code = %s
|
|
""", (location_code,))
|
|
|
|
location_result = cursor.fetchone()
|
|
|
|
if not location_result:
|
|
conn.close()
|
|
return False, {'message': f'Location "{location_code}" not found in the system'}, 404
|
|
|
|
location_id = location_result[0]
|
|
|
|
# Get all boxes assigned to this location
|
|
cursor.execute("""
|
|
SELECT
|
|
b.id,
|
|
b.box_number,
|
|
b.status,
|
|
b.created_at
|
|
FROM boxes_crates b
|
|
WHERE b.location_id = %s
|
|
ORDER BY b.box_number
|
|
""", (location_id,))
|
|
|
|
boxes_results = cursor.fetchall()
|
|
conn.close()
|
|
|
|
boxes = []
|
|
for box in boxes_results:
|
|
boxes.append({
|
|
'id': box[0],
|
|
'box_number': box[1],
|
|
'status': box[2],
|
|
'created_at': box[3].strftime('%Y-%m-%d %H:%M:%S') if box[3] else None
|
|
})
|
|
|
|
return True, {
|
|
'location': {
|
|
'id': location_result[0],
|
|
'location_code': location_result[1],
|
|
'size': location_result[2],
|
|
'description': location_result[3]
|
|
},
|
|
'boxes': boxes
|
|
}, 200
|
|
|
|
except Exception as e:
|
|
return False, {'message': f'Error searching for location: {str(e)}'}, 500
|
|
|
|
|
|
def move_box_to_new_location(box_id, new_location_code):
|
|
"""
|
|
Move a box from its current location to a new location
|
|
|
|
Args:
|
|
box_id (int): The ID of the box to move
|
|
new_location_code (str): The new location code
|
|
|
|
Returns:
|
|
tuple: (success: bool, data: dict, status_code: int)
|
|
"""
|
|
try:
|
|
if not box_id or not new_location_code:
|
|
return False, {'message': 'Box ID and new location code are required'}, 400
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if new location exists
|
|
cursor.execute("SELECT id FROM warehouse_locations WHERE location_code = %s", (new_location_code,))
|
|
location_result = cursor.fetchone()
|
|
|
|
if not location_result:
|
|
conn.close()
|
|
return False, {'message': f'Location "{new_location_code}" not found in the system'}, 404
|
|
|
|
new_location_id = location_result[0]
|
|
|
|
# Get box number for response message
|
|
cursor.execute("SELECT box_number FROM boxes_crates WHERE id = %s", (box_id,))
|
|
box_result = cursor.fetchone()
|
|
|
|
if not box_result:
|
|
conn.close()
|
|
return False, {'message': 'Box not found'}, 404
|
|
|
|
box_number = box_result[0]
|
|
|
|
# Update box location
|
|
cursor.execute("""
|
|
UPDATE boxes_crates
|
|
SET location_id = %s, updated_at = NOW()
|
|
WHERE id = %s
|
|
""", (new_location_id, box_id))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return True, {'message': f'Box "{box_number}" successfully moved to location "{new_location_code}"'}, 200
|
|
|
|
except Exception as e:
|
|
return False, {'message': f'Error moving box to new location: {str(e)}'}, 500
|
|
|
|
|
|
def change_box_status(box_id, new_status):
|
|
"""
|
|
Change the status of a box (open/closed)
|
|
|
|
Args:
|
|
box_id (int): The ID of the box
|
|
new_status (str): The new status ('open' or 'closed')
|
|
|
|
Returns:
|
|
tuple: (success: bool, data: dict, status_code: int)
|
|
"""
|
|
try:
|
|
if not box_id:
|
|
return False, {'message': 'Box ID is required'}, 400
|
|
|
|
if new_status not in ['open', 'closed']:
|
|
return False, {'message': 'Invalid status. Must be "open" or "closed"'}, 400
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Get box number for response message
|
|
cursor.execute("SELECT box_number FROM boxes_crates WHERE id = %s", (box_id,))
|
|
box_result = cursor.fetchone()
|
|
|
|
if not box_result:
|
|
conn.close()
|
|
return False, {'message': 'Box not found'}, 404
|
|
|
|
box_number = box_result[0]
|
|
|
|
# Update box status
|
|
cursor.execute("""
|
|
UPDATE boxes_crates
|
|
SET status = %s, updated_at = NOW()
|
|
WHERE id = %s
|
|
""", (new_status, box_id))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return True, {'message': f'Box "{box_number}" status changed to "{new_status}"'}, 200
|
|
|
|
except Exception as e:
|
|
return False, {'message': f'Error changing box status: {str(e)}'}, 500
|
|
|
|
|