diff --git a/py_app/app/routes.py b/py_app/app/routes.py index 2491c93..e9e6209 100755 --- a/py_app/app/routes.py +++ b/py_app/app/routes.py @@ -4523,3 +4523,716 @@ def api_backup_schedule_add(): 'message': f'Failed to add schedule: {str(e)}' }), 500 + +# ===================== Maintenance & Log Cleanup Routes ===================== + +@bp.route('/api/maintenance/log-settings', methods=['GET']) +@admin_plus +def get_log_settings(): + """Get current log cleanup settings""" + try: + import os + import json + + config_path = os.path.join(current_app.instance_path, 'maintenance_config.json') + + # Default settings + default_settings = { + 'retention_days': 30, + 'enabled': True, + 'last_cleanup': None + } + + if os.path.exists(config_path): + with open(config_path, 'r') as f: + settings = json.load(f) + return jsonify({ + 'success': True, + 'settings': settings + }) + else: + return jsonify({ + 'success': True, + 'settings': default_settings + }) + + except Exception as e: + return jsonify({ + 'success': False, + 'message': f'Failed to load settings: {str(e)}' + }), 500 + + +@bp.route('/api/maintenance/log-settings', methods=['POST']) +@admin_plus +def save_log_settings(): + """Save log cleanup settings""" + try: + import os + import json + + data = request.json + retention_days = data.get('retention_days', 30) + + # Validate retention days + if retention_days != 0: # 0 means disabled + if not isinstance(retention_days, int) or retention_days < 7: + return jsonify({ + 'success': False, + 'message': 'Retention days must be at least 7 or 0 to disable' + }), 400 + + config_path = os.path.join(current_app.instance_path, 'maintenance_config.json') + + # Load existing config or create new + if os.path.exists(config_path): + with open(config_path, 'r') as f: + config = json.load(f) + else: + config = {} + + # Update settings + config['retention_days'] = retention_days + config['enabled'] = retention_days > 0 + + # Ensure instance directory exists + os.makedirs(current_app.instance_path, exist_ok=True) + + # Save config + with open(config_path, 'w') as f: + json.dump(config, f, indent=4) + + return jsonify({ + 'success': True, + 'message': f'Log cleanup settings saved. Logs older than {retention_days} days will be deleted.' if retention_days > 0 else 'Log cleanup disabled.' + }) + + except Exception as e: + return jsonify({ + 'success': False, + 'message': f'Failed to save settings: {str(e)}' + }), 500 + + +@bp.route('/api/maintenance/cleanup-logs', methods=['POST']) +@admin_plus +def cleanup_logs(): + """Execute immediate log cleanup based on retention settings""" + try: + import os + import json + import glob + from datetime import datetime, timedelta + + # Load settings + config_path = os.path.join(current_app.instance_path, 'maintenance_config.json') + + if not os.path.exists(config_path): + return jsonify({ + 'success': False, + 'message': 'No cleanup settings configured. Please configure retention days first.' + }), 400 + + with open(config_path, 'r') as f: + config = json.load(f) + + retention_days = config.get('retention_days', 30) + + if retention_days == 0: + return jsonify({ + 'success': False, + 'message': 'Log cleanup is disabled. Please enable it in settings first.' + }), 400 + + # Get log directory + log_dir = os.path.join(os.path.dirname(current_app.instance_path), 'logs') + + if not os.path.exists(log_dir): + return jsonify({ + 'success': False, + 'message': f'Log directory not found: {log_dir}' + }), 404 + + # Find log files + log_files = glob.glob(os.path.join(log_dir, '*.log')) + old_log_files = glob.glob(os.path.join(log_dir, '*.log.*')) # Rotated logs + all_log_files = log_files + old_log_files + + if not all_log_files: + return jsonify({ + 'success': True, + 'message': 'No log files found to clean up.', + 'files_deleted': 0 + }) + + # Calculate cutoff date + cutoff_date = datetime.now() - timedelta(days=retention_days) + + deleted_count = 0 + deleted_files = [] + total_size_deleted = 0 + + # Check each log file + for log_file in all_log_files: + try: + # Skip current main log files (only delete rotated/old ones) + if log_file.endswith('.log') and '.' not in os.path.basename(log_file)[:-4]: + # This is a main log file (access.log, error.log), skip it + continue + + file_mtime = datetime.fromtimestamp(os.path.getmtime(log_file)) + + if file_mtime < cutoff_date: + file_size = os.path.getsize(log_file) + os.remove(log_file) + deleted_count += 1 + deleted_files.append(os.path.basename(log_file)) + total_size_deleted += file_size + + except Exception as e: + print(f"Error deleting {log_file}: {str(e)}") + continue + + # Update last cleanup time + config['last_cleanup'] = datetime.now().isoformat() + with open(config_path, 'w') as f: + json.dump(config, f, indent=4) + + # Format size in human readable format + if total_size_deleted > 1024 * 1024: + size_str = f"{total_size_deleted / (1024 * 1024):.2f} MB" + elif total_size_deleted > 1024: + size_str = f"{total_size_deleted / 1024:.2f} KB" + else: + size_str = f"{total_size_deleted} bytes" + + if deleted_count > 0: + message = f'Successfully deleted {deleted_count} old log file(s), freed {size_str}.' + else: + message = f'No log files older than {retention_days} days found.' + + return jsonify({ + 'success': True, + 'message': message, + 'files_deleted': deleted_count, + 'size_freed': total_size_deleted, + 'deleted_files': deleted_files + }) + + except Exception as e: + return jsonify({ + 'success': False, + 'message': f'Failed to cleanup logs: {str(e)}' + }), 500 + + +@bp.route('/api/maintenance/storage-info', methods=['GET']) +@admin_plus +def get_storage_info(): + """Get storage information for logs, database, and backups""" + try: + import os + import glob + + def format_size(size_bytes): + """Format bytes to human readable size""" + if size_bytes >= 1024 * 1024 * 1024: + return f"{size_bytes / (1024 * 1024 * 1024):.2f} GB" + elif size_bytes >= 1024 * 1024: + return f"{size_bytes / (1024 * 1024):.2f} MB" + elif size_bytes >= 1024: + return f"{size_bytes / 1024:.2f} KB" + else: + return f"{size_bytes} bytes" + + def get_directory_size(path): + """Calculate total size of all files in directory""" + total_size = 0 + if os.path.exists(path): + for dirpath, dirnames, filenames in os.walk(path): + for filename in filenames: + filepath = os.path.join(dirpath, filename) + try: + total_size += os.path.getsize(filepath) + except: + continue + return total_size + + # Get logs directory size + log_dir = '/srv/quality_app/logs' + logs_size = get_directory_size(log_dir) + + # Get database size + db_size = 0 + try: + # Load database config directly + settings_file = os.path.join(current_app.instance_path, 'external_server.conf') + config = {} + with open(settings_file, 'r') as f: + for line in f: + if '=' in line: + key, value = line.strip().split('=', 1) + config[key] = value + + conn = mariadb.connect( + host=config.get('server_domain', 'localhost'), + port=int(config.get('port', '3306')), + user=config.get('username', 'root'), + password=config.get('password', ''), + database=config.get('database_name', 'trasabilitate') + ) + cursor = conn.cursor() + + # Query to get database size + cursor.execute(""" + SELECT + SUM(data_length + index_length) as size + FROM information_schema.TABLES + WHERE table_schema = %s + """, (config.get('database_name', 'trasabilitate'),)) + + result = cursor.fetchone() + if result and result[0]: + db_size = int(result[0]) + + cursor.close() + conn.close() + except Exception as e: + print(f"Error getting database size: {str(e)}") + import traceback + traceback.print_exc() + db_size = 0 + + # Get backups directory size + backups_dir = '/srv/quality_app/backups' + backups_size = get_directory_size(backups_dir) + + return jsonify({ + 'success': True, + 'logs_size': format_size(logs_size), + 'database_size': format_size(db_size) if db_size > 0 else 'N/A', + 'backups_size': format_size(backups_size), + 'logs_size_bytes': logs_size, + 'database_size_bytes': db_size, + 'backups_size_bytes': backups_size + }) + + except Exception as e: + return jsonify({ + 'success': False, + 'message': f'Failed to get storage info: {str(e)}' + }), 500 + + +@bp.route('/api/maintenance/database-tables', methods=['GET']) +@admin_plus +def get_all_database_tables(): + """Get list of all tables in the database with their info""" + try: + # Load database config directly + settings_file = os.path.join(current_app.instance_path, 'external_server.conf') + config = {} + with open(settings_file, 'r') as f: + for line in f: + if '=' in line: + key, value = line.strip().split('=', 1) + config[key] = value + + conn = mariadb.connect( + host=config.get('server_domain', 'localhost'), + port=int(config.get('port', '3306')), + user=config.get('username', 'root'), + password=config.get('password', ''), + database=config.get('database_name', 'trasabilitate') + ) + cursor = conn.cursor(dictionary=True) + + # Get all tables with their size and row count + cursor.execute(""" + SELECT + TABLE_NAME as name, + TABLE_ROWS as `rows`, + ROUND((DATA_LENGTH + INDEX_LENGTH) / 1024 / 1024, 2) as size_mb, + ROUND((DATA_LENGTH + INDEX_LENGTH), 0) as size_bytes + FROM information_schema.TABLES + WHERE TABLE_SCHEMA = %s + ORDER BY TABLE_NAME + """, (config.get('database_name', 'trasabilitate'),)) + + tables = cursor.fetchall() + + # Format the data + formatted_tables = [] + for table in tables: + size_mb = float(table['size_mb']) if table['size_mb'] else 0 + if size_mb >= 1: + size_str = f"{size_mb:.2f} MB" + else: + size_kb = size_mb * 1024 + if size_kb >= 1: + size_str = f"{size_kb:.2f} KB" + else: + size_str = f"{table['size_bytes']} bytes" + + formatted_tables.append({ + 'name': table['name'], + 'rows': f"{table['rows']:,}" if table['rows'] else '0', + 'size': size_str, + 'size_bytes': int(table['size_bytes']) if table['size_bytes'] else 0 + }) + + cursor.close() + conn.close() + + return jsonify({ + 'success': True, + 'tables': formatted_tables, + 'total_tables': len(formatted_tables) + }) + + except Exception as e: + return jsonify({ + 'success': False, + 'message': f'Failed to get database tables: {str(e)}' + }), 500 + + +@bp.route('/api/maintenance/drop-table', methods=['POST']) +@admin_plus +def drop_table(): + """Drop a database table - DANGEROUS operation""" + try: + data = request.json + table_name = data.get('table_name', '').strip() + + if not table_name: + return jsonify({ + 'success': False, + 'message': 'Table name is required' + }), 400 + + # Validate table name to prevent SQL injection + if not table_name.replace('_', '').isalnum(): + return jsonify({ + 'success': False, + 'message': 'Invalid table name format' + }), 400 + + # Load database config directly + settings_file = os.path.join(current_app.instance_path, 'external_server.conf') + config = {} + with open(settings_file, 'r') as f: + for line in f: + if '=' in line: + key, value = line.strip().split('=', 1) + config[key] = value + + conn = mariadb.connect( + host=config.get('server_domain', 'localhost'), + port=int(config.get('port', '3306')), + user=config.get('username', 'root'), + password=config.get('password', ''), + database=config.get('database_name', 'trasabilitate') + ) + cursor = conn.cursor() + + # Verify table exists + cursor.execute(""" + SELECT COUNT(*) as count + FROM information_schema.TABLES + WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s + """, (config.get('database_name', 'trasabilitate'), table_name)) + + result = cursor.fetchone() + if not result or result[0] == 0: + cursor.close() + conn.close() + return jsonify({ + 'success': False, + 'message': f'Table "{table_name}" does not exist' + }), 404 + + # Drop the table + cursor.execute(f"DROP TABLE IF EXISTS `{table_name}`") + conn.commit() + + cursor.close() + conn.close() + + return jsonify({ + 'success': True, + 'message': f'Table "{table_name}" has been dropped successfully' + }) + + except Exception as e: + return jsonify({ + 'success': False, + 'message': f'Failed to drop table: {str(e)}' + }), 500 + + +@bp.route('/api/backup/table', methods=['POST']) +@admin_plus +def backup_single_table(): + """Create a backup of a single table""" + try: + import os + from datetime import datetime + + data = request.json + table_name = data.get('table_name', '').strip() + + if not table_name: + return jsonify({ + 'success': False, + 'message': 'Table name is required' + }), 400 + + # Validate table name + if not table_name.replace('_', '').isalnum(): + return jsonify({ + 'success': False, + 'message': 'Invalid table name format' + }), 400 + + # Load database config directly + settings_file = os.path.join(current_app.instance_path, 'external_server.conf') + config = {} + with open(settings_file, 'r') as f: + for line in f: + if '=' in line: + key, value = line.strip().split('=', 1) + config[key] = value + + # Verify table exists + conn = mariadb.connect( + host=config.get('server_domain', 'localhost'), + port=int(config.get('port', '3306')), + user=config.get('username', 'root'), + password=config.get('password', ''), + database=config.get('database_name', 'trasabilitate') + ) + cursor = conn.cursor() + + cursor.execute(""" + SELECT COUNT(*) as count + FROM information_schema.TABLES + WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s + """, (config.get('database_name', 'trasabilitate'), table_name)) + + result = cursor.fetchone() + if not result or result[0] == 0: + cursor.close() + conn.close() + return jsonify({ + 'success': False, + 'message': f'Table "{table_name}" does not exist' + }), 404 + + cursor.close() + conn.close() + + # Create backup directory if not exists + backup_dir = os.path.join(os.path.dirname(current_app.instance_path), 'backups') + os.makedirs(backup_dir, exist_ok=True) + + # Generate backup filename + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + backup_filename = f'table_{table_name}_{timestamp}.sql' + backup_path = os.path.join(backup_dir, backup_filename) + + # Create backup using mysqldump + import subprocess + + mysqldump_cmd = [ + 'mysqldump', + f'--host={config.get("host", "localhost")}', + f'--user={config.get("user", "root")}', + f'--password={config.get("password", "")}', + '--single-transaction', + '--no-create-db', + config.get('database', 'trasabilitate'), + table_name + ] + + with open(backup_path, 'w') as f: + result = subprocess.run( + mysqldump_cmd, + stdout=f, + stderr=subprocess.PIPE, + text=True + ) + + if result.returncode != 0: + return jsonify({ + 'success': False, + 'message': f'Backup failed: {result.stderr}' + }), 500 + + # Get file size + file_size = os.path.getsize(backup_path) + if file_size >= 1024 * 1024: + size_str = f"{file_size / (1024 * 1024):.2f} MB" + elif file_size >= 1024: + size_str = f"{file_size / 1024:.2f} KB" + else: + size_str = f"{file_size} bytes" + + return jsonify({ + 'success': True, + 'message': f'Table "{table_name}" backed up successfully', + 'filename': backup_filename, + 'size': size_str + }) + + except Exception as e: + return jsonify({ + 'success': False, + 'message': f'Failed to backup table: {str(e)}' + }), 500 + + +@bp.route('/api/backup/table-backups', methods=['GET']) +@admin_plus +def list_table_backups(): + """List all table-specific backups""" + try: + import os + import glob + from datetime import datetime + + backup_dir = os.path.join(os.path.dirname(current_app.instance_path), 'backups') + + if not os.path.exists(backup_dir): + return jsonify({ + 'success': True, + 'backups': [] + }) + + # Find all table backup files (pattern: table_*_timestamp.sql) + table_backups = glob.glob(os.path.join(backup_dir, 'table_*.sql')) + + backups = [] + for backup_path in table_backups: + filename = os.path.basename(backup_path) + + # Extract table name from filename (table_TABLENAME_timestamp.sql) + parts = filename.replace('.sql', '').split('_') + if len(parts) >= 3: + table_name = '_'.join(parts[1:-2]) if len(parts) > 3 else parts[1] + + file_size = os.path.getsize(backup_path) + if file_size >= 1024 * 1024: + size_str = f"{file_size / (1024 * 1024):.2f} MB" + elif file_size >= 1024: + size_str = f"{file_size / 1024:.2f} KB" + else: + size_str = f"{file_size} bytes" + + mtime = os.path.getmtime(backup_path) + created = datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M') + + backups.append({ + 'filename': filename, + 'table_name': table_name, + 'size': size_str, + 'created': created, + 'timestamp': mtime + }) + + # Sort by timestamp descending + backups.sort(key=lambda x: x['timestamp'], reverse=True) + + return jsonify({ + 'success': True, + 'backups': backups + }) + + except Exception as e: + return jsonify({ + 'success': False, + 'message': f'Failed to list table backups: {str(e)}' + }), 500 + + +@bp.route('/api/restore/table', methods=['POST']) +@admin_plus +def restore_single_table(): + """Restore a single table from backup""" + try: + import os + import subprocess + + data = request.json + backup_file = data.get('backup_file', '').strip() + + if not backup_file: + return jsonify({ + 'success': False, + 'message': 'Backup file is required' + }), 400 + + # Security: validate filename + if not backup_file.endswith('.sql') or not backup_file.startswith('table_'): + return jsonify({ + 'success': False, + 'message': 'Invalid backup file format' + }), 400 + + backup_dir = os.path.join(os.path.dirname(current_app.instance_path), 'backups') + backup_path = os.path.join(backup_dir, backup_file) + + if not os.path.exists(backup_path): + return jsonify({ + 'success': False, + 'message': 'Backup file not found' + }), 404 + + # Extract table name from filename + parts = backup_file.replace('.sql', '').split('_') + table_name = '_'.join(parts[1:-2]) if len(parts) > 3 else parts[1] + + # Load database config directly + settings_file = os.path.join(current_app.instance_path, 'external_server.conf') + config = {} + with open(settings_file, 'r') as f: + for line in f: + if '=' in line: + key, value = line.strip().split('=', 1) + config[key] = value + + # Restore using mysql command + mysql_cmd = [ + 'mysql', + f'--host={config.get("server_domain", "localhost")}', + f'--user={config.get("username", "root")}', + f'--password={config.get("password", "")}', + config.get('database_name', 'trasabilitate') + ] + + with open(backup_path, 'r') as f: + result = subprocess.run( + mysql_cmd, + stdin=f, + stderr=subprocess.PIPE, + text=True + ) + + if result.returncode != 0: + return jsonify({ + 'success': False, + 'message': f'Restore failed: {result.stderr}' + }), 500 + + return jsonify({ + 'success': True, + 'message': f'Table "{table_name}" restored successfully from {backup_file}' + }) + + except Exception as e: + return jsonify({ + 'success': False, + 'message': f'Failed to restore table: {str(e)}' + }), 500 + + diff --git a/py_app/app/templates/settings.html b/py_app/app/templates/settings.html index cda553a..5e3a6dc 100755 --- a/py_app/app/templates/settings.html +++ b/py_app/app/templates/settings.html @@ -68,217 +68,380 @@ {% endif %} {% if session.role in ['superadmin', 'admin'] %} -
Automated Backup System: Schedule and manage database backups
- {% if session.role in ['superadmin', 'admin'] %} -+ Dropping tables will permanently delete all data in the selected table. This action cannot be undone. Always create a backup before dropping tables! +
+