Improve maintenance & backup UI with per-table operations

- Enhanced maintenance card with dark mode support
- Added system storage information display (logs, database, backups)
- Implemented per-table backup and restore functionality
- Added database table management with drop capability
- Restructured backup management UI with split layout:
  - Quick action buttons for full/data-only backups
  - Collapsible per-table backup/restore section
  - Split schedule creation (1/3) and active schedules list (2/3)
- Fixed database config loading to use mariadb module
- Fixed SQL syntax for reserved 'rows' keyword
- Removed System Information card
- All database operations use correct config keys from external_server.conf
This commit is contained in:
ske087
2025-11-29 20:23:40 +02:00
parent 7912885046
commit 41f9caa6ba
2 changed files with 1615 additions and 172 deletions

View File

@@ -4523,3 +4523,716 @@ def api_backup_schedule_add():
'message': f'Failed to add schedule: {str(e)}'
}), 500
# ===================== Maintenance & Log Cleanup Routes =====================
@bp.route('/api/maintenance/log-settings', methods=['GET'])
@admin_plus
def get_log_settings():
"""Get current log cleanup settings"""
try:
import os
import json
config_path = os.path.join(current_app.instance_path, 'maintenance_config.json')
# Default settings
default_settings = {
'retention_days': 30,
'enabled': True,
'last_cleanup': None
}
if os.path.exists(config_path):
with open(config_path, 'r') as f:
settings = json.load(f)
return jsonify({
'success': True,
'settings': settings
})
else:
return jsonify({
'success': True,
'settings': default_settings
})
except Exception as e:
return jsonify({
'success': False,
'message': f'Failed to load settings: {str(e)}'
}), 500
@bp.route('/api/maintenance/log-settings', methods=['POST'])
@admin_plus
def save_log_settings():
"""Save log cleanup settings"""
try:
import os
import json
data = request.json
retention_days = data.get('retention_days', 30)
# Validate retention days
if retention_days != 0: # 0 means disabled
if not isinstance(retention_days, int) or retention_days < 7:
return jsonify({
'success': False,
'message': 'Retention days must be at least 7 or 0 to disable'
}), 400
config_path = os.path.join(current_app.instance_path, 'maintenance_config.json')
# Load existing config or create new
if os.path.exists(config_path):
with open(config_path, 'r') as f:
config = json.load(f)
else:
config = {}
# Update settings
config['retention_days'] = retention_days
config['enabled'] = retention_days > 0
# Ensure instance directory exists
os.makedirs(current_app.instance_path, exist_ok=True)
# Save config
with open(config_path, 'w') as f:
json.dump(config, f, indent=4)
return jsonify({
'success': True,
'message': f'Log cleanup settings saved. Logs older than {retention_days} days will be deleted.' if retention_days > 0 else 'Log cleanup disabled.'
})
except Exception as e:
return jsonify({
'success': False,
'message': f'Failed to save settings: {str(e)}'
}), 500
@bp.route('/api/maintenance/cleanup-logs', methods=['POST'])
@admin_plus
def cleanup_logs():
"""Execute immediate log cleanup based on retention settings"""
try:
import os
import json
import glob
from datetime import datetime, timedelta
# Load settings
config_path = os.path.join(current_app.instance_path, 'maintenance_config.json')
if not os.path.exists(config_path):
return jsonify({
'success': False,
'message': 'No cleanup settings configured. Please configure retention days first.'
}), 400
with open(config_path, 'r') as f:
config = json.load(f)
retention_days = config.get('retention_days', 30)
if retention_days == 0:
return jsonify({
'success': False,
'message': 'Log cleanup is disabled. Please enable it in settings first.'
}), 400
# Get log directory
log_dir = os.path.join(os.path.dirname(current_app.instance_path), 'logs')
if not os.path.exists(log_dir):
return jsonify({
'success': False,
'message': f'Log directory not found: {log_dir}'
}), 404
# Find log files
log_files = glob.glob(os.path.join(log_dir, '*.log'))
old_log_files = glob.glob(os.path.join(log_dir, '*.log.*')) # Rotated logs
all_log_files = log_files + old_log_files
if not all_log_files:
return jsonify({
'success': True,
'message': 'No log files found to clean up.',
'files_deleted': 0
})
# Calculate cutoff date
cutoff_date = datetime.now() - timedelta(days=retention_days)
deleted_count = 0
deleted_files = []
total_size_deleted = 0
# Check each log file
for log_file in all_log_files:
try:
# Skip current main log files (only delete rotated/old ones)
if log_file.endswith('.log') and '.' not in os.path.basename(log_file)[:-4]:
# This is a main log file (access.log, error.log), skip it
continue
file_mtime = datetime.fromtimestamp(os.path.getmtime(log_file))
if file_mtime < cutoff_date:
file_size = os.path.getsize(log_file)
os.remove(log_file)
deleted_count += 1
deleted_files.append(os.path.basename(log_file))
total_size_deleted += file_size
except Exception as e:
print(f"Error deleting {log_file}: {str(e)}")
continue
# Update last cleanup time
config['last_cleanup'] = datetime.now().isoformat()
with open(config_path, 'w') as f:
json.dump(config, f, indent=4)
# Format size in human readable format
if total_size_deleted > 1024 * 1024:
size_str = f"{total_size_deleted / (1024 * 1024):.2f} MB"
elif total_size_deleted > 1024:
size_str = f"{total_size_deleted / 1024:.2f} KB"
else:
size_str = f"{total_size_deleted} bytes"
if deleted_count > 0:
message = f'Successfully deleted {deleted_count} old log file(s), freed {size_str}.'
else:
message = f'No log files older than {retention_days} days found.'
return jsonify({
'success': True,
'message': message,
'files_deleted': deleted_count,
'size_freed': total_size_deleted,
'deleted_files': deleted_files
})
except Exception as e:
return jsonify({
'success': False,
'message': f'Failed to cleanup logs: {str(e)}'
}), 500
@bp.route('/api/maintenance/storage-info', methods=['GET'])
@admin_plus
def get_storage_info():
"""Get storage information for logs, database, and backups"""
try:
import os
import glob
def format_size(size_bytes):
"""Format bytes to human readable size"""
if size_bytes >= 1024 * 1024 * 1024:
return f"{size_bytes / (1024 * 1024 * 1024):.2f} GB"
elif size_bytes >= 1024 * 1024:
return f"{size_bytes / (1024 * 1024):.2f} MB"
elif size_bytes >= 1024:
return f"{size_bytes / 1024:.2f} KB"
else:
return f"{size_bytes} bytes"
def get_directory_size(path):
"""Calculate total size of all files in directory"""
total_size = 0
if os.path.exists(path):
for dirpath, dirnames, filenames in os.walk(path):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
try:
total_size += os.path.getsize(filepath)
except:
continue
return total_size
# Get logs directory size
log_dir = '/srv/quality_app/logs'
logs_size = get_directory_size(log_dir)
# Get database size
db_size = 0
try:
# Load database config directly
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
config = {}
with open(settings_file, 'r') as f:
for line in f:
if '=' in line:
key, value = line.strip().split('=', 1)
config[key] = value
conn = mariadb.connect(
host=config.get('server_domain', 'localhost'),
port=int(config.get('port', '3306')),
user=config.get('username', 'root'),
password=config.get('password', ''),
database=config.get('database_name', 'trasabilitate')
)
cursor = conn.cursor()
# Query to get database size
cursor.execute("""
SELECT
SUM(data_length + index_length) as size
FROM information_schema.TABLES
WHERE table_schema = %s
""", (config.get('database_name', 'trasabilitate'),))
result = cursor.fetchone()
if result and result[0]:
db_size = int(result[0])
cursor.close()
conn.close()
except Exception as e:
print(f"Error getting database size: {str(e)}")
import traceback
traceback.print_exc()
db_size = 0
# Get backups directory size
backups_dir = '/srv/quality_app/backups'
backups_size = get_directory_size(backups_dir)
return jsonify({
'success': True,
'logs_size': format_size(logs_size),
'database_size': format_size(db_size) if db_size > 0 else 'N/A',
'backups_size': format_size(backups_size),
'logs_size_bytes': logs_size,
'database_size_bytes': db_size,
'backups_size_bytes': backups_size
})
except Exception as e:
return jsonify({
'success': False,
'message': f'Failed to get storage info: {str(e)}'
}), 500
@bp.route('/api/maintenance/database-tables', methods=['GET'])
@admin_plus
def get_all_database_tables():
"""Get list of all tables in the database with their info"""
try:
# Load database config directly
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
config = {}
with open(settings_file, 'r') as f:
for line in f:
if '=' in line:
key, value = line.strip().split('=', 1)
config[key] = value
conn = mariadb.connect(
host=config.get('server_domain', 'localhost'),
port=int(config.get('port', '3306')),
user=config.get('username', 'root'),
password=config.get('password', ''),
database=config.get('database_name', 'trasabilitate')
)
cursor = conn.cursor(dictionary=True)
# Get all tables with their size and row count
cursor.execute("""
SELECT
TABLE_NAME as name,
TABLE_ROWS as `rows`,
ROUND((DATA_LENGTH + INDEX_LENGTH) / 1024 / 1024, 2) as size_mb,
ROUND((DATA_LENGTH + INDEX_LENGTH), 0) as size_bytes
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = %s
ORDER BY TABLE_NAME
""", (config.get('database_name', 'trasabilitate'),))
tables = cursor.fetchall()
# Format the data
formatted_tables = []
for table in tables:
size_mb = float(table['size_mb']) if table['size_mb'] else 0
if size_mb >= 1:
size_str = f"{size_mb:.2f} MB"
else:
size_kb = size_mb * 1024
if size_kb >= 1:
size_str = f"{size_kb:.2f} KB"
else:
size_str = f"{table['size_bytes']} bytes"
formatted_tables.append({
'name': table['name'],
'rows': f"{table['rows']:,}" if table['rows'] else '0',
'size': size_str,
'size_bytes': int(table['size_bytes']) if table['size_bytes'] else 0
})
cursor.close()
conn.close()
return jsonify({
'success': True,
'tables': formatted_tables,
'total_tables': len(formatted_tables)
})
except Exception as e:
return jsonify({
'success': False,
'message': f'Failed to get database tables: {str(e)}'
}), 500
@bp.route('/api/maintenance/drop-table', methods=['POST'])
@admin_plus
def drop_table():
"""Drop a database table - DANGEROUS operation"""
try:
data = request.json
table_name = data.get('table_name', '').strip()
if not table_name:
return jsonify({
'success': False,
'message': 'Table name is required'
}), 400
# Validate table name to prevent SQL injection
if not table_name.replace('_', '').isalnum():
return jsonify({
'success': False,
'message': 'Invalid table name format'
}), 400
# Load database config directly
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
config = {}
with open(settings_file, 'r') as f:
for line in f:
if '=' in line:
key, value = line.strip().split('=', 1)
config[key] = value
conn = mariadb.connect(
host=config.get('server_domain', 'localhost'),
port=int(config.get('port', '3306')),
user=config.get('username', 'root'),
password=config.get('password', ''),
database=config.get('database_name', 'trasabilitate')
)
cursor = conn.cursor()
# Verify table exists
cursor.execute("""
SELECT COUNT(*) as count
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
""", (config.get('database_name', 'trasabilitate'), table_name))
result = cursor.fetchone()
if not result or result[0] == 0:
cursor.close()
conn.close()
return jsonify({
'success': False,
'message': f'Table "{table_name}" does not exist'
}), 404
# Drop the table
cursor.execute(f"DROP TABLE IF EXISTS `{table_name}`")
conn.commit()
cursor.close()
conn.close()
return jsonify({
'success': True,
'message': f'Table "{table_name}" has been dropped successfully'
})
except Exception as e:
return jsonify({
'success': False,
'message': f'Failed to drop table: {str(e)}'
}), 500
@bp.route('/api/backup/table', methods=['POST'])
@admin_plus
def backup_single_table():
"""Create a backup of a single table"""
try:
import os
from datetime import datetime
data = request.json
table_name = data.get('table_name', '').strip()
if not table_name:
return jsonify({
'success': False,
'message': 'Table name is required'
}), 400
# Validate table name
if not table_name.replace('_', '').isalnum():
return jsonify({
'success': False,
'message': 'Invalid table name format'
}), 400
# Load database config directly
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
config = {}
with open(settings_file, 'r') as f:
for line in f:
if '=' in line:
key, value = line.strip().split('=', 1)
config[key] = value
# Verify table exists
conn = mariadb.connect(
host=config.get('server_domain', 'localhost'),
port=int(config.get('port', '3306')),
user=config.get('username', 'root'),
password=config.get('password', ''),
database=config.get('database_name', 'trasabilitate')
)
cursor = conn.cursor()
cursor.execute("""
SELECT COUNT(*) as count
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
""", (config.get('database_name', 'trasabilitate'), table_name))
result = cursor.fetchone()
if not result or result[0] == 0:
cursor.close()
conn.close()
return jsonify({
'success': False,
'message': f'Table "{table_name}" does not exist'
}), 404
cursor.close()
conn.close()
# Create backup directory if not exists
backup_dir = os.path.join(os.path.dirname(current_app.instance_path), 'backups')
os.makedirs(backup_dir, exist_ok=True)
# Generate backup filename
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_filename = f'table_{table_name}_{timestamp}.sql'
backup_path = os.path.join(backup_dir, backup_filename)
# Create backup using mysqldump
import subprocess
mysqldump_cmd = [
'mysqldump',
f'--host={config.get("host", "localhost")}',
f'--user={config.get("user", "root")}',
f'--password={config.get("password", "")}',
'--single-transaction',
'--no-create-db',
config.get('database', 'trasabilitate'),
table_name
]
with open(backup_path, 'w') as f:
result = subprocess.run(
mysqldump_cmd,
stdout=f,
stderr=subprocess.PIPE,
text=True
)
if result.returncode != 0:
return jsonify({
'success': False,
'message': f'Backup failed: {result.stderr}'
}), 500
# Get file size
file_size = os.path.getsize(backup_path)
if file_size >= 1024 * 1024:
size_str = f"{file_size / (1024 * 1024):.2f} MB"
elif file_size >= 1024:
size_str = f"{file_size / 1024:.2f} KB"
else:
size_str = f"{file_size} bytes"
return jsonify({
'success': True,
'message': f'Table "{table_name}" backed up successfully',
'filename': backup_filename,
'size': size_str
})
except Exception as e:
return jsonify({
'success': False,
'message': f'Failed to backup table: {str(e)}'
}), 500
@bp.route('/api/backup/table-backups', methods=['GET'])
@admin_plus
def list_table_backups():
"""List all table-specific backups"""
try:
import os
import glob
from datetime import datetime
backup_dir = os.path.join(os.path.dirname(current_app.instance_path), 'backups')
if not os.path.exists(backup_dir):
return jsonify({
'success': True,
'backups': []
})
# Find all table backup files (pattern: table_*_timestamp.sql)
table_backups = glob.glob(os.path.join(backup_dir, 'table_*.sql'))
backups = []
for backup_path in table_backups:
filename = os.path.basename(backup_path)
# Extract table name from filename (table_TABLENAME_timestamp.sql)
parts = filename.replace('.sql', '').split('_')
if len(parts) >= 3:
table_name = '_'.join(parts[1:-2]) if len(parts) > 3 else parts[1]
file_size = os.path.getsize(backup_path)
if file_size >= 1024 * 1024:
size_str = f"{file_size / (1024 * 1024):.2f} MB"
elif file_size >= 1024:
size_str = f"{file_size / 1024:.2f} KB"
else:
size_str = f"{file_size} bytes"
mtime = os.path.getmtime(backup_path)
created = datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M')
backups.append({
'filename': filename,
'table_name': table_name,
'size': size_str,
'created': created,
'timestamp': mtime
})
# Sort by timestamp descending
backups.sort(key=lambda x: x['timestamp'], reverse=True)
return jsonify({
'success': True,
'backups': backups
})
except Exception as e:
return jsonify({
'success': False,
'message': f'Failed to list table backups: {str(e)}'
}), 500
@bp.route('/api/restore/table', methods=['POST'])
@admin_plus
def restore_single_table():
"""Restore a single table from backup"""
try:
import os
import subprocess
data = request.json
backup_file = data.get('backup_file', '').strip()
if not backup_file:
return jsonify({
'success': False,
'message': 'Backup file is required'
}), 400
# Security: validate filename
if not backup_file.endswith('.sql') or not backup_file.startswith('table_'):
return jsonify({
'success': False,
'message': 'Invalid backup file format'
}), 400
backup_dir = os.path.join(os.path.dirname(current_app.instance_path), 'backups')
backup_path = os.path.join(backup_dir, backup_file)
if not os.path.exists(backup_path):
return jsonify({
'success': False,
'message': 'Backup file not found'
}), 404
# Extract table name from filename
parts = backup_file.replace('.sql', '').split('_')
table_name = '_'.join(parts[1:-2]) if len(parts) > 3 else parts[1]
# Load database config directly
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
config = {}
with open(settings_file, 'r') as f:
for line in f:
if '=' in line:
key, value = line.strip().split('=', 1)
config[key] = value
# Restore using mysql command
mysql_cmd = [
'mysql',
f'--host={config.get("server_domain", "localhost")}',
f'--user={config.get("username", "root")}',
f'--password={config.get("password", "")}',
config.get('database_name', 'trasabilitate')
]
with open(backup_path, 'r') as f:
result = subprocess.run(
mysql_cmd,
stdin=f,
stderr=subprocess.PIPE,
text=True
)
if result.returncode != 0:
return jsonify({
'success': False,
'message': f'Restore failed: {result.stderr}'
}), 500
return jsonify({
'success': True,
'message': f'Table "{table_name}" restored successfully from {backup_file}'
})
except Exception as e:
return jsonify({
'success': False,
'message': f'Failed to restore table: {str(e)}'
}), 500

File diff suppressed because it is too large Load Diff