"""Admin blueprint for user management and system settings.""" from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify, current_app from flask_login import login_required, current_user from werkzeug.utils import secure_filename from functools import wraps import os from datetime import datetime from typing import Optional from app.extensions import db, bcrypt from app.models import User, Player, Group, Content, ServerLog, Playlist from app.utils.logger import log_action admin_bp = Blueprint('admin', __name__, url_prefix='/admin') def admin_required(f): """Decorator to require admin role for route access.""" @wraps(f) def decorated_function(*args, **kwargs): if not current_user.is_authenticated: flash('Please login to access this page.', 'warning') return redirect(url_for('auth.login')) if current_user.role != 'admin': log_action('warning', f'Unauthorized admin access attempt by {current_user.username}') flash('You do not have permission to access this page.', 'danger') return redirect(url_for('main.dashboard')) return f(*args, **kwargs) return decorated_function @admin_bp.route('/') @login_required @admin_required def admin_panel(): """Display admin panel with system overview.""" try: # Get statistics total_users = User.query.count() total_players = Player.query.count() total_playlists = Playlist.query.count() total_content = Content.query.count() # Get recent logs recent_logs = ServerLog.query.order_by(ServerLog.timestamp.desc()).limit(10).all() # Get all users users = User.query.all() # Calculate storage usage upload_folder = current_app.config['UPLOAD_FOLDER'] total_size = 0 if os.path.exists(upload_folder): for dirpath, dirnames, filenames in os.walk(upload_folder): for filename in filenames: filepath = os.path.join(dirpath, filename) if os.path.exists(filepath): total_size += os.path.getsize(filepath) storage_mb = round(total_size / (1024 * 1024), 2) return render_template('admin/admin.html', total_users=total_users, total_players=total_players, total_playlists=total_playlists, total_content=total_content, storage_mb=storage_mb, users=users, recent_logs=recent_logs) except Exception as e: log_action('error', f'Error loading admin panel: {str(e)}') flash('Error loading admin panel.', 'danger') return redirect(url_for('main.dashboard')) @admin_bp.route('/user/create', methods=['POST']) @login_required @admin_required def create_user(): """Create a new user account.""" try: username = request.form.get('username', '').strip() password = request.form.get('password', '').strip() role = request.form.get('role', 'user').strip() # Validation if not username or len(username) < 3: flash('Username must be at least 3 characters long.', 'warning') return redirect(url_for('admin.admin_panel')) if not password or len(password) < 6: flash('Password must be at least 6 characters long.', 'warning') return redirect(url_for('admin.admin_panel')) if role not in ['user', 'admin']: flash('Invalid role specified.', 'danger') return redirect(url_for('admin.admin_panel')) # Check if username exists existing_user = User.query.filter_by(username=username).first() if existing_user: flash(f'Username "{username}" already exists.', 'warning') return redirect(url_for('admin.admin_panel')) # Create user hashed_password = bcrypt.generate_password_hash(password).decode('utf-8') new_user = User(username=username, password=hashed_password, role=role) db.session.add(new_user) db.session.commit() log_action('info', f'User {username} created by admin {current_user.username}') flash(f'User "{username}" created successfully.', 'success') except Exception as e: db.session.rollback() log_action('error', f'Error creating user: {str(e)}') flash('Error creating user. Please try again.', 'danger') return redirect(url_for('admin.admin_panel')) @admin_bp.route('/user//role', methods=['POST']) @login_required @admin_required def change_user_role(user_id: int): """Change user role between user and admin.""" try: user = User.query.get_or_404(user_id) new_role = request.form.get('role', '').strip() # Validation if new_role not in ['user', 'admin']: flash('Invalid role specified.', 'danger') return redirect(url_for('admin.admin_panel')) # Prevent changing own role if user.id == current_user.id: flash('You cannot change your own role.', 'warning') return redirect(url_for('admin.admin_panel')) old_role = user.role user.role = new_role db.session.commit() log_action('info', f'User {user.username} role changed from {old_role} to {new_role} by {current_user.username}') flash(f'User "{user.username}" role changed to {new_role}.', 'success') except Exception as e: db.session.rollback() log_action('error', f'Error changing user role: {str(e)}') flash('Error changing user role. Please try again.', 'danger') return redirect(url_for('admin.admin_panel')) @admin_bp.route('/user//delete', methods=['POST']) @login_required @admin_required def delete_user(user_id: int): """Delete a user account.""" try: user = User.query.get_or_404(user_id) # Prevent deleting own account if user.id == current_user.id: flash('You cannot delete your own account.', 'warning') return redirect(url_for('admin.admin_panel')) username = user.username db.session.delete(user) db.session.commit() log_action('info', f'User {username} deleted by admin {current_user.username}') flash(f'User "{username}" deleted successfully.', 'success') except Exception as e: db.session.rollback() log_action('error', f'Error deleting user: {str(e)}') flash('Error deleting user. Please try again.', 'danger') return redirect(url_for('admin.admin_panel')) @admin_bp.route('/theme', methods=['POST']) @login_required @admin_required def change_theme(): """Change application theme.""" try: theme = request.form.get('theme', 'light').strip() if theme not in ['light', 'dark']: flash('Invalid theme specified.', 'danger') return redirect(url_for('admin.admin_panel')) # Store theme preference (you can extend this to save to database) # For now, just log the action log_action('info', f'Theme changed to {theme} by {current_user.username}') flash(f'Theme changed to {theme} mode.', 'success') except Exception as e: log_action('error', f'Error changing theme: {str(e)}') flash('Error changing theme. Please try again.', 'danger') return redirect(url_for('admin.admin_panel')) @admin_bp.route('/logo/upload', methods=['POST']) @login_required @admin_required def upload_logo(): """Upload custom logo for application.""" try: if 'logo' not in request.files: flash('No logo file provided.', 'warning') return redirect(url_for('admin.admin_panel')) file = request.files['logo'] if file.filename == '': flash('No file selected.', 'warning') return redirect(url_for('admin.admin_panel')) # Validate file type allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'svg'} filename = secure_filename(file.filename) if not ('.' in filename and filename.rsplit('.', 1)[1].lower() in allowed_extensions): flash('Invalid file type. Please upload an image file (PNG, JPG, GIF, SVG).', 'danger') return redirect(url_for('admin.admin_panel')) # Save logo static_folder = current_app.config.get('STATIC_FOLDER', 'app/static') logo_path = os.path.join(static_folder, 'logo.png') # Create static folder if it doesn't exist os.makedirs(static_folder, exist_ok=True) file.save(logo_path) log_action('info', f'Logo uploaded by admin {current_user.username}') flash('Logo uploaded successfully.', 'success') except Exception as e: log_action('error', f'Error uploading logo: {str(e)}') flash('Error uploading logo. Please try again.', 'danger') return redirect(url_for('admin.admin_panel')) @admin_bp.route('/logs/clear', methods=['POST']) @login_required @admin_required def clear_logs(): """Clear all server logs.""" try: ServerLog.query.delete() db.session.commit() log_action('info', f'All logs cleared by admin {current_user.username}') flash('All logs cleared successfully.', 'success') except Exception as e: db.session.rollback() log_action('error', f'Error clearing logs: {str(e)}') flash('Error clearing logs. Please try again.', 'danger') return redirect(url_for('admin.admin_panel')) @admin_bp.route('/users') @login_required @admin_required def user_management(): """Display user management page.""" try: users = User.query.order_by(User.created_at.desc()).all() return render_template('admin/user_management.html', users=users) except Exception as e: log_action('error', f'Error loading user management: {str(e)}') flash('Error loading user management page.', 'danger') return redirect(url_for('admin.admin_panel')) @admin_bp.route('/user//password', methods=['POST']) @login_required @admin_required def reset_user_password(user_id: int): """Reset user password.""" try: user = User.query.get_or_404(user_id) new_password = request.form.get('password', '').strip() # Validation if not new_password or len(new_password) < 6: flash('Password must be at least 6 characters long.', 'warning') return redirect(url_for('admin.user_management')) # Prevent changing own password through this route if user.id == current_user.id: flash('Use the change password option to update your own password.', 'warning') return redirect(url_for('admin.user_management')) # Update password hashed_password = bcrypt.generate_password_hash(new_password).decode('utf-8') user.password = hashed_password db.session.commit() log_action('info', f'Password reset for user {user.username} by admin {current_user.username}') flash(f'Password reset successfully for user "{user.username}".', 'success') except Exception as e: db.session.rollback() log_action('error', f'Error resetting password: {str(e)}') flash('Error resetting password. Please try again.', 'danger') return redirect(url_for('admin.user_management')) @admin_bp.route('/system/info') @login_required @admin_required def system_info(): """Get system information as JSON.""" try: import platform import psutil # Get system info info = { 'system': platform.system(), 'release': platform.release(), 'version': platform.version(), 'machine': platform.machine(), 'processor': platform.processor(), 'cpu_count': psutil.cpu_count(), 'cpu_percent': psutil.cpu_percent(interval=1), 'memory_total': round(psutil.virtual_memory().total / (1024**3), 2), # GB 'memory_used': round(psutil.virtual_memory().used / (1024**3), 2), # GB 'memory_percent': psutil.virtual_memory().percent, 'disk_total': round(psutil.disk_usage('/').total / (1024**3), 2), # GB 'disk_used': round(psutil.disk_usage('/').used / (1024**3), 2), # GB 'disk_percent': psutil.disk_usage('/').percent } return jsonify(info) except Exception as e: log_action('error', f'Error getting system info: {str(e)}') return jsonify({'error': str(e)}), 500 @admin_bp.route('/leftover-media') @login_required @admin_required def leftover_media(): """Display leftover media files not assigned to any playlist.""" from app.models.playlist import playlist_content from sqlalchemy import select try: # Get all content IDs that are in playlists stmt = select(playlist_content.c.content_id).distinct() content_in_playlists = set(row[0] for row in db.session.execute(stmt)) # Get all content all_content = Content.query.all() # Filter content not in any playlist leftover_content = [c for c in all_content if c.id not in content_in_playlists] # Separate by type leftover_images = [c for c in leftover_content if c.content_type == 'image'] leftover_videos = [c for c in leftover_content if c.content_type == 'video'] leftover_pdfs = [c for c in leftover_content if c.content_type == 'pdf'] leftover_pptx = [c for c in leftover_content if c.content_type == 'pptx'] # Calculate storage total_leftover_size = sum(c.file_size for c in leftover_content) images_size = sum(c.file_size for c in leftover_images) videos_size = sum(c.file_size for c in leftover_videos) pdfs_size = sum(c.file_size for c in leftover_pdfs) pptx_size = sum(c.file_size for c in leftover_pptx) return render_template('admin/leftover_media.html', leftover_images=leftover_images, leftover_videos=leftover_videos, leftover_pdfs=leftover_pdfs, leftover_pptx=leftover_pptx, total_leftover=len(leftover_content), total_leftover_size_mb=total_leftover_size / (1024 * 1024), images_size_mb=images_size / (1024 * 1024), videos_size_mb=videos_size / (1024 * 1024), pdfs_size_mb=pdfs_size / (1024 * 1024), pptx_size_mb=pptx_size / (1024 * 1024)) except Exception as e: log_action('error', f'Error loading leftover media: {str(e)}') flash('Error loading leftover media.', 'danger') return redirect(url_for('admin.admin_panel')) @admin_bp.route('/delete-leftover-images', methods=['POST']) @login_required @admin_required def delete_leftover_images(): """Delete all leftover images that are not part of any playlist""" from app.models.playlist import playlist_content try: # Find all leftover image content leftover_images = db.session.query(Content).filter( Content.content_type == 'image', ~Content.id.in_( db.session.query(playlist_content.c.content_id) ) ).all() deleted_count = 0 errors = [] for content in leftover_images: try: # Delete physical file if content.filename: file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], content.filename) if os.path.exists(file_path): os.remove(file_path) # Delete edited media archive folder if it exists import shutil edited_media_dir = os.path.join(current_app.config['UPLOAD_FOLDER'], 'edited_media', str(content.id)) if os.path.exists(edited_media_dir): shutil.rmtree(edited_media_dir) # Delete associated player edit records first from app.models.player_edit import PlayerEdit PlayerEdit.query.filter_by(content_id=content.id).delete() # Delete database record db.session.delete(content) deleted_count += 1 except Exception as e: errors.append(f"Error deleting {content.filename}: {str(e)}") db.session.commit() if errors: flash(f'Deleted {deleted_count} images with {len(errors)} errors', 'warning') else: flash(f'Successfully deleted {deleted_count} leftover images', 'success') except Exception as e: db.session.rollback() flash(f'Error deleting leftover images: {str(e)}', 'danger') return redirect(url_for('admin.leftover_media')) @admin_bp.route('/delete-leftover-videos', methods=['POST']) @login_required @admin_required def delete_leftover_videos(): """Delete all leftover videos that are not part of any playlist""" from app.models.playlist import playlist_content try: # Find all leftover video content leftover_videos = db.session.query(Content).filter( Content.content_type == 'video', ~Content.id.in_( db.session.query(playlist_content.c.content_id) ) ).all() deleted_count = 0 errors = [] for content in leftover_videos: try: # Delete physical file if content.filename: file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], content.filename) if os.path.exists(file_path): os.remove(file_path) # Delete edited media archive folder if it exists import shutil edited_media_dir = os.path.join(current_app.config['UPLOAD_FOLDER'], 'edited_media', str(content.id)) if os.path.exists(edited_media_dir): shutil.rmtree(edited_media_dir) # Delete associated player edit records first from app.models.player_edit import PlayerEdit PlayerEdit.query.filter_by(content_id=content.id).delete() # Delete database record db.session.delete(content) deleted_count += 1 except Exception as e: errors.append(f"Error deleting {content.filename}: {str(e)}") db.session.commit() if errors: flash(f'Deleted {deleted_count} videos with {len(errors)} errors', 'warning') else: flash(f'Successfully deleted {deleted_count} leftover videos', 'success') except Exception as e: db.session.rollback() flash(f'Error deleting leftover videos: {str(e)}', 'danger') return redirect(url_for('admin.leftover_media')) @admin_bp.route('/delete-single-leftover/', methods=['POST']) @login_required @admin_required def delete_single_leftover(content_id): """Delete a single leftover content file""" try: content = Content.query.get_or_404(content_id) # Delete physical file if content.filename: file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], content.filename) if os.path.exists(file_path): os.remove(file_path) # Delete edited media archive folder if it exists import shutil edited_media_dir = os.path.join(current_app.config['UPLOAD_FOLDER'], 'edited_media', str(content.id)) if os.path.exists(edited_media_dir): shutil.rmtree(edited_media_dir) # Delete associated player edit records first from app.models.player_edit import PlayerEdit PlayerEdit.query.filter_by(content_id=content.id).delete() # Delete database record db.session.delete(content) db.session.commit() flash(f'Successfully deleted {content.filename}', 'success') except Exception as e: db.session.rollback() flash(f'Error deleting file: {str(e)}', 'danger') return redirect(url_for('admin.leftover_media')) @admin_bp.route('/dependencies') @login_required @admin_required def dependencies(): """Show system dependencies status.""" import subprocess # Check LibreOffice libreoffice_installed = False libreoffice_version = "Not installed" try: result = subprocess.run(['libreoffice', '--version'], capture_output=True, text=True, timeout=5) if result.returncode == 0: libreoffice_installed = True libreoffice_version = result.stdout.strip() except Exception: pass # Check Poppler (for PDF) poppler_installed = False poppler_version = "Not installed" try: result = subprocess.run(['pdftoppm', '-v'], capture_output=True, text=True, timeout=5) if result.returncode == 0 or 'pdftoppm' in result.stderr: poppler_installed = True poppler_version = "Installed" except Exception: pass # Check FFmpeg (for video) ffmpeg_installed = False ffmpeg_version = "Not installed" try: result = subprocess.run(['ffmpeg', '-version'], capture_output=True, text=True, timeout=5) if result.returncode == 0: ffmpeg_installed = True ffmpeg_version = result.stdout.split('\n')[0] except Exception: pass # Check Emoji Fonts emoji_installed = False emoji_version = 'Not installed' try: result = subprocess.run(['dpkg', '-l', 'fonts-noto-color-emoji'], capture_output=True, text=True, timeout=5) if result.returncode == 0 and 'ii' in result.stdout: emoji_installed = True # Get version from dpkg output lines = result.stdout.split('\n') for line in lines: if 'fonts-noto-color-emoji' in line and line.startswith('ii'): parts = line.split() if len(parts) >= 3: emoji_version = f'Noto Color Emoji {parts[2]}' break except Exception: pass return render_template('admin/dependencies.html', libreoffice_installed=libreoffice_installed, libreoffice_version=libreoffice_version, poppler_installed=poppler_installed, poppler_version=poppler_version, ffmpeg_installed=ffmpeg_installed, ffmpeg_version=ffmpeg_version, emoji_installed=emoji_installed, emoji_version=emoji_version) @admin_bp.route('/install-libreoffice', methods=['POST']) @login_required @admin_required def install_libreoffice(): """Install LibreOffice for PPTX conversion.""" import subprocess try: # Run installation script script_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'install_libreoffice.sh') if not os.path.exists(script_path): flash('Installation script not found', 'danger') return redirect(url_for('admin.dependencies')) result = subprocess.run(['sudo', '-n', script_path], capture_output=True, text=True, timeout=300) if result.returncode == 0: log_action('info', 'LibreOffice installed successfully') flash('LibreOffice installed successfully! You can now convert PPTX files.', 'success') else: log_action('error', f'LibreOffice installation failed: {result.stderr}') flash(f'Installation failed: {result.stderr}', 'danger') except subprocess.TimeoutExpired: flash('Installation timeout. Please try again.', 'warning') except Exception as e: log_action('error', f'Error installing LibreOffice: {str(e)}') flash(f'Error: {str(e)}', 'danger') return redirect(url_for('admin.dependencies')) @admin_bp.route('/install-emoji-fonts', methods=['POST']) @login_required @admin_required def install_emoji_fonts(): """Install Emoji Fonts for better UI display.""" import subprocess try: # Run installation script script_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'install_emoji_fonts.sh') if not os.path.exists(script_path): flash('Installation script not found', 'danger') return redirect(url_for('admin.dependencies')) result = subprocess.run(['sudo', '-n', script_path], capture_output=True, text=True, timeout=180) if result.returncode == 0: log_action('info', 'Emoji fonts installed successfully') flash('Emoji fonts installed successfully! Please restart your browser to see changes.', 'success') else: log_action('error', f'Emoji fonts installation failed: {result.stderr}') flash(f'Installation failed: {result.stderr}', 'danger') except subprocess.TimeoutExpired: flash('Installation timeout. Please try again.', 'warning') except Exception as e: log_action('error', f'Error installing emoji fonts: {str(e)}') flash(f'Error: {str(e)}', 'danger') return redirect(url_for('admin.dependencies')) @admin_bp.route('/customize-logos') @login_required @admin_required def customize_logos(): """Logo customization page.""" import time return render_template('admin/customize_logos.html', version=int(time.time())) @admin_bp.route('/upload-header-logo', methods=['POST']) @login_required @admin_required def upload_header_logo(): """Upload header logo.""" try: if 'header_logo' not in request.files: flash('No file selected', 'warning') return redirect(url_for('admin.customize_logos')) file = request.files['header_logo'] if file.filename == '': flash('No file selected', 'warning') return redirect(url_for('admin.customize_logos')) if file: # Save as header_logo.png filename = 'header_logo.png' filepath = os.path.join(current_app.config['UPLOAD_FOLDER'], filename) file.save(filepath) log_action('info', f'Header logo uploaded: {filename}') flash('Header logo uploaded successfully!', 'success') except Exception as e: log_action('error', f'Error uploading header logo: {str(e)}') flash(f'Error uploading logo: {str(e)}', 'danger') return redirect(url_for('admin.customize_logos')) @admin_bp.route('/upload-login-logo', methods=['POST']) @login_required @admin_required def upload_login_logo(): """Upload login page logo.""" try: if 'login_logo' not in request.files: flash('No file selected', 'warning') return redirect(url_for('admin.customize_logos')) file = request.files['login_logo'] if file.filename == '': flash('No file selected', 'warning') return redirect(url_for('admin.customize_logos')) if file: # Save as login_logo.png filename = 'login_logo.png' filepath = os.path.join(current_app.config['UPLOAD_FOLDER'], filename) file.save(filepath) log_action('info', f'Login logo uploaded: {filename}') flash('Login logo uploaded successfully!', 'success') except Exception as e: log_action('error', f'Error uploading login logo: {str(e)}') flash(f'Error uploading logo: {str(e)}', 'danger') return redirect(url_for('admin.customize_logos'))