493 lines
18 KiB
Python
493 lines
18 KiB
Python
"""Admin blueprint for user management and system settings."""
|
|
from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify, current_app
|
|
from flask_login import login_required, current_user
|
|
from werkzeug.utils import secure_filename
|
|
from functools import wraps
|
|
import os
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from app.extensions import db, bcrypt
|
|
from app.models import User, Player, Group, Content, ServerLog, Playlist
|
|
from app.utils.logger import log_action
|
|
|
|
admin_bp = Blueprint('admin', __name__, url_prefix='/admin')
|
|
|
|
|
|
def admin_required(f):
|
|
"""Decorator to require admin role for route access."""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if not current_user.is_authenticated:
|
|
flash('Please login to access this page.', 'warning')
|
|
return redirect(url_for('auth.login'))
|
|
if current_user.role != 'admin':
|
|
log_action('warning', f'Unauthorized admin access attempt by {current_user.username}')
|
|
flash('You do not have permission to access this page.', 'danger')
|
|
return redirect(url_for('main.dashboard'))
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
|
|
@admin_bp.route('/')
|
|
@login_required
|
|
@admin_required
|
|
def admin_panel():
|
|
"""Display admin panel with system overview."""
|
|
try:
|
|
# Get statistics
|
|
total_users = User.query.count()
|
|
total_players = Player.query.count()
|
|
total_playlists = Playlist.query.count()
|
|
total_content = Content.query.count()
|
|
|
|
# Get recent logs
|
|
recent_logs = ServerLog.query.order_by(ServerLog.timestamp.desc()).limit(10).all()
|
|
|
|
# Get all users
|
|
users = User.query.all()
|
|
|
|
# Calculate storage usage
|
|
upload_folder = current_app.config['UPLOAD_FOLDER']
|
|
total_size = 0
|
|
if os.path.exists(upload_folder):
|
|
for dirpath, dirnames, filenames in os.walk(upload_folder):
|
|
for filename in filenames:
|
|
filepath = os.path.join(dirpath, filename)
|
|
if os.path.exists(filepath):
|
|
total_size += os.path.getsize(filepath)
|
|
|
|
storage_mb = round(total_size / (1024 * 1024), 2)
|
|
|
|
return render_template('admin/admin.html',
|
|
total_users=total_users,
|
|
total_players=total_players,
|
|
total_playlists=total_playlists,
|
|
total_content=total_content,
|
|
storage_mb=storage_mb,
|
|
users=users,
|
|
recent_logs=recent_logs)
|
|
except Exception as e:
|
|
log_action('error', f'Error loading admin panel: {str(e)}')
|
|
flash('Error loading admin panel.', 'danger')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
|
|
@admin_bp.route('/user/create', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def create_user():
|
|
"""Create a new user account."""
|
|
try:
|
|
username = request.form.get('username', '').strip()
|
|
password = request.form.get('password', '').strip()
|
|
role = request.form.get('role', 'user').strip()
|
|
|
|
# Validation
|
|
if not username or len(username) < 3:
|
|
flash('Username must be at least 3 characters long.', 'warning')
|
|
return redirect(url_for('admin.admin_panel'))
|
|
|
|
if not password or len(password) < 6:
|
|
flash('Password must be at least 6 characters long.', 'warning')
|
|
return redirect(url_for('admin.admin_panel'))
|
|
|
|
if role not in ['user', 'admin']:
|
|
flash('Invalid role specified.', 'danger')
|
|
return redirect(url_for('admin.admin_panel'))
|
|
|
|
# Check if username exists
|
|
existing_user = User.query.filter_by(username=username).first()
|
|
if existing_user:
|
|
flash(f'Username "{username}" already exists.', 'warning')
|
|
return redirect(url_for('admin.admin_panel'))
|
|
|
|
# Create user
|
|
hashed_password = bcrypt.generate_password_hash(password).decode('utf-8')
|
|
new_user = User(username=username, password=hashed_password, role=role)
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
|
|
log_action('info', f'User {username} created by admin {current_user.username}')
|
|
flash(f'User "{username}" created successfully.', 'success')
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
log_action('error', f'Error creating user: {str(e)}')
|
|
flash('Error creating user. Please try again.', 'danger')
|
|
|
|
return redirect(url_for('admin.admin_panel'))
|
|
|
|
|
|
@admin_bp.route('/user/<int:user_id>/role', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def change_user_role(user_id: int):
|
|
"""Change user role between user and admin."""
|
|
try:
|
|
user = User.query.get_or_404(user_id)
|
|
new_role = request.form.get('role', '').strip()
|
|
|
|
# Validation
|
|
if new_role not in ['user', 'admin']:
|
|
flash('Invalid role specified.', 'danger')
|
|
return redirect(url_for('admin.admin_panel'))
|
|
|
|
# Prevent changing own role
|
|
if user.id == current_user.id:
|
|
flash('You cannot change your own role.', 'warning')
|
|
return redirect(url_for('admin.admin_panel'))
|
|
|
|
old_role = user.role
|
|
user.role = new_role
|
|
db.session.commit()
|
|
|
|
log_action('info', f'User {user.username} role changed from {old_role} to {new_role} by {current_user.username}')
|
|
flash(f'User "{user.username}" role changed to {new_role}.', 'success')
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
log_action('error', f'Error changing user role: {str(e)}')
|
|
flash('Error changing user role. Please try again.', 'danger')
|
|
|
|
return redirect(url_for('admin.admin_panel'))
|
|
|
|
|
|
@admin_bp.route('/user/<int:user_id>/delete', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def delete_user(user_id: int):
|
|
"""Delete a user account."""
|
|
try:
|
|
user = User.query.get_or_404(user_id)
|
|
|
|
# Prevent deleting own account
|
|
if user.id == current_user.id:
|
|
flash('You cannot delete your own account.', 'warning')
|
|
return redirect(url_for('admin.admin_panel'))
|
|
|
|
username = user.username
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
|
|
log_action('info', f'User {username} deleted by admin {current_user.username}')
|
|
flash(f'User "{username}" deleted successfully.', 'success')
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
log_action('error', f'Error deleting user: {str(e)}')
|
|
flash('Error deleting user. Please try again.', 'danger')
|
|
|
|
return redirect(url_for('admin.admin_panel'))
|
|
|
|
|
|
@admin_bp.route('/theme', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def change_theme():
|
|
"""Change application theme."""
|
|
try:
|
|
theme = request.form.get('theme', 'light').strip()
|
|
|
|
if theme not in ['light', 'dark']:
|
|
flash('Invalid theme specified.', 'danger')
|
|
return redirect(url_for('admin.admin_panel'))
|
|
|
|
# Store theme preference (you can extend this to save to database)
|
|
# For now, just log the action
|
|
log_action('info', f'Theme changed to {theme} by {current_user.username}')
|
|
flash(f'Theme changed to {theme} mode.', 'success')
|
|
|
|
except Exception as e:
|
|
log_action('error', f'Error changing theme: {str(e)}')
|
|
flash('Error changing theme. Please try again.', 'danger')
|
|
|
|
return redirect(url_for('admin.admin_panel'))
|
|
|
|
|
|
@admin_bp.route('/logo/upload', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def upload_logo():
|
|
"""Upload custom logo for application."""
|
|
try:
|
|
if 'logo' not in request.files:
|
|
flash('No logo file provided.', 'warning')
|
|
return redirect(url_for('admin.admin_panel'))
|
|
|
|
file = request.files['logo']
|
|
|
|
if file.filename == '':
|
|
flash('No file selected.', 'warning')
|
|
return redirect(url_for('admin.admin_panel'))
|
|
|
|
# Validate file type
|
|
allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'svg'}
|
|
filename = secure_filename(file.filename)
|
|
if not ('.' in filename and filename.rsplit('.', 1)[1].lower() in allowed_extensions):
|
|
flash('Invalid file type. Please upload an image file (PNG, JPG, GIF, SVG).', 'danger')
|
|
return redirect(url_for('admin.admin_panel'))
|
|
|
|
# Save logo
|
|
static_folder = current_app.config.get('STATIC_FOLDER', 'app/static')
|
|
logo_path = os.path.join(static_folder, 'logo.png')
|
|
|
|
# Create static folder if it doesn't exist
|
|
os.makedirs(static_folder, exist_ok=True)
|
|
|
|
file.save(logo_path)
|
|
|
|
log_action('info', f'Logo uploaded by admin {current_user.username}')
|
|
flash('Logo uploaded successfully.', 'success')
|
|
|
|
except Exception as e:
|
|
log_action('error', f'Error uploading logo: {str(e)}')
|
|
flash('Error uploading logo. Please try again.', 'danger')
|
|
|
|
return redirect(url_for('admin.admin_panel'))
|
|
|
|
|
|
@admin_bp.route('/logs/clear', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def clear_logs():
|
|
"""Clear all server logs."""
|
|
try:
|
|
ServerLog.query.delete()
|
|
db.session.commit()
|
|
|
|
log_action('info', f'All logs cleared by admin {current_user.username}')
|
|
flash('All logs cleared successfully.', 'success')
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
log_action('error', f'Error clearing logs: {str(e)}')
|
|
flash('Error clearing logs. Please try again.', 'danger')
|
|
|
|
return redirect(url_for('admin.admin_panel'))
|
|
|
|
|
|
@admin_bp.route('/users')
|
|
@login_required
|
|
@admin_required
|
|
def user_management():
|
|
"""Display user management page."""
|
|
try:
|
|
users = User.query.order_by(User.created_at.desc()).all()
|
|
return render_template('admin/user_management.html', users=users)
|
|
except Exception as e:
|
|
log_action('error', f'Error loading user management: {str(e)}')
|
|
flash('Error loading user management page.', 'danger')
|
|
return redirect(url_for('admin.admin_panel'))
|
|
|
|
|
|
@admin_bp.route('/user/<int:user_id>/password', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def reset_user_password(user_id: int):
|
|
"""Reset user password."""
|
|
try:
|
|
user = User.query.get_or_404(user_id)
|
|
new_password = request.form.get('password', '').strip()
|
|
|
|
# Validation
|
|
if not new_password or len(new_password) < 6:
|
|
flash('Password must be at least 6 characters long.', 'warning')
|
|
return redirect(url_for('admin.user_management'))
|
|
|
|
# Prevent changing own password through this route
|
|
if user.id == current_user.id:
|
|
flash('Use the change password option to update your own password.', 'warning')
|
|
return redirect(url_for('admin.user_management'))
|
|
|
|
# Update password
|
|
hashed_password = bcrypt.generate_password_hash(new_password).decode('utf-8')
|
|
user.password = hashed_password
|
|
db.session.commit()
|
|
|
|
log_action('info', f'Password reset for user {user.username} by admin {current_user.username}')
|
|
flash(f'Password reset successfully for user "{user.username}".', 'success')
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
log_action('error', f'Error resetting password: {str(e)}')
|
|
flash('Error resetting password. Please try again.', 'danger')
|
|
|
|
return redirect(url_for('admin.user_management'))
|
|
|
|
|
|
@admin_bp.route('/system/info')
|
|
@login_required
|
|
@admin_required
|
|
def system_info():
|
|
"""Get system information as JSON."""
|
|
try:
|
|
import platform
|
|
import psutil
|
|
|
|
# Get system info
|
|
info = {
|
|
'system': platform.system(),
|
|
'release': platform.release(),
|
|
'version': platform.version(),
|
|
'machine': platform.machine(),
|
|
'processor': platform.processor(),
|
|
'cpu_count': psutil.cpu_count(),
|
|
'cpu_percent': psutil.cpu_percent(interval=1),
|
|
'memory_total': round(psutil.virtual_memory().total / (1024**3), 2), # GB
|
|
'memory_used': round(psutil.virtual_memory().used / (1024**3), 2), # GB
|
|
'memory_percent': psutil.virtual_memory().percent,
|
|
'disk_total': round(psutil.disk_usage('/').total / (1024**3), 2), # GB
|
|
'disk_used': round(psutil.disk_usage('/').used / (1024**3), 2), # GB
|
|
'disk_percent': psutil.disk_usage('/').percent
|
|
}
|
|
|
|
return jsonify(info)
|
|
|
|
except Exception as e:
|
|
log_action('error', f'Error getting system info: {str(e)}')
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@admin_bp.route('/leftover-media')
|
|
@login_required
|
|
@admin_required
|
|
def leftover_media():
|
|
"""Display leftover media files not assigned to any playlist."""
|
|
from app.models.playlist import playlist_content
|
|
from sqlalchemy import select
|
|
|
|
try:
|
|
# Get all content IDs that are in playlists
|
|
stmt = select(playlist_content.c.content_id).distinct()
|
|
content_in_playlists = set(row[0] for row in db.session.execute(stmt))
|
|
|
|
# Get all content
|
|
all_content = Content.query.all()
|
|
|
|
# Filter content not in any playlist
|
|
leftover_content = [c for c in all_content if c.id not in content_in_playlists]
|
|
|
|
# Separate by type
|
|
leftover_images = [c for c in leftover_content if c.content_type == 'image']
|
|
leftover_videos = [c for c in leftover_content if c.content_type == 'video']
|
|
leftover_pdfs = [c for c in leftover_content if c.content_type == 'pdf']
|
|
leftover_pptx = [c for c in leftover_content if c.content_type == 'pptx']
|
|
|
|
# Calculate storage
|
|
total_leftover_size = sum(c.file_size for c in leftover_content)
|
|
images_size = sum(c.file_size for c in leftover_images)
|
|
videos_size = sum(c.file_size for c in leftover_videos)
|
|
pdfs_size = sum(c.file_size for c in leftover_pdfs)
|
|
pptx_size = sum(c.file_size for c in leftover_pptx)
|
|
|
|
return render_template('admin/leftover_media.html',
|
|
leftover_images=leftover_images,
|
|
leftover_videos=leftover_videos,
|
|
leftover_pdfs=leftover_pdfs,
|
|
leftover_pptx=leftover_pptx,
|
|
total_leftover=len(leftover_content),
|
|
total_leftover_size_mb=total_leftover_size / (1024 * 1024),
|
|
images_size_mb=images_size / (1024 * 1024),
|
|
videos_size_mb=videos_size / (1024 * 1024),
|
|
pdfs_size_mb=pdfs_size / (1024 * 1024),
|
|
pptx_size_mb=pptx_size / (1024 * 1024))
|
|
|
|
except Exception as e:
|
|
log_action('error', f'Error loading leftover media: {str(e)}')
|
|
flash('Error loading leftover media.', 'danger')
|
|
return redirect(url_for('admin.admin_panel'))
|
|
|
|
|
|
@admin_bp.route('/delete-leftover-images', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def delete_leftover_images():
|
|
"""Delete all leftover images that are not part of any playlist"""
|
|
from app.models.playlist import playlist_content
|
|
|
|
try:
|
|
# Find all leftover image content
|
|
leftover_images = db.session.query(Content).filter(
|
|
Content.media_type == 'image',
|
|
~Content.id.in_(
|
|
db.session.query(playlist_content.c.content_id)
|
|
)
|
|
).all()
|
|
|
|
deleted_count = 0
|
|
errors = []
|
|
|
|
for content in leftover_images:
|
|
try:
|
|
# Delete physical file
|
|
if content.file_path:
|
|
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], content.file_path)
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
|
|
# Delete database record
|
|
db.session.delete(content)
|
|
deleted_count += 1
|
|
except Exception as e:
|
|
errors.append(f"Error deleting {content.file_path}: {str(e)}")
|
|
|
|
db.session.commit()
|
|
|
|
if errors:
|
|
flash(f'Deleted {deleted_count} images with {len(errors)} errors', 'warning')
|
|
else:
|
|
flash(f'Successfully deleted {deleted_count} leftover images', 'success')
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
flash(f'Error deleting leftover images: {str(e)}', 'danger')
|
|
|
|
return redirect(url_for('admin.leftover_media'))
|
|
|
|
@admin_bp.route('/delete-leftover-videos', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def delete_leftover_videos():
|
|
"""Delete all leftover videos that are not part of any playlist"""
|
|
from app.models.playlist import playlist_content
|
|
|
|
try:
|
|
# Find all leftover video content
|
|
leftover_videos = db.session.query(Content).filter(
|
|
Content.media_type == 'video',
|
|
~Content.id.in_(
|
|
db.session.query(playlist_content.c.content_id)
|
|
)
|
|
).all()
|
|
|
|
deleted_count = 0
|
|
errors = []
|
|
|
|
for content in leftover_videos:
|
|
try:
|
|
# Delete physical file
|
|
if content.file_path:
|
|
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], content.file_path)
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
|
|
# Delete database record
|
|
db.session.delete(content)
|
|
deleted_count += 1
|
|
except Exception as e:
|
|
errors.append(f"Error deleting {content.file_path}: {str(e)}")
|
|
|
|
db.session.commit()
|
|
|
|
if errors:
|
|
flash(f'Deleted {deleted_count} videos with {len(errors)} errors', 'warning')
|
|
else:
|
|
flash(f'Successfully deleted {deleted_count} leftover videos', 'success')
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
flash(f'Error deleting leftover videos: {str(e)}', 'danger')
|
|
|
|
return redirect(url_for('admin.leftover_media'))
|