Files
quality_app/py_app/app/database_backup.py
2025-11-05 21:25:02 +02:00

934 lines
35 KiB
Python

"""
Database Backup Management Module
Quality Recticel Application
This module provides functionality for backing up and restoring the MariaDB database,
including scheduled backups, manual backups, and backup file management.
"""
import os
import subprocess
import json
from datetime import datetime, timedelta
from pathlib import Path
import configparser
from flask import current_app
import mariadb
class DatabaseBackupManager:
"""Manages database backup operations"""
def __init__(self):
"""Initialize the backup manager with configuration from external_server.conf"""
self.config = self._load_database_config()
self.backup_path = self._get_backup_path()
self._ensure_backup_directory()
def _load_database_config(self):
"""Load database configuration from external_server.conf"""
try:
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
config = {}
if os.path.exists(settings_file):
with open(settings_file, 'r') as f:
for line in f:
if '=' in line:
key, value = line.strip().split('=', 1)
config[key] = value
return {
'host': config.get('server_domain', 'localhost'),
'port': config.get('port', '3306'),
'database': config.get('database_name', 'trasabilitate'),
'user': config.get('username', 'trasabilitate'),
'password': config.get('password', '')
}
except Exception as e:
print(f"Error loading database config: {e}")
return None
def _get_backup_path(self):
"""Get backup path from environment or use default"""
# Check environment variable (set in docker-compose)
backup_path = os.environ.get('BACKUP_PATH', '/srv/quality_app/backups')
# Check if custom path is set in config
try:
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
if os.path.exists(settings_file):
with open(settings_file, 'r') as f:
for line in f:
if line.startswith('backup_path='):
backup_path = line.strip().split('=', 1)[1]
break
except Exception as e:
print(f"Error reading backup path from config: {e}")
return backup_path
def _ensure_backup_directory(self):
"""Ensure backup directory exists"""
try:
Path(self.backup_path).mkdir(parents=True, exist_ok=True)
print(f"Backup directory ensured: {self.backup_path}")
except Exception as e:
print(f"Error creating backup directory: {e}")
def create_backup(self, backup_name=None):
"""
Create a complete backup of the database
Args:
backup_name (str, optional): Custom name for the backup file
Returns:
dict: Result with success status, message, and backup file path
"""
try:
if not self.config:
return {
'success': False,
'message': 'Database configuration not loaded'
}
# Generate backup filename
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if backup_name:
filename = f"{backup_name}_{timestamp}.sql"
else:
filename = f"backup_{self.config['database']}_{timestamp}.sql"
backup_file = os.path.join(self.backup_path, filename)
# Build mysqldump command
# Note: --skip-lock-tables and --force help with views that have permission issues
cmd = [
'mysqldump',
f"--host={self.config['host']}",
f"--port={self.config['port']}",
f"--user={self.config['user']}",
f"--password={self.config['password']}",
'--single-transaction',
'--skip-lock-tables',
'--force',
'--routines',
'--triggers',
'--events',
'--add-drop-database',
'--databases',
self.config['database']
]
# Execute mysqldump and save to file
with open(backup_file, 'w') as f:
result = subprocess.run(
cmd,
stdout=f,
stderr=subprocess.PIPE,
text=True
)
if result.returncode == 0:
# Get file size
file_size = os.path.getsize(backup_file)
file_size_mb = file_size / (1024 * 1024)
# Save backup metadata
self._save_backup_metadata(filename, file_size)
return {
'success': True,
'message': f'Backup created successfully',
'filename': filename,
'file_path': backup_file,
'size': f"{file_size_mb:.2f} MB",
'timestamp': timestamp
}
else:
error_msg = result.stderr
print(f"Backup error: {error_msg}")
return {
'success': False,
'message': f'Backup failed: {error_msg}'
}
except Exception as e:
print(f"Exception during backup: {e}")
return {
'success': False,
'message': f'Backup failed: {str(e)}'
}
def _save_backup_metadata(self, filename, file_size):
"""Save metadata about the backup"""
try:
metadata_file = os.path.join(self.backup_path, 'backups_metadata.json')
# Load existing metadata
metadata = []
if os.path.exists(metadata_file):
with open(metadata_file, 'r') as f:
metadata = json.load(f)
# Add new backup metadata
metadata.append({
'filename': filename,
'size': file_size,
'timestamp': datetime.now().isoformat(),
'database': self.config['database']
})
# Save updated metadata
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=2)
except Exception as e:
print(f"Error saving backup metadata: {e}")
def list_backups(self):
"""
List all available backups
Returns:
list: List of backup information dictionaries
"""
try:
backups = []
# Get all .sql files in backup directory
if os.path.exists(self.backup_path):
for filename in os.listdir(self.backup_path):
if filename.endswith('.sql'):
file_path = os.path.join(self.backup_path, filename)
file_stat = os.stat(file_path)
backups.append({
'filename': filename,
'size': file_stat.st_size,
'size_mb': f"{file_stat.st_size / (1024 * 1024):.2f}",
'created': datetime.fromtimestamp(file_stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S'),
'timestamp': file_stat.st_ctime
})
# Sort by timestamp (newest first)
backups.sort(key=lambda x: x['timestamp'], reverse=True)
return backups
except Exception as e:
print(f"Error listing backups: {e}")
return []
def delete_backup(self, filename):
"""
Delete a backup file
Args:
filename (str): Name of the backup file to delete
Returns:
dict: Result with success status and message
"""
try:
# Security: ensure filename doesn't contain path traversal
if '..' in filename or '/' in filename:
return {
'success': False,
'message': 'Invalid filename'
}
file_path = os.path.join(self.backup_path, filename)
if os.path.exists(file_path):
os.remove(file_path)
# Update metadata
self._remove_backup_metadata(filename)
return {
'success': True,
'message': f'Backup {filename} deleted successfully'
}
else:
return {
'success': False,
'message': 'Backup file not found'
}
except Exception as e:
print(f"Error deleting backup: {e}")
return {
'success': False,
'message': f'Failed to delete backup: {str(e)}'
}
def _remove_backup_metadata(self, filename):
"""Remove metadata entry for deleted backup"""
try:
metadata_file = os.path.join(self.backup_path, 'backups_metadata.json')
if os.path.exists(metadata_file):
with open(metadata_file, 'r') as f:
metadata = json.load(f)
# Filter out the deleted backup
metadata = [m for m in metadata if m['filename'] != filename]
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=2)
except Exception as e:
print(f"Error removing backup metadata: {e}")
def create_data_only_backup(self, backup_name=None):
"""
Create a data-only backup (no schema, triggers, or structure)
Only exports INSERT statements for existing tables
Args:
backup_name (str, optional): Custom name for the backup file
Returns:
dict: Result with success status, message, and backup file path
"""
try:
if not self.config:
return {
'success': False,
'message': 'Database configuration not loaded'
}
# Generate backup filename with data_only prefix
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if backup_name:
filename = f"data_only_{backup_name}_{timestamp}.sql"
else:
filename = f"data_only_{self.config['database']}_{timestamp}.sql"
backup_file = os.path.join(self.backup_path, filename)
# Build mysqldump command for data only
# --no-create-info: Skip CREATE TABLE statements
# --skip-triggers: Skip trigger definitions
# --no-create-db: Skip CREATE DATABASE statement
# --complete-insert: Include column names in INSERT (more reliable)
# --extended-insert: Use multi-row INSERT for efficiency
cmd = [
'mysqldump',
f"--host={self.config['host']}",
f"--port={self.config['port']}",
f"--user={self.config['user']}",
f"--password={self.config['password']}",
'--no-create-info', # Skip table structure
'--skip-triggers', # Skip triggers
'--no-create-db', # Skip database creation
'--complete-insert', # Include column names
'--extended-insert', # Multi-row INSERTs
'--single-transaction',
'--skip-lock-tables',
self.config['database']
]
# Execute mysqldump and save to file
with open(backup_file, 'w') as f:
result = subprocess.run(
cmd,
stdout=f,
stderr=subprocess.PIPE,
text=True
)
if result.returncode == 0:
# Get file size
file_size = os.path.getsize(backup_file)
file_size_mb = file_size / (1024 * 1024)
# Save backup metadata
self._save_backup_metadata(filename, file_size)
return {
'success': True,
'message': f'Data-only backup created successfully',
'filename': filename,
'file_path': backup_file,
'size': f"{file_size_mb:.2f} MB",
'timestamp': timestamp
}
else:
error_msg = result.stderr
print(f"Data backup error: {error_msg}")
return {
'success': False,
'message': f'Data backup failed: {error_msg}'
}
except Exception as e:
print(f"Exception during data backup: {e}")
return {
'success': False,
'message': f'Data backup failed: {str(e)}'
}
def restore_backup(self, filename):
"""
Restore database from a backup file
Args:
filename (str): Name of the backup file to restore
Returns:
dict: Result with success status and message
"""
try:
# Security: ensure filename doesn't contain path traversal
if '..' in filename or '/' in filename:
return {
'success': False,
'message': 'Invalid filename'
}
file_path = os.path.join(self.backup_path, filename)
if not os.path.exists(file_path):
return {
'success': False,
'message': 'Backup file not found'
}
# Build mysql restore command
cmd = [
'mysql',
f"--host={self.config['host']}",
f"--port={self.config['port']}",
f"--user={self.config['user']}",
f"--password={self.config['password']}"
]
# Execute mysql restore
with open(file_path, 'r') as f:
result = subprocess.run(
cmd,
stdin=f,
stderr=subprocess.PIPE,
text=True
)
if result.returncode == 0:
return {
'success': True,
'message': f'Database restored successfully from {filename}'
}
else:
error_msg = result.stderr
print(f"Restore error: {error_msg}")
return {
'success': False,
'message': f'Restore failed: {error_msg}'
}
except Exception as e:
print(f"Exception during restore: {e}")
return {
'success': False,
'message': f'Restore failed: {str(e)}'
}
def restore_data_only(self, filename):
"""
Restore data from a data-only backup file
Assumes database schema already exists
Truncates tables before inserting data to avoid duplicates
Args:
filename (str): Name of the data-only backup file to restore
Returns:
dict: Result with success status and message
"""
try:
# Security: ensure filename doesn't contain path traversal
if '..' in filename or '/' in filename:
return {
'success': False,
'message': 'Invalid filename'
}
file_path = os.path.join(self.backup_path, filename)
if not os.path.exists(file_path):
return {
'success': False,
'message': 'Backup file not found'
}
# First, disable foreign key checks and truncate all tables
# This ensures clean data import without constraint violations
try:
conn = mariadb.connect(
host=self.config['host'],
port=int(self.config['port']),
user=self.config['user'],
password=self.config['password'],
database=self.config['database']
)
cursor = conn.cursor()
# Disable foreign key checks
cursor.execute("SET FOREIGN_KEY_CHECKS = 0;")
# Get list of all tables in the database
cursor.execute("SHOW TABLES;")
tables = cursor.fetchall()
# Truncate each table (except system tables)
for (table_name,) in tables:
# Skip metadata and system tables
if table_name not in ['backups_metadata', 'backup_schedule']:
try:
cursor.execute(f"TRUNCATE TABLE `{table_name}`;")
print(f"Truncated table: {table_name}")
except Exception as e:
print(f"Warning: Could not truncate {table_name}: {e}")
conn.commit()
cursor.close()
conn.close()
except Exception as e:
print(f"Warning during table truncation: {e}")
# Continue anyway - the restore might still work
# Build mysql restore command for data
cmd = [
'mysql',
f"--host={self.config['host']}",
f"--port={self.config['port']}",
f"--user={self.config['user']}",
f"--password={self.config['password']}",
self.config['database']
]
# Execute mysql restore
with open(file_path, 'r') as f:
result = subprocess.run(
cmd,
stdin=f,
stderr=subprocess.PIPE,
text=True
)
# Re-enable foreign key checks
try:
conn = mariadb.connect(
host=self.config['host'],
port=int(self.config['port']),
user=self.config['user'],
password=self.config['password'],
database=self.config['database']
)
cursor = conn.cursor()
cursor.execute("SET FOREIGN_KEY_CHECKS = 1;")
conn.commit()
cursor.close()
conn.close()
except Exception as e:
print(f"Warning: Could not re-enable foreign key checks: {e}")
if result.returncode == 0:
return {
'success': True,
'message': f'Data restored successfully from {filename}'
}
else:
error_msg = result.stderr
print(f"Data restore error: {error_msg}")
return {
'success': False,
'message': f'Data restore failed: {error_msg}'
}
except Exception as e:
print(f"Exception during data restore: {e}")
return {
'success': False,
'message': f'Data restore failed: {str(e)}'
}
def get_backup_schedule(self):
"""Get current backup schedule configuration"""
try:
schedule_file = os.path.join(self.backup_path, 'backup_schedule.json')
if os.path.exists(schedule_file):
with open(schedule_file, 'r') as f:
schedule = json.load(f)
# Ensure backup_type exists (for backward compatibility)
if 'backup_type' not in schedule:
schedule['backup_type'] = 'full'
return schedule
# Default schedule
return {
'enabled': False,
'time': '02:00', # 2 AM
'frequency': 'daily', # daily, weekly, monthly
'backup_type': 'full', # full or data-only
'retention_days': 30 # Keep backups for 30 days
}
except Exception as e:
print(f"Error loading backup schedule: {e}")
return None
def save_backup_schedule(self, schedule):
"""
Save backup schedule configuration
Args:
schedule (dict): Schedule configuration
Returns:
dict: Result with success status and message
"""
try:
schedule_file = os.path.join(self.backup_path, 'backup_schedule.json')
with open(schedule_file, 'w') as f:
json.dump(schedule, f, indent=2)
return {
'success': True,
'message': 'Backup schedule saved successfully'
}
except Exception as e:
print(f"Error saving backup schedule: {e}")
return {
'success': False,
'message': f'Failed to save schedule: {str(e)}'
}
def validate_backup_file(self, filename):
"""
Validate uploaded backup file for integrity and compatibility
Checks:
- File exists and is readable
- File contains valid SQL syntax
- File contains expected database structure (users table, etc.)
- File size is reasonable
- No malicious commands (DROP statements outside of backup context)
Args:
filename (str): Name of the backup file to validate
Returns:
dict: Validation result with success status, message, and details
"""
try:
# Security: ensure filename doesn't contain path traversal
if '..' in filename or '/' in filename:
return {
'success': False,
'message': 'Invalid filename - potential security issue',
'details': {}
}
file_path = os.path.join(self.backup_path, filename)
# Check if file exists
if not os.path.exists(file_path):
return {
'success': False,
'message': 'Backup file not found',
'details': {}
}
# Check file size (warn if too small or too large)
file_size = os.path.getsize(file_path)
size_mb = round(file_size / (1024 * 1024), 2)
if file_size < 1024: # Less than 1KB is suspicious
return {
'success': False,
'message': 'File too small - may be empty or corrupted',
'details': {'size_mb': size_mb}
}
# For very large files (>2GB), skip detailed validation to avoid timeouts
# Just do basic checks
if file_size > 2 * 1024 * 1024 * 1024: # Over 2GB
return {
'success': True,
'message': f'Large backup file accepted ({size_mb:.2f} MB) - detailed validation skipped for performance',
'details': {
'size_mb': size_mb,
'validation_skipped': True,
'reason': 'File too large for line-by-line validation'
},
'warnings': ['Detailed content validation skipped due to large file size']
}
# Read and validate SQL content (only for files < 2GB)
validation_details = {
'size_mb': size_mb,
'has_create_database': False,
'has_users_table': False,
'has_insert_statements': False,
'suspicious_commands': [],
'line_count': 0
}
# For large files (100MB - 2GB), only read first 10MB for validation
max_bytes_to_read = 10 * 1024 * 1024 if file_size > 100 * 1024 * 1024 else None
bytes_read = 0
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content_preview = []
line_count = 0
for line in f:
line_count += 1
bytes_read += len(line.encode('utf-8'))
# Stop reading after max_bytes for large files
if max_bytes_to_read and bytes_read > max_bytes_to_read:
validation_details['partial_validation'] = True
validation_details['bytes_validated'] = f'{bytes_read / (1024*1024):.2f} MB'
break
line_upper = line.strip().upper()
# Store first 10 non-comment lines for preview
if len(content_preview) < 10 and line_upper and not line_upper.startswith('--') and not line_upper.startswith('/*'):
content_preview.append(line.strip()[:100]) # First 100 chars
# Check for expected SQL commands
if 'CREATE DATABASE' in line_upper or 'CREATE SCHEMA' in line_upper:
validation_details['has_create_database'] = True
if 'CREATE TABLE' in line_upper and 'USERS' in line_upper:
validation_details['has_users_table'] = True
if line_upper.startswith('INSERT INTO'):
validation_details['has_insert_statements'] = True
# Check for potentially dangerous commands (outside of normal backup context)
if 'DROP DATABASE' in line_upper and 'IF EXISTS' not in line_upper:
validation_details['suspicious_commands'].append('Unconditional DROP DATABASE found')
if 'TRUNCATE TABLE' in line_upper:
validation_details['suspicious_commands'].append('TRUNCATE TABLE found')
# Check for very long lines (potential binary data)
if len(line) > 50000:
validation_details['suspicious_commands'].append('Very long lines detected (possible binary data)')
break
validation_details['line_count'] = line_count
validation_details['preview'] = content_preview[:5] # First 5 lines
# Evaluate validation results
issues = []
warnings = []
if not validation_details['has_insert_statements']:
warnings.append('No INSERT statements found - backup may be empty')
if not validation_details['has_users_table']:
warnings.append('Users table not found - may not be compatible with this application')
if validation_details['suspicious_commands']:
issues.extend(validation_details['suspicious_commands'])
if validation_details['line_count'] < 10:
issues.append('Too few lines - file may be incomplete')
# Final validation decision
if issues:
return {
'success': False,
'message': f'Validation failed: {"; ".join(issues)}',
'details': validation_details,
'warnings': warnings
}
if warnings:
return {
'success': True,
'message': 'Validation passed with warnings',
'details': validation_details,
'warnings': warnings
}
return {
'success': True,
'message': 'Backup file validated successfully',
'details': validation_details,
'warnings': []
}
except UnicodeDecodeError as e:
return {
'success': False,
'message': 'File contains invalid characters - may be corrupted or not a text file',
'details': {'error': str(e)}
}
except Exception as e:
print(f"Error validating backup file: {e}")
return {
'success': False,
'message': f'Validation error: {str(e)}',
'details': {}
}
def cleanup_old_backups(self, retention_days=30):
"""
Delete backups older than retention_days
Args:
retention_days (int): Number of days to keep backups
Returns:
dict: Result with count of deleted backups
"""
try:
deleted_count = 0
cutoff_time = datetime.now() - timedelta(days=retention_days)
if os.path.exists(self.backup_path):
for filename in os.listdir(self.backup_path):
if filename.endswith('.sql'):
file_path = os.path.join(self.backup_path, filename)
file_time = datetime.fromtimestamp(os.path.getctime(file_path))
if file_time < cutoff_time:
os.remove(file_path)
self._remove_backup_metadata(filename)
deleted_count += 1
print(f"Deleted old backup: {filename}")
return {
'success': True,
'deleted_count': deleted_count,
'message': f'Cleaned up {deleted_count} old backup(s)'
}
except Exception as e:
print(f"Error cleaning up old backups: {e}")
return {
'success': False,
'message': f'Cleanup failed: {str(e)}'
}
def upload_backup(self, uploaded_file):
"""
Upload and validate an external backup file
Args:
uploaded_file: Werkzeug FileStorage object from request.files
Returns:
dict: Result with success status, filename, and validation details
"""
try:
from werkzeug.utils import secure_filename
from pathlib import Path
# Validate file extension
if not uploaded_file.filename.lower().endswith('.sql'):
return {
'success': False,
'message': 'Invalid file format. Only .sql files are allowed.'
}
# Ensure backup_path is a Path object
backup_path = Path(self.backup_path)
backup_path.mkdir(parents=True, exist_ok=True)
# Generate secure filename with timestamp to avoid conflicts
original_filename = secure_filename(uploaded_file.filename)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# If filename already starts with "backup_", keep it; otherwise add prefix
if original_filename.startswith('backup_'):
new_filename = f"{original_filename.rsplit('.', 1)[0]}_{timestamp}.sql"
else:
new_filename = f"backup_uploaded_{timestamp}_{original_filename}"
# Save file to backup directory
file_path = backup_path / new_filename
uploaded_file.save(str(file_path))
# Get file size
file_size = file_path.stat().st_size
size_mb = round(file_size / (1024 * 1024), 2)
# Validate the uploaded file for integrity and compatibility
validation_result = self.validate_backup_file(new_filename)
if not validation_result['success']:
# Validation failed - remove the uploaded file
file_path.unlink() # Delete the invalid file
return {
'success': False,
'message': f'Validation failed: {validation_result["message"]}',
'validation_details': validation_result.get('details', {}),
'warnings': validation_result.get('warnings', [])
}
# Build response with validation details
response = {
'success': True,
'message': 'Backup file uploaded and validated successfully',
'filename': new_filename,
'size': f'{size_mb} MB',
'path': str(file_path),
'validation': {
'status': 'passed',
'message': validation_result['message'],
'details': validation_result.get('details', {}),
'warnings': validation_result.get('warnings', [])
}
}
# Add warning flag if there are warnings
if validation_result.get('warnings'):
response['message'] = f'Backup uploaded with warnings: {"; ".join(validation_result["warnings"])}'
# Save metadata
self._save_backup_metadata(new_filename, file_size)
return response
except Exception as e:
print(f"Error uploading backup: {e}")
return {
'success': False,
'message': f'Upload failed: {str(e)}'
}
def get_backup_file_path(self, filename):
"""
Get the full path to a backup file (with security validation)
Args:
filename (str): Name of the backup file
Returns:
str or None: Full file path if valid, None if security check fails
"""
# Security: ensure filename doesn't contain path traversal
if '..' in filename or '/' in filename:
return None
file_path = os.path.join(self.backup_path, filename)
if os.path.exists(file_path):
return file_path
return None