Files
quality_app-v2/app/modules/settings/routes.py
Quality App Developer e1f3302c6b Implement boxes management module with auto-numbered box creation
- Add boxes_crates database table with BIGINT IDs and 8-digit auto-numbered box_numbers
- Implement boxes CRUD operations (add, edit, update, delete, delete_multiple)
- Create boxes route handlers with POST actions for all operations
- Add boxes.html template with 3-panel layout matching warehouse locations module
- Implement barcode generation and printing with JsBarcode and QZ Tray integration
- Add browser print fallback for when QZ Tray is not available
- Simplify create box form to single button with auto-generation
- Fix JavaScript null reference errors with proper element validation
- Convert tuple data to dictionaries for Jinja2 template compatibility
- Register boxes blueprint in Flask app initialization
2026-01-26 22:08:31 +02:00

1375 lines
47 KiB
Python

"""
Settings Module Routes
"""
from flask import Blueprint, render_template, session, redirect, url_for, request, flash, jsonify, send_file
import pymysql
import hashlib
import secrets
from datetime import datetime, timedelta
from app.database import get_db
from app.modules.settings.stats import get_all_stats
from app.modules.settings.logs import get_log_files, get_log_content, get_log_file_path, get_log_statistics, search_in_logs
import subprocess
import os
import json
from pathlib import Path
import io
import logging
settings_bp = Blueprint('settings', __name__, url_prefix='/settings')
@settings_bp.route('/', methods=['GET'])
def settings_index():
"""Settings module main page with app overview"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
# Get all app statistics
try:
stats = get_all_stats()
except Exception as e:
logger.error(f"Error getting stats in settings_index: {e}", exc_info=True)
stats = {
'user_count': 0,
'database_size_mb': 0,
'logs_size_mb': 0,
'database_count': 0,
'backup_count': 0,
'printer_keys_count': 0,
'app_key_availability': {
'available': False,
'count': 0,
'status': 'Error loading data'
}
}
return render_template('modules/settings/index.html', stats=stats)
@settings_bp.route('/general', methods=['GET', 'POST'])
def general_settings():
"""General application settings"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
conn = get_db()
cursor = conn.cursor()
if request.method == 'GET':
# Get current settings
cursor.execute("""
SELECT setting_key, setting_value FROM application_settings
WHERE setting_key IN ('app_name', 'app_version', 'session_timeout')
""")
settings = {row[0]: row[1] for row in cursor.fetchall()}
cursor.close()
return render_template('modules/settings/general.html',
app_name=settings.get('app_name', 'Quality App v2'),
app_version=settings.get('app_version', '2.0.0'),
session_timeout=settings.get('session_timeout', '480'))
# Handle POST - Save settings
try:
app_name = request.form.get('app_name', 'Quality App v2').strip()
session_timeout = request.form.get('session_timeout', '480').strip()
# Validate
if not app_name:
app_name = 'Quality App v2'
try:
timeout = int(session_timeout)
if timeout < 1:
timeout = 480
except:
timeout = 480
# Save to database
cursor = conn.cursor()
# Insert or update app_name
cursor.execute("""
INSERT INTO application_settings (setting_key, setting_value, setting_type)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE setting_value = %s
""", ('app_name', app_name, 'string', app_name))
# Insert or update session_timeout
cursor.execute("""
INSERT INTO application_settings (setting_key, setting_value, setting_type)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE setting_value = %s
""", ('session_timeout', str(timeout), 'integer', str(timeout)))
conn.commit()
cursor.close()
return render_template('modules/settings/general.html',
app_name=app_name,
app_version='2.0.0',
session_timeout=timeout,
success='Settings saved successfully!')
except Exception as e:
cursor.close()
return render_template('modules/settings/general.html',
app_name='Quality App v2',
app_version='2.0.0',
session_timeout='480',
error=f'Error saving settings: {str(e)}')
@settings_bp.route('/users', methods=['GET'])
def user_management():
"""User management settings"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
# Get all users
conn = get_db()
cursor = conn.cursor()
cursor.execute("SELECT id, username, email, full_name, role, is_active FROM users ORDER BY username")
users = []
for row in cursor.fetchall():
users.append({
'id': row[0],
'username': row[1],
'email': row[2],
'full_name': row[3],
'role': row[4],
'is_active': row[5]
})
cursor.close()
return render_template('modules/settings/users.html', users=users)
@settings_bp.route('/database', methods=['GET'])
def database_settings():
"""Database configuration settings"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
return render_template('modules/settings/database.html')
@settings_bp.route('/users/create', methods=['GET', 'POST'])
def create_user():
"""Create a new user"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
if request.method == 'GET':
# Get available roles and modules
conn = get_db()
cursor = conn.cursor()
cursor.execute("SELECT id, name, level, description FROM roles ORDER BY level DESC")
roles = [{'id': r[0], 'name': r[1], 'level': r[2], 'description': r[3]} for r in cursor.fetchall()]
cursor.close()
available_modules = ['quality', 'settings']
return render_template('modules/settings/user_form.html',
user=None,
roles=roles,
available_modules=available_modules,
user_modules=[])
# Handle POST - Create user
try:
username = request.form.get('username', '').strip()
email = request.form.get('email', '').strip()
full_name = request.form.get('full_name', '').strip()
role = request.form.get('role', '').strip()
password = request.form.get('password', '').strip()
confirm_password = request.form.get('confirm_password', '').strip()
is_active = request.form.get('is_active') == 'on'
modules = request.form.getlist('modules')
# Validation
errors = []
if not username:
errors.append("Username is required")
if not full_name:
errors.append("Full name is required")
if not role:
errors.append("Role is required")
if not password:
errors.append("Password is required")
if password != confirm_password:
errors.append("Passwords do not match")
if len(password) < 8:
errors.append("Password must be at least 8 characters")
if not modules:
errors.append("Select at least one module")
if errors:
conn = get_db()
cursor = conn.cursor()
cursor.execute("SELECT id, name, level, description FROM roles ORDER BY level DESC")
roles = [{'id': r[0], 'name': r[1], 'level': r[2], 'description': r[3]} for r in cursor.fetchall()]
cursor.close()
return render_template('modules/settings/user_form.html',
user=None,
roles=roles,
available_modules=['quality', 'settings'],
user_modules=[],
error="; ".join(errors))
# Create user
conn = get_db()
cursor = conn.cursor()
# Check if username already exists
cursor.execute("SELECT id FROM users WHERE username = %s", (username,))
if cursor.fetchone():
cursor.close()
return render_template('modules/settings/user_form.html',
user=None,
roles=[],
available_modules=['quality', 'settings'],
user_modules=[],
error="Username already exists")
# Insert user
cursor.execute(
"INSERT INTO users (username, email, full_name, role, is_active) VALUES (%s, %s, %s, %s, %s)",
(username, email, full_name, role, is_active)
)
# Get user ID
cursor.execute("SELECT id FROM users WHERE username = %s", (username,))
user_id = cursor.fetchone()[0]
# Insert password
password_hash = hashlib.sha256(password.encode()).hexdigest()
cursor.execute(
"INSERT INTO user_credentials (user_id, password_hash) VALUES (%s, %s)",
(user_id, password_hash)
)
# Grant module access
for module in modules:
cursor.execute(
"INSERT IGNORE INTO user_modules (user_id, module_name) VALUES (%s, %s)",
(user_id, module)
)
conn.commit()
cursor.close()
return redirect(url_for('settings.user_management'))
except Exception as e:
return render_template('modules/settings/user_form.html',
user=None,
roles=[],
available_modules=['quality', 'settings'],
user_modules=[],
error=f"Error creating user: {str(e)}")
@settings_bp.route('/users/<int:user_id>/edit', methods=['GET', 'POST'])
def edit_user(user_id):
"""Edit an existing user"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
conn = get_db()
cursor = conn.cursor()
# Get user
cursor.execute("""
SELECT id, username, email, full_name, role, is_active
FROM users WHERE id = %s
""", (user_id,))
user_row = cursor.fetchone()
if not user_row:
cursor.close()
return redirect(url_for('settings.user_management'))
user = {
'id': user_row[0],
'username': user_row[1],
'email': user_row[2],
'full_name': user_row[3],
'role': user_row[4],
'is_active': user_row[5]
}
# Get user modules
cursor.execute("SELECT module_name FROM user_modules WHERE user_id = %s", (user_id,))
user_modules = [row[0] for row in cursor.fetchall()]
# Get all roles
cursor.execute("SELECT id, name, level, description FROM roles ORDER BY level DESC")
roles = [{'id': r[0], 'name': r[1], 'level': r[2], 'description': r[3]} for r in cursor.fetchall()]
if request.method == 'GET':
cursor.close()
available_modules = ['quality', 'settings']
return render_template('modules/settings/user_form.html',
user=user,
roles=roles,
available_modules=available_modules,
user_modules=user_modules)
# Handle POST - Update user
try:
email = request.form.get('email', '').strip()
full_name = request.form.get('full_name', '').strip()
role = request.form.get('role', '').strip()
password = request.form.get('password', '').strip()
confirm_password = request.form.get('confirm_password', '').strip()
is_active = request.form.get('is_active') == 'on'
modules = request.form.getlist('modules')
# Validation
errors = []
if not full_name:
errors.append("Full name is required")
if not role:
errors.append("Role is required")
if password and password != confirm_password:
errors.append("Passwords do not match")
if password and len(password) < 8:
errors.append("Password must be at least 8 characters")
if not modules:
errors.append("Select at least one module")
if errors:
cursor.close()
available_modules = ['quality', 'settings']
return render_template('modules/settings/user_form.html',
user=user,
roles=roles,
available_modules=available_modules,
user_modules=user_modules,
error="; ".join(errors))
# Update user
cursor.execute(
"UPDATE users SET email = %s, full_name = %s, role = %s, is_active = %s WHERE id = %s",
(email, full_name, role, is_active, user_id)
)
# Update password if provided
if password:
password_hash = hashlib.sha256(password.encode()).hexdigest()
cursor.execute(
"UPDATE user_credentials SET password_hash = %s WHERE user_id = %s",
(password_hash, user_id)
)
# Update module access
cursor.execute("DELETE FROM user_modules WHERE user_id = %s", (user_id,))
for module in modules:
cursor.execute(
"INSERT INTO user_modules (user_id, module_name) VALUES (%s, %s)",
(user_id, module)
)
conn.commit()
cursor.close()
return redirect(url_for('settings.user_management'))
except Exception as e:
cursor.close()
available_modules = ['quality', 'settings']
return render_template('modules/settings/user_form.html',
user=user,
roles=roles,
available_modules=available_modules,
user_modules=user_modules,
error=f"Error updating user: {str(e)}")
@settings_bp.route('/users/<int:user_id>/delete', methods=['POST'])
def delete_user(user_id):
"""Delete a user"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
try:
conn = get_db()
cursor = conn.cursor()
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
conn.commit()
cursor.close()
except Exception as e:
pass
return redirect(url_for('settings.user_management'))
@settings_bp.route('/app-keys', methods=['GET'])
def app_keys():
"""Manage app keys and printer pairing keys"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
conn = get_db()
cursor = conn.cursor()
# Get pairing keys
cursor.execute("""
SELECT id, printer_name, pairing_key, valid_until
FROM qz_pairing_keys
ORDER BY created_at DESC
""")
pairing_keys = []
for row in cursor.fetchall():
valid_until = row[3]
days_remaining = (valid_until - datetime.now().date()).days
pairing_keys.append({
'id': row[0],
'printer_name': row[1],
'pairing_key': row[2],
'valid_until': valid_until.strftime('%Y-%m-%d') if valid_until else 'N/A',
'days_remaining': max(0, days_remaining)
})
# Get API keys
cursor.execute("""
SELECT id, key_name, key_type, api_key, created_at
FROM api_keys
WHERE is_active = 1
ORDER BY created_at DESC
""")
api_keys = []
for row in cursor.fetchall():
api_keys.append({
'id': row[0],
'key_name': row[1],
'key_type': row[2],
'api_key': row[3],
'created_at': row[4]
})
cursor.close()
return render_template('modules/settings/app_keys.html',
pairing_keys=pairing_keys,
api_keys=api_keys)
@settings_bp.route('/app-keys/pairing/generate', methods=['POST'])
def generate_pairing_key():
"""Generate a new QZ Tray pairing key"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
try:
printer_name = request.form.get('printer_name', '').strip()
validity_days = int(request.form.get('validity_days', '90'))
if not printer_name:
return redirect_with_error('app_keys', 'Printer name is required')
# Generate secure pairing key
pairing_key = secrets.token_urlsafe(32)
valid_until = (datetime.now() + timedelta(days=validity_days)).date()
# Save to database
conn = get_db()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO qz_pairing_keys (printer_name, pairing_key, valid_until)
VALUES (%s, %s, %s)
""", (printer_name, pairing_key, valid_until))
conn.commit()
cursor.close()
return redirect(url_for('settings.app_keys'))
except Exception as e:
return redirect_with_error('app_keys', f'Error generating pairing key: {str(e)}')
@settings_bp.route('/app-keys/pairing/<int:key_id>/delete', methods=['POST'])
def delete_pairing_key(key_id):
"""Delete a pairing key"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
try:
conn = get_db()
cursor = conn.cursor()
cursor.execute("DELETE FROM qz_pairing_keys WHERE id = %s", (key_id,))
conn.commit()
cursor.close()
except Exception as e:
pass
return redirect(url_for('settings.app_keys'))
@settings_bp.route('/app-keys/api/generate', methods=['POST'])
def generate_api_key():
"""Generate a new API key"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
try:
key_name = request.form.get('key_name', '').strip()
key_type = request.form.get('key_type', 'app_key').strip()
if not key_name:
return redirect_with_error('app_keys', 'Key name is required')
# Generate secure API key
api_key = secrets.token_urlsafe(32)
# Save to database
conn = get_db()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO api_keys (key_name, key_type, api_key, is_active)
VALUES (%s, %s, %s, 1)
""", (key_name, key_type, api_key))
conn.commit()
cursor.close()
return redirect(url_for('settings.app_keys'))
except Exception as e:
return redirect_with_error('app_keys', f'Error generating API key: {str(e)}')
@settings_bp.route('/app-keys/api/<int:key_id>/delete', methods=['POST'])
def delete_api_key(key_id):
"""Delete an API key"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
try:
conn = get_db()
cursor = conn.cursor()
cursor.execute("DELETE FROM api_keys WHERE id = %s", (key_id,))
conn.commit()
cursor.close()
except Exception as e:
pass
return redirect(url_for('settings.app_keys'))
def redirect_with_error(endpoint, error_message):
"""Helper function to redirect with error message"""
conn = get_db()
cursor = conn.cursor()
# Get pairing keys
cursor.execute("""
SELECT id, printer_name, pairing_key, valid_until
FROM qz_pairing_keys
ORDER BY created_at DESC
""")
pairing_keys = []
for row in cursor.fetchall():
valid_until = row[3]
days_remaining = (valid_until - datetime.now().date()).days
pairing_keys.append({
'id': row[0],
'printer_name': row[1],
'pairing_key': row[2],
'valid_until': valid_until.strftime('%Y-%m-%d') if valid_until else 'N/A',
'days_remaining': max(0, days_remaining)
})
# Get API keys
cursor.execute("""
SELECT id, key_name, key_type, api_key, created_at
FROM api_keys
WHERE is_active = 1
ORDER BY created_at DESC
""")
api_keys = []
for row in cursor.fetchall():
api_keys.append({
'id': row[0],
'key_name': row[1],
'key_type': row[2],
'api_key': row[3],
'created_at': row[4]
})
cursor.close()
return render_template('modules/settings/app_keys.html',
pairing_keys=pairing_keys,
api_keys=api_keys,
error=error_message)
# Database Management Routes
@settings_bp.route('/database-management', methods=['GET'])
def database_management():
"""Database management page"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
try:
conn = get_db()
cursor = conn.cursor()
# Get list of tables
cursor.execute("""
SELECT TABLE_NAME, TABLE_ROWS
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = DATABASE()
ORDER BY TABLE_NAME
""")
tables = [{'name': row[0], 'rows': row[1]} for row in cursor.fetchall()]
# Get backup retention settings
cursor.execute("""
SELECT setting_key, setting_value FROM application_settings
WHERE setting_key IN ('backup_retention_days', 'backup_auto_cleanup')
""")
retention_settings = {row[0]: row[1] for row in cursor.fetchall()}
backup_retention_days = retention_settings.get('backup_retention_days', '30')
auto_cleanup = retention_settings.get('backup_auto_cleanup') == '1'
cursor.close()
return render_template('modules/settings/database_management.html',
tables=tables,
backup_retention_days=backup_retention_days,
auto_cleanup=auto_cleanup)
except Exception as e:
flash(f'Error loading database management: {str(e)}', 'error')
return redirect(url_for('settings.settings_index'))
@settings_bp.route('/api/backups', methods=['GET'])
def get_backups_list():
"""Get list of available backups (JSON API)"""
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
try:
backups_dir = '/app/data/backups'
if not os.path.exists(backups_dir):
os.makedirs(backups_dir)
backups = []
for filename in sorted(os.listdir(backups_dir), reverse=True)[:10]:
filepath = os.path.join(backups_dir, filename)
if os.path.isfile(filepath):
size = os.path.getsize(filepath)
size_mb = size / (1024 * 1024)
# Determine backup type
backup_type = 'data' if 'data-only' in filename else 'full'
backups.append({
'name': filename,
'size': f'{size_mb:.2f} MB',
'date': datetime.fromtimestamp(os.path.getmtime(filepath)).strftime('%Y-%m-%d %H:%M:%S'),
'type': backup_type
})
# Get database size
conn = get_db()
cursor = conn.cursor()
cursor.execute("""
SELECT ROUND(SUM(data_length + index_length) / 1024 / 1024, 2)
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = DATABASE()
""")
db_size = cursor.fetchone()[0]
cursor.close()
return jsonify({
'backups': backups,
'db_size': f'{db_size} MB' if db_size else 'Unknown'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@settings_bp.route('/api/backup', methods=['POST'])
def create_backup():
"""Create a new database backup"""
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
try:
backup_type = request.form.get('backup_type', 'full')
backup_name = request.form.get('backup_name', '').strip()
backups_dir = '/app/data/backups'
if not os.path.exists(backups_dir):
os.makedirs(backups_dir)
# Generate filename
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if backup_name:
filename = f"{backup_name}_{timestamp}.sql"
else:
filename = f"backup_{timestamp}.sql" if backup_type == 'full' else f"backup_data-only_{timestamp}.sql"
filepath = os.path.join(backups_dir, filename)
# Get database credentials from environment
db_host = os.getenv('DB_HOST', 'mariadb')
db_user = os.getenv('DB_USER', 'quality_user')
db_password = os.getenv('DB_PASSWORD', 'quality_pass')
db_name = os.getenv('DB_NAME', 'quality_db')
# Use mysqldump to create backup
cmd = [
'mysqldump',
'-h', db_host,
'-u', db_user,
f'-p{db_password}',
'--skip-ssl',
db_name
]
if backup_type == 'data':
cmd.append('--no-create-info')
logger = logging.getLogger(__name__)
logger.info(f"Creating backup with command: {' '.join(cmd[:4])}...")
with open(filepath, 'w') as f:
result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True)
# Check if backup was successful
if result.returncode != 0:
error_msg = result.stderr if result.stderr else f"mysqldump returned code {result.returncode}"
logger.error(f"Backup failed: {error_msg}")
# Delete empty file
if os.path.exists(filepath):
os.remove(filepath)
return jsonify({'error': f'Backup failed: {error_msg}'}), 500
# Check file size
file_size = os.path.getsize(filepath)
logger.info(f"Backup created successfully: {filename} ({file_size} bytes)")
if file_size == 0:
logger.warning(f"Warning: Backup file is empty: {filename}")
return jsonify({
'success': False,
'error': 'Backup file is empty - the database may not have any data or mysqldump failed silently'
}), 500
return jsonify({
'success': True,
'message': 'Backup created successfully',
'file': filename,
'size': file_size
})
except Exception as e:
logger = logging.getLogger(__name__)
logger.error(f"Exception in create_backup: {str(e)}")
return jsonify({'error': str(e)}), 500
@settings_bp.route('/api/backup/download', methods=['GET'])
def download_backup():
"""Download a backup file"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
try:
filename = request.args.get('file')
if not filename:
flash('Invalid backup file', 'error')
return redirect(url_for('settings.database_management'))
filepath = os.path.join('/app/data/backups', filename)
# Security: ensure we're not accessing files outside backups directory
if not os.path.abspath(filepath).startswith('/app/data/backups'):
flash('Invalid backup file', 'error')
return redirect(url_for('settings.database_management'))
if not os.path.exists(filepath):
flash('Backup file not found', 'error')
return redirect(url_for('settings.database_management'))
return send_file(filepath, as_attachment=True, download_name=filename)
except Exception as e:
flash(f'Error downloading backup: {str(e)}', 'error')
return redirect(url_for('settings.database_management'))
@settings_bp.route('/api/database/restore', methods=['POST'])
def restore_database():
"""Restore database from backup"""
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
try:
data = request.get_json()
backup_file = data.get('backup')
if not backup_file:
return jsonify({'error': 'Backup file not specified'}), 400
filepath = os.path.join('/app/data/backups', backup_file)
# Security check
if not os.path.abspath(filepath).startswith('/app/data/backups'):
return jsonify({'error': 'Invalid backup file'}), 400
if not os.path.exists(filepath):
return jsonify({'error': 'Backup file not found'}), 404
# Restore from backup
with open(filepath, 'r') as f:
sql_content = f.read()
conn = get_db()
cursor = conn.cursor()
# Execute SQL statements from backup
for statement in sql_content.split(';'):
statement = statement.strip()
if statement:
try:
cursor.execute(statement)
except Exception as e:
# Some statements might have errors, continue
pass
conn.commit()
cursor.close()
return jsonify({'success': True, 'message': 'Database restored successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@settings_bp.route('/api/database/truncate', methods=['POST'])
def truncate_table():
"""Truncate (clear) a database table"""
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
try:
data = request.get_json()
table = data.get('table')
if not table:
return jsonify({'error': 'Table not specified'}), 400
conn = get_db()
cursor = conn.cursor()
# Validate table name (basic security)
cursor.execute("""
SELECT TABLE_NAME FROM information_schema.TABLES
WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s
""", (table,))
if not cursor.fetchone():
cursor.close()
return jsonify({'error': 'Table not found'}), 404
# Truncate the table
cursor.execute(f'TRUNCATE TABLE {table}')
conn.commit()
cursor.close()
return jsonify({'success': True, 'message': f'Table {table} cleared successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@settings_bp.route('/api/database/import', methods=['POST'])
def import_database():
"""Import database from uploaded SQL file - legacy route, redirects to upload"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
flash('Please use the Upload Backup File section instead', 'info')
return redirect(url_for('settings.database_management'))
@settings_bp.route('/api/backup/upload', methods=['POST'])
def upload_backup_file():
"""Upload a backup SQL file to the backups directory"""
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
try:
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
if file.filename == '' or not file.filename.lower().endswith('.sql'):
return jsonify({'error': 'Invalid file. Please upload a .sql file'}), 400
# Ensure filename is safe
import time
from werkzeug.utils import secure_filename
# Use secure filename and add timestamp to avoid conflicts
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
safe_name = secure_filename(file.filename)
if not safe_name:
safe_name = 'backup.sql'
# Insert timestamp before .sql extension
name_parts = safe_name.rsplit('.', 1)
if len(name_parts) == 2:
filename = f"{name_parts[0]}_uploaded_{timestamp}.{name_parts[1]}"
else:
filename = f"{safe_name}_uploaded_{timestamp}.sql"
# Save file to backups directory
backups_dir = '/app/data/backups'
if not os.path.exists(backups_dir):
os.makedirs(backups_dir)
filepath = os.path.join(backups_dir, filename)
# Save the file
file.save(filepath)
# Get file size
file_size = os.path.getsize(filepath)
logger = logging.getLogger(__name__)
logger.info(f"Backup file uploaded: {filename} ({file_size} bytes)")
return jsonify({
'success': True,
'message': 'Backup file uploaded successfully',
'filename': filename,
'size': file_size
})
except Exception as e:
logger = logging.getLogger(__name__)
logger.error(f"Error uploading backup file: {str(e)}")
return jsonify({'error': str(e)}), 500
except Exception as e:
flash(f'Error importing database: {str(e)}', 'error')
return redirect(url_for('settings.database_management'))
# Backup Retention Routes
@settings_bp.route('/api/backup-retention/save', methods=['POST'])
def save_backup_retention():
"""Save backup retention settings"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
try:
retention_days = request.form.get('retention_days', '30').strip()
auto_cleanup = request.form.get('auto_cleanup') == '1'
# Validate
try:
retention_days = int(retention_days)
if retention_days < 1:
retention_days = 30
if retention_days > 365:
retention_days = 365
except:
retention_days = 30
conn = get_db()
cursor = conn.cursor()
# Save retention days
cursor.execute("""
INSERT INTO application_settings (setting_key, setting_value, setting_type)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE setting_value = %s
""", ('backup_retention_days', str(retention_days), 'integer', str(retention_days)))
# Save auto cleanup setting
cursor.execute("""
INSERT INTO application_settings (setting_key, setting_value, setting_type)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE setting_value = %s
""", ('backup_auto_cleanup', '1' if auto_cleanup else '0', 'boolean', '1' if auto_cleanup else '0'))
conn.commit()
cursor.close()
flash(f'Backup retention policy saved! Backups will be kept for {retention_days} days.', 'success')
return redirect(url_for('settings.database_management'))
except Exception as e:
flash(f'Error saving retention policy: {str(e)}', 'error')
return redirect(url_for('settings.database_management'))
@settings_bp.route('/api/backups/cleanup', methods=['POST'])
def cleanup_old_backups():
"""Clean up old backups based on retention policy"""
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
try:
# Get retention days from database
conn = get_db()
cursor = conn.cursor()
cursor.execute("""
SELECT setting_value FROM application_settings
WHERE setting_key = 'backup_retention_days'
""")
result = cursor.fetchone()
cursor.close()
retention_days = int(result[0]) if result else 30
# Calculate cutoff date
cutoff_date = datetime.now() - timedelta(days=retention_days)
# Get backups directory
backups_dir = '/app/data/backups'
if not os.path.exists(backups_dir):
return jsonify({'error': 'Backups directory not found'}), 404
# Delete old backups
deleted_count = 0
for filename in os.listdir(backups_dir):
filepath = os.path.join(backups_dir, filename)
if os.path.isfile(filepath):
file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
if file_mtime < cutoff_date:
try:
os.remove(filepath)
deleted_count += 1
except Exception as e:
pass
return jsonify({
'success': True,
'deleted_count': deleted_count,
'message': f'{deleted_count} old backups deleted'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@settings_bp.route('/api/backup-schedules', methods=['GET'])
def get_backup_schedules():
"""Get list of all backup schedules"""
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
try:
conn = get_db()
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute("""
SELECT id, schedule_name, frequency, day_of_week, time_of_day,
backup_type, is_active, last_run, next_run, created_at
FROM backup_schedules
ORDER BY created_at DESC
""")
schedules = cursor.fetchall()
cursor.close()
# Convert datetime objects to strings for JSON serialization
for schedule in schedules:
# Convert time_of_day to string if it's a time object
if schedule['time_of_day']:
schedule['time_of_day'] = str(schedule['time_of_day'])
# Convert datetime fields
if schedule['last_run']:
schedule['last_run'] = schedule['last_run'].isoformat()
if schedule['next_run']:
schedule['next_run'] = schedule['next_run'].isoformat()
if schedule['created_at']:
schedule['created_at'] = schedule['created_at'].isoformat()
return jsonify({'schedules': schedules})
except Exception as e:
import traceback
logger = logging.getLogger(__name__)
logger.error(f"Error in get_backup_schedules: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({'error': str(e)}), 500
@settings_bp.route('/api/backup-schedules/save', methods=['POST'])
def save_backup_schedule():
"""Create or update a backup schedule"""
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
try:
data = request.get_json()
schedule_name = data.get('schedule_name', '').strip()
frequency = data.get('frequency', 'daily') # daily or weekly
day_of_week = data.get('day_of_week', None) # Monday, Tuesday, etc
time_of_day = data.get('time_of_day', '00:00') # HH:MM format
backup_type = data.get('backup_type', 'full') # full or data_only
schedule_id = data.get('schedule_id', None)
# Validate inputs
if not schedule_name or len(schedule_name) < 3:
return jsonify({'error': 'Schedule name must be at least 3 characters'}), 400
if frequency not in ['daily', 'weekly']:
return jsonify({'error': 'Frequency must be daily or weekly'}), 400
if frequency == 'weekly' and not day_of_week:
return jsonify({'error': 'Day of week is required for weekly schedules'}), 400
if backup_type not in ['full', 'data_only']:
return jsonify({'error': 'Backup type must be full or data_only'}), 400
# Validate time format (HH:MM)
try:
time_parts = time_of_day.split(':')
if len(time_parts) != 2:
raise ValueError()
hour = int(time_parts[0])
minute = int(time_parts[1])
if hour < 0 or hour > 23 or minute < 0 or minute > 59:
raise ValueError()
except:
return jsonify({'error': 'Invalid time format. Use HH:MM'}), 400
conn = get_db()
cursor = conn.cursor()
# Calculate next run time
now = datetime.now()
if frequency == 'daily':
next_run = now.replace(hour=int(time_parts[0]), minute=int(time_parts[1]), second=0, microsecond=0)
if next_run < now:
next_run += timedelta(days=1)
else:
# Weekly - find next occurrence of day_of_week
days_of_week = {
'Monday': 0, 'Tuesday': 1, 'Wednesday': 2, 'Thursday': 3,
'Friday': 4, 'Saturday': 5, 'Sunday': 6
}
target_day = days_of_week.get(day_of_week, 0)
current_day = now.weekday()
days_ahead = (target_day - current_day) % 7
if days_ahead == 0:
# Check if time has passed today
next_run = now.replace(hour=int(time_parts[0]), minute=int(time_parts[1]), second=0, microsecond=0)
if next_run < now:
days_ahead = 7
next_run = now + timedelta(days=days_ahead)
next_run = next_run.replace(hour=int(time_parts[0]), minute=int(time_parts[1]), second=0, microsecond=0)
if schedule_id:
# Update existing schedule
cursor.execute("""
UPDATE backup_schedules
SET schedule_name = %s, frequency = %s, day_of_week = %s,
time_of_day = %s, backup_type = %s, next_run = %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
""", (schedule_name, frequency, day_of_week, time_of_day, backup_type, next_run, schedule_id))
else:
# Create new schedule
cursor.execute("""
INSERT INTO backup_schedules
(schedule_name, frequency, day_of_week, time_of_day, backup_type, is_active, next_run)
VALUES (%s, %s, %s, %s, %s, 1, %s)
""", (schedule_name, frequency, day_of_week, time_of_day, backup_type, next_run))
conn.commit()
cursor.close()
action = 'updated' if schedule_id else 'created'
return jsonify({
'success': True,
'message': f'Backup schedule {action} successfully'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@settings_bp.route('/api/backup-schedules/<int:schedule_id>/delete', methods=['POST'])
def delete_backup_schedule(schedule_id):
"""Delete a backup schedule"""
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
try:
conn = get_db()
cursor = conn.cursor()
cursor.execute("DELETE FROM backup_schedules WHERE id = %s", (schedule_id,))
conn.commit()
cursor.close()
return jsonify({
'success': True,
'message': 'Backup schedule deleted successfully'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@settings_bp.route('/api/backup-schedules/<int:schedule_id>/toggle', methods=['POST'])
def toggle_backup_schedule(schedule_id):
"""Enable or disable a backup schedule"""
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
try:
conn = get_db()
cursor = conn.cursor()
# Get current state
cursor.execute("SELECT is_active FROM backup_schedules WHERE id = %s", (schedule_id,))
result = cursor.fetchone()
if not result:
return jsonify({'error': 'Schedule not found'}), 404
current_state = result[0]
new_state = 0 if current_state else 1
# Update state
cursor.execute("""
UPDATE backup_schedules
SET is_active = %s, updated_at = CURRENT_TIMESTAMP
WHERE id = %s
""", (new_state, schedule_id))
conn.commit()
cursor.close()
status = 'enabled' if new_state else 'disabled'
return jsonify({
'success': True,
'message': f'Backup schedule {status} successfully',
'is_active': new_state
})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ============================================================================
# Log Explorer Routes
# ============================================================================
@settings_bp.route('/logs', methods=['GET'])
def logs_explorer():
"""Log explorer main page - list all log files"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
try:
log_files = get_log_files()
log_stats = get_log_statistics()
return render_template('modules/settings/logs_explorer.html',
log_files=log_files,
log_stats=log_stats)
except Exception as e:
logger.error(f"Error loading logs explorer: {e}")
flash(f"Error loading logs: {str(e)}", 'error')
return render_template('modules/settings/logs_explorer.html',
log_files=[],
log_stats={})
@settings_bp.route('/logs/view/<filename>', methods=['GET'])
def view_log(filename):
"""View content of a specific log file"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
try:
lines = request.args.get('lines', default=100, type=int)
log_data = get_log_content(filename, lines=lines)
if not log_data.get('success'):
flash(log_data.get('error', 'Error reading log file'), 'error')
return redirect(url_for('settings.logs_explorer'))
return render_template('modules/settings/view_log.html', log_data=log_data)
except Exception as e:
logger.error(f"Error viewing log {filename}: {e}")
flash(f"Error viewing log: {str(e)}", 'error')
return redirect(url_for('settings.logs_explorer'))
@settings_bp.route('/logs/download/<filename>', methods=['GET'])
def download_log(filename):
"""Download a log file"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
try:
filepath = get_log_file_path(filename)
if not filepath:
flash('Invalid file or file not found', 'error')
return redirect(url_for('settings.logs_explorer'))
return send_file(
filepath,
as_attachment=True,
download_name=f"{filename}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
)
except Exception as e:
logger.error(f"Error downloading log {filename}: {e}")
flash(f"Error downloading log: {str(e)}", 'error')
return redirect(url_for('settings.logs_explorer'))
@settings_bp.route('/logs/search', methods=['GET'])
def search_logs():
"""Search for terms in log files"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
try:
search_term = request.args.get('q', '').strip()
filename = request.args.get('file', default=None)
results = []
if search_term:
results = search_in_logs(search_term, filename=filename)
log_files = get_log_files()
return render_template('modules/settings/search_logs.html',
search_term=search_term,
results=results,
log_files=log_files,
selected_file=filename)
except Exception as e:
logger.error(f"Error searching logs: {e}")
flash(f"Error searching logs: {str(e)}", 'error')
return redirect(url_for('settings.logs_explorer'))