updated documentation folder

This commit is contained in:
Quality System Admin
2025-11-03 21:17:10 +02:00
parent 8d47e6e82d
commit 1ade0b5681
29 changed files with 6113 additions and 32 deletions

View File

@@ -0,0 +1,431 @@
"""
Database Backup Management Module
Quality Recticel Application
This module provides functionality for backing up and restoring the MariaDB database,
including scheduled backups, manual backups, and backup file management.
"""
import os
import subprocess
import json
from datetime import datetime, timedelta
from pathlib import Path
import configparser
from flask import current_app
import mariadb
class DatabaseBackupManager:
"""Manages database backup operations"""
def __init__(self):
"""Initialize the backup manager with configuration from external_server.conf"""
self.config = self._load_database_config()
self.backup_path = self._get_backup_path()
self._ensure_backup_directory()
def _load_database_config(self):
"""Load database configuration from external_server.conf"""
try:
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
config = {}
if os.path.exists(settings_file):
with open(settings_file, 'r') as f:
for line in f:
if '=' in line:
key, value = line.strip().split('=', 1)
config[key] = value
return {
'host': config.get('server_domain', 'localhost'),
'port': config.get('port', '3306'),
'database': config.get('database_name', 'trasabilitate'),
'user': config.get('username', 'trasabilitate'),
'password': config.get('password', '')
}
except Exception as e:
print(f"Error loading database config: {e}")
return None
def _get_backup_path(self):
"""Get backup path from environment or use default"""
# Check environment variable (set in docker-compose)
backup_path = os.environ.get('BACKUP_PATH', '/srv/quality_app/backups')
# Check if custom path is set in config
try:
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
if os.path.exists(settings_file):
with open(settings_file, 'r') as f:
for line in f:
if line.startswith('backup_path='):
backup_path = line.strip().split('=', 1)[1]
break
except Exception as e:
print(f"Error reading backup path from config: {e}")
return backup_path
def _ensure_backup_directory(self):
"""Ensure backup directory exists"""
try:
Path(self.backup_path).mkdir(parents=True, exist_ok=True)
print(f"Backup directory ensured: {self.backup_path}")
except Exception as e:
print(f"Error creating backup directory: {e}")
def create_backup(self, backup_name=None):
"""
Create a complete backup of the database
Args:
backup_name (str, optional): Custom name for the backup file
Returns:
dict: Result with success status, message, and backup file path
"""
try:
if not self.config:
return {
'success': False,
'message': 'Database configuration not loaded'
}
# Generate backup filename
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if backup_name:
filename = f"{backup_name}_{timestamp}.sql"
else:
filename = f"backup_{self.config['database']}_{timestamp}.sql"
backup_file = os.path.join(self.backup_path, filename)
# Build mysqldump command
cmd = [
'mysqldump',
f"--host={self.config['host']}",
f"--port={self.config['port']}",
f"--user={self.config['user']}",
f"--password={self.config['password']}",
'--single-transaction',
'--routines',
'--triggers',
'--events',
'--add-drop-database',
'--databases',
self.config['database']
]
# Execute mysqldump and save to file
with open(backup_file, 'w') as f:
result = subprocess.run(
cmd,
stdout=f,
stderr=subprocess.PIPE,
text=True
)
if result.returncode == 0:
# Get file size
file_size = os.path.getsize(backup_file)
file_size_mb = file_size / (1024 * 1024)
# Save backup metadata
self._save_backup_metadata(filename, file_size)
return {
'success': True,
'message': f'Backup created successfully',
'filename': filename,
'file_path': backup_file,
'size': f"{file_size_mb:.2f} MB",
'timestamp': timestamp
}
else:
error_msg = result.stderr
print(f"Backup error: {error_msg}")
return {
'success': False,
'message': f'Backup failed: {error_msg}'
}
except Exception as e:
print(f"Exception during backup: {e}")
return {
'success': False,
'message': f'Backup failed: {str(e)}'
}
def _save_backup_metadata(self, filename, file_size):
"""Save metadata about the backup"""
try:
metadata_file = os.path.join(self.backup_path, 'backups_metadata.json')
# Load existing metadata
metadata = []
if os.path.exists(metadata_file):
with open(metadata_file, 'r') as f:
metadata = json.load(f)
# Add new backup metadata
metadata.append({
'filename': filename,
'size': file_size,
'timestamp': datetime.now().isoformat(),
'database': self.config['database']
})
# Save updated metadata
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=2)
except Exception as e:
print(f"Error saving backup metadata: {e}")
def list_backups(self):
"""
List all available backups
Returns:
list: List of backup information dictionaries
"""
try:
backups = []
# Get all .sql files in backup directory
if os.path.exists(self.backup_path):
for filename in os.listdir(self.backup_path):
if filename.endswith('.sql'):
file_path = os.path.join(self.backup_path, filename)
file_stat = os.stat(file_path)
backups.append({
'filename': filename,
'size': file_stat.st_size,
'size_mb': f"{file_stat.st_size / (1024 * 1024):.2f}",
'created': datetime.fromtimestamp(file_stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S'),
'timestamp': file_stat.st_ctime
})
# Sort by timestamp (newest first)
backups.sort(key=lambda x: x['timestamp'], reverse=True)
return backups
except Exception as e:
print(f"Error listing backups: {e}")
return []
def delete_backup(self, filename):
"""
Delete a backup file
Args:
filename (str): Name of the backup file to delete
Returns:
dict: Result with success status and message
"""
try:
# Security: ensure filename doesn't contain path traversal
if '..' in filename or '/' in filename:
return {
'success': False,
'message': 'Invalid filename'
}
file_path = os.path.join(self.backup_path, filename)
if os.path.exists(file_path):
os.remove(file_path)
# Update metadata
self._remove_backup_metadata(filename)
return {
'success': True,
'message': f'Backup {filename} deleted successfully'
}
else:
return {
'success': False,
'message': 'Backup file not found'
}
except Exception as e:
print(f"Error deleting backup: {e}")
return {
'success': False,
'message': f'Failed to delete backup: {str(e)}'
}
def _remove_backup_metadata(self, filename):
"""Remove metadata entry for deleted backup"""
try:
metadata_file = os.path.join(self.backup_path, 'backups_metadata.json')
if os.path.exists(metadata_file):
with open(metadata_file, 'r') as f:
metadata = json.load(f)
# Filter out the deleted backup
metadata = [m for m in metadata if m['filename'] != filename]
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=2)
except Exception as e:
print(f"Error removing backup metadata: {e}")
def restore_backup(self, filename):
"""
Restore database from a backup file
Args:
filename (str): Name of the backup file to restore
Returns:
dict: Result with success status and message
"""
try:
# Security: ensure filename doesn't contain path traversal
if '..' in filename or '/' in filename:
return {
'success': False,
'message': 'Invalid filename'
}
file_path = os.path.join(self.backup_path, filename)
if not os.path.exists(file_path):
return {
'success': False,
'message': 'Backup file not found'
}
# Build mysql restore command
cmd = [
'mysql',
f"--host={self.config['host']}",
f"--port={self.config['port']}",
f"--user={self.config['user']}",
f"--password={self.config['password']}"
]
# Execute mysql restore
with open(file_path, 'r') as f:
result = subprocess.run(
cmd,
stdin=f,
stderr=subprocess.PIPE,
text=True
)
if result.returncode == 0:
return {
'success': True,
'message': f'Database restored successfully from {filename}'
}
else:
error_msg = result.stderr
print(f"Restore error: {error_msg}")
return {
'success': False,
'message': f'Restore failed: {error_msg}'
}
except Exception as e:
print(f"Exception during restore: {e}")
return {
'success': False,
'message': f'Restore failed: {str(e)}'
}
def get_backup_schedule(self):
"""Get current backup schedule configuration"""
try:
schedule_file = os.path.join(self.backup_path, 'backup_schedule.json')
if os.path.exists(schedule_file):
with open(schedule_file, 'r') as f:
return json.load(f)
# Default schedule
return {
'enabled': False,
'time': '02:00', # 2 AM
'frequency': 'daily', # daily, weekly, monthly
'retention_days': 30 # Keep backups for 30 days
}
except Exception as e:
print(f"Error loading backup schedule: {e}")
return None
def save_backup_schedule(self, schedule):
"""
Save backup schedule configuration
Args:
schedule (dict): Schedule configuration
Returns:
dict: Result with success status and message
"""
try:
schedule_file = os.path.join(self.backup_path, 'backup_schedule.json')
with open(schedule_file, 'w') as f:
json.dump(schedule, f, indent=2)
return {
'success': True,
'message': 'Backup schedule saved successfully'
}
except Exception as e:
print(f"Error saving backup schedule: {e}")
return {
'success': False,
'message': f'Failed to save schedule: {str(e)}'
}
def cleanup_old_backups(self, retention_days=30):
"""
Delete backups older than retention_days
Args:
retention_days (int): Number of days to keep backups
Returns:
dict: Result with count of deleted backups
"""
try:
deleted_count = 0
cutoff_time = datetime.now() - timedelta(days=retention_days)
if os.path.exists(self.backup_path):
for filename in os.listdir(self.backup_path):
if filename.endswith('.sql'):
file_path = os.path.join(self.backup_path, filename)
file_time = datetime.fromtimestamp(os.path.getctime(file_path))
if file_time < cutoff_time:
os.remove(file_path)
self._remove_backup_metadata(filename)
deleted_count += 1
print(f"Deleted old backup: {filename}")
return {
'success': True,
'deleted_count': deleted_count,
'message': f'Cleaned up {deleted_count} old backup(s)'
}
except Exception as e:
print(f"Error cleaning up old backups: {e}")
return {
'success': False,
'message': f'Cleanup failed: {str(e)}'
}