- Add boxes_crates database table with BIGINT IDs and 8-digit auto-numbered box_numbers - Implement boxes CRUD operations (add, edit, update, delete, delete_multiple) - Create boxes route handlers with POST actions for all operations - Add boxes.html template with 3-panel layout matching warehouse locations module - Implement barcode generation and printing with JsBarcode and QZ Tray integration - Add browser print fallback for when QZ Tray is not available - Simplify create box form to single button with auto-generation - Fix JavaScript null reference errors with proper element validation - Convert tuple data to dictionaries for Jinja2 template compatibility - Register boxes blueprint in Flask app initialization
248 lines
7.8 KiB
Python
248 lines
7.8 KiB
Python
"""
|
|
Settings Module - App Statistics Helper
|
|
Provides functions to collect various app statistics for the overview
|
|
"""
|
|
import os
|
|
import pymysql
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from app.database import get_db
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_user_count():
|
|
"""Get total number of existing users"""
|
|
try:
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT COUNT(*) as count FROM users WHERE is_active = 1")
|
|
result = cursor.fetchone()
|
|
cursor.close()
|
|
return result[0] if result else 0
|
|
except Exception as e:
|
|
logger.error(f"Error getting user count: {e}")
|
|
return 0
|
|
|
|
|
|
def get_database_size():
|
|
"""Get size of the database in MB"""
|
|
try:
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
try:
|
|
# Get database name from connection
|
|
cursor.execute("SELECT DATABASE()")
|
|
result = cursor.fetchone()
|
|
if not result:
|
|
cursor.close()
|
|
return 0
|
|
|
|
db_name = result[0]
|
|
|
|
# Get database size
|
|
query = f"""
|
|
SELECT ROUND(SUM(data_length + index_length) / 1024 / 1024, 2)
|
|
FROM information_schema.tables
|
|
WHERE table_schema = %s
|
|
"""
|
|
cursor.execute(query, (db_name,))
|
|
result = cursor.fetchone()
|
|
cursor.close()
|
|
return result[0] if result and result[0] else 0
|
|
except Exception as e:
|
|
cursor.close()
|
|
logger.error(f"Error executing database size query: {e}")
|
|
return 0
|
|
except Exception as e:
|
|
logger.error(f"Error getting database size: {e}")
|
|
return 0
|
|
|
|
|
|
def get_logs_size():
|
|
"""Get total size of log files in MB"""
|
|
try:
|
|
log_dir = './data/logs'
|
|
if not os.path.exists(log_dir):
|
|
return 0
|
|
|
|
total_size = 0
|
|
for filename in os.listdir(log_dir):
|
|
filepath = os.path.join(log_dir, filename)
|
|
if os.path.isfile(filepath):
|
|
total_size += os.path.getsize(filepath)
|
|
|
|
return round(total_size / 1024 / 1024, 2)
|
|
except Exception as e:
|
|
logger.error(f"Error getting logs size: {e}")
|
|
return 0
|
|
|
|
|
|
def get_database_count():
|
|
"""Get number of existing databases (user accessible)"""
|
|
try:
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute("SHOW DATABASES")
|
|
result = cursor.fetchall()
|
|
cursor.close()
|
|
|
|
# Filter out system databases
|
|
if result:
|
|
excluded_dbs = {'information_schema', 'mysql', 'performance_schema', 'sys'}
|
|
user_dbs = [db for db in result if db[0] not in excluded_dbs]
|
|
return len(user_dbs)
|
|
return 0
|
|
except Exception as e:
|
|
cursor.close()
|
|
logger.error(f"Error executing show databases query: {e}")
|
|
return 0
|
|
except Exception as e:
|
|
logger.error(f"Error getting database count: {e}")
|
|
return 0
|
|
|
|
|
|
def get_backup_count():
|
|
"""Get number of scheduled backups for the database"""
|
|
try:
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if backups table exists
|
|
cursor.execute("""
|
|
SELECT COUNT(*) FROM information_schema.tables
|
|
WHERE table_schema = DATABASE() AND table_name = 'backup_schedules'
|
|
""")
|
|
|
|
if cursor.fetchone()[0] > 0:
|
|
cursor.execute("SELECT COUNT(*) FROM backup_schedules WHERE is_active = 1")
|
|
result = cursor.fetchone()
|
|
cursor.close()
|
|
return result[0] if result else 0
|
|
else:
|
|
cursor.close()
|
|
# Count backup files if no table exists
|
|
backup_dir = './data/backups'
|
|
if os.path.exists(backup_dir):
|
|
return len([f for f in os.listdir(backup_dir) if f.endswith('.sql')])
|
|
return 0
|
|
except Exception as e:
|
|
logger.error(f"Error getting backup count: {e}")
|
|
# Fallback to counting backup files
|
|
try:
|
|
backup_dir = './data/backups'
|
|
if os.path.exists(backup_dir):
|
|
return len([f for f in os.listdir(backup_dir) if f.endswith('.sql')])
|
|
except:
|
|
pass
|
|
return 0
|
|
|
|
|
|
def get_printer_keys_count():
|
|
"""Get number of keys for printers (pairing keys)"""
|
|
try:
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
# Check if qz_pairing_keys table exists
|
|
cursor.execute("""
|
|
SELECT COUNT(*) FROM information_schema.tables
|
|
WHERE table_schema = DATABASE() AND table_name = 'qz_pairing_keys'
|
|
""")
|
|
|
|
if cursor.fetchone()[0] > 0:
|
|
cursor.execute("SELECT COUNT(*) FROM qz_pairing_keys")
|
|
result = cursor.fetchone()
|
|
cursor.close()
|
|
return result[0] if result else 0
|
|
else:
|
|
cursor.close()
|
|
return 0
|
|
except Exception as e:
|
|
cursor.close()
|
|
logger.error(f"Error executing printer keys query: {e}")
|
|
return 0
|
|
except Exception as e:
|
|
logger.error(f"Error getting printer keys count: {e}")
|
|
return 0
|
|
|
|
|
|
def check_app_key_availability():
|
|
"""Check app key availability"""
|
|
try:
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
# Check if api_keys table exists
|
|
cursor.execute("""
|
|
SELECT COUNT(*) FROM information_schema.tables
|
|
WHERE table_schema = DATABASE() AND table_name = 'api_keys'
|
|
""")
|
|
|
|
if cursor.fetchone()[0] > 0:
|
|
cursor.execute("SELECT COUNT(*) FROM api_keys WHERE is_active = 1")
|
|
result = cursor.fetchone()
|
|
cursor.close()
|
|
count = result[0] if result else 0
|
|
return {
|
|
'available': count > 0,
|
|
'count': count,
|
|
'status': f'{count} active key(s)' if count > 0 else 'No active keys'
|
|
}
|
|
else:
|
|
cursor.close()
|
|
return {
|
|
'available': False,
|
|
'count': 0,
|
|
'status': 'API Keys table not found'
|
|
}
|
|
except Exception as e:
|
|
cursor.close()
|
|
logger.error(f"Error executing api_keys query: {e}")
|
|
return {
|
|
'available': False,
|
|
'count': 0,
|
|
'status': f'Error: {str(e)}'
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error checking app key availability: {e}")
|
|
return {
|
|
'available': False,
|
|
'count': 0,
|
|
'status': f'Error: {str(e)}'
|
|
}
|
|
|
|
|
|
def get_all_stats():
|
|
"""Get all statistics for the overview"""
|
|
try:
|
|
return {
|
|
'user_count': get_user_count(),
|
|
'database_size_mb': get_database_size(),
|
|
'logs_size_mb': get_logs_size(),
|
|
'database_count': get_database_count(),
|
|
'backup_count': get_backup_count(),
|
|
'printer_keys_count': get_printer_keys_count(),
|
|
'app_key_availability': check_app_key_availability()
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting all stats: {e}")
|
|
# Return defaults on error
|
|
return {
|
|
'user_count': 0,
|
|
'database_size_mb': 0,
|
|
'logs_size_mb': 0,
|
|
'database_count': 0,
|
|
'backup_count': 0,
|
|
'printer_keys_count': 0,
|
|
'app_key_availability': {
|
|
'available': False,
|
|
'count': 0,
|
|
'status': 'Error loading data'
|
|
}
|
|
}
|