""" Database Backup Management Module Quality Recticel Application This module provides functionality for backing up and restoring the MariaDB database, including scheduled backups, manual backups, and backup file management. """ import os import subprocess import json from datetime import datetime, timedelta from pathlib import Path import configparser from flask import current_app import mariadb class DatabaseBackupManager: """Manages database backup operations""" def __init__(self): """Initialize the backup manager with configuration from external_server.conf""" self.config = self._load_database_config() self.backup_path = self._get_backup_path() self._ensure_backup_directory() def _load_database_config(self): """Load database configuration from external_server.conf""" try: settings_file = os.path.join(current_app.instance_path, 'external_server.conf') config = {} if os.path.exists(settings_file): with open(settings_file, 'r') as f: for line in f: if '=' in line: key, value = line.strip().split('=', 1) config[key] = value return { 'host': config.get('server_domain', 'localhost'), 'port': config.get('port', '3306'), 'database': config.get('database_name', 'trasabilitate'), 'user': config.get('username', 'trasabilitate'), 'password': config.get('password', '') } except Exception as e: print(f"Error loading database config: {e}") return None def _get_backup_path(self): """Get backup path from environment or use default""" # Check environment variable (set in docker-compose) backup_path = os.environ.get('BACKUP_PATH', '/srv/quality_app/backups') # Check if custom path is set in config try: settings_file = os.path.join(current_app.instance_path, 'external_server.conf') if os.path.exists(settings_file): with open(settings_file, 'r') as f: for line in f: if line.startswith('backup_path='): backup_path = line.strip().split('=', 1)[1] break except Exception as e: print(f"Error reading backup path from config: {e}") return backup_path def _ensure_backup_directory(self): """Ensure backup directory exists""" try: Path(self.backup_path).mkdir(parents=True, exist_ok=True) print(f"Backup directory ensured: {self.backup_path}") except Exception as e: print(f"Error creating backup directory: {e}") def create_backup(self, backup_name=None): """ Create a complete backup of the database Args: backup_name (str, optional): Custom name for the backup file Returns: dict: Result with success status, message, and backup file path """ try: if not self.config: return { 'success': False, 'message': 'Database configuration not loaded' } # Generate backup filename timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') if backup_name: filename = f"{backup_name}_{timestamp}.sql" else: filename = f"backup_{self.config['database']}_{timestamp}.sql" backup_file = os.path.join(self.backup_path, filename) # Build mysqldump command # Note: --skip-lock-tables and --force help with views that have permission issues cmd = [ 'mysqldump', f"--host={self.config['host']}", f"--port={self.config['port']}", f"--user={self.config['user']}", f"--password={self.config['password']}", '--single-transaction', '--skip-lock-tables', '--force', '--routines', '--triggers', '--events', '--add-drop-database', '--databases', self.config['database'] ] # Execute mysqldump and save to file with open(backup_file, 'w') as f: result = subprocess.run( cmd, stdout=f, stderr=subprocess.PIPE, text=True ) if result.returncode == 0: # Get file size file_size = os.path.getsize(backup_file) file_size_mb = file_size / (1024 * 1024) # Save backup metadata self._save_backup_metadata(filename, file_size) return { 'success': True, 'message': f'Backup created successfully', 'filename': filename, 'file_path': backup_file, 'size': f"{file_size_mb:.2f} MB", 'timestamp': timestamp } else: error_msg = result.stderr print(f"Backup error: {error_msg}") return { 'success': False, 'message': f'Backup failed: {error_msg}' } except Exception as e: print(f"Exception during backup: {e}") return { 'success': False, 'message': f'Backup failed: {str(e)}' } def _save_backup_metadata(self, filename, file_size): """Save metadata about the backup""" try: metadata_file = os.path.join(self.backup_path, 'backups_metadata.json') # Load existing metadata metadata = [] if os.path.exists(metadata_file): with open(metadata_file, 'r') as f: metadata = json.load(f) # Add new backup metadata metadata.append({ 'filename': filename, 'size': file_size, 'timestamp': datetime.now().isoformat(), 'database': self.config['database'] }) # Save updated metadata with open(metadata_file, 'w') as f: json.dump(metadata, f, indent=2) except Exception as e: print(f"Error saving backup metadata: {e}") def list_backups(self): """ List all available backups Returns: list: List of backup information dictionaries """ try: backups = [] # Get all .sql files in backup directory if os.path.exists(self.backup_path): for filename in os.listdir(self.backup_path): if filename.endswith('.sql'): file_path = os.path.join(self.backup_path, filename) file_stat = os.stat(file_path) backups.append({ 'filename': filename, 'size': file_stat.st_size, 'size_mb': f"{file_stat.st_size / (1024 * 1024):.2f}", 'created': datetime.fromtimestamp(file_stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S'), 'timestamp': file_stat.st_ctime }) # Sort by timestamp (newest first) backups.sort(key=lambda x: x['timestamp'], reverse=True) return backups except Exception as e: print(f"Error listing backups: {e}") return [] def delete_backup(self, filename): """ Delete a backup file Args: filename (str): Name of the backup file to delete Returns: dict: Result with success status and message """ try: # Security: ensure filename doesn't contain path traversal if '..' in filename or '/' in filename: return { 'success': False, 'message': 'Invalid filename' } file_path = os.path.join(self.backup_path, filename) if os.path.exists(file_path): os.remove(file_path) # Update metadata self._remove_backup_metadata(filename) return { 'success': True, 'message': f'Backup {filename} deleted successfully' } else: return { 'success': False, 'message': 'Backup file not found' } except Exception as e: print(f"Error deleting backup: {e}") return { 'success': False, 'message': f'Failed to delete backup: {str(e)}' } def _remove_backup_metadata(self, filename): """Remove metadata entry for deleted backup""" try: metadata_file = os.path.join(self.backup_path, 'backups_metadata.json') if os.path.exists(metadata_file): with open(metadata_file, 'r') as f: metadata = json.load(f) # Filter out the deleted backup metadata = [m for m in metadata if m['filename'] != filename] with open(metadata_file, 'w') as f: json.dump(metadata, f, indent=2) except Exception as e: print(f"Error removing backup metadata: {e}") def restore_backup(self, filename): """ Restore database from a backup file Args: filename (str): Name of the backup file to restore Returns: dict: Result with success status and message """ try: # Security: ensure filename doesn't contain path traversal if '..' in filename or '/' in filename: return { 'success': False, 'message': 'Invalid filename' } file_path = os.path.join(self.backup_path, filename) if not os.path.exists(file_path): return { 'success': False, 'message': 'Backup file not found' } # Build mysql restore command cmd = [ 'mysql', f"--host={self.config['host']}", f"--port={self.config['port']}", f"--user={self.config['user']}", f"--password={self.config['password']}" ] # Execute mysql restore with open(file_path, 'r') as f: result = subprocess.run( cmd, stdin=f, stderr=subprocess.PIPE, text=True ) if result.returncode == 0: return { 'success': True, 'message': f'Database restored successfully from {filename}' } else: error_msg = result.stderr print(f"Restore error: {error_msg}") return { 'success': False, 'message': f'Restore failed: {error_msg}' } except Exception as e: print(f"Exception during restore: {e}") return { 'success': False, 'message': f'Restore failed: {str(e)}' } def get_backup_schedule(self): """Get current backup schedule configuration""" try: schedule_file = os.path.join(self.backup_path, 'backup_schedule.json') if os.path.exists(schedule_file): with open(schedule_file, 'r') as f: return json.load(f) # Default schedule return { 'enabled': False, 'time': '02:00', # 2 AM 'frequency': 'daily', # daily, weekly, monthly 'retention_days': 30 # Keep backups for 30 days } except Exception as e: print(f"Error loading backup schedule: {e}") return None def save_backup_schedule(self, schedule): """ Save backup schedule configuration Args: schedule (dict): Schedule configuration Returns: dict: Result with success status and message """ try: schedule_file = os.path.join(self.backup_path, 'backup_schedule.json') with open(schedule_file, 'w') as f: json.dump(schedule, f, indent=2) return { 'success': True, 'message': 'Backup schedule saved successfully' } except Exception as e: print(f"Error saving backup schedule: {e}") return { 'success': False, 'message': f'Failed to save schedule: {str(e)}' } def validate_backup_file(self, filename): """ Validate uploaded backup file for integrity and compatibility Checks: - File exists and is readable - File contains valid SQL syntax - File contains expected database structure (users table, etc.) - File size is reasonable - No malicious commands (DROP statements outside of backup context) Args: filename (str): Name of the backup file to validate Returns: dict: Validation result with success status, message, and details """ try: # Security: ensure filename doesn't contain path traversal if '..' in filename or '/' in filename: return { 'success': False, 'message': 'Invalid filename - potential security issue', 'details': {} } file_path = os.path.join(self.backup_path, filename) # Check if file exists if not os.path.exists(file_path): return { 'success': False, 'message': 'Backup file not found', 'details': {} } # Check file size (warn if too small or too large) file_size = os.path.getsize(file_path) size_mb = round(file_size / (1024 * 1024), 2) if file_size < 1024: # Less than 1KB is suspicious return { 'success': False, 'message': 'File too small - may be empty or corrupted', 'details': {'size_mb': size_mb} } # For very large files (>2GB), skip detailed validation to avoid timeouts # Just do basic checks if file_size > 2 * 1024 * 1024 * 1024: # Over 2GB return { 'success': True, 'message': f'Large backup file accepted ({size_mb:.2f} MB) - detailed validation skipped for performance', 'details': { 'size_mb': size_mb, 'validation_skipped': True, 'reason': 'File too large for line-by-line validation' }, 'warnings': ['Detailed content validation skipped due to large file size'] } # Read and validate SQL content (only for files < 2GB) validation_details = { 'size_mb': size_mb, 'has_create_database': False, 'has_users_table': False, 'has_insert_statements': False, 'suspicious_commands': [], 'line_count': 0 } # For large files (100MB - 2GB), only read first 10MB for validation max_bytes_to_read = 10 * 1024 * 1024 if file_size > 100 * 1024 * 1024 else None bytes_read = 0 with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content_preview = [] line_count = 0 for line in f: line_count += 1 bytes_read += len(line.encode('utf-8')) # Stop reading after max_bytes for large files if max_bytes_to_read and bytes_read > max_bytes_to_read: validation_details['partial_validation'] = True validation_details['bytes_validated'] = f'{bytes_read / (1024*1024):.2f} MB' break line_upper = line.strip().upper() # Store first 10 non-comment lines for preview if len(content_preview) < 10 and line_upper and not line_upper.startswith('--') and not line_upper.startswith('/*'): content_preview.append(line.strip()[:100]) # First 100 chars # Check for expected SQL commands if 'CREATE DATABASE' in line_upper or 'CREATE SCHEMA' in line_upper: validation_details['has_create_database'] = True if 'CREATE TABLE' in line_upper and 'USERS' in line_upper: validation_details['has_users_table'] = True if line_upper.startswith('INSERT INTO'): validation_details['has_insert_statements'] = True # Check for potentially dangerous commands (outside of normal backup context) if 'DROP DATABASE' in line_upper and 'IF EXISTS' not in line_upper: validation_details['suspicious_commands'].append('Unconditional DROP DATABASE found') if 'TRUNCATE TABLE' in line_upper: validation_details['suspicious_commands'].append('TRUNCATE TABLE found') # Check for very long lines (potential binary data) if len(line) > 50000: validation_details['suspicious_commands'].append('Very long lines detected (possible binary data)') break validation_details['line_count'] = line_count validation_details['preview'] = content_preview[:5] # First 5 lines # Evaluate validation results issues = [] warnings = [] if not validation_details['has_insert_statements']: warnings.append('No INSERT statements found - backup may be empty') if not validation_details['has_users_table']: warnings.append('Users table not found - may not be compatible with this application') if validation_details['suspicious_commands']: issues.extend(validation_details['suspicious_commands']) if validation_details['line_count'] < 10: issues.append('Too few lines - file may be incomplete') # Final validation decision if issues: return { 'success': False, 'message': f'Validation failed: {"; ".join(issues)}', 'details': validation_details, 'warnings': warnings } if warnings: return { 'success': True, 'message': 'Validation passed with warnings', 'details': validation_details, 'warnings': warnings } return { 'success': True, 'message': 'Backup file validated successfully', 'details': validation_details, 'warnings': [] } except UnicodeDecodeError as e: return { 'success': False, 'message': 'File contains invalid characters - may be corrupted or not a text file', 'details': {'error': str(e)} } except Exception as e: print(f"Error validating backup file: {e}") return { 'success': False, 'message': f'Validation error: {str(e)}', 'details': {} } def cleanup_old_backups(self, retention_days=30): """ Delete backups older than retention_days Args: retention_days (int): Number of days to keep backups Returns: dict: Result with count of deleted backups """ try: deleted_count = 0 cutoff_time = datetime.now() - timedelta(days=retention_days) if os.path.exists(self.backup_path): for filename in os.listdir(self.backup_path): if filename.endswith('.sql'): file_path = os.path.join(self.backup_path, filename) file_time = datetime.fromtimestamp(os.path.getctime(file_path)) if file_time < cutoff_time: os.remove(file_path) self._remove_backup_metadata(filename) deleted_count += 1 print(f"Deleted old backup: {filename}") return { 'success': True, 'deleted_count': deleted_count, 'message': f'Cleaned up {deleted_count} old backup(s)' } except Exception as e: print(f"Error cleaning up old backups: {e}") return { 'success': False, 'message': f'Cleanup failed: {str(e)}' }