""" Admin routes """ from flask import Blueprint, render_template, request, redirect, url_for, flash from flask_login import login_required, current_user from functools import wraps from app.models.user import User from app.extensions import db from app.utils.logger import log_user_created, log_user_deleted, log_action import os bp = Blueprint('admin', __name__) def admin_required(f): """Decorator to require admin or super admin role""" @wraps(f) def decorated_function(*args, **kwargs): if not current_user.is_authenticated or not current_user.has_admin_access: flash('Admin access required.', 'danger') return redirect(url_for('dashboard.index')) return f(*args, **kwargs) return decorated_function def super_admin_required(f): """Decorator to require super admin role only""" @wraps(f) def decorated_function(*args, **kwargs): if not current_user.is_authenticated or not current_user.is_super_admin: flash('Super admin access required.', 'danger') return redirect(url_for('dashboard.index')) return f(*args, **kwargs) return decorated_function @bp.route('/') @login_required @admin_required def index(): """Admin dashboard""" from flask import current_app # Check if assets exist logo_path = os.path.join(current_app.static_folder, 'assets', 'logo.png') login_picture_path = os.path.join(current_app.static_folder, 'assets', 'login_picture.png') logo_exists = os.path.exists(logo_path) login_picture_exists = os.path.exists(login_picture_path) # Get all users users = User.query.order_by(User.username).all() return render_template( 'admin/index.html', users=users, logo_exists=logo_exists, login_picture_exists=login_picture_exists ) @bp.route('/create_user', methods=['POST']) @login_required @admin_required def create_user(): """Create a new user""" username = request.form.get('username', '').strip() password = request.form.get('password', '') role = request.form.get('role', 'user') # Validation if not username or not password: flash('Username and password are required.', 'danger') return redirect(url_for('admin.index')) if len(password) < 6: flash('Password must be at least 6 characters long.', 'danger') return redirect(url_for('admin.index')) if role not in ['user', 'admin', 'sadmin']: flash('Invalid role specified.', 'danger') return redirect(url_for('admin.index')) # Prevent creating sadmin users - sadmin only exists from deployment if role == 'sadmin': flash('Super admin users cannot be created through the interface.', 'danger') return redirect(url_for('admin.index')) # Check if user already exists if User.query.filter_by(username=username).first(): flash(f'User "{username}" already exists.', 'danger') return redirect(url_for('admin.index')) try: # Create new user user = User(username=username, role=role) user.set_password(password) db.session.add(user) db.session.commit() log_user_created(username, role) flash(f'User "{username}" created successfully.', 'success') except Exception as e: db.session.rollback() flash(f'Error creating user: {str(e)}', 'danger') return redirect(url_for('admin.index')) @bp.route('/delete_user', methods=['POST']) @login_required @super_admin_required def delete_user(): """Delete a user using POST form data""" user_id = request.form.get('user_id') if not user_id: flash('User ID is required.', 'danger') return redirect(url_for('admin.index')) # Prevent self-deletion if int(user_id) == current_user.id: flash('You cannot delete your own account.', 'danger') return redirect(url_for('admin.index')) user = User.query.get_or_404(user_id) username = user.username # Prevent deletion of sadmin users - they are permanent if user.role == 'sadmin': flash('Super admin users cannot be deleted.', 'danger') return redirect(url_for('admin.index')) try: db.session.delete(user) db.session.commit() log_user_deleted(username) flash(f'User "{username}" deleted successfully.', 'success') except Exception as e: db.session.rollback() flash(f'Error deleting user: {str(e)}', 'danger') return redirect(url_for('admin.index')) @bp.route('/change_role/', methods=['POST']) @login_required @super_admin_required def change_role(user_id): """Change user role - restricted to super admin""" # Prevent changing own role if user_id == current_user.id: flash('You cannot change your own role.', 'danger') return redirect(url_for('admin.index')) user = User.query.get_or_404(user_id) new_role = request.form.get('role') if new_role not in ['user', 'admin', 'sadmin']: flash('Invalid role specified.', 'danger') return redirect(url_for('admin.index')) # Prevent any changes to sadmin users - they are permanent if user.role == 'sadmin': flash('Super admin users cannot have their role changed.', 'danger') return redirect(url_for('admin.index')) # Prevent assigning sadmin role - sadmin only exists from deployment if new_role == 'sadmin': flash('Super admin role cannot be assigned through the interface.', 'danger') return redirect(url_for('admin.index')) try: old_role = user.role user.role = new_role db.session.commit() log_action(f"User '{user.username}' role changed from '{old_role}' to '{new_role}'") flash(f'User "{user.username}" role changed to "{new_role}".', 'success') except Exception as e: db.session.rollback() flash(f'Error changing user role: {str(e)}', 'danger') return redirect(url_for('admin.index')) @bp.route('/change_theme', methods=['POST']) @login_required def change_theme(): """Change user theme""" theme = request.form.get('theme', 'light') if theme not in ['light', 'dark']: flash('Invalid theme specified.', 'danger') return redirect(request.referrer or url_for('admin.index')) try: current_user.theme = theme db.session.commit() flash(f'Theme changed to "{theme}".', 'success') except Exception as e: db.session.rollback() flash(f'Error changing theme: {str(e)}', 'danger') return redirect(request.referrer or url_for('admin.index')) @bp.route('/upload_assets', methods=['POST']) @login_required @admin_required def upload_assets(): """Upload logo and login picture""" from flask import current_app from werkzeug.utils import secure_filename assets_folder = os.path.join(current_app.static_folder, 'assets') os.makedirs(assets_folder, exist_ok=True) # Handle logo upload logo_file = request.files.get('logo') if logo_file and logo_file.filename: try: logo_path = os.path.join(assets_folder, 'logo.png') logo_file.save(logo_path) flash('Logo uploaded successfully.', 'success') log_action('Logo uploaded') except Exception as e: flash(f'Error uploading logo: {str(e)}', 'danger') # Handle login picture upload login_picture_file = request.files.get('login_picture') if login_picture_file and login_picture_file.filename: try: login_picture_path = os.path.join(assets_folder, 'login_picture.png') login_picture_file.save(login_picture_path) flash('Login picture uploaded successfully.', 'success') log_action('Login picture uploaded') except Exception as e: flash(f'Error uploading login picture: {str(e)}', 'danger') return redirect(url_for('admin.index')) @bp.route('/clean_unused_files', methods=['POST']) @login_required @admin_required def clean_unused_files(): """Clean unused files from uploads folder - API endpoint""" from flask import current_app, jsonify from app.models.content import Content try: upload_folder = os.path.join(current_app.static_folder, 'uploads') # Get all file names from database content_files = {content.file_name for content in Content.query.all()} # Get all files in upload folder deleted_count = 0 if os.path.exists(upload_folder): all_files = set(os.listdir(upload_folder)) # Find unused files unused_files = all_files - content_files # Delete unused files for file_name in unused_files: file_path = os.path.join(upload_folder, file_name) if os.path.isfile(file_path): try: os.remove(file_path) deleted_count += 1 except Exception as e: print(f"Error deleting {file_path}: {e}") log_action(f'Cleaned {deleted_count} unused files') return jsonify({'success': True, 'deleted_count': deleted_count}) except Exception as e: return jsonify({'success': False, 'message': str(e)}) @bp.route('/optimize_database', methods=['POST']) @login_required @admin_required def optimize_database(): """Optimize database performance""" from flask import jsonify try: # SQLite optimization commands db.session.execute(db.text('VACUUM;')) db.session.execute(db.text('ANALYZE;')) db.session.commit() log_action('Database optimized') return jsonify({'success': True, 'message': 'Database optimized successfully'}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': str(e)}) @bp.route('/scheduled_tasks', methods=['GET']) @login_required @admin_required def get_scheduled_tasks(): """Get all scheduled tasks""" from flask import jsonify from app.models.scheduled_task import ScheduledTask try: tasks = ScheduledTask.query.order_by(ScheduledTask.created_at.desc()).all() tasks_data = [] for task in tasks: task_dict = task.to_dict() # Format next run time if task.next_run: task_dict['next_run'] = task.next_run.strftime('%Y-%m-%d %H:%M') else: task_dict['next_run'] = calculate_next_run(task.schedule) tasks_data.append(task_dict) return jsonify({'success': True, 'tasks': tasks_data}) except Exception as e: return jsonify({'success': False, 'message': str(e)}) @bp.route('/create_scheduled_task', methods=['POST']) @login_required @admin_required def create_scheduled_task(): """Create a new scheduled task""" from app.models.scheduled_task import ScheduledTask name = request.form.get('name', '').strip() task_type = request.form.get('task_type', '').strip() schedule = request.form.get('schedule', '').strip() enabled = 'enabled' in request.form or request.form.get('enabled') == 'true' # Handle time and frequency form data for quick setup time_str = request.form.get('time') frequency = request.form.get('frequency') if time_str and frequency: # Convert time and frequency to cron expression hour, minute = time_str.split(':') if frequency == 'daily': schedule = f"{minute} {hour} * * *" elif frequency == 'weekly': schedule = f"{minute} {hour} * * 0" # Sunday elif frequency == 'monthly': schedule = f"{minute} {hour} 1 * *" # 1st of month # Generate name if not provided if not name: name = f"{task_type.replace('_', ' ').title()} - {frequency.title()}" # Validation if not task_type or not schedule: flash('Task type and schedule are required.', 'danger') return redirect(url_for('admin.index')) if not name: name = f"{task_type.replace('_', ' ').title()} Task" try: # Create new scheduled task task = ScheduledTask( name=name, task_type=task_type, schedule=schedule, enabled=enabled ) # Calculate next run time task.next_run = calculate_next_run_datetime(schedule) db.session.add(task) db.session.commit() log_action(f"Scheduled task '{name}' created") flash(f'Scheduled task "{name}" created successfully.', 'success') except Exception as e: db.session.rollback() flash(f'Error creating scheduled task: {str(e)}', 'danger') return redirect(url_for('admin.index')) @bp.route('/toggle_task/', methods=['POST']) @login_required @admin_required def toggle_task(task_id): """Toggle a scheduled task on/off""" from flask import jsonify from app.models.scheduled_task import ScheduledTask try: task = ScheduledTask.query.get_or_404(task_id) task.enabled = not task.enabled db.session.commit() status = 'enabled' if task.enabled else 'disabled' log_action(f"Scheduled task '{task.name}' {status}") return jsonify({'success': True, 'enabled': task.enabled}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': str(e)}) @bp.route('/delete_task/', methods=['DELETE']) @login_required @admin_required def delete_task(task_id): """Delete a scheduled task""" from flask import jsonify from app.models.scheduled_task import ScheduledTask try: task = ScheduledTask.query.get_or_404(task_id) task_name = task.name db.session.delete(task) db.session.commit() log_action(f"Scheduled task '{task_name}' deleted") return jsonify({'success': True}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': str(e)}) @bp.route('/edit_user', methods=['POST']) @login_required @admin_required def edit_user(): """Edit a user""" user_id = request.form.get('user_id') if not user_id: flash('User ID is required.', 'danger') return redirect(url_for('admin.index')) # Prevent self-editing if int(user_id) == current_user.id: flash('You cannot edit your own account.', 'danger') return redirect(url_for('admin.index')) user = User.query.get_or_404(user_id) # Get form data username = request.form.get('username', '').strip() role = request.form.get('role', 'user') is_active = 'is_active' in request.form password = request.form.get('password', '').strip() if not username: flash('Username cannot be empty.', 'danger') return redirect(url_for('admin.index')) if role not in ['user', 'admin', 'sadmin']: flash('Invalid role specified.', 'danger') return redirect(url_for('admin.index')) # Prevent changing sadmin users - they are permanent if user.role == 'sadmin': flash('Super admin users cannot be modified.', 'danger') return redirect(url_for('admin.index')) # Prevent assigning sadmin role - sadmin only exists from deployment if role == 'sadmin': flash('Super admin role cannot be assigned through the interface.', 'danger') return redirect(url_for('admin.index')) # Check if username is taken by another user if username != user.username: existing_user = User.query.filter_by(username=username).first() if existing_user: flash('Username already exists.', 'danger') return redirect(url_for('admin.index')) try: # Update user old_username = user.username user.username = username user.role = role user.is_active_user = is_active # Update password if provided if password: if len(password) < 6: flash('Password must be at least 6 characters long.', 'danger') return redirect(url_for('admin.index')) user.set_password(password) db.session.commit() log_action(f"User '{old_username}' updated - Username: {username}, Role: {role}, Active: {is_active}" + (", Password changed" if password else "")) flash(f'User "{username}" updated successfully.', 'success') except Exception as e: db.session.rollback() flash(f'Error updating user: {str(e)}', 'danger') return redirect(url_for('admin.index')) def calculate_next_run(cron_expression): """Calculate next run time from cron expression (simplified)""" try: parts = cron_expression.split() if len(parts) >= 2: minute, hour = parts[0], parts[1] return f"Next run: {hour}:{minute.zfill(2)}" return "Invalid schedule" except: return "Invalid schedule" def calculate_next_run_datetime(cron_expression): """Calculate next run datetime from cron expression (basic implementation)""" from datetime import datetime, timedelta try: parts = cron_expression.split() if len(parts) >= 2: minute = int(parts[0]) hour = int(parts[1]) now = datetime.now() next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0) # If the time has passed today, schedule for tomorrow if next_run <= now: next_run += timedelta(days=1) return next_run except: pass return None