277 lines
10 KiB
Python
Executable File
277 lines
10 KiB
Python
Executable File
import mariadb
|
|
from flask import current_app, request, render_template, session, redirect, url_for, jsonify, make_response
|
|
import csv, os, tempfile
|
|
from reportlab.lib.pagesizes import letter
|
|
from reportlab.pdfgen import canvas
|
|
from reportlab.lib.units import cm
|
|
from reportlab.graphics.barcode import code128
|
|
import io
|
|
|
|
def get_db_connection():
|
|
settings_file = current_app.instance_path + '/external_server.conf'
|
|
settings = {}
|
|
with open(settings_file, 'r') as f:
|
|
for line in f:
|
|
key, value = line.strip().split('=', 1)
|
|
settings[key] = value
|
|
return mariadb.connect(
|
|
user=settings['username'],
|
|
password=settings['password'],
|
|
host=settings['server_domain'],
|
|
port=int(settings['port']),
|
|
database=settings['database_name']
|
|
)
|
|
|
|
def ensure_warehouse_locations_table():
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SHOW TABLES LIKE 'warehouse_locations'")
|
|
result = cursor.fetchone()
|
|
if not result:
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS warehouse_locations (
|
|
id BIGINT AUTO_INCREMENT PRIMARY KEY,
|
|
location_code VARCHAR(12) NOT NULL,
|
|
size INT,
|
|
description VARCHAR(250)
|
|
)
|
|
''')
|
|
conn.commit()
|
|
conn.close()
|
|
except Exception as e:
|
|
print(f"Error ensuring warehouse_locations table: {e}")
|
|
|
|
# Add warehouse-specific functions below
|
|
def add_location(location_code, size, description):
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute(
|
|
"INSERT INTO warehouse_locations (location_code, size, description) VALUES (?, ?, ?)",
|
|
(location_code, size if size else None, description)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return "Location added successfully."
|
|
except mariadb.IntegrityError:
|
|
conn.close()
|
|
return "Failed: Location code already exists."
|
|
|
|
def get_locations():
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT id, location_code, size, description FROM warehouse_locations ORDER BY id DESC")
|
|
locations = cursor.fetchall()
|
|
conn.close()
|
|
return locations
|
|
|
|
def recreate_warehouse_locations_table():
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("DROP TABLE IF EXISTS warehouse_locations")
|
|
cursor.execute("""
|
|
CREATE TABLE warehouse_locations (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
location_code VARCHAR(12) UNIQUE NOT NULL,
|
|
size INT,
|
|
description VARCHAR(250)
|
|
)
|
|
""")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def delete_locations_by_ids(ids_str):
|
|
ids = [id.strip() for id in ids_str.split(',') if id.strip().isdigit()]
|
|
if not ids:
|
|
return "No valid IDs provided."
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
deleted = 0
|
|
for id in ids:
|
|
cursor.execute("DELETE FROM warehouse_locations WHERE id = ?", (id,))
|
|
if cursor.rowcount:
|
|
deleted += 1
|
|
conn.commit()
|
|
conn.close()
|
|
return f"Deleted {deleted} location(s)."
|
|
|
|
def create_locations_handler():
|
|
message = None
|
|
if request.method == "POST":
|
|
if request.form.get("delete_locations"):
|
|
ids_str = request.form.get("delete_ids", "")
|
|
message = delete_locations_by_ids(ids_str)
|
|
else:
|
|
location_code = request.form.get("location_code")
|
|
size = request.form.get("size")
|
|
description = request.form.get("description")
|
|
message = add_location(location_code, size, description)
|
|
locations = get_locations()
|
|
return render_template("create_locations.html", locations=locations, message=message)
|
|
|
|
def import_locations_csv_handler():
|
|
report = None
|
|
locations = []
|
|
errors = []
|
|
temp_dir = tempfile.gettempdir()
|
|
if request.method == 'POST':
|
|
file = request.files.get('csv_file')
|
|
if file and file.filename.endswith('.csv'):
|
|
temp_path = os.path.join(temp_dir, file.filename)
|
|
file.save(temp_path)
|
|
session['csv_filename'] = file.filename
|
|
session['csv_filepath'] = temp_path
|
|
with open(temp_path, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
locations = []
|
|
for row in reader:
|
|
location_code = row.get('Location Code') or row.get('location_code') or ''
|
|
size = row.get('Size') or row.get('size') or ''
|
|
description = row.get('Description') or row.get('description') or ''
|
|
locations.append((location_code, size, description))
|
|
session['csv_locations'] = locations
|
|
elif 'csv_locations' in session:
|
|
locations = session['csv_locations']
|
|
if request.form.get('create_locations') and locations:
|
|
added = 0
|
|
failed = 0
|
|
errors = []
|
|
for loc in locations:
|
|
location_code, size, description = loc
|
|
result = add_location(location_code, size, description)
|
|
if result and 'success' in result.lower():
|
|
added += 1
|
|
else:
|
|
failed += 1
|
|
errors.append(location_code or '')
|
|
report = f"{added} locations were added to warehouse_locations table. {failed} locations failed: {', '.join(errors)}"
|
|
session.pop('csv_locations', None)
|
|
session.pop('csv_filename', None)
|
|
session.pop('csv_filepath', None)
|
|
return redirect(url_for('warehouse.import_locations_csv') + '#created')
|
|
elif 'csv_locations' in session:
|
|
locations = session['csv_locations']
|
|
return render_template('import_locations_csv.html', report=report, locations=locations)
|
|
|
|
def generate_location_label_pdf():
|
|
"""Generate PDF for location barcode label (8x4cm)"""
|
|
try:
|
|
data = request.get_json()
|
|
location_code = data.get('location_code', '')
|
|
|
|
if not location_code:
|
|
return jsonify({'error': 'Location code is required'}), 400
|
|
|
|
# Create PDF in memory
|
|
buffer = io.BytesIO()
|
|
|
|
# Create PDF with 8x4cm page size (width x height)
|
|
page_width = 8 * cm
|
|
page_height = 4 * cm
|
|
|
|
c = canvas.Canvas(buffer, pagesize=(page_width, page_height))
|
|
|
|
# Generate Code128 barcode
|
|
barcode = code128.Code128(location_code, barWidth=1.0, humanReadable=False)
|
|
|
|
# Calculate the desired barcode dimensions (fill most of the label)
|
|
desired_barcode_width = 7 * cm # Almost full width
|
|
desired_barcode_height = 2.5 * cm # Most of the height
|
|
|
|
# Calculate scaling factor to fit the desired width
|
|
scale = desired_barcode_width / barcode.width
|
|
|
|
# Calculate actual dimensions after scaling
|
|
actual_width = barcode.width * scale
|
|
actual_height = barcode.height * scale
|
|
|
|
# Center the barcode on the label
|
|
barcode_x = (page_width - actual_width) / 2
|
|
barcode_y = (page_height - actual_height) / 2 + 0.3 * cm # Slightly above center for text space
|
|
|
|
# Draw barcode with scaling
|
|
c.saveState()
|
|
c.translate(barcode_x, barcode_y)
|
|
c.scale(scale, scale)
|
|
barcode.drawOn(c, 0, 0)
|
|
c.restoreState()
|
|
|
|
# Add location code text below barcode
|
|
c.setFont("Helvetica-Bold", 10)
|
|
text_width = c.stringWidth(location_code, "Helvetica-Bold", 10)
|
|
text_x = (page_width - text_width) / 2
|
|
text_y = barcode_y - 0.5 * cm # Below the barcode
|
|
c.drawString(text_x, text_y, location_code)
|
|
|
|
# Finalize PDF
|
|
c.save()
|
|
|
|
# Prepare response
|
|
buffer.seek(0)
|
|
response = make_response(buffer.getvalue())
|
|
response.headers['Content-Type'] = 'application/pdf'
|
|
response.headers['Content-Disposition'] = f'inline; filename=location_{location_code}_label.pdf'
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
print(f"Error generating location label PDF: {e}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
def update_location(location_id, location_code, size, description):
|
|
"""Update an existing warehouse location"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if location exists
|
|
cursor.execute("SELECT id FROM warehouse_locations WHERE id = ?", (location_id,))
|
|
if not cursor.fetchone():
|
|
conn.close()
|
|
return {"success": False, "error": "Location not found"}
|
|
|
|
# Check if location code already exists for different location
|
|
cursor.execute("SELECT id FROM warehouse_locations WHERE location_code = ? AND id != ?", (location_code, location_id))
|
|
if cursor.fetchone():
|
|
conn.close()
|
|
return {"success": False, "error": "Location code already exists"}
|
|
|
|
# Update location
|
|
cursor.execute(
|
|
"UPDATE warehouse_locations SET location_code = ?, size = ?, description = ? WHERE id = ?",
|
|
(location_code, size if size else None, description, location_id)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {"success": True, "message": "Location updated successfully"}
|
|
|
|
except Exception as e:
|
|
print(f"Error updating location: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
def delete_location_by_id(location_id):
|
|
"""Delete a warehouse location by ID"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if location exists
|
|
cursor.execute("SELECT location_code FROM warehouse_locations WHERE id = ?", (location_id,))
|
|
location = cursor.fetchone()
|
|
if not location:
|
|
conn.close()
|
|
return {"success": False, "error": "Location not found"}
|
|
|
|
# Delete location
|
|
cursor.execute("DELETE FROM warehouse_locations WHERE id = ?", (location_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {"success": True, "message": f"Location '{location[0]}' deleted successfully"}
|
|
|
|
except Exception as e:
|
|
print(f"Error deleting location: {e}")
|
|
return {"success": False, "error": str(e)}
|