"""Admin blueprint for user management and system settings.""" from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify, current_app from flask_login import login_required, current_user from werkzeug.utils import secure_filename from functools import wraps import os from datetime import datetime from typing import Optional from app.extensions import db, bcrypt from app.models import User, Player, Group, Content, ServerLog from app.utils.logger import log_action admin_bp = Blueprint('admin', __name__, url_prefix='/admin') def admin_required(f): """Decorator to require admin role for route access.""" @wraps(f) def decorated_function(*args, **kwargs): if not current_user.is_authenticated: flash('Please login to access this page.', 'warning') return redirect(url_for('auth.login')) if current_user.role != 'admin': log_action('warning', f'Unauthorized admin access attempt by {current_user.username}') flash('You do not have permission to access this page.', 'danger') return redirect(url_for('main.dashboard')) return f(*args, **kwargs) return decorated_function @admin_bp.route('/') @login_required @admin_required def admin_panel(): """Display admin panel with system overview.""" try: # Get statistics total_users = User.query.count() total_players = Player.query.count() total_groups = Group.query.count() total_content = Content.query.count() # Get recent logs recent_logs = ServerLog.query.order_by(ServerLog.timestamp.desc()).limit(10).all() # Get all users users = User.query.all() # Calculate storage usage upload_folder = current_app.config['UPLOAD_FOLDER'] total_size = 0 if os.path.exists(upload_folder): for dirpath, dirnames, filenames in os.walk(upload_folder): for filename in filenames: filepath = os.path.join(dirpath, filename) if os.path.exists(filepath): total_size += os.path.getsize(filepath) storage_mb = round(total_size / (1024 * 1024), 2) return render_template('admin/admin.html', total_users=total_users, total_players=total_players, total_groups=total_groups, total_content=total_content, storage_mb=storage_mb, users=users, recent_logs=recent_logs) except Exception as e: log_action('error', f'Error loading admin panel: {str(e)}') flash('Error loading admin panel.', 'danger') return redirect(url_for('main.dashboard')) @admin_bp.route('/user/create', methods=['POST']) @login_required @admin_required def create_user(): """Create a new user account.""" try: username = request.form.get('username', '').strip() password = request.form.get('password', '').strip() role = request.form.get('role', 'user').strip() # Validation if not username or len(username) < 3: flash('Username must be at least 3 characters long.', 'warning') return redirect(url_for('admin.admin_panel')) if not password or len(password) < 6: flash('Password must be at least 6 characters long.', 'warning') return redirect(url_for('admin.admin_panel')) if role not in ['user', 'admin']: flash('Invalid role specified.', 'danger') return redirect(url_for('admin.admin_panel')) # Check if username exists existing_user = User.query.filter_by(username=username).first() if existing_user: flash(f'Username "{username}" already exists.', 'warning') return redirect(url_for('admin.admin_panel')) # Create user hashed_password = bcrypt.generate_password_hash(password).decode('utf-8') new_user = User(username=username, password=hashed_password, role=role) db.session.add(new_user) db.session.commit() log_action('info', f'User {username} created by admin {current_user.username}') flash(f'User "{username}" created successfully.', 'success') except Exception as e: db.session.rollback() log_action('error', f'Error creating user: {str(e)}') flash('Error creating user. Please try again.', 'danger') return redirect(url_for('admin.admin_panel')) @admin_bp.route('/user//role', methods=['POST']) @login_required @admin_required def change_user_role(user_id: int): """Change user role between user and admin.""" try: user = User.query.get_or_404(user_id) new_role = request.form.get('role', '').strip() # Validation if new_role not in ['user', 'admin']: flash('Invalid role specified.', 'danger') return redirect(url_for('admin.admin_panel')) # Prevent changing own role if user.id == current_user.id: flash('You cannot change your own role.', 'warning') return redirect(url_for('admin.admin_panel')) old_role = user.role user.role = new_role db.session.commit() log_action('info', f'User {user.username} role changed from {old_role} to {new_role} by {current_user.username}') flash(f'User "{user.username}" role changed to {new_role}.', 'success') except Exception as e: db.session.rollback() log_action('error', f'Error changing user role: {str(e)}') flash('Error changing user role. Please try again.', 'danger') return redirect(url_for('admin.admin_panel')) @admin_bp.route('/user//delete', methods=['POST']) @login_required @admin_required def delete_user(user_id: int): """Delete a user account.""" try: user = User.query.get_or_404(user_id) # Prevent deleting own account if user.id == current_user.id: flash('You cannot delete your own account.', 'warning') return redirect(url_for('admin.admin_panel')) username = user.username db.session.delete(user) db.session.commit() log_action('info', f'User {username} deleted by admin {current_user.username}') flash(f'User "{username}" deleted successfully.', 'success') except Exception as e: db.session.rollback() log_action('error', f'Error deleting user: {str(e)}') flash('Error deleting user. Please try again.', 'danger') return redirect(url_for('admin.admin_panel')) @admin_bp.route('/theme', methods=['POST']) @login_required @admin_required def change_theme(): """Change application theme.""" try: theme = request.form.get('theme', 'light').strip() if theme not in ['light', 'dark']: flash('Invalid theme specified.', 'danger') return redirect(url_for('admin.admin_panel')) # Store theme preference (you can extend this to save to database) # For now, just log the action log_action('info', f'Theme changed to {theme} by {current_user.username}') flash(f'Theme changed to {theme} mode.', 'success') except Exception as e: log_action('error', f'Error changing theme: {str(e)}') flash('Error changing theme. Please try again.', 'danger') return redirect(url_for('admin.admin_panel')) @admin_bp.route('/logo/upload', methods=['POST']) @login_required @admin_required def upload_logo(): """Upload custom logo for application.""" try: if 'logo' not in request.files: flash('No logo file provided.', 'warning') return redirect(url_for('admin.admin_panel')) file = request.files['logo'] if file.filename == '': flash('No file selected.', 'warning') return redirect(url_for('admin.admin_panel')) # Validate file type allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'svg'} filename = secure_filename(file.filename) if not ('.' in filename and filename.rsplit('.', 1)[1].lower() in allowed_extensions): flash('Invalid file type. Please upload an image file (PNG, JPG, GIF, SVG).', 'danger') return redirect(url_for('admin.admin_panel')) # Save logo static_folder = current_app.config.get('STATIC_FOLDER', 'app/static') logo_path = os.path.join(static_folder, 'logo.png') # Create static folder if it doesn't exist os.makedirs(static_folder, exist_ok=True) file.save(logo_path) log_action('info', f'Logo uploaded by admin {current_user.username}') flash('Logo uploaded successfully.', 'success') except Exception as e: log_action('error', f'Error uploading logo: {str(e)}') flash('Error uploading logo. Please try again.', 'danger') return redirect(url_for('admin.admin_panel')) @admin_bp.route('/logs/clear', methods=['POST']) @login_required @admin_required def clear_logs(): """Clear all server logs.""" try: ServerLog.query.delete() db.session.commit() log_action('info', f'All logs cleared by admin {current_user.username}') flash('All logs cleared successfully.', 'success') except Exception as e: db.session.rollback() log_action('error', f'Error clearing logs: {str(e)}') flash('Error clearing logs. Please try again.', 'danger') return redirect(url_for('admin.admin_panel')) @admin_bp.route('/system/info') @login_required @admin_required def system_info(): """Get system information as JSON.""" try: import platform import psutil # Get system info info = { 'system': platform.system(), 'release': platform.release(), 'version': platform.version(), 'machine': platform.machine(), 'processor': platform.processor(), 'cpu_count': psutil.cpu_count(), 'cpu_percent': psutil.cpu_percent(interval=1), 'memory_total': round(psutil.virtual_memory().total / (1024**3), 2), # GB 'memory_used': round(psutil.virtual_memory().used / (1024**3), 2), # GB 'memory_percent': psutil.virtual_memory().percent, 'disk_total': round(psutil.disk_usage('/').total / (1024**3), 2), # GB 'disk_used': round(psutil.disk_usage('/').used / (1024**3), 2), # GB 'disk_percent': psutil.disk_usage('/').percent } return jsonify(info) except Exception as e: log_action('error', f'Error getting system info: {str(e)}') return jsonify({'error': str(e)}), 500