Files
digiserver-v2/app/blueprints/admin.py
2025-11-20 19:59:49 +02:00

745 lines
27 KiB
Python

"""Admin blueprint for user management and system settings."""
from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify, current_app
from flask_login import login_required, current_user
from werkzeug.utils import secure_filename
from functools import wraps
import os
from datetime import datetime
from typing import Optional
from app.extensions import db, bcrypt
from app.models import User, Player, Group, Content, ServerLog, Playlist
from app.utils.logger import log_action
admin_bp = Blueprint('admin', __name__, url_prefix='/admin')
def admin_required(f):
"""Decorator to require admin role for route access."""
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
flash('Please login to access this page.', 'warning')
return redirect(url_for('auth.login'))
if current_user.role != 'admin':
log_action('warning', f'Unauthorized admin access attempt by {current_user.username}')
flash('You do not have permission to access this page.', 'danger')
return redirect(url_for('main.dashboard'))
return f(*args, **kwargs)
return decorated_function
@admin_bp.route('/')
@login_required
@admin_required
def admin_panel():
"""Display admin panel with system overview."""
try:
# Get statistics
total_users = User.query.count()
total_players = Player.query.count()
total_playlists = Playlist.query.count()
total_content = Content.query.count()
# Get recent logs
recent_logs = ServerLog.query.order_by(ServerLog.timestamp.desc()).limit(10).all()
# Get all users
users = User.query.all()
# Calculate storage usage
upload_folder = current_app.config['UPLOAD_FOLDER']
total_size = 0
if os.path.exists(upload_folder):
for dirpath, dirnames, filenames in os.walk(upload_folder):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
if os.path.exists(filepath):
total_size += os.path.getsize(filepath)
storage_mb = round(total_size / (1024 * 1024), 2)
return render_template('admin/admin.html',
total_users=total_users,
total_players=total_players,
total_playlists=total_playlists,
total_content=total_content,
storage_mb=storage_mb,
users=users,
recent_logs=recent_logs)
except Exception as e:
log_action('error', f'Error loading admin panel: {str(e)}')
flash('Error loading admin panel.', 'danger')
return redirect(url_for('main.dashboard'))
@admin_bp.route('/user/create', methods=['POST'])
@login_required
@admin_required
def create_user():
"""Create a new user account."""
try:
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
role = request.form.get('role', 'user').strip()
# Validation
if not username or len(username) < 3:
flash('Username must be at least 3 characters long.', 'warning')
return redirect(url_for('admin.admin_panel'))
if not password or len(password) < 6:
flash('Password must be at least 6 characters long.', 'warning')
return redirect(url_for('admin.admin_panel'))
if role not in ['user', 'admin']:
flash('Invalid role specified.', 'danger')
return redirect(url_for('admin.admin_panel'))
# Check if username exists
existing_user = User.query.filter_by(username=username).first()
if existing_user:
flash(f'Username "{username}" already exists.', 'warning')
return redirect(url_for('admin.admin_panel'))
# Create user
hashed_password = bcrypt.generate_password_hash(password).decode('utf-8')
new_user = User(username=username, password=hashed_password, role=role)
db.session.add(new_user)
db.session.commit()
log_action('info', f'User {username} created by admin {current_user.username}')
flash(f'User "{username}" created successfully.', 'success')
except Exception as e:
db.session.rollback()
log_action('error', f'Error creating user: {str(e)}')
flash('Error creating user. Please try again.', 'danger')
return redirect(url_for('admin.admin_panel'))
@admin_bp.route('/user/<int:user_id>/role', methods=['POST'])
@login_required
@admin_required
def change_user_role(user_id: int):
"""Change user role between user and admin."""
try:
user = User.query.get_or_404(user_id)
new_role = request.form.get('role', '').strip()
# Validation
if new_role not in ['user', 'admin']:
flash('Invalid role specified.', 'danger')
return redirect(url_for('admin.admin_panel'))
# Prevent changing own role
if user.id == current_user.id:
flash('You cannot change your own role.', 'warning')
return redirect(url_for('admin.admin_panel'))
old_role = user.role
user.role = new_role
db.session.commit()
log_action('info', f'User {user.username} role changed from {old_role} to {new_role} by {current_user.username}')
flash(f'User "{user.username}" role changed to {new_role}.', 'success')
except Exception as e:
db.session.rollback()
log_action('error', f'Error changing user role: {str(e)}')
flash('Error changing user role. Please try again.', 'danger')
return redirect(url_for('admin.admin_panel'))
@admin_bp.route('/user/<int:user_id>/delete', methods=['POST'])
@login_required
@admin_required
def delete_user(user_id: int):
"""Delete a user account."""
try:
user = User.query.get_or_404(user_id)
# Prevent deleting own account
if user.id == current_user.id:
flash('You cannot delete your own account.', 'warning')
return redirect(url_for('admin.admin_panel'))
username = user.username
db.session.delete(user)
db.session.commit()
log_action('info', f'User {username} deleted by admin {current_user.username}')
flash(f'User "{username}" deleted successfully.', 'success')
except Exception as e:
db.session.rollback()
log_action('error', f'Error deleting user: {str(e)}')
flash('Error deleting user. Please try again.', 'danger')
return redirect(url_for('admin.admin_panel'))
@admin_bp.route('/theme', methods=['POST'])
@login_required
@admin_required
def change_theme():
"""Change application theme."""
try:
theme = request.form.get('theme', 'light').strip()
if theme not in ['light', 'dark']:
flash('Invalid theme specified.', 'danger')
return redirect(url_for('admin.admin_panel'))
# Store theme preference (you can extend this to save to database)
# For now, just log the action
log_action('info', f'Theme changed to {theme} by {current_user.username}')
flash(f'Theme changed to {theme} mode.', 'success')
except Exception as e:
log_action('error', f'Error changing theme: {str(e)}')
flash('Error changing theme. Please try again.', 'danger')
return redirect(url_for('admin.admin_panel'))
@admin_bp.route('/logo/upload', methods=['POST'])
@login_required
@admin_required
def upload_logo():
"""Upload custom logo for application."""
try:
if 'logo' not in request.files:
flash('No logo file provided.', 'warning')
return redirect(url_for('admin.admin_panel'))
file = request.files['logo']
if file.filename == '':
flash('No file selected.', 'warning')
return redirect(url_for('admin.admin_panel'))
# Validate file type
allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'svg'}
filename = secure_filename(file.filename)
if not ('.' in filename and filename.rsplit('.', 1)[1].lower() in allowed_extensions):
flash('Invalid file type. Please upload an image file (PNG, JPG, GIF, SVG).', 'danger')
return redirect(url_for('admin.admin_panel'))
# Save logo
static_folder = current_app.config.get('STATIC_FOLDER', 'app/static')
logo_path = os.path.join(static_folder, 'logo.png')
# Create static folder if it doesn't exist
os.makedirs(static_folder, exist_ok=True)
file.save(logo_path)
log_action('info', f'Logo uploaded by admin {current_user.username}')
flash('Logo uploaded successfully.', 'success')
except Exception as e:
log_action('error', f'Error uploading logo: {str(e)}')
flash('Error uploading logo. Please try again.', 'danger')
return redirect(url_for('admin.admin_panel'))
@admin_bp.route('/logs/clear', methods=['POST'])
@login_required
@admin_required
def clear_logs():
"""Clear all server logs."""
try:
ServerLog.query.delete()
db.session.commit()
log_action('info', f'All logs cleared by admin {current_user.username}')
flash('All logs cleared successfully.', 'success')
except Exception as e:
db.session.rollback()
log_action('error', f'Error clearing logs: {str(e)}')
flash('Error clearing logs. Please try again.', 'danger')
return redirect(url_for('admin.admin_panel'))
@admin_bp.route('/users')
@login_required
@admin_required
def user_management():
"""Display user management page."""
try:
users = User.query.order_by(User.created_at.desc()).all()
return render_template('admin/user_management.html', users=users)
except Exception as e:
log_action('error', f'Error loading user management: {str(e)}')
flash('Error loading user management page.', 'danger')
return redirect(url_for('admin.admin_panel'))
@admin_bp.route('/user/<int:user_id>/password', methods=['POST'])
@login_required
@admin_required
def reset_user_password(user_id: int):
"""Reset user password."""
try:
user = User.query.get_or_404(user_id)
new_password = request.form.get('password', '').strip()
# Validation
if not new_password or len(new_password) < 6:
flash('Password must be at least 6 characters long.', 'warning')
return redirect(url_for('admin.user_management'))
# Prevent changing own password through this route
if user.id == current_user.id:
flash('Use the change password option to update your own password.', 'warning')
return redirect(url_for('admin.user_management'))
# Update password
hashed_password = bcrypt.generate_password_hash(new_password).decode('utf-8')
user.password = hashed_password
db.session.commit()
log_action('info', f'Password reset for user {user.username} by admin {current_user.username}')
flash(f'Password reset successfully for user "{user.username}".', 'success')
except Exception as e:
db.session.rollback()
log_action('error', f'Error resetting password: {str(e)}')
flash('Error resetting password. Please try again.', 'danger')
return redirect(url_for('admin.user_management'))
@admin_bp.route('/system/info')
@login_required
@admin_required
def system_info():
"""Get system information as JSON."""
try:
import platform
import psutil
# Get system info
info = {
'system': platform.system(),
'release': platform.release(),
'version': platform.version(),
'machine': platform.machine(),
'processor': platform.processor(),
'cpu_count': psutil.cpu_count(),
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_total': round(psutil.virtual_memory().total / (1024**3), 2), # GB
'memory_used': round(psutil.virtual_memory().used / (1024**3), 2), # GB
'memory_percent': psutil.virtual_memory().percent,
'disk_total': round(psutil.disk_usage('/').total / (1024**3), 2), # GB
'disk_used': round(psutil.disk_usage('/').used / (1024**3), 2), # GB
'disk_percent': psutil.disk_usage('/').percent
}
return jsonify(info)
except Exception as e:
log_action('error', f'Error getting system info: {str(e)}')
return jsonify({'error': str(e)}), 500
@admin_bp.route('/leftover-media')
@login_required
@admin_required
def leftover_media():
"""Display leftover media files not assigned to any playlist."""
from app.models.playlist import playlist_content
from sqlalchemy import select
try:
# Get all content IDs that are in playlists
stmt = select(playlist_content.c.content_id).distinct()
content_in_playlists = set(row[0] for row in db.session.execute(stmt))
# Get all content
all_content = Content.query.all()
# Filter content not in any playlist
leftover_content = [c for c in all_content if c.id not in content_in_playlists]
# Separate by type
leftover_images = [c for c in leftover_content if c.content_type == 'image']
leftover_videos = [c for c in leftover_content if c.content_type == 'video']
leftover_pdfs = [c for c in leftover_content if c.content_type == 'pdf']
leftover_pptx = [c for c in leftover_content if c.content_type == 'pptx']
# Calculate storage
total_leftover_size = sum(c.file_size for c in leftover_content)
images_size = sum(c.file_size for c in leftover_images)
videos_size = sum(c.file_size for c in leftover_videos)
pdfs_size = sum(c.file_size for c in leftover_pdfs)
pptx_size = sum(c.file_size for c in leftover_pptx)
return render_template('admin/leftover_media.html',
leftover_images=leftover_images,
leftover_videos=leftover_videos,
leftover_pdfs=leftover_pdfs,
leftover_pptx=leftover_pptx,
total_leftover=len(leftover_content),
total_leftover_size_mb=total_leftover_size / (1024 * 1024),
images_size_mb=images_size / (1024 * 1024),
videos_size_mb=videos_size / (1024 * 1024),
pdfs_size_mb=pdfs_size / (1024 * 1024),
pptx_size_mb=pptx_size / (1024 * 1024))
except Exception as e:
log_action('error', f'Error loading leftover media: {str(e)}')
flash('Error loading leftover media.', 'danger')
return redirect(url_for('admin.admin_panel'))
@admin_bp.route('/delete-leftover-images', methods=['POST'])
@login_required
@admin_required
def delete_leftover_images():
"""Delete all leftover images that are not part of any playlist"""
from app.models.playlist import playlist_content
try:
# Find all leftover image content
leftover_images = db.session.query(Content).filter(
Content.media_type == 'image',
~Content.id.in_(
db.session.query(playlist_content.c.content_id)
)
).all()
deleted_count = 0
errors = []
for content in leftover_images:
try:
# Delete physical file
if content.file_path:
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], content.file_path)
if os.path.exists(file_path):
os.remove(file_path)
# Delete database record
db.session.delete(content)
deleted_count += 1
except Exception as e:
errors.append(f"Error deleting {content.file_path}: {str(e)}")
db.session.commit()
if errors:
flash(f'Deleted {deleted_count} images with {len(errors)} errors', 'warning')
else:
flash(f'Successfully deleted {deleted_count} leftover images', 'success')
except Exception as e:
db.session.rollback()
flash(f'Error deleting leftover images: {str(e)}', 'danger')
return redirect(url_for('admin.leftover_media'))
@admin_bp.route('/delete-leftover-videos', methods=['POST'])
@login_required
@admin_required
def delete_leftover_videos():
"""Delete all leftover videos that are not part of any playlist"""
from app.models.playlist import playlist_content
try:
# Find all leftover video content
leftover_videos = db.session.query(Content).filter(
Content.media_type == 'video',
~Content.id.in_(
db.session.query(playlist_content.c.content_id)
)
).all()
deleted_count = 0
errors = []
for content in leftover_videos:
try:
# Delete physical file
if content.file_path:
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], content.file_path)
if os.path.exists(file_path):
os.remove(file_path)
# Delete database record
db.session.delete(content)
deleted_count += 1
except Exception as e:
errors.append(f"Error deleting {content.file_path}: {str(e)}")
db.session.commit()
if errors:
flash(f'Deleted {deleted_count} videos with {len(errors)} errors', 'warning')
else:
flash(f'Successfully deleted {deleted_count} leftover videos', 'success')
except Exception as e:
db.session.rollback()
flash(f'Error deleting leftover videos: {str(e)}', 'danger')
return redirect(url_for('admin.leftover_media'))
@admin_bp.route('/delete-single-leftover/<int:content_id>', methods=['POST'])
@login_required
@admin_required
def delete_single_leftover(content_id):
"""Delete a single leftover content file"""
try:
content = Content.query.get_or_404(content_id)
# Delete physical file
if content.file_path:
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], content.file_path)
if os.path.exists(file_path):
os.remove(file_path)
# Delete database record
db.session.delete(content)
db.session.commit()
flash(f'Successfully deleted {content.file_path}', 'success')
except Exception as e:
db.session.rollback()
flash(f'Error deleting file: {str(e)}', 'danger')
return redirect(url_for('admin.leftover_media'))
@admin_bp.route('/dependencies')
@login_required
@admin_required
def dependencies():
"""Show system dependencies status."""
import subprocess
# Check LibreOffice
libreoffice_installed = False
libreoffice_version = "Not installed"
try:
result = subprocess.run(['libreoffice', '--version'],
capture_output=True,
text=True,
timeout=5)
if result.returncode == 0:
libreoffice_installed = True
libreoffice_version = result.stdout.strip()
except Exception:
pass
# Check Poppler (for PDF)
poppler_installed = False
poppler_version = "Not installed"
try:
result = subprocess.run(['pdftoppm', '-v'],
capture_output=True,
text=True,
timeout=5)
if result.returncode == 0 or 'pdftoppm' in result.stderr:
poppler_installed = True
poppler_version = "Installed"
except Exception:
pass
# Check FFmpeg (for video)
ffmpeg_installed = False
ffmpeg_version = "Not installed"
try:
result = subprocess.run(['ffmpeg', '-version'],
capture_output=True,
text=True,
timeout=5)
if result.returncode == 0:
ffmpeg_installed = True
ffmpeg_version = result.stdout.split('\n')[0]
except Exception:
pass
# Check Emoji Fonts
emoji_installed = False
emoji_version = 'Not installed'
try:
result = subprocess.run(['dpkg', '-l', 'fonts-noto-color-emoji'],
capture_output=True,
text=True,
timeout=5)
if result.returncode == 0 and 'ii' in result.stdout:
emoji_installed = True
# Get version from dpkg output
lines = result.stdout.split('\n')
for line in lines:
if 'fonts-noto-color-emoji' in line and line.startswith('ii'):
parts = line.split()
if len(parts) >= 3:
emoji_version = f'Noto Color Emoji {parts[2]}'
break
except Exception:
pass
return render_template('admin/dependencies.html',
libreoffice_installed=libreoffice_installed,
libreoffice_version=libreoffice_version,
poppler_installed=poppler_installed,
poppler_version=poppler_version,
ffmpeg_installed=ffmpeg_installed,
ffmpeg_version=ffmpeg_version,
emoji_installed=emoji_installed,
emoji_version=emoji_version)
@admin_bp.route('/install-libreoffice', methods=['POST'])
@login_required
@admin_required
def install_libreoffice():
"""Install LibreOffice for PPTX conversion."""
import subprocess
try:
# Run installation script
script_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
'install_libreoffice.sh')
if not os.path.exists(script_path):
flash('Installation script not found', 'danger')
return redirect(url_for('admin.dependencies'))
result = subprocess.run(['sudo', '-n', script_path],
capture_output=True,
text=True,
timeout=300)
if result.returncode == 0:
log_action('info', 'LibreOffice installed successfully')
flash('LibreOffice installed successfully! You can now convert PPTX files.', 'success')
else:
log_action('error', f'LibreOffice installation failed: {result.stderr}')
flash(f'Installation failed: {result.stderr}', 'danger')
except subprocess.TimeoutExpired:
flash('Installation timeout. Please try again.', 'warning')
except Exception as e:
log_action('error', f'Error installing LibreOffice: {str(e)}')
flash(f'Error: {str(e)}', 'danger')
return redirect(url_for('admin.dependencies'))
@admin_bp.route('/install-emoji-fonts', methods=['POST'])
@login_required
@admin_required
def install_emoji_fonts():
"""Install Emoji Fonts for better UI display."""
import subprocess
try:
# Run installation script
script_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
'install_emoji_fonts.sh')
if not os.path.exists(script_path):
flash('Installation script not found', 'danger')
return redirect(url_for('admin.dependencies'))
result = subprocess.run(['sudo', '-n', script_path],
capture_output=True,
text=True,
timeout=180)
if result.returncode == 0:
log_action('info', 'Emoji fonts installed successfully')
flash('Emoji fonts installed successfully! Please restart your browser to see changes.', 'success')
else:
log_action('error', f'Emoji fonts installation failed: {result.stderr}')
flash(f'Installation failed: {result.stderr}', 'danger')
except subprocess.TimeoutExpired:
flash('Installation timeout. Please try again.', 'warning')
except Exception as e:
log_action('error', f'Error installing emoji fonts: {str(e)}')
flash(f'Error: {str(e)}', 'danger')
return redirect(url_for('admin.dependencies'))
@admin_bp.route('/customize-logos')
@login_required
@admin_required
def customize_logos():
"""Logo customization page."""
import time
return render_template('admin/customize_logos.html', version=int(time.time()))
@admin_bp.route('/upload-header-logo', methods=['POST'])
@login_required
@admin_required
def upload_header_logo():
"""Upload header logo."""
try:
if 'header_logo' not in request.files:
flash('No file selected', 'warning')
return redirect(url_for('admin.customize_logos'))
file = request.files['header_logo']
if file.filename == '':
flash('No file selected', 'warning')
return redirect(url_for('admin.customize_logos'))
if file:
# Save as header_logo.png
filename = 'header_logo.png'
filepath = os.path.join(current_app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
log_action('info', f'Header logo uploaded: {filename}')
flash('Header logo uploaded successfully!', 'success')
except Exception as e:
log_action('error', f'Error uploading header logo: {str(e)}')
flash(f'Error uploading logo: {str(e)}', 'danger')
return redirect(url_for('admin.customize_logos'))
@admin_bp.route('/upload-login-logo', methods=['POST'])
@login_required
@admin_required
def upload_login_logo():
"""Upload login page logo."""
try:
if 'login_logo' not in request.files:
flash('No file selected', 'warning')
return redirect(url_for('admin.customize_logos'))
file = request.files['login_logo']
if file.filename == '':
flash('No file selected', 'warning')
return redirect(url_for('admin.customize_logos'))
if file:
# Save as login_logo.png
filename = 'login_logo.png'
filepath = os.path.join(current_app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
log_action('info', f'Login logo uploaded: {filename}')
flash('Login logo uploaded successfully!', 'success')
except Exception as e:
log_action('error', f'Error uploading login logo: {str(e)}')
flash(f'Error uploading logo: {str(e)}', 'danger')
return redirect(url_for('admin.customize_logos'))