- Fix external_server.conf parsing to skip comment lines and empty lines - Update routes.py get_db_connection() to handle comments - Update settings.py get_external_db_connection() to handle comments - Improve restore_backup() to use mariadb command instead of Python parsing - Remove SQLite database creation (MariaDB only) - Add environment detection for dump command (mariadb-dump vs mysqldump) - Add conditional SSL flag based on Docker environment - Fix database restore to handle MariaDB sandbox mode comments
1000 lines
38 KiB
Python
1000 lines
38 KiB
Python
"""
|
|
Database Backup Management Module
|
|
Quality Recticel Application
|
|
|
|
This module provides functionality for backing up and restoring the MariaDB database,
|
|
including scheduled backups, manual backups, and backup file management.
|
|
"""
|
|
|
|
import os
|
|
import subprocess
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
import configparser
|
|
from flask import current_app
|
|
import mariadb
|
|
|
|
class DatabaseBackupManager:
|
|
"""Manages database backup operations"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the backup manager with configuration from external_server.conf"""
|
|
self.config = self._load_database_config()
|
|
self.backup_path = self._get_backup_path()
|
|
self._ensure_backup_directory()
|
|
self.dump_command = self._detect_dump_command()
|
|
|
|
def _detect_dump_command(self):
|
|
"""Detect which mysqldump command is available (mariadb-dump or mysqldump)"""
|
|
try:
|
|
# Try mariadb-dump first (newer MariaDB versions)
|
|
result = subprocess.run(['which', 'mariadb-dump'],
|
|
capture_output=True, text=True)
|
|
if result.returncode == 0:
|
|
return 'mariadb-dump'
|
|
|
|
# Fall back to mysqldump
|
|
result = subprocess.run(['which', 'mysqldump'],
|
|
capture_output=True, text=True)
|
|
if result.returncode == 0:
|
|
return 'mysqldump'
|
|
|
|
# Default to mariadb-dump (will error if not available)
|
|
return 'mariadb-dump'
|
|
except Exception as e:
|
|
print(f"Warning: Could not detect dump command: {e}")
|
|
return 'mysqldump' # Default fallback
|
|
|
|
def _get_ssl_args(self):
|
|
"""Get SSL arguments based on environment (Docker needs --skip-ssl)"""
|
|
# Check if running in Docker container
|
|
if os.path.exists('/.dockerenv') or os.environ.get('DOCKER_CONTAINER'):
|
|
return ['--skip-ssl']
|
|
return []
|
|
|
|
def _load_database_config(self):
|
|
"""Load database configuration from external_server.conf"""
|
|
try:
|
|
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
|
|
config = {}
|
|
|
|
if os.path.exists(settings_file):
|
|
with open(settings_file, 'r') as f:
|
|
for line in f:
|
|
if '=' in line:
|
|
key, value = line.strip().split('=', 1)
|
|
config[key] = value
|
|
|
|
return {
|
|
'host': config.get('server_domain', 'localhost'),
|
|
'port': config.get('port', '3306'),
|
|
'database': config.get('database_name', 'trasabilitate'),
|
|
'user': config.get('username', 'trasabilitate'),
|
|
'password': config.get('password', '')
|
|
}
|
|
except Exception as e:
|
|
print(f"Error loading database config: {e}")
|
|
return None
|
|
|
|
def _get_backup_path(self):
|
|
"""Get backup path from environment or use default"""
|
|
# Check environment variable (set in docker-compose)
|
|
backup_path = os.environ.get('BACKUP_PATH', '/srv/quality_app/backups')
|
|
|
|
# Check if custom path is set in config
|
|
try:
|
|
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
|
|
if os.path.exists(settings_file):
|
|
with open(settings_file, 'r') as f:
|
|
for line in f:
|
|
if line.startswith('backup_path='):
|
|
backup_path = line.strip().split('=', 1)[1]
|
|
break
|
|
except Exception as e:
|
|
print(f"Error reading backup path from config: {e}")
|
|
|
|
return backup_path
|
|
|
|
def _ensure_backup_directory(self):
|
|
"""Ensure backup directory exists"""
|
|
try:
|
|
Path(self.backup_path).mkdir(parents=True, exist_ok=True)
|
|
print(f"Backup directory ensured: {self.backup_path}")
|
|
except Exception as e:
|
|
print(f"Error creating backup directory: {e}")
|
|
|
|
def create_backup(self, backup_name=None):
|
|
"""
|
|
Create a complete backup of the database
|
|
|
|
Args:
|
|
backup_name (str, optional): Custom name for the backup file
|
|
|
|
Returns:
|
|
dict: Result with success status, message, and backup file path
|
|
"""
|
|
try:
|
|
if not self.config:
|
|
return {
|
|
'success': False,
|
|
'message': 'Database configuration not loaded'
|
|
}
|
|
|
|
# Generate backup filename
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
if backup_name:
|
|
filename = f"{backup_name}_{timestamp}.sql"
|
|
else:
|
|
filename = f"backup_{self.config['database']}_{timestamp}.sql"
|
|
|
|
backup_file = os.path.join(self.backup_path, filename)
|
|
|
|
# Build mysqldump command
|
|
# Note: --skip-lock-tables and --force help with views that have permission issues
|
|
cmd = [
|
|
self.dump_command,
|
|
f"--host={self.config['host']}",
|
|
f"--port={self.config['port']}",
|
|
f"--user={self.config['user']}",
|
|
f"--password={self.config['password']}",
|
|
]
|
|
|
|
# Add SSL args if needed (Docker environment)
|
|
cmd.extend(self._get_ssl_args())
|
|
|
|
# Add backup options
|
|
cmd.extend([
|
|
'--single-transaction',
|
|
'--skip-lock-tables',
|
|
'--force',
|
|
'--routines',
|
|
'--triggers',
|
|
'--events',
|
|
'--add-drop-database',
|
|
'--databases',
|
|
self.config['database']
|
|
])
|
|
|
|
# Execute mysqldump and save to file
|
|
with open(backup_file, 'w') as f:
|
|
result = subprocess.run(
|
|
cmd,
|
|
stdout=f,
|
|
stderr=subprocess.PIPE,
|
|
text=True
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
# Get file size
|
|
file_size = os.path.getsize(backup_file)
|
|
file_size_mb = file_size / (1024 * 1024)
|
|
|
|
# Save backup metadata
|
|
self._save_backup_metadata(filename, file_size)
|
|
|
|
return {
|
|
'success': True,
|
|
'message': f'Backup created successfully',
|
|
'filename': filename,
|
|
'file_path': backup_file,
|
|
'size': f"{file_size_mb:.2f} MB",
|
|
'timestamp': timestamp
|
|
}
|
|
else:
|
|
error_msg = result.stderr
|
|
print(f"Backup error: {error_msg}")
|
|
return {
|
|
'success': False,
|
|
'message': f'Backup failed: {error_msg}'
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Exception during backup: {e}")
|
|
return {
|
|
'success': False,
|
|
'message': f'Backup failed: {str(e)}'
|
|
}
|
|
|
|
def _save_backup_metadata(self, filename, file_size):
|
|
"""Save metadata about the backup"""
|
|
try:
|
|
metadata_file = os.path.join(self.backup_path, 'backups_metadata.json')
|
|
|
|
# Load existing metadata
|
|
metadata = []
|
|
if os.path.exists(metadata_file):
|
|
with open(metadata_file, 'r') as f:
|
|
metadata = json.load(f)
|
|
|
|
# Add new backup metadata
|
|
metadata.append({
|
|
'filename': filename,
|
|
'size': file_size,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'database': self.config['database']
|
|
})
|
|
|
|
# Save updated metadata
|
|
with open(metadata_file, 'w') as f:
|
|
json.dump(metadata, f, indent=2)
|
|
|
|
except Exception as e:
|
|
print(f"Error saving backup metadata: {e}")
|
|
|
|
def list_backups(self):
|
|
"""
|
|
List all available backups
|
|
|
|
Returns:
|
|
list: List of backup information dictionaries
|
|
"""
|
|
try:
|
|
backups = []
|
|
|
|
# Get all .sql files in backup directory
|
|
if os.path.exists(self.backup_path):
|
|
for filename in os.listdir(self.backup_path):
|
|
if filename.endswith('.sql'):
|
|
file_path = os.path.join(self.backup_path, filename)
|
|
file_stat = os.stat(file_path)
|
|
|
|
backups.append({
|
|
'filename': filename,
|
|
'size': file_stat.st_size,
|
|
'size_mb': f"{file_stat.st_size / (1024 * 1024):.2f}",
|
|
'created': datetime.fromtimestamp(file_stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S'),
|
|
'timestamp': file_stat.st_ctime
|
|
})
|
|
|
|
# Sort by timestamp (newest first)
|
|
backups.sort(key=lambda x: x['timestamp'], reverse=True)
|
|
|
|
return backups
|
|
|
|
except Exception as e:
|
|
print(f"Error listing backups: {e}")
|
|
return []
|
|
|
|
def delete_backup(self, filename):
|
|
"""
|
|
Delete a backup file
|
|
|
|
Args:
|
|
filename (str): Name of the backup file to delete
|
|
|
|
Returns:
|
|
dict: Result with success status and message
|
|
"""
|
|
try:
|
|
# Security: ensure filename doesn't contain path traversal
|
|
if '..' in filename or '/' in filename:
|
|
return {
|
|
'success': False,
|
|
'message': 'Invalid filename'
|
|
}
|
|
|
|
file_path = os.path.join(self.backup_path, filename)
|
|
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
|
|
# Update metadata
|
|
self._remove_backup_metadata(filename)
|
|
|
|
return {
|
|
'success': True,
|
|
'message': f'Backup {filename} deleted successfully'
|
|
}
|
|
else:
|
|
return {
|
|
'success': False,
|
|
'message': 'Backup file not found'
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error deleting backup: {e}")
|
|
return {
|
|
'success': False,
|
|
'message': f'Failed to delete backup: {str(e)}'
|
|
}
|
|
|
|
def _remove_backup_metadata(self, filename):
|
|
"""Remove metadata entry for deleted backup"""
|
|
try:
|
|
metadata_file = os.path.join(self.backup_path, 'backups_metadata.json')
|
|
|
|
if os.path.exists(metadata_file):
|
|
with open(metadata_file, 'r') as f:
|
|
metadata = json.load(f)
|
|
|
|
# Filter out the deleted backup
|
|
metadata = [m for m in metadata if m['filename'] != filename]
|
|
|
|
with open(metadata_file, 'w') as f:
|
|
json.dump(metadata, f, indent=2)
|
|
|
|
except Exception as e:
|
|
print(f"Error removing backup metadata: {e}")
|
|
|
|
def create_data_only_backup(self, backup_name=None):
|
|
"""
|
|
Create a data-only backup (no schema, triggers, or structure)
|
|
Only exports INSERT statements for existing tables
|
|
|
|
Args:
|
|
backup_name (str, optional): Custom name for the backup file
|
|
|
|
Returns:
|
|
dict: Result with success status, message, and backup file path
|
|
"""
|
|
try:
|
|
if not self.config:
|
|
return {
|
|
'success': False,
|
|
'message': 'Database configuration not loaded'
|
|
}
|
|
|
|
# Generate backup filename with data_only prefix
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
if backup_name:
|
|
filename = f"data_only_{backup_name}_{timestamp}.sql"
|
|
else:
|
|
filename = f"data_only_{self.config['database']}_{timestamp}.sql"
|
|
|
|
backup_file = os.path.join(self.backup_path, filename)
|
|
|
|
# Build mysqldump command for data only
|
|
# --no-create-info: Skip CREATE TABLE statements
|
|
# --skip-triggers: Skip trigger definitions
|
|
# --no-create-db: Skip CREATE DATABASE statement
|
|
# --complete-insert: Include column names in INSERT (more reliable)
|
|
# --extended-insert: Use multi-row INSERT for efficiency
|
|
cmd = [
|
|
self.dump_command,
|
|
f"--host={self.config['host']}",
|
|
f"--port={self.config['port']}",
|
|
f"--user={self.config['user']}",
|
|
f"--password={self.config['password']}",
|
|
]
|
|
|
|
# Add SSL args if needed (Docker environment)
|
|
cmd.extend(self._get_ssl_args())
|
|
|
|
# Add data-only backup options
|
|
cmd.extend([
|
|
'--no-create-info', # Skip table structure
|
|
'--skip-triggers', # Skip triggers
|
|
'--no-create-db', # Skip database creation
|
|
'--complete-insert', # Include column names
|
|
'--extended-insert', # Multi-row INSERTs
|
|
'--single-transaction',
|
|
'--skip-lock-tables',
|
|
self.config['database']
|
|
])
|
|
|
|
# Execute mysqldump and save to file
|
|
with open(backup_file, 'w') as f:
|
|
result = subprocess.run(
|
|
cmd,
|
|
stdout=f,
|
|
stderr=subprocess.PIPE,
|
|
text=True
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
# Get file size
|
|
file_size = os.path.getsize(backup_file)
|
|
file_size_mb = file_size / (1024 * 1024)
|
|
|
|
# Save backup metadata
|
|
self._save_backup_metadata(filename, file_size)
|
|
|
|
return {
|
|
'success': True,
|
|
'message': f'Data-only backup created successfully',
|
|
'filename': filename,
|
|
'file_path': backup_file,
|
|
'size': f"{file_size_mb:.2f} MB",
|
|
'timestamp': timestamp
|
|
}
|
|
else:
|
|
error_msg = result.stderr
|
|
print(f"Data backup error: {error_msg}")
|
|
return {
|
|
'success': False,
|
|
'message': f'Data backup failed: {error_msg}'
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Exception during data backup: {e}")
|
|
return {
|
|
'success': False,
|
|
'message': f'Data backup failed: {str(e)}'
|
|
}
|
|
|
|
def restore_backup(self, filename):
|
|
"""
|
|
Restore database from a backup file using mariadb command
|
|
|
|
Args:
|
|
filename (str): Name of the backup file to restore
|
|
|
|
Returns:
|
|
dict: Result with success status and message
|
|
"""
|
|
try:
|
|
# Security: ensure filename doesn't contain path traversal
|
|
if '..' in filename or '/' in filename:
|
|
return {
|
|
'success': False,
|
|
'message': 'Invalid filename'
|
|
}
|
|
|
|
file_path = os.path.join(self.backup_path, filename)
|
|
|
|
if not os.path.exists(file_path):
|
|
return {
|
|
'success': False,
|
|
'message': 'Backup file not found'
|
|
}
|
|
|
|
# Use mariadb command to restore (more reliable than Python parsing)
|
|
# First, read the file and skip problematic first line if needed
|
|
with open(file_path, 'r') as f:
|
|
sql_content = f.read()
|
|
|
|
# Remove problematic MariaDB sandbox mode comment from first line
|
|
lines = sql_content.split('\n')
|
|
if lines and lines[0].startswith('/*M!999999'):
|
|
sql_content = '\n'.join(lines[1:])
|
|
|
|
# Build mariadb restore command
|
|
cmd = [
|
|
'mariadb',
|
|
f"--host={self.config['host']}",
|
|
f"--port={self.config['port']}",
|
|
f"--user={self.config['user']}",
|
|
f"--password={self.config['password']}",
|
|
]
|
|
|
|
# Add SSL args if needed (Docker environment)
|
|
cmd.extend(self._get_ssl_args())
|
|
|
|
# Add database name
|
|
cmd.append(self.config['database'])
|
|
|
|
# Execute restore with stdin
|
|
result = subprocess.run(
|
|
cmd,
|
|
input=sql_content,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
return {
|
|
'success': True,
|
|
'message': f'Database restored successfully from {filename}'
|
|
}
|
|
else:
|
|
error_msg = result.stderr
|
|
print(f"Restore error: {error_msg}")
|
|
return {
|
|
'success': False,
|
|
'message': f'Restore failed: {error_msg}'
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Exception during restore: {e}")
|
|
return {
|
|
'success': False,
|
|
'message': f'Restore failed: {str(e)}'
|
|
}
|
|
|
|
def restore_data_only(self, filename):
|
|
"""
|
|
Restore data from a data-only backup file
|
|
Assumes database schema already exists
|
|
Truncates tables before inserting data to avoid duplicates
|
|
|
|
Args:
|
|
filename (str): Name of the data-only backup file to restore
|
|
|
|
Returns:
|
|
dict: Result with success status and message
|
|
"""
|
|
try:
|
|
# Security: ensure filename doesn't contain path traversal
|
|
if '..' in filename or '/' in filename:
|
|
return {
|
|
'success': False,
|
|
'message': 'Invalid filename'
|
|
}
|
|
|
|
file_path = os.path.join(self.backup_path, filename)
|
|
|
|
if not os.path.exists(file_path):
|
|
return {
|
|
'success': False,
|
|
'message': 'Backup file not found'
|
|
}
|
|
|
|
# First, disable foreign key checks and truncate all tables
|
|
# This ensures clean data import without constraint violations
|
|
try:
|
|
conn = mariadb.connect(
|
|
host=self.config['host'],
|
|
port=int(self.config['port']),
|
|
user=self.config['user'],
|
|
password=self.config['password'],
|
|
database=self.config['database']
|
|
)
|
|
cursor = conn.cursor()
|
|
|
|
# Disable foreign key checks
|
|
cursor.execute("SET FOREIGN_KEY_CHECKS = 0;")
|
|
|
|
# Get list of all tables in the database
|
|
cursor.execute("SHOW TABLES;")
|
|
tables = cursor.fetchall()
|
|
|
|
# Truncate each table (except system tables)
|
|
for (table_name,) in tables:
|
|
# Skip metadata and system tables
|
|
if table_name not in ['backups_metadata', 'backup_schedule']:
|
|
try:
|
|
cursor.execute(f"TRUNCATE TABLE `{table_name}`;")
|
|
print(f"Truncated table: {table_name}")
|
|
except Exception as e:
|
|
print(f"Warning: Could not truncate {table_name}: {e}")
|
|
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
except Exception as e:
|
|
print(f"Warning during table truncation: {e}")
|
|
# Continue anyway - the restore might still work
|
|
|
|
# Read and execute SQL file using Python mariadb library
|
|
with open(file_path, 'r') as f:
|
|
sql_content = f.read()
|
|
|
|
conn = mariadb.connect(
|
|
user=self.config['user'],
|
|
password=self.config['password'],
|
|
host=self.config['host'],
|
|
port=int(self.config['port']),
|
|
database=self.config['database']
|
|
)
|
|
|
|
cursor = conn.cursor()
|
|
statements = sql_content.split(';')
|
|
executed = 0
|
|
|
|
for statement in statements:
|
|
statement = statement.strip()
|
|
if statement:
|
|
try:
|
|
cursor.execute(statement)
|
|
executed += 1
|
|
except Exception as stmt_error:
|
|
print(f"Warning executing statement: {stmt_error}")
|
|
|
|
conn.commit()
|
|
result_success = True
|
|
result_returncode = 0
|
|
|
|
# Re-enable foreign key checks
|
|
try:
|
|
conn = mariadb.connect(
|
|
host=self.config['host'],
|
|
port=int(self.config['port']),
|
|
user=self.config['user'],
|
|
password=self.config['password'],
|
|
database=self.config['database']
|
|
)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SET FOREIGN_KEY_CHECKS = 1;")
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
except Exception as e:
|
|
print(f"Warning: Could not re-enable foreign key checks: {e}")
|
|
|
|
if result_success:
|
|
return {
|
|
'success': True,
|
|
'message': f'Data restored successfully from {filename}'
|
|
}
|
|
else:
|
|
return {
|
|
'success': False,
|
|
'message': f'Data restore failed'
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Exception during data restore: {e}")
|
|
return {
|
|
'success': False,
|
|
'message': f'Data restore failed: {str(e)}'
|
|
}
|
|
|
|
def get_backup_schedule(self):
|
|
"""Get current backup schedule configuration"""
|
|
try:
|
|
schedule_file = os.path.join(self.backup_path, 'backup_schedule.json')
|
|
|
|
if os.path.exists(schedule_file):
|
|
with open(schedule_file, 'r') as f:
|
|
schedule = json.load(f)
|
|
# Ensure backup_type exists (for backward compatibility)
|
|
if 'backup_type' not in schedule:
|
|
schedule['backup_type'] = 'full'
|
|
return schedule
|
|
|
|
# Default schedule
|
|
return {
|
|
'enabled': False,
|
|
'time': '02:00', # 2 AM
|
|
'frequency': 'daily', # daily, weekly, monthly
|
|
'backup_type': 'full', # full or data-only
|
|
'retention_days': 30 # Keep backups for 30 days
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error loading backup schedule: {e}")
|
|
return None
|
|
|
|
def save_backup_schedule(self, schedule):
|
|
"""
|
|
Save backup schedule configuration
|
|
|
|
Args:
|
|
schedule (dict): Schedule configuration
|
|
|
|
Returns:
|
|
dict: Result with success status and message
|
|
"""
|
|
try:
|
|
schedule_file = os.path.join(self.backup_path, 'backup_schedule.json')
|
|
|
|
with open(schedule_file, 'w') as f:
|
|
json.dump(schedule, f, indent=2)
|
|
|
|
return {
|
|
'success': True,
|
|
'message': 'Backup schedule saved successfully'
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error saving backup schedule: {e}")
|
|
return {
|
|
'success': False,
|
|
'message': f'Failed to save schedule: {str(e)}'
|
|
}
|
|
|
|
def validate_backup_file(self, filename):
|
|
"""
|
|
Validate uploaded backup file for integrity and compatibility
|
|
|
|
Checks:
|
|
- File exists and is readable
|
|
- File contains valid SQL syntax
|
|
- File contains expected database structure (users table, etc.)
|
|
- File size is reasonable
|
|
- No malicious commands (DROP statements outside of backup context)
|
|
|
|
Args:
|
|
filename (str): Name of the backup file to validate
|
|
|
|
Returns:
|
|
dict: Validation result with success status, message, and details
|
|
"""
|
|
try:
|
|
# Security: ensure filename doesn't contain path traversal
|
|
if '..' in filename or '/' in filename:
|
|
return {
|
|
'success': False,
|
|
'message': 'Invalid filename - potential security issue',
|
|
'details': {}
|
|
}
|
|
|
|
file_path = os.path.join(self.backup_path, filename)
|
|
|
|
# Check if file exists
|
|
if not os.path.exists(file_path):
|
|
return {
|
|
'success': False,
|
|
'message': 'Backup file not found',
|
|
'details': {}
|
|
}
|
|
|
|
# Check file size (warn if too small or too large)
|
|
file_size = os.path.getsize(file_path)
|
|
size_mb = round(file_size / (1024 * 1024), 2)
|
|
|
|
if file_size < 1024: # Less than 1KB is suspicious
|
|
return {
|
|
'success': False,
|
|
'message': 'File too small - may be empty or corrupted',
|
|
'details': {'size_mb': size_mb}
|
|
}
|
|
|
|
# For very large files (>2GB), skip detailed validation to avoid timeouts
|
|
# Just do basic checks
|
|
if file_size > 2 * 1024 * 1024 * 1024: # Over 2GB
|
|
return {
|
|
'success': True,
|
|
'message': f'Large backup file accepted ({size_mb:.2f} MB) - detailed validation skipped for performance',
|
|
'details': {
|
|
'size_mb': size_mb,
|
|
'validation_skipped': True,
|
|
'reason': 'File too large for line-by-line validation'
|
|
},
|
|
'warnings': ['Detailed content validation skipped due to large file size']
|
|
}
|
|
|
|
# Read and validate SQL content (only for files < 2GB)
|
|
validation_details = {
|
|
'size_mb': size_mb,
|
|
'has_create_database': False,
|
|
'has_users_table': False,
|
|
'has_insert_statements': False,
|
|
'suspicious_commands': [],
|
|
'line_count': 0
|
|
}
|
|
|
|
# For large files (100MB - 2GB), only read first 10MB for validation
|
|
max_bytes_to_read = 10 * 1024 * 1024 if file_size > 100 * 1024 * 1024 else None
|
|
bytes_read = 0
|
|
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
content_preview = []
|
|
line_count = 0
|
|
|
|
for line in f:
|
|
line_count += 1
|
|
bytes_read += len(line.encode('utf-8'))
|
|
|
|
# Stop reading after max_bytes for large files
|
|
if max_bytes_to_read and bytes_read > max_bytes_to_read:
|
|
validation_details['partial_validation'] = True
|
|
validation_details['bytes_validated'] = f'{bytes_read / (1024*1024):.2f} MB'
|
|
break
|
|
|
|
line_upper = line.strip().upper()
|
|
|
|
# Store first 10 non-comment lines for preview
|
|
if len(content_preview) < 10 and line_upper and not line_upper.startswith('--') and not line_upper.startswith('/*'):
|
|
content_preview.append(line.strip()[:100]) # First 100 chars
|
|
|
|
# Check for expected SQL commands
|
|
if 'CREATE DATABASE' in line_upper or 'CREATE SCHEMA' in line_upper:
|
|
validation_details['has_create_database'] = True
|
|
|
|
if 'CREATE TABLE' in line_upper and 'USERS' in line_upper:
|
|
validation_details['has_users_table'] = True
|
|
|
|
if line_upper.startswith('INSERT INTO'):
|
|
validation_details['has_insert_statements'] = True
|
|
|
|
# Check for potentially dangerous commands (outside of normal backup context)
|
|
if 'DROP DATABASE' in line_upper and 'IF EXISTS' not in line_upper:
|
|
validation_details['suspicious_commands'].append('Unconditional DROP DATABASE found')
|
|
|
|
if 'TRUNCATE TABLE' in line_upper:
|
|
validation_details['suspicious_commands'].append('TRUNCATE TABLE found')
|
|
|
|
# Check for very long lines (potential binary data)
|
|
if len(line) > 50000:
|
|
validation_details['suspicious_commands'].append('Very long lines detected (possible binary data)')
|
|
break
|
|
|
|
validation_details['line_count'] = line_count
|
|
validation_details['preview'] = content_preview[:5] # First 5 lines
|
|
|
|
# Evaluate validation results
|
|
issues = []
|
|
warnings = []
|
|
|
|
if not validation_details['has_insert_statements']:
|
|
warnings.append('No INSERT statements found - backup may be empty')
|
|
|
|
if not validation_details['has_users_table']:
|
|
warnings.append('Users table not found - may not be compatible with this application')
|
|
|
|
if validation_details['suspicious_commands']:
|
|
issues.extend(validation_details['suspicious_commands'])
|
|
|
|
if validation_details['line_count'] < 10:
|
|
issues.append('Too few lines - file may be incomplete')
|
|
|
|
# Final validation decision
|
|
if issues:
|
|
return {
|
|
'success': False,
|
|
'message': f'Validation failed: {"; ".join(issues)}',
|
|
'details': validation_details,
|
|
'warnings': warnings
|
|
}
|
|
|
|
if warnings:
|
|
return {
|
|
'success': True,
|
|
'message': 'Validation passed with warnings',
|
|
'details': validation_details,
|
|
'warnings': warnings
|
|
}
|
|
|
|
return {
|
|
'success': True,
|
|
'message': 'Backup file validated successfully',
|
|
'details': validation_details,
|
|
'warnings': []
|
|
}
|
|
|
|
except UnicodeDecodeError as e:
|
|
return {
|
|
'success': False,
|
|
'message': 'File contains invalid characters - may be corrupted or not a text file',
|
|
'details': {'error': str(e)}
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error validating backup file: {e}")
|
|
return {
|
|
'success': False,
|
|
'message': f'Validation error: {str(e)}',
|
|
'details': {}
|
|
}
|
|
|
|
def cleanup_old_backups(self, retention_days=30):
|
|
"""
|
|
Delete backups older than retention_days
|
|
|
|
Args:
|
|
retention_days (int): Number of days to keep backups
|
|
|
|
Returns:
|
|
dict: Result with count of deleted backups
|
|
"""
|
|
try:
|
|
deleted_count = 0
|
|
cutoff_time = datetime.now() - timedelta(days=retention_days)
|
|
|
|
if os.path.exists(self.backup_path):
|
|
for filename in os.listdir(self.backup_path):
|
|
if filename.endswith('.sql'):
|
|
file_path = os.path.join(self.backup_path, filename)
|
|
file_time = datetime.fromtimestamp(os.path.getctime(file_path))
|
|
|
|
if file_time < cutoff_time:
|
|
os.remove(file_path)
|
|
self._remove_backup_metadata(filename)
|
|
deleted_count += 1
|
|
print(f"Deleted old backup: {filename}")
|
|
|
|
return {
|
|
'success': True,
|
|
'deleted_count': deleted_count,
|
|
'message': f'Cleaned up {deleted_count} old backup(s)'
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error cleaning up old backups: {e}")
|
|
return {
|
|
'success': False,
|
|
'message': f'Cleanup failed: {str(e)}'
|
|
}
|
|
|
|
def upload_backup(self, uploaded_file):
|
|
"""
|
|
Upload and validate an external backup file
|
|
|
|
Args:
|
|
uploaded_file: Werkzeug FileStorage object from request.files
|
|
|
|
Returns:
|
|
dict: Result with success status, filename, and validation details
|
|
"""
|
|
try:
|
|
from werkzeug.utils import secure_filename
|
|
from pathlib import Path
|
|
|
|
# Validate file extension
|
|
if not uploaded_file.filename.lower().endswith('.sql'):
|
|
return {
|
|
'success': False,
|
|
'message': 'Invalid file format. Only .sql files are allowed.'
|
|
}
|
|
|
|
# Ensure backup_path is a Path object
|
|
backup_path = Path(self.backup_path)
|
|
backup_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Generate secure filename with timestamp to avoid conflicts
|
|
original_filename = secure_filename(uploaded_file.filename)
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
|
|
# If filename already starts with "backup_", keep it; otherwise add prefix
|
|
if original_filename.startswith('backup_'):
|
|
new_filename = f"{original_filename.rsplit('.', 1)[0]}_{timestamp}.sql"
|
|
else:
|
|
new_filename = f"backup_uploaded_{timestamp}_{original_filename}"
|
|
|
|
# Save file to backup directory
|
|
file_path = backup_path / new_filename
|
|
uploaded_file.save(str(file_path))
|
|
|
|
# Get file size
|
|
file_size = file_path.stat().st_size
|
|
size_mb = round(file_size / (1024 * 1024), 2)
|
|
|
|
# Validate the uploaded file for integrity and compatibility
|
|
validation_result = self.validate_backup_file(new_filename)
|
|
|
|
if not validation_result['success']:
|
|
# Validation failed - remove the uploaded file
|
|
file_path.unlink() # Delete the invalid file
|
|
return {
|
|
'success': False,
|
|
'message': f'Validation failed: {validation_result["message"]}',
|
|
'validation_details': validation_result.get('details', {}),
|
|
'warnings': validation_result.get('warnings', [])
|
|
}
|
|
|
|
# Build response with validation details
|
|
response = {
|
|
'success': True,
|
|
'message': 'Backup file uploaded and validated successfully',
|
|
'filename': new_filename,
|
|
'size': f'{size_mb} MB',
|
|
'path': str(file_path),
|
|
'validation': {
|
|
'status': 'passed',
|
|
'message': validation_result['message'],
|
|
'details': validation_result.get('details', {}),
|
|
'warnings': validation_result.get('warnings', [])
|
|
}
|
|
}
|
|
|
|
# Add warning flag if there are warnings
|
|
if validation_result.get('warnings'):
|
|
response['message'] = f'Backup uploaded with warnings: {"; ".join(validation_result["warnings"])}'
|
|
|
|
# Save metadata
|
|
self._save_backup_metadata(new_filename, file_size)
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
print(f"Error uploading backup: {e}")
|
|
return {
|
|
'success': False,
|
|
'message': f'Upload failed: {str(e)}'
|
|
}
|
|
|
|
def get_backup_file_path(self, filename):
|
|
"""
|
|
Get the full path to a backup file (with security validation)
|
|
|
|
Args:
|
|
filename (str): Name of the backup file
|
|
|
|
Returns:
|
|
str or None: Full file path if valid, None if security check fails
|
|
"""
|
|
# Security: ensure filename doesn't contain path traversal
|
|
if '..' in filename or '/' in filename:
|
|
return None
|
|
|
|
file_path = os.path.join(self.backup_path, filename)
|
|
|
|
if os.path.exists(file_path):
|
|
return file_path
|
|
|
|
return None
|
|
|