import mariadb from flask import current_app, request, render_template, session, redirect, url_for, jsonify, make_response import csv, os, tempfile from reportlab.lib.pagesizes import letter from reportlab.pdfgen import canvas from reportlab.lib.units import cm from reportlab.graphics.barcode import code128 import io def get_db_connection(): settings_file = current_app.instance_path + '/external_server.conf' settings = {} with open(settings_file, 'r') as f: for line in f: line = line.strip() if line and '=' in line and not line.startswith('#'): key, value = line.split('=', 1) settings[key] = value return mariadb.connect( user=settings['username'], password=settings['password'], host=settings['server_domain'], port=int(settings['port']), database=settings['database_name'] ) def ensure_warehouse_locations_table(): try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SHOW TABLES LIKE 'warehouse_locations'") result = cursor.fetchone() if not result: cursor.execute(''' CREATE TABLE IF NOT EXISTS warehouse_locations ( id BIGINT AUTO_INCREMENT PRIMARY KEY, location_code VARCHAR(12) NOT NULL, size INT, description VARCHAR(250) ) ''') conn.commit() conn.close() except Exception as e: print(f"Error ensuring warehouse_locations table: {e}") def ensure_boxes_crates_table(): try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SHOW TABLES LIKE 'boxes_crates'") result = cursor.fetchone() if not result: cursor.execute(''' CREATE TABLE IF NOT EXISTS boxes_crates ( id BIGINT AUTO_INCREMENT PRIMARY KEY, box_number VARCHAR(8) NOT NULL UNIQUE, status ENUM('open', 'closed') DEFAULT 'open', location_id BIGINT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, created_by VARCHAR(100), FOREIGN KEY (location_id) REFERENCES warehouse_locations(id) ON DELETE SET NULL ) ''') conn.commit() conn.close() except Exception as e: print(f"Error ensuring boxes_crates table: {e}") def ensure_box_contents_table(): """Ensure box_contents table exists for tracking CP codes in boxes""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SHOW TABLES LIKE 'box_contents'") result = cursor.fetchone() if not result: cursor.execute(''' CREATE TABLE IF NOT EXISTS box_contents ( id BIGINT AUTO_INCREMENT PRIMARY KEY, box_id BIGINT NOT NULL, cp_code VARCHAR(15) NOT NULL, scan_id BIGINT, scanned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, scanned_by VARCHAR(100), FOREIGN KEY (box_id) REFERENCES boxes_crates(id) ON DELETE CASCADE, INDEX idx_box_id (box_id), INDEX idx_cp_code (cp_code) ) ''') conn.commit() print("box_contents table created successfully") conn.close() except Exception as e: print(f"Error ensuring box_contents table: {e}") def ensure_location_contents_table(): """Ensure location_contents table exists for tracking boxes in locations""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SHOW TABLES LIKE 'location_contents'") result = cursor.fetchone() if not result: cursor.execute(''' CREATE TABLE IF NOT EXISTS location_contents ( id BIGINT AUTO_INCREMENT PRIMARY KEY, location_id BIGINT NOT NULL, box_id BIGINT NOT NULL, placed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, placed_by VARCHAR(100), removed_at TIMESTAMP NULL, removed_by VARCHAR(100), status ENUM('active', 'removed') DEFAULT 'active', FOREIGN KEY (location_id) REFERENCES warehouse_locations(id) ON DELETE CASCADE, FOREIGN KEY (box_id) REFERENCES boxes_crates(id) ON DELETE CASCADE, INDEX idx_location_id (location_id), INDEX idx_box_id (box_id), INDEX idx_status (status) ) ''') conn.commit() print("location_contents table created successfully") conn.close() except Exception as e: print(f"Error ensuring location_contents table: {e}") # Add warehouse-specific functions below def add_location(location_code, size, description): conn = get_db_connection() cursor = conn.cursor() try: cursor.execute( "INSERT INTO warehouse_locations (location_code, size, description) VALUES (?, ?, ?)", (location_code, size if size else None, description) ) conn.commit() conn.close() return "Location added successfully." except mariadb.IntegrityError: conn.close() return "Failed: Location code already exists." def get_locations(): conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT id, location_code, size, description FROM warehouse_locations ORDER BY id DESC") locations = cursor.fetchall() conn.close() return locations def recreate_warehouse_locations_table(): conn = get_db_connection() cursor = conn.cursor() cursor.execute("DROP TABLE IF EXISTS warehouse_locations") cursor.execute(""" CREATE TABLE warehouse_locations ( id INT AUTO_INCREMENT PRIMARY KEY, location_code VARCHAR(12) UNIQUE NOT NULL, size INT, description VARCHAR(250) ) """) conn.commit() conn.close() def delete_locations_by_ids(ids_str): ids = [id.strip() for id in ids_str.split(',') if id.strip().isdigit()] if not ids: return "No valid IDs provided." conn = get_db_connection() cursor = conn.cursor() deleted = 0 for id in ids: cursor.execute("DELETE FROM warehouse_locations WHERE id = %s", (id,)) if cursor.rowcount: deleted += 1 conn.commit() conn.close() return f"Deleted {deleted} location(s)." def create_locations_handler(): try: # Ensure table exists ensure_warehouse_locations_table() if request.method == "POST": if request.form.get("delete_locations"): ids_str = request.form.get("delete_ids", "") message = delete_locations_by_ids(ids_str) session['flash_message'] = message else: location_code = request.form.get("location_code") size = request.form.get("size") description = request.form.get("description") message = add_location(location_code, size, description) session['flash_message'] = message # Redirect to prevent form resubmission on page reload return redirect(url_for('warehouse.create_locations')) # Get flash message from session if any message = session.pop('flash_message', None) locations = get_locations() return render_template("create_locations.html", locations=locations, message=message) except Exception as e: import traceback error_trace = traceback.format_exc() print(f"Error in create_locations_handler: {e}") print(error_trace) return f"
{error_trace}", 500
def import_locations_csv_handler():
report = None
locations = []
errors = []
temp_dir = tempfile.gettempdir()
if request.method == 'POST':
file = request.files.get('csv_file')
if file and file.filename.endswith('.csv'):
temp_path = os.path.join(temp_dir, file.filename)
file.save(temp_path)
session['csv_filename'] = file.filename
session['csv_filepath'] = temp_path
with open(temp_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
locations = []
for row in reader:
location_code = row.get('Location Code') or row.get('location_code') or ''
size = row.get('Size') or row.get('size') or ''
description = row.get('Description') or row.get('description') or ''
locations.append((location_code, size, description))
session['csv_locations'] = locations
elif 'csv_locations' in session:
locations = session['csv_locations']
if request.form.get('create_locations') and locations:
added = 0
failed = 0
errors = []
for loc in locations:
location_code, size, description = loc
result = add_location(location_code, size, description)
if result and 'success' in result.lower():
added += 1
else:
failed += 1
errors.append(location_code or '')
report = f"{added} locations were added to warehouse_locations table. {failed} locations failed: {', '.join(errors)}"
session.pop('csv_locations', None)
session.pop('csv_filename', None)
session.pop('csv_filepath', None)
return redirect(url_for('warehouse.import_locations_csv') + '#created')
elif 'csv_locations' in session:
locations = session['csv_locations']
return render_template('import_locations_csv.html', report=report, locations=locations)
def generate_location_label_pdf():
"""Generate PDF for location barcode label (8x4cm)"""
try:
data = request.get_json()
location_code = data.get('location_code', '')
if not location_code:
return jsonify({'error': 'Location code is required'}), 400
# Create PDF in memory
buffer = io.BytesIO()
# Create PDF with 8x4cm page size (width x height)
page_width = 8 * cm
page_height = 4 * cm
c = canvas.Canvas(buffer, pagesize=(page_width, page_height))
# Generate Code128 barcode
barcode = code128.Code128(location_code, barWidth=1.0, humanReadable=False)
# Calculate the desired barcode dimensions (fill most of the label)
desired_barcode_width = 7 * cm # Almost full width
desired_barcode_height = 2.5 * cm # Most of the height
# Calculate scaling factor to fit the desired width
scale = desired_barcode_width / barcode.width
# Calculate actual dimensions after scaling
actual_width = barcode.width * scale
actual_height = barcode.height * scale
# Center the barcode on the label
barcode_x = (page_width - actual_width) / 2
barcode_y = (page_height - actual_height) / 2 + 0.3 * cm # Slightly above center for text space
# Draw barcode with scaling
c.saveState()
c.translate(barcode_x, barcode_y)
c.scale(scale, scale)
barcode.drawOn(c, 0, 0)
c.restoreState()
# Add location code text below barcode
c.setFont("Helvetica-Bold", 10)
text_width = c.stringWidth(location_code, "Helvetica-Bold", 10)
text_x = (page_width - text_width) / 2
text_y = barcode_y - 0.5 * cm # Below the barcode
c.drawString(text_x, text_y, location_code)
# Finalize PDF
c.save()
# Prepare response
buffer.seek(0)
response = make_response(buffer.getvalue())
response.headers['Content-Type'] = 'application/pdf'
response.headers['Content-Disposition'] = f'inline; filename=location_{location_code}_label.pdf'
return response
except Exception as e:
print(f"Error generating location label PDF: {e}")
return jsonify({'error': str(e)}), 500
def update_location(location_id, location_code, size, description):
"""Update an existing warehouse location"""
try:
conn = get_db_connection()
cursor = conn.cursor()
# Check if location exists
cursor.execute("SELECT id FROM warehouse_locations WHERE id = %s", (location_id,))
if not cursor.fetchone():
conn.close()
return {"success": False, "error": "Location not found"}
# Check if location code already exists for different location
cursor.execute("SELECT id FROM warehouse_locations WHERE location_code = %s AND id != %s", (location_code, location_id))
if cursor.fetchone():
conn.close()
return {"success": False, "error": "Location code already exists"}
# Update location
cursor.execute(
"UPDATE warehouse_locations SET location_code = %s, size = %s, description = %s WHERE id = %s",
(location_code, size if size else None, description, location_id)
)
conn.commit()
conn.close()
return {"success": True, "message": "Location updated successfully"}
except Exception as e:
print(f"Error updating location: {e}")
return {"success": False, "error": str(e)}
# ============================================================================
# Boxes/Crates Functions
# ============================================================================
def generate_box_number():
"""Generate next box number with 8 digits (00000001, 00000002, etc.)"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT MAX(CAST(box_number AS UNSIGNED)) FROM boxes_crates")
result = cursor.fetchone()
conn.close()
if result and result[0]:
next_number = int(result[0]) + 1
else:
next_number = 1
return str(next_number).zfill(8)
def add_box(location_id=None, created_by=None):
"""Add a new box/crate"""
conn = get_db_connection()
cursor = conn.cursor()
box_number = generate_box_number()
try:
cursor.execute(
"INSERT INTO boxes_crates (box_number, status, location_id, created_by) VALUES (%s, %s, %s, %s)",
(box_number, 'open', location_id if location_id else None, created_by)
)
conn.commit()
conn.close()
return f"Box {box_number} created successfully"
except Exception as e:
conn.close()
return f"Error creating box: {e}"
def get_boxes():
"""Get all boxes with location information"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT
b.id,
b.box_number,
b.status,
l.location_code,
b.created_at,
b.updated_at,
b.created_by,
b.location_id
FROM boxes_crates b
LEFT JOIN warehouse_locations l ON b.location_id = l.id
ORDER BY b.id DESC
""")
boxes = cursor.fetchall()
conn.close()
return boxes
def update_box_status(box_id, status):
"""Update box status (open/closed)"""
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute(
"UPDATE boxes_crates SET status = %s WHERE id = %s",
(status, box_id)
)
conn.commit()
conn.close()
return {"success": True, "message": f"Box status updated to {status}"}
except Exception as e:
conn.close()
return {"success": False, "error": str(e)}
def update_box_location(box_id, location_id):
"""Update box location"""
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute(
"UPDATE boxes_crates SET location_id = %s WHERE id = %s",
(location_id if location_id else None, box_id)
)
conn.commit()
conn.close()
return {"success": True, "message": "Box location updated"}
except Exception as e:
conn.close()
return {"success": False, "error": str(e)}
def delete_boxes_by_ids(ids_str):
"""Delete boxes by comma-separated IDs"""
ids = [id.strip() for id in ids_str.split(',') if id.strip().isdigit()]
if not ids:
return "No valid IDs provided."
conn = get_db_connection()
cursor = conn.cursor()
deleted = 0
for id in ids:
cursor.execute("DELETE FROM boxes_crates WHERE id = %s", (id,))
if cursor.rowcount:
deleted += 1
conn.commit()
conn.close()
return f"Deleted {deleted} box(es)."
def manage_boxes_handler():
"""Handler for boxes/crates management page"""
try:
# Ensure table exists
ensure_boxes_crates_table()
ensure_warehouse_locations_table()
if request.method == "POST":
action = request.form.get("action")
if action == "delete_boxes":
ids_str = request.form.get("delete_ids", "")
message = delete_boxes_by_ids(ids_str)
session['flash_message'] = message
elif action == "add_box":
created_by = session.get('user', 'Unknown')
message = add_box(None, created_by) # Create box without location
# Check if this is an AJAX request
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
# Extract box number from message (format: "Box 12345678 created successfully")
import re
match = re.search(r'Box (\d{8})', message)
if match:
return jsonify({'success': True, 'box_number': match.group(1), 'message': message})
else:
return jsonify({'success': False, 'error': message})
session['flash_message'] = message
elif action == "update_status":
box_id = request.form.get("box_id")
new_status = request.form.get("new_status")
message = update_box_status(box_id, new_status)
session['flash_message'] = message
elif action == "update_location":
box_id = request.form.get("box_id")
new_location_id = request.form.get("new_location_id")
message = update_box_location(box_id, new_location_id)
session['flash_message'] = message
return redirect(url_for('warehouse.manage_boxes'))
# Get flash message from session if any
message = session.pop('flash_message', None)
boxes = get_boxes()
locations = get_locations()
return render_template("manage_boxes.html", boxes=boxes, locations=locations, message=message)
except Exception as e:
import traceback
error_trace = traceback.format_exc()
print(f"Error in manage_boxes_handler: {e}")
print(error_trace)
return f"{error_trace}", 500
def delete_location_by_id(location_id):
"""Delete a warehouse location by ID"""
try:
conn = get_db_connection()
cursor = conn.cursor()
# Check if location exists
cursor.execute("SELECT location_code FROM warehouse_locations WHERE id = %s", (location_id,))
location = cursor.fetchone()
if not location:
conn.close()
return {"success": False, "error": "Location not found"}
# Delete location
cursor.execute("DELETE FROM warehouse_locations WHERE id = %s", (location_id,))
conn.commit()
conn.close()
return {"success": True, "message": f"Location '{location[0]}' deleted successfully"}
except Exception as e:
print(f"Error deleting location: {e}")
return {"success": False, "error": str(e)}
def assign_cp_to_box_handler():
"""Handle assigning CP code to a box"""
from flask import request, jsonify, session
import json
try:
# Ensure box_contents table exists
ensure_box_contents_table()
data = json.loads(request.data)
box_number = data.get('box_number')
cp_code = data.get('cp_code')
scanned_by = session.get('user', 'Unknown')
if not box_number or not cp_code:
return jsonify({'success': False, 'error': 'Missing box_number or cp_code'}), 400
conn = get_db_connection()
cursor = conn.cursor()
# Find the box by number
cursor.execute("SELECT id FROM boxes_crates WHERE box_number = %s", (box_number,))
box = cursor.fetchone()
if not box:
conn.close()
return jsonify({'success': False, 'error': f'Box {box_number} not found'}), 404
box_id = box[0]
# Insert into box_contents
cursor.execute("""
INSERT INTO box_contents (box_id, cp_code, scanned_by)
VALUES (%s, %s, %s)
""", (box_id, cp_code, scanned_by))
conn.commit()
conn.close()
return jsonify({
'success': True,
'message': f'CP {cp_code} assigned to box {box_number}'
}), 200
except Exception as e:
import traceback
print(f"Error in assign_cp_to_box_handler: {e}")
print(traceback.format_exc())
return jsonify({'success': False, 'error': str(e)}), 500
def view_warehouse_inventory_handler():
"""Handle warehouse inventory view - shows CP codes, boxes, and locations"""
from flask import render_template, request
try:
# Ensure tables exist
ensure_box_contents_table()
ensure_location_contents_table()
# Get search parameters
search_cp = request.args.get('search_cp', '').strip()
search_box = request.args.get('search_box', '').strip()
search_location = request.args.get('search_location', '').strip()
conn = get_db_connection()
cursor = conn.cursor()
# Build query with joins and filters
query = """
SELECT
bc.cp_code,
b.box_number,
wl.location_code,
bc.scanned_at,
bc.scanned_by,
lc.placed_at,
lc.placed_by,
b.status as box_status,
lc.status as location_status
FROM box_contents bc
INNER JOIN boxes_crates b ON bc.box_id = b.id
LEFT JOIN location_contents lc ON b.id = lc.box_id AND lc.status = 'active'
LEFT JOIN warehouse_locations wl ON lc.location_id = wl.id
WHERE 1=1
"""
params = []
if search_cp:
query += " AND bc.cp_code LIKE %s"
params.append(f"%{search_cp}%")
if search_box:
query += " AND b.box_number LIKE %s"
params.append(f"%{search_box}%")
if search_location:
query += " AND wl.location_code LIKE %s"
params.append(f"%{search_location}%")
query += " ORDER BY bc.scanned_at DESC"
cursor.execute(query, params)
inventory_data = cursor.fetchall()
conn.close()
return render_template(
'warehouse_inventory.html',
inventory_data=inventory_data,
search_cp=search_cp,
search_box=search_box,
search_location=search_location
)
except Exception as e:
import traceback
error_trace = traceback.format_exc()
print(f"Error in view_warehouse_inventory_handler: {e}")
print(error_trace)
return f"{error_trace}", 500