- Add boxes_crates database table with BIGINT IDs and 8-digit auto-numbered box_numbers - Implement boxes CRUD operations (add, edit, update, delete, delete_multiple) - Create boxes route handlers with POST actions for all operations - Add boxes.html template with 3-panel layout matching warehouse locations module - Implement barcode generation and printing with JsBarcode and QZ Tray integration - Add browser print fallback for when QZ Tray is not available - Simplify create box form to single button with auto-generation - Fix JavaScript null reference errors with proper element validation - Convert tuple data to dictionaries for Jinja2 template compatibility - Register boxes blueprint in Flask app initialization
1375 lines
47 KiB
Python
1375 lines
47 KiB
Python
"""
|
|
Settings Module Routes
|
|
"""
|
|
from flask import Blueprint, render_template, session, redirect, url_for, request, flash, jsonify, send_file
|
|
import pymysql
|
|
import hashlib
|
|
import secrets
|
|
from datetime import datetime, timedelta
|
|
from app.database import get_db
|
|
from app.modules.settings.stats import get_all_stats
|
|
from app.modules.settings.logs import get_log_files, get_log_content, get_log_file_path, get_log_statistics, search_in_logs
|
|
import subprocess
|
|
import os
|
|
import json
|
|
from pathlib import Path
|
|
import io
|
|
import logging
|
|
|
|
settings_bp = Blueprint('settings', __name__, url_prefix='/settings')
|
|
|
|
|
|
@settings_bp.route('/', methods=['GET'])
|
|
def settings_index():
|
|
"""Settings module main page with app overview"""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('main.login'))
|
|
|
|
# Get all app statistics
|
|
try:
|
|
stats = get_all_stats()
|
|
except Exception as e:
|
|
logger.error(f"Error getting stats in settings_index: {e}", exc_info=True)
|
|
stats = {
|
|
'user_count': 0,
|
|
'database_size_mb': 0,
|
|
'logs_size_mb': 0,
|
|
'database_count': 0,
|
|
'backup_count': 0,
|
|
'printer_keys_count': 0,
|
|
'app_key_availability': {
|
|
'available': False,
|
|
'count': 0,
|
|
'status': 'Error loading data'
|
|
}
|
|
}
|
|
|
|
return render_template('modules/settings/index.html', stats=stats)
|
|
|
|
|
|
@settings_bp.route('/general', methods=['GET', 'POST'])
|
|
def general_settings():
|
|
"""General application settings"""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('main.login'))
|
|
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
|
|
if request.method == 'GET':
|
|
# Get current settings
|
|
cursor.execute("""
|
|
SELECT setting_key, setting_value FROM application_settings
|
|
WHERE setting_key IN ('app_name', 'app_version', 'session_timeout')
|
|
""")
|
|
settings = {row[0]: row[1] for row in cursor.fetchall()}
|
|
cursor.close()
|
|
|
|
return render_template('modules/settings/general.html',
|
|
app_name=settings.get('app_name', 'Quality App v2'),
|
|
app_version=settings.get('app_version', '2.0.0'),
|
|
session_timeout=settings.get('session_timeout', '480'))
|
|
|
|
# Handle POST - Save settings
|
|
try:
|
|
app_name = request.form.get('app_name', 'Quality App v2').strip()
|
|
session_timeout = request.form.get('session_timeout', '480').strip()
|
|
|
|
# Validate
|
|
if not app_name:
|
|
app_name = 'Quality App v2'
|
|
try:
|
|
timeout = int(session_timeout)
|
|
if timeout < 1:
|
|
timeout = 480
|
|
except:
|
|
timeout = 480
|
|
|
|
# Save to database
|
|
cursor = conn.cursor()
|
|
|
|
# Insert or update app_name
|
|
cursor.execute("""
|
|
INSERT INTO application_settings (setting_key, setting_value, setting_type)
|
|
VALUES (%s, %s, %s)
|
|
ON DUPLICATE KEY UPDATE setting_value = %s
|
|
""", ('app_name', app_name, 'string', app_name))
|
|
|
|
# Insert or update session_timeout
|
|
cursor.execute("""
|
|
INSERT INTO application_settings (setting_key, setting_value, setting_type)
|
|
VALUES (%s, %s, %s)
|
|
ON DUPLICATE KEY UPDATE setting_value = %s
|
|
""", ('session_timeout', str(timeout), 'integer', str(timeout)))
|
|
|
|
conn.commit()
|
|
cursor.close()
|
|
|
|
return render_template('modules/settings/general.html',
|
|
app_name=app_name,
|
|
app_version='2.0.0',
|
|
session_timeout=timeout,
|
|
success='Settings saved successfully!')
|
|
|
|
except Exception as e:
|
|
cursor.close()
|
|
return render_template('modules/settings/general.html',
|
|
app_name='Quality App v2',
|
|
app_version='2.0.0',
|
|
session_timeout='480',
|
|
error=f'Error saving settings: {str(e)}')
|
|
|
|
|
|
@settings_bp.route('/users', methods=['GET'])
|
|
def user_management():
|
|
"""User management settings"""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('main.login'))
|
|
|
|
# Get all users
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT id, username, email, full_name, role, is_active FROM users ORDER BY username")
|
|
users = []
|
|
for row in cursor.fetchall():
|
|
users.append({
|
|
'id': row[0],
|
|
'username': row[1],
|
|
'email': row[2],
|
|
'full_name': row[3],
|
|
'role': row[4],
|
|
'is_active': row[5]
|
|
})
|
|
cursor.close()
|
|
|
|
return render_template('modules/settings/users.html', users=users)
|
|
|
|
|
|
@settings_bp.route('/database', methods=['GET'])
|
|
def database_settings():
|
|
"""Database configuration settings"""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('main.login'))
|
|
|
|
return render_template('modules/settings/database.html')
|
|
|
|
|
|
@settings_bp.route('/users/create', methods=['GET', 'POST'])
|
|
def create_user():
|
|
"""Create a new user"""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('main.login'))
|
|
|
|
if request.method == 'GET':
|
|
# Get available roles and modules
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT id, name, level, description FROM roles ORDER BY level DESC")
|
|
roles = [{'id': r[0], 'name': r[1], 'level': r[2], 'description': r[3]} for r in cursor.fetchall()]
|
|
cursor.close()
|
|
|
|
available_modules = ['quality', 'settings']
|
|
|
|
return render_template('modules/settings/user_form.html',
|
|
user=None,
|
|
roles=roles,
|
|
available_modules=available_modules,
|
|
user_modules=[])
|
|
|
|
# Handle POST - Create user
|
|
try:
|
|
username = request.form.get('username', '').strip()
|
|
email = request.form.get('email', '').strip()
|
|
full_name = request.form.get('full_name', '').strip()
|
|
role = request.form.get('role', '').strip()
|
|
password = request.form.get('password', '').strip()
|
|
confirm_password = request.form.get('confirm_password', '').strip()
|
|
is_active = request.form.get('is_active') == 'on'
|
|
modules = request.form.getlist('modules')
|
|
|
|
# Validation
|
|
errors = []
|
|
|
|
if not username:
|
|
errors.append("Username is required")
|
|
if not full_name:
|
|
errors.append("Full name is required")
|
|
if not role:
|
|
errors.append("Role is required")
|
|
if not password:
|
|
errors.append("Password is required")
|
|
if password != confirm_password:
|
|
errors.append("Passwords do not match")
|
|
if len(password) < 8:
|
|
errors.append("Password must be at least 8 characters")
|
|
if not modules:
|
|
errors.append("Select at least one module")
|
|
|
|
if errors:
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT id, name, level, description FROM roles ORDER BY level DESC")
|
|
roles = [{'id': r[0], 'name': r[1], 'level': r[2], 'description': r[3]} for r in cursor.fetchall()]
|
|
cursor.close()
|
|
|
|
return render_template('modules/settings/user_form.html',
|
|
user=None,
|
|
roles=roles,
|
|
available_modules=['quality', 'settings'],
|
|
user_modules=[],
|
|
error="; ".join(errors))
|
|
|
|
# Create user
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if username already exists
|
|
cursor.execute("SELECT id FROM users WHERE username = %s", (username,))
|
|
if cursor.fetchone():
|
|
cursor.close()
|
|
return render_template('modules/settings/user_form.html',
|
|
user=None,
|
|
roles=[],
|
|
available_modules=['quality', 'settings'],
|
|
user_modules=[],
|
|
error="Username already exists")
|
|
|
|
# Insert user
|
|
cursor.execute(
|
|
"INSERT INTO users (username, email, full_name, role, is_active) VALUES (%s, %s, %s, %s, %s)",
|
|
(username, email, full_name, role, is_active)
|
|
)
|
|
|
|
# Get user ID
|
|
cursor.execute("SELECT id FROM users WHERE username = %s", (username,))
|
|
user_id = cursor.fetchone()[0]
|
|
|
|
# Insert password
|
|
password_hash = hashlib.sha256(password.encode()).hexdigest()
|
|
cursor.execute(
|
|
"INSERT INTO user_credentials (user_id, password_hash) VALUES (%s, %s)",
|
|
(user_id, password_hash)
|
|
)
|
|
|
|
# Grant module access
|
|
for module in modules:
|
|
cursor.execute(
|
|
"INSERT IGNORE INTO user_modules (user_id, module_name) VALUES (%s, %s)",
|
|
(user_id, module)
|
|
)
|
|
|
|
conn.commit()
|
|
cursor.close()
|
|
|
|
return redirect(url_for('settings.user_management'))
|
|
|
|
except Exception as e:
|
|
return render_template('modules/settings/user_form.html',
|
|
user=None,
|
|
roles=[],
|
|
available_modules=['quality', 'settings'],
|
|
user_modules=[],
|
|
error=f"Error creating user: {str(e)}")
|
|
|
|
|
|
@settings_bp.route('/users/<int:user_id>/edit', methods=['GET', 'POST'])
|
|
def edit_user(user_id):
|
|
"""Edit an existing user"""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('main.login'))
|
|
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
|
|
# Get user
|
|
cursor.execute("""
|
|
SELECT id, username, email, full_name, role, is_active
|
|
FROM users WHERE id = %s
|
|
""", (user_id,))
|
|
|
|
user_row = cursor.fetchone()
|
|
if not user_row:
|
|
cursor.close()
|
|
return redirect(url_for('settings.user_management'))
|
|
|
|
user = {
|
|
'id': user_row[0],
|
|
'username': user_row[1],
|
|
'email': user_row[2],
|
|
'full_name': user_row[3],
|
|
'role': user_row[4],
|
|
'is_active': user_row[5]
|
|
}
|
|
|
|
# Get user modules
|
|
cursor.execute("SELECT module_name FROM user_modules WHERE user_id = %s", (user_id,))
|
|
user_modules = [row[0] for row in cursor.fetchall()]
|
|
|
|
# Get all roles
|
|
cursor.execute("SELECT id, name, level, description FROM roles ORDER BY level DESC")
|
|
roles = [{'id': r[0], 'name': r[1], 'level': r[2], 'description': r[3]} for r in cursor.fetchall()]
|
|
|
|
if request.method == 'GET':
|
|
cursor.close()
|
|
available_modules = ['quality', 'settings']
|
|
return render_template('modules/settings/user_form.html',
|
|
user=user,
|
|
roles=roles,
|
|
available_modules=available_modules,
|
|
user_modules=user_modules)
|
|
|
|
# Handle POST - Update user
|
|
try:
|
|
email = request.form.get('email', '').strip()
|
|
full_name = request.form.get('full_name', '').strip()
|
|
role = request.form.get('role', '').strip()
|
|
password = request.form.get('password', '').strip()
|
|
confirm_password = request.form.get('confirm_password', '').strip()
|
|
is_active = request.form.get('is_active') == 'on'
|
|
modules = request.form.getlist('modules')
|
|
|
|
# Validation
|
|
errors = []
|
|
|
|
if not full_name:
|
|
errors.append("Full name is required")
|
|
if not role:
|
|
errors.append("Role is required")
|
|
if password and password != confirm_password:
|
|
errors.append("Passwords do not match")
|
|
if password and len(password) < 8:
|
|
errors.append("Password must be at least 8 characters")
|
|
if not modules:
|
|
errors.append("Select at least one module")
|
|
|
|
if errors:
|
|
cursor.close()
|
|
available_modules = ['quality', 'settings']
|
|
return render_template('modules/settings/user_form.html',
|
|
user=user,
|
|
roles=roles,
|
|
available_modules=available_modules,
|
|
user_modules=user_modules,
|
|
error="; ".join(errors))
|
|
|
|
# Update user
|
|
cursor.execute(
|
|
"UPDATE users SET email = %s, full_name = %s, role = %s, is_active = %s WHERE id = %s",
|
|
(email, full_name, role, is_active, user_id)
|
|
)
|
|
|
|
# Update password if provided
|
|
if password:
|
|
password_hash = hashlib.sha256(password.encode()).hexdigest()
|
|
cursor.execute(
|
|
"UPDATE user_credentials SET password_hash = %s WHERE user_id = %s",
|
|
(password_hash, user_id)
|
|
)
|
|
|
|
# Update module access
|
|
cursor.execute("DELETE FROM user_modules WHERE user_id = %s", (user_id,))
|
|
for module in modules:
|
|
cursor.execute(
|
|
"INSERT INTO user_modules (user_id, module_name) VALUES (%s, %s)",
|
|
(user_id, module)
|
|
)
|
|
|
|
conn.commit()
|
|
cursor.close()
|
|
|
|
return redirect(url_for('settings.user_management'))
|
|
|
|
except Exception as e:
|
|
cursor.close()
|
|
available_modules = ['quality', 'settings']
|
|
return render_template('modules/settings/user_form.html',
|
|
user=user,
|
|
roles=roles,
|
|
available_modules=available_modules,
|
|
user_modules=user_modules,
|
|
error=f"Error updating user: {str(e)}")
|
|
|
|
|
|
@settings_bp.route('/users/<int:user_id>/delete', methods=['POST'])
|
|
def delete_user(user_id):
|
|
"""Delete a user"""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('main.login'))
|
|
|
|
try:
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
|
|
conn.commit()
|
|
cursor.close()
|
|
except Exception as e:
|
|
pass
|
|
|
|
return redirect(url_for('settings.user_management'))
|
|
|
|
|
|
@settings_bp.route('/app-keys', methods=['GET'])
|
|
def app_keys():
|
|
"""Manage app keys and printer pairing keys"""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('main.login'))
|
|
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
|
|
# Get pairing keys
|
|
cursor.execute("""
|
|
SELECT id, printer_name, pairing_key, valid_until
|
|
FROM qz_pairing_keys
|
|
ORDER BY created_at DESC
|
|
""")
|
|
|
|
pairing_keys = []
|
|
for row in cursor.fetchall():
|
|
valid_until = row[3]
|
|
days_remaining = (valid_until - datetime.now().date()).days
|
|
pairing_keys.append({
|
|
'id': row[0],
|
|
'printer_name': row[1],
|
|
'pairing_key': row[2],
|
|
'valid_until': valid_until.strftime('%Y-%m-%d') if valid_until else 'N/A',
|
|
'days_remaining': max(0, days_remaining)
|
|
})
|
|
|
|
# Get API keys
|
|
cursor.execute("""
|
|
SELECT id, key_name, key_type, api_key, created_at
|
|
FROM api_keys
|
|
WHERE is_active = 1
|
|
ORDER BY created_at DESC
|
|
""")
|
|
|
|
api_keys = []
|
|
for row in cursor.fetchall():
|
|
api_keys.append({
|
|
'id': row[0],
|
|
'key_name': row[1],
|
|
'key_type': row[2],
|
|
'api_key': row[3],
|
|
'created_at': row[4]
|
|
})
|
|
|
|
cursor.close()
|
|
|
|
return render_template('modules/settings/app_keys.html',
|
|
pairing_keys=pairing_keys,
|
|
api_keys=api_keys)
|
|
|
|
|
|
@settings_bp.route('/app-keys/pairing/generate', methods=['POST'])
|
|
def generate_pairing_key():
|
|
"""Generate a new QZ Tray pairing key"""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('main.login'))
|
|
|
|
try:
|
|
printer_name = request.form.get('printer_name', '').strip()
|
|
validity_days = int(request.form.get('validity_days', '90'))
|
|
|
|
if not printer_name:
|
|
return redirect_with_error('app_keys', 'Printer name is required')
|
|
|
|
# Generate secure pairing key
|
|
pairing_key = secrets.token_urlsafe(32)
|
|
valid_until = (datetime.now() + timedelta(days=validity_days)).date()
|
|
|
|
# Save to database
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
INSERT INTO qz_pairing_keys (printer_name, pairing_key, valid_until)
|
|
VALUES (%s, %s, %s)
|
|
""", (printer_name, pairing_key, valid_until))
|
|
conn.commit()
|
|
cursor.close()
|
|
|
|
return redirect(url_for('settings.app_keys'))
|
|
|
|
except Exception as e:
|
|
return redirect_with_error('app_keys', f'Error generating pairing key: {str(e)}')
|
|
|
|
|
|
@settings_bp.route('/app-keys/pairing/<int:key_id>/delete', methods=['POST'])
|
|
def delete_pairing_key(key_id):
|
|
"""Delete a pairing key"""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('main.login'))
|
|
|
|
try:
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM qz_pairing_keys WHERE id = %s", (key_id,))
|
|
conn.commit()
|
|
cursor.close()
|
|
except Exception as e:
|
|
pass
|
|
|
|
return redirect(url_for('settings.app_keys'))
|
|
|
|
|
|
@settings_bp.route('/app-keys/api/generate', methods=['POST'])
|
|
def generate_api_key():
|
|
"""Generate a new API key"""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('main.login'))
|
|
|
|
try:
|
|
key_name = request.form.get('key_name', '').strip()
|
|
key_type = request.form.get('key_type', 'app_key').strip()
|
|
|
|
if not key_name:
|
|
return redirect_with_error('app_keys', 'Key name is required')
|
|
|
|
# Generate secure API key
|
|
api_key = secrets.token_urlsafe(32)
|
|
|
|
# Save to database
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
INSERT INTO api_keys (key_name, key_type, api_key, is_active)
|
|
VALUES (%s, %s, %s, 1)
|
|
""", (key_name, key_type, api_key))
|
|
conn.commit()
|
|
cursor.close()
|
|
|
|
return redirect(url_for('settings.app_keys'))
|
|
|
|
except Exception as e:
|
|
return redirect_with_error('app_keys', f'Error generating API key: {str(e)}')
|
|
|
|
|
|
@settings_bp.route('/app-keys/api/<int:key_id>/delete', methods=['POST'])
|
|
def delete_api_key(key_id):
|
|
"""Delete an API key"""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('main.login'))
|
|
|
|
try:
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM api_keys WHERE id = %s", (key_id,))
|
|
conn.commit()
|
|
cursor.close()
|
|
except Exception as e:
|
|
pass
|
|
|
|
return redirect(url_for('settings.app_keys'))
|
|
|
|
|
|
def redirect_with_error(endpoint, error_message):
|
|
"""Helper function to redirect with error message"""
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
|
|
# Get pairing keys
|
|
cursor.execute("""
|
|
SELECT id, printer_name, pairing_key, valid_until
|
|
FROM qz_pairing_keys
|
|
ORDER BY created_at DESC
|
|
""")
|
|
|
|
pairing_keys = []
|
|
for row in cursor.fetchall():
|
|
valid_until = row[3]
|
|
days_remaining = (valid_until - datetime.now().date()).days
|
|
pairing_keys.append({
|
|
'id': row[0],
|
|
'printer_name': row[1],
|
|
'pairing_key': row[2],
|
|
'valid_until': valid_until.strftime('%Y-%m-%d') if valid_until else 'N/A',
|
|
'days_remaining': max(0, days_remaining)
|
|
})
|
|
|
|
# Get API keys
|
|
cursor.execute("""
|
|
SELECT id, key_name, key_type, api_key, created_at
|
|
FROM api_keys
|
|
WHERE is_active = 1
|
|
ORDER BY created_at DESC
|
|
""")
|
|
|
|
api_keys = []
|
|
for row in cursor.fetchall():
|
|
api_keys.append({
|
|
'id': row[0],
|
|
'key_name': row[1],
|
|
'key_type': row[2],
|
|
'api_key': row[3],
|
|
'created_at': row[4]
|
|
})
|
|
|
|
cursor.close()
|
|
|
|
return render_template('modules/settings/app_keys.html',
|
|
pairing_keys=pairing_keys,
|
|
api_keys=api_keys,
|
|
error=error_message)
|
|
|
|
|
|
# Database Management Routes
|
|
|
|
@settings_bp.route('/database-management', methods=['GET'])
|
|
def database_management():
|
|
"""Database management page"""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('main.login'))
|
|
|
|
try:
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
|
|
# Get list of tables
|
|
cursor.execute("""
|
|
SELECT TABLE_NAME, TABLE_ROWS
|
|
FROM information_schema.TABLES
|
|
WHERE TABLE_SCHEMA = DATABASE()
|
|
ORDER BY TABLE_NAME
|
|
""")
|
|
|
|
tables = [{'name': row[0], 'rows': row[1]} for row in cursor.fetchall()]
|
|
|
|
# Get backup retention settings
|
|
cursor.execute("""
|
|
SELECT setting_key, setting_value FROM application_settings
|
|
WHERE setting_key IN ('backup_retention_days', 'backup_auto_cleanup')
|
|
""")
|
|
|
|
retention_settings = {row[0]: row[1] for row in cursor.fetchall()}
|
|
backup_retention_days = retention_settings.get('backup_retention_days', '30')
|
|
auto_cleanup = retention_settings.get('backup_auto_cleanup') == '1'
|
|
|
|
cursor.close()
|
|
|
|
return render_template('modules/settings/database_management.html',
|
|
tables=tables,
|
|
backup_retention_days=backup_retention_days,
|
|
auto_cleanup=auto_cleanup)
|
|
except Exception as e:
|
|
flash(f'Error loading database management: {str(e)}', 'error')
|
|
return redirect(url_for('settings.settings_index'))
|
|
|
|
|
|
@settings_bp.route('/api/backups', methods=['GET'])
|
|
def get_backups_list():
|
|
"""Get list of available backups (JSON API)"""
|
|
if 'user_id' not in session:
|
|
return jsonify({'error': 'Unauthorized'}), 401
|
|
|
|
try:
|
|
backups_dir = '/app/data/backups'
|
|
if not os.path.exists(backups_dir):
|
|
os.makedirs(backups_dir)
|
|
|
|
backups = []
|
|
for filename in sorted(os.listdir(backups_dir), reverse=True)[:10]:
|
|
filepath = os.path.join(backups_dir, filename)
|
|
if os.path.isfile(filepath):
|
|
size = os.path.getsize(filepath)
|
|
size_mb = size / (1024 * 1024)
|
|
|
|
# Determine backup type
|
|
backup_type = 'data' if 'data-only' in filename else 'full'
|
|
|
|
backups.append({
|
|
'name': filename,
|
|
'size': f'{size_mb:.2f} MB',
|
|
'date': datetime.fromtimestamp(os.path.getmtime(filepath)).strftime('%Y-%m-%d %H:%M:%S'),
|
|
'type': backup_type
|
|
})
|
|
|
|
# Get database size
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT ROUND(SUM(data_length + index_length) / 1024 / 1024, 2)
|
|
FROM information_schema.TABLES
|
|
WHERE TABLE_SCHEMA = DATABASE()
|
|
""")
|
|
db_size = cursor.fetchone()[0]
|
|
cursor.close()
|
|
|
|
return jsonify({
|
|
'backups': backups,
|
|
'db_size': f'{db_size} MB' if db_size else 'Unknown'
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@settings_bp.route('/api/backup', methods=['POST'])
|
|
def create_backup():
|
|
"""Create a new database backup"""
|
|
if 'user_id' not in session:
|
|
return jsonify({'error': 'Unauthorized'}), 401
|
|
|
|
try:
|
|
backup_type = request.form.get('backup_type', 'full')
|
|
backup_name = request.form.get('backup_name', '').strip()
|
|
|
|
backups_dir = '/app/data/backups'
|
|
if not os.path.exists(backups_dir):
|
|
os.makedirs(backups_dir)
|
|
|
|
# Generate filename
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
if backup_name:
|
|
filename = f"{backup_name}_{timestamp}.sql"
|
|
else:
|
|
filename = f"backup_{timestamp}.sql" if backup_type == 'full' else f"backup_data-only_{timestamp}.sql"
|
|
|
|
filepath = os.path.join(backups_dir, filename)
|
|
|
|
# Get database credentials from environment
|
|
db_host = os.getenv('DB_HOST', 'mariadb')
|
|
db_user = os.getenv('DB_USER', 'quality_user')
|
|
db_password = os.getenv('DB_PASSWORD', 'quality_pass')
|
|
db_name = os.getenv('DB_NAME', 'quality_db')
|
|
|
|
# Use mysqldump to create backup
|
|
cmd = [
|
|
'mysqldump',
|
|
'-h', db_host,
|
|
'-u', db_user,
|
|
f'-p{db_password}',
|
|
'--skip-ssl',
|
|
db_name
|
|
]
|
|
|
|
if backup_type == 'data':
|
|
cmd.append('--no-create-info')
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.info(f"Creating backup with command: {' '.join(cmd[:4])}...")
|
|
|
|
with open(filepath, 'w') as f:
|
|
result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True)
|
|
|
|
# Check if backup was successful
|
|
if result.returncode != 0:
|
|
error_msg = result.stderr if result.stderr else f"mysqldump returned code {result.returncode}"
|
|
logger.error(f"Backup failed: {error_msg}")
|
|
# Delete empty file
|
|
if os.path.exists(filepath):
|
|
os.remove(filepath)
|
|
return jsonify({'error': f'Backup failed: {error_msg}'}), 500
|
|
|
|
# Check file size
|
|
file_size = os.path.getsize(filepath)
|
|
logger.info(f"Backup created successfully: {filename} ({file_size} bytes)")
|
|
|
|
if file_size == 0:
|
|
logger.warning(f"Warning: Backup file is empty: {filename}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Backup file is empty - the database may not have any data or mysqldump failed silently'
|
|
}), 500
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Backup created successfully',
|
|
'file': filename,
|
|
'size': file_size
|
|
})
|
|
|
|
except Exception as e:
|
|
logger = logging.getLogger(__name__)
|
|
logger.error(f"Exception in create_backup: {str(e)}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@settings_bp.route('/api/backup/download', methods=['GET'])
|
|
def download_backup():
|
|
"""Download a backup file"""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('main.login'))
|
|
|
|
try:
|
|
filename = request.args.get('file')
|
|
if not filename:
|
|
flash('Invalid backup file', 'error')
|
|
return redirect(url_for('settings.database_management'))
|
|
|
|
filepath = os.path.join('/app/data/backups', filename)
|
|
|
|
# Security: ensure we're not accessing files outside backups directory
|
|
if not os.path.abspath(filepath).startswith('/app/data/backups'):
|
|
flash('Invalid backup file', 'error')
|
|
return redirect(url_for('settings.database_management'))
|
|
|
|
if not os.path.exists(filepath):
|
|
flash('Backup file not found', 'error')
|
|
return redirect(url_for('settings.database_management'))
|
|
|
|
return send_file(filepath, as_attachment=True, download_name=filename)
|
|
|
|
except Exception as e:
|
|
flash(f'Error downloading backup: {str(e)}', 'error')
|
|
return redirect(url_for('settings.database_management'))
|
|
|
|
|
|
@settings_bp.route('/api/database/restore', methods=['POST'])
|
|
def restore_database():
|
|
"""Restore database from backup"""
|
|
if 'user_id' not in session:
|
|
return jsonify({'error': 'Unauthorized'}), 401
|
|
|
|
try:
|
|
data = request.get_json()
|
|
backup_file = data.get('backup')
|
|
|
|
if not backup_file:
|
|
return jsonify({'error': 'Backup file not specified'}), 400
|
|
|
|
filepath = os.path.join('/app/data/backups', backup_file)
|
|
|
|
# Security check
|
|
if not os.path.abspath(filepath).startswith('/app/data/backups'):
|
|
return jsonify({'error': 'Invalid backup file'}), 400
|
|
|
|
if not os.path.exists(filepath):
|
|
return jsonify({'error': 'Backup file not found'}), 404
|
|
|
|
# Restore from backup
|
|
with open(filepath, 'r') as f:
|
|
sql_content = f.read()
|
|
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
|
|
# Execute SQL statements from backup
|
|
for statement in sql_content.split(';'):
|
|
statement = statement.strip()
|
|
if statement:
|
|
try:
|
|
cursor.execute(statement)
|
|
except Exception as e:
|
|
# Some statements might have errors, continue
|
|
pass
|
|
|
|
conn.commit()
|
|
cursor.close()
|
|
|
|
return jsonify({'success': True, 'message': 'Database restored successfully'})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@settings_bp.route('/api/database/truncate', methods=['POST'])
|
|
def truncate_table():
|
|
"""Truncate (clear) a database table"""
|
|
if 'user_id' not in session:
|
|
return jsonify({'error': 'Unauthorized'}), 401
|
|
|
|
try:
|
|
data = request.get_json()
|
|
table = data.get('table')
|
|
|
|
if not table:
|
|
return jsonify({'error': 'Table not specified'}), 400
|
|
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
|
|
# Validate table name (basic security)
|
|
cursor.execute("""
|
|
SELECT TABLE_NAME FROM information_schema.TABLES
|
|
WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s
|
|
""", (table,))
|
|
|
|
if not cursor.fetchone():
|
|
cursor.close()
|
|
return jsonify({'error': 'Table not found'}), 404
|
|
|
|
# Truncate the table
|
|
cursor.execute(f'TRUNCATE TABLE {table}')
|
|
conn.commit()
|
|
cursor.close()
|
|
|
|
return jsonify({'success': True, 'message': f'Table {table} cleared successfully'})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@settings_bp.route('/api/database/import', methods=['POST'])
|
|
def import_database():
|
|
"""Import database from uploaded SQL file - legacy route, redirects to upload"""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('main.login'))
|
|
|
|
flash('Please use the Upload Backup File section instead', 'info')
|
|
return redirect(url_for('settings.database_management'))
|
|
|
|
|
|
@settings_bp.route('/api/backup/upload', methods=['POST'])
|
|
def upload_backup_file():
|
|
"""Upload a backup SQL file to the backups directory"""
|
|
if 'user_id' not in session:
|
|
return jsonify({'error': 'Unauthorized'}), 401
|
|
|
|
try:
|
|
if 'file' not in request.files:
|
|
return jsonify({'error': 'No file provided'}), 400
|
|
|
|
file = request.files['file']
|
|
|
|
if file.filename == '' or not file.filename.lower().endswith('.sql'):
|
|
return jsonify({'error': 'Invalid file. Please upload a .sql file'}), 400
|
|
|
|
# Ensure filename is safe
|
|
import time
|
|
from werkzeug.utils import secure_filename
|
|
|
|
# Use secure filename and add timestamp to avoid conflicts
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
safe_name = secure_filename(file.filename)
|
|
if not safe_name:
|
|
safe_name = 'backup.sql'
|
|
|
|
# Insert timestamp before .sql extension
|
|
name_parts = safe_name.rsplit('.', 1)
|
|
if len(name_parts) == 2:
|
|
filename = f"{name_parts[0]}_uploaded_{timestamp}.{name_parts[1]}"
|
|
else:
|
|
filename = f"{safe_name}_uploaded_{timestamp}.sql"
|
|
|
|
# Save file to backups directory
|
|
backups_dir = '/app/data/backups'
|
|
if not os.path.exists(backups_dir):
|
|
os.makedirs(backups_dir)
|
|
|
|
filepath = os.path.join(backups_dir, filename)
|
|
|
|
# Save the file
|
|
file.save(filepath)
|
|
|
|
# Get file size
|
|
file_size = os.path.getsize(filepath)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.info(f"Backup file uploaded: {filename} ({file_size} bytes)")
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Backup file uploaded successfully',
|
|
'filename': filename,
|
|
'size': file_size
|
|
})
|
|
|
|
except Exception as e:
|
|
logger = logging.getLogger(__name__)
|
|
logger.error(f"Error uploading backup file: {str(e)}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
except Exception as e:
|
|
flash(f'Error importing database: {str(e)}', 'error')
|
|
return redirect(url_for('settings.database_management'))
|
|
|
|
|
|
# Backup Retention Routes
|
|
|
|
@settings_bp.route('/api/backup-retention/save', methods=['POST'])
|
|
def save_backup_retention():
|
|
"""Save backup retention settings"""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('main.login'))
|
|
|
|
try:
|
|
retention_days = request.form.get('retention_days', '30').strip()
|
|
auto_cleanup = request.form.get('auto_cleanup') == '1'
|
|
|
|
# Validate
|
|
try:
|
|
retention_days = int(retention_days)
|
|
if retention_days < 1:
|
|
retention_days = 30
|
|
if retention_days > 365:
|
|
retention_days = 365
|
|
except:
|
|
retention_days = 30
|
|
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
|
|
# Save retention days
|
|
cursor.execute("""
|
|
INSERT INTO application_settings (setting_key, setting_value, setting_type)
|
|
VALUES (%s, %s, %s)
|
|
ON DUPLICATE KEY UPDATE setting_value = %s
|
|
""", ('backup_retention_days', str(retention_days), 'integer', str(retention_days)))
|
|
|
|
# Save auto cleanup setting
|
|
cursor.execute("""
|
|
INSERT INTO application_settings (setting_key, setting_value, setting_type)
|
|
VALUES (%s, %s, %s)
|
|
ON DUPLICATE KEY UPDATE setting_value = %s
|
|
""", ('backup_auto_cleanup', '1' if auto_cleanup else '0', 'boolean', '1' if auto_cleanup else '0'))
|
|
|
|
conn.commit()
|
|
cursor.close()
|
|
|
|
flash(f'Backup retention policy saved! Backups will be kept for {retention_days} days.', 'success')
|
|
return redirect(url_for('settings.database_management'))
|
|
|
|
except Exception as e:
|
|
flash(f'Error saving retention policy: {str(e)}', 'error')
|
|
return redirect(url_for('settings.database_management'))
|
|
|
|
|
|
@settings_bp.route('/api/backups/cleanup', methods=['POST'])
|
|
def cleanup_old_backups():
|
|
"""Clean up old backups based on retention policy"""
|
|
if 'user_id' not in session:
|
|
return jsonify({'error': 'Unauthorized'}), 401
|
|
|
|
try:
|
|
# Get retention days from database
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT setting_value FROM application_settings
|
|
WHERE setting_key = 'backup_retention_days'
|
|
""")
|
|
result = cursor.fetchone()
|
|
cursor.close()
|
|
|
|
retention_days = int(result[0]) if result else 30
|
|
|
|
# Calculate cutoff date
|
|
cutoff_date = datetime.now() - timedelta(days=retention_days)
|
|
|
|
# Get backups directory
|
|
backups_dir = '/app/data/backups'
|
|
if not os.path.exists(backups_dir):
|
|
return jsonify({'error': 'Backups directory not found'}), 404
|
|
|
|
# Delete old backups
|
|
deleted_count = 0
|
|
for filename in os.listdir(backups_dir):
|
|
filepath = os.path.join(backups_dir, filename)
|
|
if os.path.isfile(filepath):
|
|
file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
|
|
if file_mtime < cutoff_date:
|
|
try:
|
|
os.remove(filepath)
|
|
deleted_count += 1
|
|
except Exception as e:
|
|
pass
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'deleted_count': deleted_count,
|
|
'message': f'{deleted_count} old backups deleted'
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@settings_bp.route('/api/backup-schedules', methods=['GET'])
|
|
def get_backup_schedules():
|
|
"""Get list of all backup schedules"""
|
|
if 'user_id' not in session:
|
|
return jsonify({'error': 'Unauthorized'}), 401
|
|
|
|
try:
|
|
conn = get_db()
|
|
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
|
cursor.execute("""
|
|
SELECT id, schedule_name, frequency, day_of_week, time_of_day,
|
|
backup_type, is_active, last_run, next_run, created_at
|
|
FROM backup_schedules
|
|
ORDER BY created_at DESC
|
|
""")
|
|
schedules = cursor.fetchall()
|
|
cursor.close()
|
|
|
|
# Convert datetime objects to strings for JSON serialization
|
|
for schedule in schedules:
|
|
# Convert time_of_day to string if it's a time object
|
|
if schedule['time_of_day']:
|
|
schedule['time_of_day'] = str(schedule['time_of_day'])
|
|
|
|
# Convert datetime fields
|
|
if schedule['last_run']:
|
|
schedule['last_run'] = schedule['last_run'].isoformat()
|
|
if schedule['next_run']:
|
|
schedule['next_run'] = schedule['next_run'].isoformat()
|
|
if schedule['created_at']:
|
|
schedule['created_at'] = schedule['created_at'].isoformat()
|
|
|
|
return jsonify({'schedules': schedules})
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
logger = logging.getLogger(__name__)
|
|
logger.error(f"Error in get_backup_schedules: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@settings_bp.route('/api/backup-schedules/save', methods=['POST'])
|
|
def save_backup_schedule():
|
|
"""Create or update a backup schedule"""
|
|
if 'user_id' not in session:
|
|
return jsonify({'error': 'Unauthorized'}), 401
|
|
|
|
try:
|
|
data = request.get_json()
|
|
|
|
schedule_name = data.get('schedule_name', '').strip()
|
|
frequency = data.get('frequency', 'daily') # daily or weekly
|
|
day_of_week = data.get('day_of_week', None) # Monday, Tuesday, etc
|
|
time_of_day = data.get('time_of_day', '00:00') # HH:MM format
|
|
backup_type = data.get('backup_type', 'full') # full or data_only
|
|
schedule_id = data.get('schedule_id', None)
|
|
|
|
# Validate inputs
|
|
if not schedule_name or len(schedule_name) < 3:
|
|
return jsonify({'error': 'Schedule name must be at least 3 characters'}), 400
|
|
|
|
if frequency not in ['daily', 'weekly']:
|
|
return jsonify({'error': 'Frequency must be daily or weekly'}), 400
|
|
|
|
if frequency == 'weekly' and not day_of_week:
|
|
return jsonify({'error': 'Day of week is required for weekly schedules'}), 400
|
|
|
|
if backup_type not in ['full', 'data_only']:
|
|
return jsonify({'error': 'Backup type must be full or data_only'}), 400
|
|
|
|
# Validate time format (HH:MM)
|
|
try:
|
|
time_parts = time_of_day.split(':')
|
|
if len(time_parts) != 2:
|
|
raise ValueError()
|
|
hour = int(time_parts[0])
|
|
minute = int(time_parts[1])
|
|
if hour < 0 or hour > 23 or minute < 0 or minute > 59:
|
|
raise ValueError()
|
|
except:
|
|
return jsonify({'error': 'Invalid time format. Use HH:MM'}), 400
|
|
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
|
|
# Calculate next run time
|
|
now = datetime.now()
|
|
if frequency == 'daily':
|
|
next_run = now.replace(hour=int(time_parts[0]), minute=int(time_parts[1]), second=0, microsecond=0)
|
|
if next_run < now:
|
|
next_run += timedelta(days=1)
|
|
else:
|
|
# Weekly - find next occurrence of day_of_week
|
|
days_of_week = {
|
|
'Monday': 0, 'Tuesday': 1, 'Wednesday': 2, 'Thursday': 3,
|
|
'Friday': 4, 'Saturday': 5, 'Sunday': 6
|
|
}
|
|
target_day = days_of_week.get(day_of_week, 0)
|
|
current_day = now.weekday()
|
|
days_ahead = (target_day - current_day) % 7
|
|
if days_ahead == 0:
|
|
# Check if time has passed today
|
|
next_run = now.replace(hour=int(time_parts[0]), minute=int(time_parts[1]), second=0, microsecond=0)
|
|
if next_run < now:
|
|
days_ahead = 7
|
|
next_run = now + timedelta(days=days_ahead)
|
|
next_run = next_run.replace(hour=int(time_parts[0]), minute=int(time_parts[1]), second=0, microsecond=0)
|
|
|
|
if schedule_id:
|
|
# Update existing schedule
|
|
cursor.execute("""
|
|
UPDATE backup_schedules
|
|
SET schedule_name = %s, frequency = %s, day_of_week = %s,
|
|
time_of_day = %s, backup_type = %s, next_run = %s,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s
|
|
""", (schedule_name, frequency, day_of_week, time_of_day, backup_type, next_run, schedule_id))
|
|
else:
|
|
# Create new schedule
|
|
cursor.execute("""
|
|
INSERT INTO backup_schedules
|
|
(schedule_name, frequency, day_of_week, time_of_day, backup_type, is_active, next_run)
|
|
VALUES (%s, %s, %s, %s, %s, 1, %s)
|
|
""", (schedule_name, frequency, day_of_week, time_of_day, backup_type, next_run))
|
|
|
|
conn.commit()
|
|
cursor.close()
|
|
|
|
action = 'updated' if schedule_id else 'created'
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Backup schedule {action} successfully'
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@settings_bp.route('/api/backup-schedules/<int:schedule_id>/delete', methods=['POST'])
|
|
def delete_backup_schedule(schedule_id):
|
|
"""Delete a backup schedule"""
|
|
if 'user_id' not in session:
|
|
return jsonify({'error': 'Unauthorized'}), 401
|
|
|
|
try:
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM backup_schedules WHERE id = %s", (schedule_id,))
|
|
conn.commit()
|
|
cursor.close()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Backup schedule deleted successfully'
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@settings_bp.route('/api/backup-schedules/<int:schedule_id>/toggle', methods=['POST'])
|
|
def toggle_backup_schedule(schedule_id):
|
|
"""Enable or disable a backup schedule"""
|
|
if 'user_id' not in session:
|
|
return jsonify({'error': 'Unauthorized'}), 401
|
|
|
|
try:
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
|
|
# Get current state
|
|
cursor.execute("SELECT is_active FROM backup_schedules WHERE id = %s", (schedule_id,))
|
|
result = cursor.fetchone()
|
|
|
|
if not result:
|
|
return jsonify({'error': 'Schedule not found'}), 404
|
|
|
|
current_state = result[0]
|
|
new_state = 0 if current_state else 1
|
|
|
|
# Update state
|
|
cursor.execute("""
|
|
UPDATE backup_schedules
|
|
SET is_active = %s, updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s
|
|
""", (new_state, schedule_id))
|
|
|
|
conn.commit()
|
|
cursor.close()
|
|
|
|
status = 'enabled' if new_state else 'disabled'
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Backup schedule {status} successfully',
|
|
'is_active': new_state
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
# ============================================================================
|
|
# Log Explorer Routes
|
|
# ============================================================================
|
|
|
|
@settings_bp.route('/logs', methods=['GET'])
|
|
def logs_explorer():
|
|
"""Log explorer main page - list all log files"""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('main.login'))
|
|
|
|
try:
|
|
log_files = get_log_files()
|
|
log_stats = get_log_statistics()
|
|
|
|
return render_template('modules/settings/logs_explorer.html',
|
|
log_files=log_files,
|
|
log_stats=log_stats)
|
|
except Exception as e:
|
|
logger.error(f"Error loading logs explorer: {e}")
|
|
flash(f"Error loading logs: {str(e)}", 'error')
|
|
return render_template('modules/settings/logs_explorer.html',
|
|
log_files=[],
|
|
log_stats={})
|
|
|
|
|
|
@settings_bp.route('/logs/view/<filename>', methods=['GET'])
|
|
def view_log(filename):
|
|
"""View content of a specific log file"""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('main.login'))
|
|
|
|
try:
|
|
lines = request.args.get('lines', default=100, type=int)
|
|
log_data = get_log_content(filename, lines=lines)
|
|
|
|
if not log_data.get('success'):
|
|
flash(log_data.get('error', 'Error reading log file'), 'error')
|
|
return redirect(url_for('settings.logs_explorer'))
|
|
|
|
return render_template('modules/settings/view_log.html', log_data=log_data)
|
|
except Exception as e:
|
|
logger.error(f"Error viewing log {filename}: {e}")
|
|
flash(f"Error viewing log: {str(e)}", 'error')
|
|
return redirect(url_for('settings.logs_explorer'))
|
|
|
|
|
|
@settings_bp.route('/logs/download/<filename>', methods=['GET'])
|
|
def download_log(filename):
|
|
"""Download a log file"""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('main.login'))
|
|
|
|
try:
|
|
filepath = get_log_file_path(filename)
|
|
|
|
if not filepath:
|
|
flash('Invalid file or file not found', 'error')
|
|
return redirect(url_for('settings.logs_explorer'))
|
|
|
|
return send_file(
|
|
filepath,
|
|
as_attachment=True,
|
|
download_name=f"{filename}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error downloading log {filename}: {e}")
|
|
flash(f"Error downloading log: {str(e)}", 'error')
|
|
return redirect(url_for('settings.logs_explorer'))
|
|
|
|
|
|
@settings_bp.route('/logs/search', methods=['GET'])
|
|
def search_logs():
|
|
"""Search for terms in log files"""
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('main.login'))
|
|
|
|
try:
|
|
search_term = request.args.get('q', '').strip()
|
|
filename = request.args.get('file', default=None)
|
|
|
|
results = []
|
|
if search_term:
|
|
results = search_in_logs(search_term, filename=filename)
|
|
|
|
log_files = get_log_files()
|
|
|
|
return render_template('modules/settings/search_logs.html',
|
|
search_term=search_term,
|
|
results=results,
|
|
log_files=log_files,
|
|
selected_file=filename)
|
|
except Exception as e:
|
|
logger.error(f"Error searching logs: {e}")
|
|
flash(f"Error searching logs: {str(e)}", 'error')
|
|
return redirect(url_for('settings.logs_explorer'))
|
|
|