Implement boxes management module with auto-numbered box creation

- Add boxes_crates database table with BIGINT IDs and 8-digit auto-numbered box_numbers
- Implement boxes CRUD operations (add, edit, update, delete, delete_multiple)
- Create boxes route handlers with POST actions for all operations
- Add boxes.html template with 3-panel layout matching warehouse locations module
- Implement barcode generation and printing with JsBarcode and QZ Tray integration
- Add browser print fallback for when QZ Tray is not available
- Simplify create box form to single button with auto-generation
- Fix JavaScript null reference errors with proper element validation
- Convert tuple data to dictionaries for Jinja2 template compatibility
- Register boxes blueprint in Flask app initialization
This commit is contained in:
Quality App Developer
2026-01-26 22:08:31 +02:00
parent 3c5a273a89
commit e1f3302c6b
37 changed files with 8429 additions and 66 deletions

View File

@@ -26,15 +26,6 @@ def quality_index():
return render_template('modules/quality/index.html')
@quality_bp.route('/inspections', methods=['GET'])
def inspections():
"""View and manage quality inspections"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
return render_template('modules/quality/inspections.html')
@quality_bp.route('/reports', methods=['GET'])
def quality_reports():
"""Quality reports page - displays FG scan reports"""

View File

@@ -0,0 +1,231 @@
"""
Settings Module - Log Explorer Helper
Provides functions to explore and manage application logs
"""
import os
from datetime import datetime
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
def get_log_files():
"""Get list of all log files in the logs folder"""
try:
log_dir = './data/logs'
if not os.path.exists(log_dir):
return []
log_files = []
for filename in sorted(os.listdir(log_dir)):
filepath = os.path.join(log_dir, filename)
if os.path.isfile(filepath):
try:
stat_info = os.stat(filepath)
log_files.append({
'name': filename,
'size': stat_info.st_size,
'size_mb': round(stat_info.st_size / 1024 / 1024, 2),
'modified_at': datetime.fromtimestamp(stat_info.st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
'path': filepath
})
except Exception as e:
logger.error(f"Error getting stat info for {filename}: {e}")
continue
return sorted(log_files, key=lambda x: x['modified_at'], reverse=True)
except Exception as e:
logger.error(f"Error getting log files: {e}")
return []
def get_log_content(filename, lines=100):
"""Get content of a log file
Args:
filename: Name of the log file (without path)
lines: Number of lines to read from the end (None for all)
Returns:
Dictionary with file info and content
"""
try:
log_dir = './data/logs'
filepath = os.path.join(log_dir, filename)
# Security check - ensure filepath is within log_dir
if not os.path.abspath(filepath).startswith(os.path.abspath(log_dir)):
logger.error(f"Attempted to access file outside log directory: {filepath}")
return {
'success': False,
'error': 'Invalid file path',
'filename': filename
}
if not os.path.exists(filepath):
return {
'success': False,
'error': f'File not found: {filename}',
'filename': filename
}
# Read file content
try:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
if lines:
# Read all lines and get the last N lines
all_lines = f.readlines()
content_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines
content = ''.join(content_lines)
total_lines = len(all_lines)
else:
content = f.read()
total_lines = len(content.splitlines())
stat_info = os.stat(filepath)
return {
'success': True,
'filename': filename,
'size': stat_info.st_size,
'size_mb': round(stat_info.st_size / 1024 / 1024, 2),
'modified_at': datetime.fromtimestamp(stat_info.st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
'content': content,
'total_lines': total_lines,
'displayed_lines': len(content.splitlines()),
'truncated': lines and total_lines > lines
}
except Exception as e:
return {
'success': False,
'error': f'Error reading file: {str(e)}',
'filename': filename
}
except Exception as e:
logger.error(f"Error getting log content for {filename}: {e}")
return {
'success': False,
'error': f'Error: {str(e)}',
'filename': filename
}
def get_log_file_path(filename):
"""Get safe file path for download/save
Args:
filename: Name of the log file (without path)
Returns:
Full file path if valid, None otherwise
"""
try:
log_dir = './data/logs'
filepath = os.path.join(log_dir, filename)
# Security check - ensure filepath is within log_dir
if not os.path.abspath(filepath).startswith(os.path.abspath(log_dir)):
logger.error(f"Attempted to access file outside log directory: {filepath}")
return None
if not os.path.exists(filepath):
logger.error(f"File not found: {filepath}")
return None
return filepath
except Exception as e:
logger.error(f"Error getting log file path for {filename}: {e}")
return None
def get_log_statistics():
"""Get statistics about log files"""
try:
log_files = get_log_files()
if not log_files:
return {
'total_files': 0,
'total_size_mb': 0,
'oldest_log': None,
'newest_log': None
}
total_size = sum(f['size'] for f in log_files)
return {
'total_files': len(log_files),
'total_size_mb': round(total_size / 1024 / 1024, 2),
'oldest_log': log_files[-1]['modified_at'] if log_files else None,
'newest_log': log_files[0]['modified_at'] if log_files else None
}
except Exception as e:
logger.error(f"Error getting log statistics: {e}")
return {
'total_files': 0,
'total_size_mb': 0,
'oldest_log': None,
'newest_log': None
}
def search_in_logs(search_term, filename=None, max_results=50):
"""Search for a term in log files
Args:
search_term: Term to search for
filename: Optional specific file to search in
max_results: Maximum number of results to return
Returns:
List of matching lines with context
"""
try:
log_dir = './data/logs'
results = []
if filename:
# Search in specific file
filepath = os.path.join(log_dir, filename)
if not os.path.abspath(filepath).startswith(os.path.abspath(log_dir)):
return []
if os.path.exists(filepath):
try:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
for line_num, line in enumerate(f, 1):
if search_term.lower() in line.lower():
results.append({
'file': filename,
'line_num': line_num,
'line': line.strip()
})
if len(results) >= max_results:
break
except Exception as e:
logger.error(f"Error searching in {filename}: {e}")
else:
# Search in all log files
for log_file in get_log_files():
try:
with open(log_file['path'], 'r', encoding='utf-8', errors='ignore') as f:
for line_num, line in enumerate(f, 1):
if search_term.lower() in line.lower():
results.append({
'file': log_file['name'],
'line_num': line_num,
'line': line.strip()
})
if len(results) >= max_results:
break
if len(results) >= max_results:
break
except Exception as e:
logger.error(f"Error searching in {log_file['name']}: {e}")
return results
except Exception as e:
logger.error(f"Error searching logs: {e}")
return []

View File

@@ -7,6 +7,8 @@ import hashlib
import secrets
from datetime import datetime, timedelta
from app.database import get_db
from app.modules.settings.stats import get_all_stats
from app.modules.settings.logs import get_log_files, get_log_content, get_log_file_path, get_log_statistics, search_in_logs
import subprocess
import os
import json
@@ -19,11 +21,30 @@ settings_bp = Blueprint('settings', __name__, url_prefix='/settings')
@settings_bp.route('/', methods=['GET'])
def settings_index():
"""Settings module main page"""
"""Settings module main page with app overview"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
return render_template('modules/settings/index.html')
# Get all app statistics
try:
stats = get_all_stats()
except Exception as e:
logger.error(f"Error getting stats in settings_index: {e}", exc_info=True)
stats = {
'user_count': 0,
'database_size_mb': 0,
'logs_size_mb': 0,
'database_count': 0,
'backup_count': 0,
'printer_keys_count': 0,
'app_key_availability': {
'available': False,
'count': 0,
'status': 'Error loading data'
}
}
return render_template('modules/settings/index.html', stats=stats)
@settings_bp.route('/general', methods=['GET', 'POST'])
@@ -1254,3 +1275,100 @@ def toggle_backup_schedule(schedule_id):
except Exception as e:
return jsonify({'error': str(e)}), 500
# ============================================================================
# Log Explorer Routes
# ============================================================================
@settings_bp.route('/logs', methods=['GET'])
def logs_explorer():
"""Log explorer main page - list all log files"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
try:
log_files = get_log_files()
log_stats = get_log_statistics()
return render_template('modules/settings/logs_explorer.html',
log_files=log_files,
log_stats=log_stats)
except Exception as e:
logger.error(f"Error loading logs explorer: {e}")
flash(f"Error loading logs: {str(e)}", 'error')
return render_template('modules/settings/logs_explorer.html',
log_files=[],
log_stats={})
@settings_bp.route('/logs/view/<filename>', methods=['GET'])
def view_log(filename):
"""View content of a specific log file"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
try:
lines = request.args.get('lines', default=100, type=int)
log_data = get_log_content(filename, lines=lines)
if not log_data.get('success'):
flash(log_data.get('error', 'Error reading log file'), 'error')
return redirect(url_for('settings.logs_explorer'))
return render_template('modules/settings/view_log.html', log_data=log_data)
except Exception as e:
logger.error(f"Error viewing log {filename}: {e}")
flash(f"Error viewing log: {str(e)}", 'error')
return redirect(url_for('settings.logs_explorer'))
@settings_bp.route('/logs/download/<filename>', methods=['GET'])
def download_log(filename):
"""Download a log file"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
try:
filepath = get_log_file_path(filename)
if not filepath:
flash('Invalid file or file not found', 'error')
return redirect(url_for('settings.logs_explorer'))
return send_file(
filepath,
as_attachment=True,
download_name=f"{filename}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
)
except Exception as e:
logger.error(f"Error downloading log {filename}: {e}")
flash(f"Error downloading log: {str(e)}", 'error')
return redirect(url_for('settings.logs_explorer'))
@settings_bp.route('/logs/search', methods=['GET'])
def search_logs():
"""Search for terms in log files"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
try:
search_term = request.args.get('q', '').strip()
filename = request.args.get('file', default=None)
results = []
if search_term:
results = search_in_logs(search_term, filename=filename)
log_files = get_log_files()
return render_template('modules/settings/search_logs.html',
search_term=search_term,
results=results,
log_files=log_files,
selected_file=filename)
except Exception as e:
logger.error(f"Error searching logs: {e}")
flash(f"Error searching logs: {str(e)}", 'error')
return redirect(url_for('settings.logs_explorer'))

View File

@@ -0,0 +1,247 @@
"""
Settings Module - App Statistics Helper
Provides functions to collect various app statistics for the overview
"""
import os
import pymysql
from datetime import datetime
from pathlib import Path
from app.database import get_db
import logging
logger = logging.getLogger(__name__)
def get_user_count():
"""Get total number of existing users"""
try:
conn = get_db()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) as count FROM users WHERE is_active = 1")
result = cursor.fetchone()
cursor.close()
return result[0] if result else 0
except Exception as e:
logger.error(f"Error getting user count: {e}")
return 0
def get_database_size():
"""Get size of the database in MB"""
try:
conn = get_db()
cursor = conn.cursor()
try:
# Get database name from connection
cursor.execute("SELECT DATABASE()")
result = cursor.fetchone()
if not result:
cursor.close()
return 0
db_name = result[0]
# Get database size
query = f"""
SELECT ROUND(SUM(data_length + index_length) / 1024 / 1024, 2)
FROM information_schema.tables
WHERE table_schema = %s
"""
cursor.execute(query, (db_name,))
result = cursor.fetchone()
cursor.close()
return result[0] if result and result[0] else 0
except Exception as e:
cursor.close()
logger.error(f"Error executing database size query: {e}")
return 0
except Exception as e:
logger.error(f"Error getting database size: {e}")
return 0
def get_logs_size():
"""Get total size of log files in MB"""
try:
log_dir = './data/logs'
if not os.path.exists(log_dir):
return 0
total_size = 0
for filename in os.listdir(log_dir):
filepath = os.path.join(log_dir, filename)
if os.path.isfile(filepath):
total_size += os.path.getsize(filepath)
return round(total_size / 1024 / 1024, 2)
except Exception as e:
logger.error(f"Error getting logs size: {e}")
return 0
def get_database_count():
"""Get number of existing databases (user accessible)"""
try:
conn = get_db()
cursor = conn.cursor()
try:
cursor.execute("SHOW DATABASES")
result = cursor.fetchall()
cursor.close()
# Filter out system databases
if result:
excluded_dbs = {'information_schema', 'mysql', 'performance_schema', 'sys'}
user_dbs = [db for db in result if db[0] not in excluded_dbs]
return len(user_dbs)
return 0
except Exception as e:
cursor.close()
logger.error(f"Error executing show databases query: {e}")
return 0
except Exception as e:
logger.error(f"Error getting database count: {e}")
return 0
def get_backup_count():
"""Get number of scheduled backups for the database"""
try:
conn = get_db()
cursor = conn.cursor()
# Check if backups table exists
cursor.execute("""
SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = DATABASE() AND table_name = 'backup_schedules'
""")
if cursor.fetchone()[0] > 0:
cursor.execute("SELECT COUNT(*) FROM backup_schedules WHERE is_active = 1")
result = cursor.fetchone()
cursor.close()
return result[0] if result else 0
else:
cursor.close()
# Count backup files if no table exists
backup_dir = './data/backups'
if os.path.exists(backup_dir):
return len([f for f in os.listdir(backup_dir) if f.endswith('.sql')])
return 0
except Exception as e:
logger.error(f"Error getting backup count: {e}")
# Fallback to counting backup files
try:
backup_dir = './data/backups'
if os.path.exists(backup_dir):
return len([f for f in os.listdir(backup_dir) if f.endswith('.sql')])
except:
pass
return 0
def get_printer_keys_count():
"""Get number of keys for printers (pairing keys)"""
try:
conn = get_db()
cursor = conn.cursor()
try:
# Check if qz_pairing_keys table exists
cursor.execute("""
SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = DATABASE() AND table_name = 'qz_pairing_keys'
""")
if cursor.fetchone()[0] > 0:
cursor.execute("SELECT COUNT(*) FROM qz_pairing_keys")
result = cursor.fetchone()
cursor.close()
return result[0] if result else 0
else:
cursor.close()
return 0
except Exception as e:
cursor.close()
logger.error(f"Error executing printer keys query: {e}")
return 0
except Exception as e:
logger.error(f"Error getting printer keys count: {e}")
return 0
def check_app_key_availability():
"""Check app key availability"""
try:
conn = get_db()
cursor = conn.cursor()
try:
# Check if api_keys table exists
cursor.execute("""
SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = DATABASE() AND table_name = 'api_keys'
""")
if cursor.fetchone()[0] > 0:
cursor.execute("SELECT COUNT(*) FROM api_keys WHERE is_active = 1")
result = cursor.fetchone()
cursor.close()
count = result[0] if result else 0
return {
'available': count > 0,
'count': count,
'status': f'{count} active key(s)' if count > 0 else 'No active keys'
}
else:
cursor.close()
return {
'available': False,
'count': 0,
'status': 'API Keys table not found'
}
except Exception as e:
cursor.close()
logger.error(f"Error executing api_keys query: {e}")
return {
'available': False,
'count': 0,
'status': f'Error: {str(e)}'
}
except Exception as e:
logger.error(f"Error checking app key availability: {e}")
return {
'available': False,
'count': 0,
'status': f'Error: {str(e)}'
}
def get_all_stats():
"""Get all statistics for the overview"""
try:
return {
'user_count': get_user_count(),
'database_size_mb': get_database_size(),
'logs_size_mb': get_logs_size(),
'database_count': get_database_count(),
'backup_count': get_backup_count(),
'printer_keys_count': get_printer_keys_count(),
'app_key_availability': check_app_key_availability()
}
except Exception as e:
logger.error(f"Error getting all stats: {e}")
# Return defaults on error
return {
'user_count': 0,
'database_size_mb': 0,
'logs_size_mb': 0,
'database_count': 0,
'backup_count': 0,
'printer_keys_count': 0,
'app_key_availability': {
'available': False,
'count': 0,
'status': 'Error loading data'
}
}

View File

@@ -0,0 +1,3 @@
"""
Warehouse Module - Initialization
"""

View File

@@ -0,0 +1,254 @@
"""
Boxes Management Module
Handles CRUD operations for warehouse boxes
Uses boxes_crates table matching the old app structure
"""
from app.database import get_db
import logging
logger = logging.getLogger(__name__)
def ensure_boxes_table():
"""Create boxes_crates table if it doesn't exist"""
try:
conn = get_db()
cursor = conn.cursor()
cursor.execute("SHOW TABLES LIKE 'boxes_crates'")
result = cursor.fetchone()
if not result:
cursor.execute('''
CREATE TABLE IF NOT EXISTS boxes_crates (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
box_number VARCHAR(8) NOT NULL UNIQUE,
status ENUM('open', 'closed') DEFAULT 'open',
location_id BIGINT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
created_by VARCHAR(100),
FOREIGN KEY (location_id) REFERENCES warehouse_locations(id) ON DELETE SET NULL,
INDEX idx_box_number (box_number),
INDEX idx_status (status)
)
''')
conn.commit()
cursor.close()
logger.info("boxes_crates table ensured")
return True
except Exception as e:
logger.error(f"Error ensuring boxes_crates table: {e}")
return False
def generate_box_number():
"""Generate next box number with 8 digits (00000001, 00000002, etc.)"""
try:
conn = get_db()
cursor = conn.cursor()
cursor.execute("SELECT MAX(CAST(box_number AS UNSIGNED)) FROM boxes_crates")
result = cursor.fetchone()
cursor.close()
if result and result[0]:
next_number = int(result[0]) + 1
else:
next_number = 1
return str(next_number).zfill(8)
except Exception as e:
logger.error(f"Error generating box number: {e}")
return "00000001"
def add_box(created_by=None):
"""Add a new box/crate with auto-generated number"""
try:
ensure_boxes_table()
box_number = generate_box_number()
conn = get_db()
cursor = conn.cursor()
cursor.execute(
"INSERT INTO boxes_crates (box_number, status, created_by) VALUES (%s, %s, %s)",
(box_number, 'open', created_by)
)
conn.commit()
cursor.close()
logger.info(f"Box {box_number} created successfully")
return True, f"Box {box_number} created successfully"
except Exception as e:
logger.error(f"Error adding box: {e}")
return False, f"Error creating box: {str(e)}"
def get_all_boxes():
"""Get all boxes with their location information"""
try:
ensure_boxes_table()
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT
b.id,
b.box_number,
b.status,
COALESCE(l.location_code, 'Not assigned') as location_code,
b.created_at,
b.updated_at,
b.created_by,
b.location_id
FROM boxes_crates b
LEFT JOIN warehouse_locations l ON b.location_id = l.id
ORDER BY b.created_at DESC
''')
boxes = cursor.fetchall()
cursor.close()
return boxes if boxes else []
except Exception as e:
logger.error(f"Error getting all boxes: {e}")
return []
def get_box_by_id(box_id):
"""Get a single box by ID"""
try:
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT
b.id,
b.box_number,
b.status,
b.location_id,
b.created_by,
b.created_at,
b.updated_at
FROM boxes_crates b
WHERE b.id = %s
''', (box_id,))
box = cursor.fetchone()
cursor.close()
return box
except Exception as e:
logger.error(f"Error getting box by ID: {e}")
return None
def update_box(box_id, status=None, location_id=None):
"""Update box status or location"""
try:
conn = get_db()
cursor = conn.cursor()
if status and location_id is not None:
cursor.execute(
"UPDATE boxes_crates SET status = %s, location_id = %s WHERE id = %s",
(status, location_id if location_id else None, box_id)
)
elif status:
cursor.execute(
"UPDATE boxes_crates SET status = %s WHERE id = %s",
(status, box_id)
)
elif location_id is not None:
cursor.execute(
"UPDATE boxes_crates SET location_id = %s WHERE id = %s",
(location_id if location_id else None, box_id)
)
conn.commit()
cursor.close()
logger.info(f"Box {box_id} updated successfully")
return True, "Box updated successfully"
except Exception as e:
logger.error(f"Error updating box: {e}")
return False, f"Error updating box: {str(e)}"
def delete_box(box_id):
"""Delete a single box"""
try:
conn = get_db()
cursor = conn.cursor()
cursor.execute("DELETE FROM boxes_crates WHERE id = %s", (box_id,))
conn.commit()
cursor.close()
logger.info(f"Box {box_id} deleted successfully")
return True, "Box deleted successfully"
except Exception as e:
logger.error(f"Error deleting box: {e}")
return False, f"Error deleting box: {str(e)}"
def delete_multiple_boxes(box_ids_str):
"""Delete multiple boxes"""
try:
if not box_ids_str:
return False, "No boxes selected"
# Parse box IDs
box_ids = [int(x) for x in box_ids_str.split(',') if x.strip()]
if not box_ids:
return False, "No valid box IDs provided"
conn = get_db()
cursor = conn.cursor()
placeholders = ','.join(['%s'] * len(box_ids))
cursor.execute(f"DELETE FROM boxes_crates WHERE id IN ({placeholders})", box_ids)
conn.commit()
cursor.close()
logger.info(f"Deleted {len(box_ids)} boxes")
return True, f"Deleted {len(box_ids)} box(es) successfully"
except Exception as e:
logger.error(f"Error deleting multiple boxes: {e}")
return False, f"Error deleting boxes: {str(e)}"
def get_box_stats():
"""Get box statistics"""
try:
ensure_boxes_table()
conn = get_db()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM boxes_crates")
total = cursor.fetchone()[0] if cursor.fetchone() else 0
cursor.execute("SELECT COUNT(*) FROM boxes_crates WHERE status = 'open'")
result = cursor.fetchone()
open_count = result[0] if result else 0
cursor.execute("SELECT COUNT(*) FROM boxes_crates WHERE status = 'closed'")
result = cursor.fetchone()
closed_count = result[0] if result else 0
cursor.close()
return {
'total': total,
'open': open_count,
'closed': closed_count
}
except Exception as e:
logger.error(f"Error getting box statistics: {e}")
return {'total': 0, 'open': 0, 'closed': 0}

View File

@@ -0,0 +1,101 @@
"""
Boxes Management Routes
"""
from flask import Blueprint, render_template, session, redirect, url_for, request, flash
from app.modules.warehouse.boxes import (
get_all_boxes, add_box, update_box, delete_box, delete_multiple_boxes,
get_box_by_id, get_box_stats, ensure_boxes_table
)
from app.modules.warehouse.warehouse import get_all_locations
import logging
logger = logging.getLogger(__name__)
boxes_bp = Blueprint('boxes', __name__, url_prefix='/warehouse/boxes')
@boxes_bp.route('/', methods=['GET', 'POST'])
def manage_boxes():
"""Manage warehouse boxes page"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
# Ensure table exists
ensure_boxes_table()
message = None
message_type = 'info'
if request.method == 'POST':
action = request.form.get('action', '')
user_id = session.get('user_id', 'System')
# Add new box (auto-numbered)
if action == 'add_box':
success, msg = add_box(created_by=user_id)
message = msg
message_type = 'success' if success else 'danger'
# Update box status or location
elif action == 'edit_box':
box_id = request.form.get('box_id', '')
status = request.form.get('status', '')
location_id = request.form.get('location_id', '')
location_id = int(location_id) if location_id and location_id.isdigit() else None
success, msg = update_box(box_id, status=status if status else None, location_id=location_id)
message = msg
message_type = 'success' if success else 'danger'
# Change status
elif action == 'toggle_status':
box_id = request.form.get('box_id', '')
current_status = request.form.get('current_status', 'open')
new_status = 'closed' if current_status == 'open' else 'open'
success, msg = update_box(box_id, status=new_status)
message = msg
message_type = 'success' if success else 'danger'
# Delete single box
elif action == 'delete_box':
box_id = request.form.get('box_id', '')
success, msg = delete_box(box_id)
message = msg
message_type = 'success' if success else 'danger'
# Delete multiple boxes
elif action == 'delete_multiple':
box_ids = request.form.get('delete_ids', '')
success, msg = delete_multiple_boxes(box_ids)
message = msg
message_type = 'success' if success else 'danger'
# Get data and convert tuples to dictionaries
boxes_data = get_all_boxes()
boxes = []
for box_tuple in boxes_data:
boxes.append({
'id': box_tuple[0],
'box_number': box_tuple[1],
'status': box_tuple[2],
'location_code': box_tuple[3],
'created_at': box_tuple[4],
'updated_at': box_tuple[5],
'created_by': box_tuple[6],
'location_id': box_tuple[7]
})
locations = get_all_locations()
stats = get_box_stats()
return render_template(
'modules/warehouse/boxes.html',
boxes=boxes,
locations=locations,
stats=stats,
message=message,
message_type=message_type
)

View File

@@ -0,0 +1,125 @@
"""
Warehouse Module Routes
"""
from flask import Blueprint, render_template, session, redirect, url_for, request, flash, jsonify
from app.modules.warehouse.warehouse import (
get_all_locations, add_location, update_location, delete_location,
delete_multiple_locations, get_location_by_id
)
import logging
logger = logging.getLogger(__name__)
warehouse_bp = Blueprint('warehouse', __name__, url_prefix='/warehouse')
@warehouse_bp.route('/', methods=['GET'])
def warehouse_index():
"""Warehouse module main page - launcher for all warehouse operations"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
return render_template('modules/warehouse/index.html')
@warehouse_bp.route('/set-boxes-locations', methods=['GET', 'POST'])
def set_boxes_locations():
"""Set boxes locations - add or update articles in warehouse inventory"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
return render_template('modules/warehouse/set_boxes_locations.html')
@warehouse_bp.route('/locations', methods=['GET', 'POST'])
def locations():
"""Create and manage warehouse locations"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
message = None
message_type = 'info'
if request.method == 'POST':
# Handle edit location
if request.form.get('edit_location'):
location_id = request.form.get('location_id', '')
size = request.form.get('edit_size', '').strip()
description = request.form.get('edit_description', '').strip()
try:
location_id = int(location_id)
success, msg = update_location(location_id, None, size if size else None, description if description else None)
message = msg
message_type = 'success' if success else 'error'
except Exception as e:
message = f"Error: {str(e)}"
message_type = 'error'
# Handle delete locations
elif request.form.get('delete_locations'):
delete_ids_str = request.form.get('delete_ids', '')
try:
location_ids = [int(id.strip()) for id in delete_ids_str.split(',') if id.strip().isdigit()]
success, msg = delete_multiple_locations(location_ids)
message = msg
message_type = 'success' if success else 'error'
except Exception as e:
message = f"Error: {str(e)}"
message_type = 'error'
# Handle add location
elif request.form.get('add_location'):
location_code = request.form.get('location_code', '').strip()
size = request.form.get('size', '').strip()
description = request.form.get('description', '').strip()
if not location_code:
message = "Location code is required"
message_type = 'error'
else:
success, msg = add_location(location_code, size if size else None, description if description else None)
message = msg
message_type = 'success' if success else 'error'
# Get all locations
locations_list = get_all_locations()
return render_template('modules/warehouse/locations.html',
locations=locations_list,
message=message,
message_type=message_type)
@warehouse_bp.route('/boxes', methods=['GET', 'POST'])
def boxes():
"""Manage boxes and crates in the warehouse"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
return render_template('modules/warehouse/boxes.html')
@warehouse_bp.route('/inventory', methods=['GET'])
def inventory():
"""View warehouse inventory - products, boxes, and locations"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
return render_template('modules/warehouse/inventory.html')
@warehouse_bp.route('/reports', methods=['GET'])
def reports():
"""Warehouse activity and inventory reports"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
return render_template('modules/warehouse/reports.html')
@warehouse_bp.route('/test-barcode', methods=['GET'])
def test_barcode():
"""Test barcode printing functionality"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
return render_template('modules/warehouse/test_barcode.html')

View File

@@ -0,0 +1,213 @@
"""
Warehouse Module - Helper Functions
Provides functions for warehouse operations
"""
import logging
from app.database import get_db
logger = logging.getLogger(__name__)
def ensure_warehouse_locations_table():
"""Ensure warehouse_locations table exists"""
try:
conn = get_db()
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS warehouse_locations (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
location_code VARCHAR(12) UNIQUE NOT NULL,
size INT,
description VARCHAR(250),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_location_code (location_code)
)
""")
conn.commit()
cursor.close()
logger.info("warehouse_locations table ensured")
return True
except Exception as e:
logger.error(f"Error ensuring warehouse_locations table: {e}")
return False
def add_location(location_code, size, description):
"""Add a new warehouse location"""
try:
ensure_warehouse_locations_table()
conn = get_db()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO warehouse_locations (location_code, size, description)
VALUES (%s, %s, %s)
""", (location_code, size if size else None, description))
conn.commit()
cursor.close()
return True, "Location added successfully."
except Exception as e:
if "Duplicate entry" in str(e):
return False, f"Failed: Location code '{location_code}' already exists."
logger.error(f"Error adding location: {e}")
return False, f"Error adding location: {str(e)}"
def get_all_locations():
"""Get all warehouse locations"""
try:
ensure_warehouse_locations_table()
conn = get_db()
cursor = conn.cursor()
cursor.execute("""
SELECT id, location_code, size, description, created_at, updated_at
FROM warehouse_locations
ORDER BY id DESC
""")
locations = cursor.fetchall()
cursor.close()
result = []
for loc in locations:
result.append({
'id': loc[0],
'location_code': loc[1],
'size': loc[2],
'description': loc[3],
'created_at': loc[4],
'updated_at': loc[5]
})
return result
except Exception as e:
logger.error(f"Error getting locations: {e}")
return []
def get_location_by_id(location_id):
"""Get a specific location by ID"""
try:
conn = get_db()
cursor = conn.cursor()
cursor.execute("""
SELECT id, location_code, size, description, created_at, updated_at
FROM warehouse_locations
WHERE id = %s
""", (location_id,))
loc = cursor.fetchone()
cursor.close()
if loc:
return {
'id': loc[0],
'location_code': loc[1],
'size': loc[2],
'description': loc[3],
'created_at': loc[4],
'updated_at': loc[5]
}
return None
except Exception as e:
logger.error(f"Error getting location: {e}")
return None
def update_location(location_id, location_code=None, size=None, description=None):
"""Update a warehouse location
Args:
location_id: ID of location to update
location_code: New location code (optional - cannot be changed in form)
size: New size (optional)
description: New description (optional)
"""
try:
conn = get_db()
cursor = conn.cursor()
# Build update query dynamically
updates = []
params = []
if location_code:
updates.append("location_code = %s")
params.append(location_code)
if size is not None:
updates.append("size = %s")
params.append(size)
if description is not None:
updates.append("description = %s")
params.append(description)
if not updates:
return False, "No fields to update"
params.append(location_id)
query = f"UPDATE warehouse_locations SET {', '.join(updates)} WHERE id = %s"
cursor.execute(query, params)
conn.commit()
cursor.close()
return True, "Location updated successfully."
except Exception as e:
if "Duplicate entry" in str(e):
return False, f"Failed: Location code already exists."
logger.error(f"Error updating location: {e}")
return False, f"Error updating location: {str(e)}"
def delete_location(location_id):
"""Delete a warehouse location"""
try:
conn = get_db()
cursor = conn.cursor()
cursor.execute("DELETE FROM warehouse_locations WHERE id = %s", (location_id,))
conn.commit()
cursor.close()
return True, "Location deleted successfully."
except Exception as e:
logger.error(f"Error deleting location: {e}")
return False, f"Error deleting location: {str(e)}"
def delete_multiple_locations(location_ids):
"""Delete multiple warehouse locations"""
try:
if not location_ids:
return False, "No locations to delete."
conn = get_db()
cursor = conn.cursor()
deleted_count = 0
for loc_id in location_ids:
try:
cursor.execute("DELETE FROM warehouse_locations WHERE id = %s", (int(loc_id),))
if cursor.rowcount > 0:
deleted_count += 1
except:
pass
conn.commit()
cursor.close()
return True, f"Deleted {deleted_count} location(s)."
except Exception as e:
logger.error(f"Error deleting multiple locations: {e}")
return False, f"Error deleting locations: {str(e)}"