import mariadb from flask import current_app, request, render_template, session, redirect, url_for, jsonify, make_response import csv, os, tempfile from reportlab.lib.pagesizes import letter from reportlab.pdfgen import canvas from reportlab.lib.units import cm from reportlab.graphics.barcode import code128 import io def get_db_connection(): settings_file = current_app.instance_path + '/external_server.conf' settings = {} with open(settings_file, 'r') as f: for line in f: key, value = line.strip().split('=', 1) settings[key] = value return mariadb.connect( user=settings['username'], password=settings['password'], host=settings['server_domain'], port=int(settings['port']), database=settings['database_name'] ) def ensure_warehouse_locations_table(): try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SHOW TABLES LIKE 'warehouse_locations'") result = cursor.fetchone() if not result: cursor.execute(''' CREATE TABLE IF NOT EXISTS warehouse_locations ( id BIGINT AUTO_INCREMENT PRIMARY KEY, location_code VARCHAR(12) NOT NULL, size INT, description VARCHAR(250) ) ''') conn.commit() conn.close() except Exception as e: print(f"Error ensuring warehouse_locations table: {e}") # Add warehouse-specific functions below def add_location(location_code, size, description): conn = get_db_connection() cursor = conn.cursor() try: cursor.execute( "INSERT INTO warehouse_locations (location_code, size, description) VALUES (?, ?, ?)", (location_code, size if size else None, description) ) conn.commit() conn.close() return "Location added successfully." except mariadb.IntegrityError: conn.close() return "Failed: Location code already exists." def get_locations(): conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT id, location_code, size, description FROM warehouse_locations ORDER BY id DESC") locations = cursor.fetchall() conn.close() return locations def recreate_warehouse_locations_table(): conn = get_db_connection() cursor = conn.cursor() cursor.execute("DROP TABLE IF EXISTS warehouse_locations") cursor.execute(""" CREATE TABLE warehouse_locations ( id INT AUTO_INCREMENT PRIMARY KEY, location_code VARCHAR(12) UNIQUE NOT NULL, size INT, description VARCHAR(250) ) """) conn.commit() conn.close() def delete_locations_by_ids(ids_str): ids = [id.strip() for id in ids_str.split(',') if id.strip().isdigit()] if not ids: return "No valid IDs provided." conn = get_db_connection() cursor = conn.cursor() deleted = 0 for id in ids: cursor.execute("DELETE FROM warehouse_locations WHERE id = ?", (id,)) if cursor.rowcount: deleted += 1 conn.commit() conn.close() return f"Deleted {deleted} location(s)." def create_locations_handler(): message = None if request.method == "POST": if request.form.get("delete_locations"): ids_str = request.form.get("delete_ids", "") message = delete_locations_by_ids(ids_str) else: location_code = request.form.get("location_code") size = request.form.get("size") description = request.form.get("description") message = add_location(location_code, size, description) locations = get_locations() return render_template("create_locations.html", locations=locations, message=message) def import_locations_csv_handler(): report = None locations = [] errors = [] temp_dir = tempfile.gettempdir() if request.method == 'POST': file = request.files.get('csv_file') if file and file.filename.endswith('.csv'): temp_path = os.path.join(temp_dir, file.filename) file.save(temp_path) session['csv_filename'] = file.filename session['csv_filepath'] = temp_path with open(temp_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) locations = [] for row in reader: location_code = row.get('Location Code') or row.get('location_code') or '' size = row.get('Size') or row.get('size') or '' description = row.get('Description') or row.get('description') or '' locations.append((location_code, size, description)) session['csv_locations'] = locations elif 'csv_locations' in session: locations = session['csv_locations'] if request.form.get('create_locations') and locations: added = 0 failed = 0 errors = [] for loc in locations: location_code, size, description = loc result = add_location(location_code, size, description) if result and 'success' in result.lower(): added += 1 else: failed += 1 errors.append(location_code or '') report = f"{added} locations were added to warehouse_locations table. {failed} locations failed: {', '.join(errors)}" session.pop('csv_locations', None) session.pop('csv_filename', None) session.pop('csv_filepath', None) return redirect(url_for('warehouse.import_locations_csv') + '#created') elif 'csv_locations' in session: locations = session['csv_locations'] return render_template('import_locations_csv.html', report=report, locations=locations) def generate_location_label_pdf(): """Generate PDF for location barcode label (8x4cm)""" try: data = request.get_json() location_code = data.get('location_code', '') if not location_code: return jsonify({'error': 'Location code is required'}), 400 # Create PDF in memory buffer = io.BytesIO() # Create PDF with 8x4cm page size (width x height) page_width = 8 * cm page_height = 4 * cm c = canvas.Canvas(buffer, pagesize=(page_width, page_height)) # Generate Code128 barcode barcode = code128.Code128(location_code, barWidth=1.0, humanReadable=False) # Calculate the desired barcode dimensions (fill most of the label) desired_barcode_width = 7 * cm # Almost full width desired_barcode_height = 2.5 * cm # Most of the height # Calculate scaling factor to fit the desired width scale = desired_barcode_width / barcode.width # Calculate actual dimensions after scaling actual_width = barcode.width * scale actual_height = barcode.height * scale # Center the barcode on the label barcode_x = (page_width - actual_width) / 2 barcode_y = (page_height - actual_height) / 2 + 0.3 * cm # Slightly above center for text space # Draw barcode with scaling c.saveState() c.translate(barcode_x, barcode_y) c.scale(scale, scale) barcode.drawOn(c, 0, 0) c.restoreState() # Add location code text below barcode c.setFont("Helvetica-Bold", 10) text_width = c.stringWidth(location_code, "Helvetica-Bold", 10) text_x = (page_width - text_width) / 2 text_y = barcode_y - 0.5 * cm # Below the barcode c.drawString(text_x, text_y, location_code) # Finalize PDF c.save() # Prepare response buffer.seek(0) response = make_response(buffer.getvalue()) response.headers['Content-Type'] = 'application/pdf' response.headers['Content-Disposition'] = f'inline; filename=location_{location_code}_label.pdf' return response except Exception as e: print(f"Error generating location label PDF: {e}") return jsonify({'error': str(e)}), 500 def update_location(location_id, location_code, size, description): """Update an existing warehouse location""" try: conn = get_db_connection() cursor = conn.cursor() # Check if location exists cursor.execute("SELECT id FROM warehouse_locations WHERE id = ?", (location_id,)) if not cursor.fetchone(): conn.close() return {"success": False, "error": "Location not found"} # Check if location code already exists for different location cursor.execute("SELECT id FROM warehouse_locations WHERE location_code = ? AND id != ?", (location_code, location_id)) if cursor.fetchone(): conn.close() return {"success": False, "error": "Location code already exists"} # Update location cursor.execute( "UPDATE warehouse_locations SET location_code = ?, size = ?, description = ? WHERE id = ?", (location_code, size if size else None, description, location_id) ) conn.commit() conn.close() return {"success": True, "message": "Location updated successfully"} except Exception as e: print(f"Error updating location: {e}") return {"success": False, "error": str(e)} def delete_location_by_id(location_id): """Delete a warehouse location by ID""" try: conn = get_db_connection() cursor = conn.cursor() # Check if location exists cursor.execute("SELECT location_code FROM warehouse_locations WHERE id = ?", (location_id,)) location = cursor.fetchone() if not location: conn.close() return {"success": False, "error": "Location not found"} # Delete location cursor.execute("DELETE FROM warehouse_locations WHERE id = ?", (location_id,)) conn.commit() conn.close() return {"success": True, "message": f"Location '{location[0]}' deleted successfully"} except Exception as e: print(f"Error deleting location: {e}") return {"success": False, "error": str(e)}