481 lines
15 KiB
Python
481 lines
15 KiB
Python
"""
|
|
Admin routes
|
|
"""
|
|
|
|
from flask import Blueprint, render_template, request, redirect, url_for, flash
|
|
from flask_login import login_required, current_user
|
|
from functools import wraps
|
|
from app.models.user import User
|
|
from app.extensions import db
|
|
from app.utils.logger import log_user_created, log_user_deleted, log_action
|
|
import os
|
|
|
|
bp = Blueprint('admin', __name__)
|
|
|
|
def admin_required(f):
|
|
"""Decorator to require admin role"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if not current_user.is_authenticated or not current_user.is_admin:
|
|
flash('Admin access required.', 'danger')
|
|
return redirect(url_for('dashboard.index'))
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
@bp.route('/')
|
|
@login_required
|
|
@admin_required
|
|
def index():
|
|
"""Admin dashboard"""
|
|
from flask import current_app
|
|
|
|
# Check if assets exist
|
|
logo_path = os.path.join(current_app.static_folder, 'assets', 'logo.png')
|
|
login_picture_path = os.path.join(current_app.static_folder, 'assets', 'login_picture.png')
|
|
|
|
logo_exists = os.path.exists(logo_path)
|
|
login_picture_exists = os.path.exists(login_picture_path)
|
|
|
|
# Get all users
|
|
users = User.query.order_by(User.username).all()
|
|
|
|
return render_template(
|
|
'admin/index.html',
|
|
users=users,
|
|
logo_exists=logo_exists,
|
|
login_picture_exists=login_picture_exists
|
|
)
|
|
|
|
@bp.route('/create_user', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def create_user():
|
|
"""Create a new user"""
|
|
username = request.form.get('username', '').strip()
|
|
password = request.form.get('password', '')
|
|
role = request.form.get('role', 'user')
|
|
|
|
# Validation
|
|
if not username or not password:
|
|
flash('Username and password are required.', 'danger')
|
|
return redirect(url_for('admin.index'))
|
|
|
|
if len(password) < 6:
|
|
flash('Password must be at least 6 characters long.', 'danger')
|
|
return redirect(url_for('admin.index'))
|
|
|
|
if role not in ['user', 'admin']:
|
|
flash('Invalid role specified.', 'danger')
|
|
return redirect(url_for('admin.index'))
|
|
|
|
# Check if user already exists
|
|
if User.query.filter_by(username=username).first():
|
|
flash(f'User "{username}" already exists.', 'danger')
|
|
return redirect(url_for('admin.index'))
|
|
|
|
try:
|
|
# Create new user
|
|
user = User(username=username, role=role)
|
|
user.set_password(password)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
log_user_created(username, role)
|
|
flash(f'User "{username}" created successfully.', 'success')
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
flash(f'Error creating user: {str(e)}', 'danger')
|
|
|
|
return redirect(url_for('admin.index'))
|
|
|
|
@bp.route('/delete_user', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def delete_user():
|
|
"""Delete a user using POST form data"""
|
|
user_id = request.form.get('user_id')
|
|
if not user_id:
|
|
flash('User ID is required.', 'danger')
|
|
return redirect(url_for('admin.index'))
|
|
|
|
# Prevent self-deletion
|
|
if int(user_id) == current_user.id:
|
|
flash('You cannot delete your own account.', 'danger')
|
|
return redirect(url_for('admin.index'))
|
|
|
|
user = User.query.get_or_404(user_id)
|
|
username = user.username
|
|
|
|
try:
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
|
|
log_user_deleted(username)
|
|
flash(f'User "{username}" deleted successfully.', 'success')
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
flash(f'Error deleting user: {str(e)}', 'danger')
|
|
|
|
return redirect(url_for('admin.index'))
|
|
|
|
@bp.route('/change_role/<int:user_id>', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def change_role(user_id):
|
|
"""Change user role"""
|
|
# Prevent changing own role
|
|
if user_id == current_user.id:
|
|
flash('You cannot change your own role.', 'danger')
|
|
return redirect(url_for('admin.index'))
|
|
|
|
user = User.query.get_or_404(user_id)
|
|
new_role = request.form.get('role')
|
|
|
|
if new_role not in ['user', 'admin']:
|
|
flash('Invalid role specified.', 'danger')
|
|
return redirect(url_for('admin.index'))
|
|
|
|
try:
|
|
old_role = user.role
|
|
user.role = new_role
|
|
db.session.commit()
|
|
|
|
log_action(f"User '{user.username}' role changed from '{old_role}' to '{new_role}'")
|
|
flash(f'User "{user.username}" role changed to "{new_role}".', 'success')
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
flash(f'Error changing user role: {str(e)}', 'danger')
|
|
|
|
return redirect(url_for('admin.index'))
|
|
|
|
@bp.route('/change_theme', methods=['POST'])
|
|
@login_required
|
|
def change_theme():
|
|
"""Change user theme"""
|
|
theme = request.form.get('theme', 'light')
|
|
|
|
if theme not in ['light', 'dark']:
|
|
flash('Invalid theme specified.', 'danger')
|
|
return redirect(request.referrer or url_for('admin.index'))
|
|
|
|
try:
|
|
current_user.theme = theme
|
|
db.session.commit()
|
|
flash(f'Theme changed to "{theme}".', 'success')
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
flash(f'Error changing theme: {str(e)}', 'danger')
|
|
|
|
return redirect(request.referrer or url_for('admin.index'))
|
|
|
|
@bp.route('/upload_assets', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def upload_assets():
|
|
"""Upload logo and login picture"""
|
|
from flask import current_app
|
|
from werkzeug.utils import secure_filename
|
|
|
|
assets_folder = os.path.join(current_app.static_folder, 'assets')
|
|
os.makedirs(assets_folder, exist_ok=True)
|
|
|
|
# Handle logo upload
|
|
logo_file = request.files.get('logo')
|
|
if logo_file and logo_file.filename:
|
|
try:
|
|
logo_path = os.path.join(assets_folder, 'logo.png')
|
|
logo_file.save(logo_path)
|
|
flash('Logo uploaded successfully.', 'success')
|
|
log_action('Logo uploaded')
|
|
except Exception as e:
|
|
flash(f'Error uploading logo: {str(e)}', 'danger')
|
|
|
|
# Handle login picture upload
|
|
login_picture_file = request.files.get('login_picture')
|
|
if login_picture_file and login_picture_file.filename:
|
|
try:
|
|
login_picture_path = os.path.join(assets_folder, 'login_picture.png')
|
|
login_picture_file.save(login_picture_path)
|
|
flash('Login picture uploaded successfully.', 'success')
|
|
log_action('Login picture uploaded')
|
|
except Exception as e:
|
|
flash(f'Error uploading login picture: {str(e)}', 'danger')
|
|
|
|
return redirect(url_for('admin.index'))
|
|
|
|
@bp.route('/clean_unused_files', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def clean_unused_files():
|
|
"""Clean unused files from uploads folder - API endpoint"""
|
|
from flask import current_app, jsonify
|
|
from app.models.content import Content
|
|
|
|
try:
|
|
upload_folder = os.path.join(current_app.static_folder, 'uploads')
|
|
|
|
# Get all file names from database
|
|
content_files = {content.file_name for content in Content.query.all()}
|
|
|
|
# Get all files in upload folder
|
|
deleted_count = 0
|
|
if os.path.exists(upload_folder):
|
|
all_files = set(os.listdir(upload_folder))
|
|
|
|
# Find unused files
|
|
unused_files = all_files - content_files
|
|
|
|
# Delete unused files
|
|
for file_name in unused_files:
|
|
file_path = os.path.join(upload_folder, file_name)
|
|
if os.path.isfile(file_path):
|
|
try:
|
|
os.remove(file_path)
|
|
deleted_count += 1
|
|
except Exception as e:
|
|
print(f"Error deleting {file_path}: {e}")
|
|
|
|
log_action(f'Cleaned {deleted_count} unused files')
|
|
return jsonify({'success': True, 'deleted_count': deleted_count})
|
|
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'message': str(e)})
|
|
|
|
@bp.route('/optimize_database', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def optimize_database():
|
|
"""Optimize database performance"""
|
|
from flask import jsonify
|
|
|
|
try:
|
|
# SQLite optimization commands
|
|
db.session.execute(db.text('VACUUM;'))
|
|
db.session.execute(db.text('ANALYZE;'))
|
|
db.session.commit()
|
|
|
|
log_action('Database optimized')
|
|
return jsonify({'success': True, 'message': 'Database optimized successfully'})
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return jsonify({'success': False, 'message': str(e)})
|
|
|
|
@bp.route('/scheduled_tasks', methods=['GET'])
|
|
@login_required
|
|
@admin_required
|
|
def get_scheduled_tasks():
|
|
"""Get all scheduled tasks"""
|
|
from flask import jsonify
|
|
from app.models.scheduled_task import ScheduledTask
|
|
|
|
try:
|
|
tasks = ScheduledTask.query.order_by(ScheduledTask.created_at.desc()).all()
|
|
tasks_data = []
|
|
|
|
for task in tasks:
|
|
task_dict = task.to_dict()
|
|
# Format next run time
|
|
if task.next_run:
|
|
task_dict['next_run'] = task.next_run.strftime('%Y-%m-%d %H:%M')
|
|
else:
|
|
task_dict['next_run'] = calculate_next_run(task.schedule)
|
|
|
|
tasks_data.append(task_dict)
|
|
|
|
return jsonify({'success': True, 'tasks': tasks_data})
|
|
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'message': str(e)})
|
|
|
|
@bp.route('/create_scheduled_task', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def create_scheduled_task():
|
|
"""Create a new scheduled task"""
|
|
from app.models.scheduled_task import ScheduledTask
|
|
|
|
name = request.form.get('name', '').strip()
|
|
task_type = request.form.get('task_type', '').strip()
|
|
schedule = request.form.get('schedule', '').strip()
|
|
enabled = 'enabled' in request.form or request.form.get('enabled') == 'true'
|
|
|
|
# Handle time and frequency form data for quick setup
|
|
time_str = request.form.get('time')
|
|
frequency = request.form.get('frequency')
|
|
|
|
if time_str and frequency:
|
|
# Convert time and frequency to cron expression
|
|
hour, minute = time_str.split(':')
|
|
if frequency == 'daily':
|
|
schedule = f"{minute} {hour} * * *"
|
|
elif frequency == 'weekly':
|
|
schedule = f"{minute} {hour} * * 0" # Sunday
|
|
elif frequency == 'monthly':
|
|
schedule = f"{minute} {hour} 1 * *" # 1st of month
|
|
|
|
# Generate name if not provided
|
|
if not name:
|
|
name = f"{task_type.replace('_', ' ').title()} - {frequency.title()}"
|
|
|
|
# Validation
|
|
if not task_type or not schedule:
|
|
flash('Task type and schedule are required.', 'danger')
|
|
return redirect(url_for('admin.index'))
|
|
|
|
if not name:
|
|
name = f"{task_type.replace('_', ' ').title()} Task"
|
|
|
|
try:
|
|
# Create new scheduled task
|
|
task = ScheduledTask(
|
|
name=name,
|
|
task_type=task_type,
|
|
schedule=schedule,
|
|
enabled=enabled
|
|
)
|
|
|
|
# Calculate next run time
|
|
task.next_run = calculate_next_run_datetime(schedule)
|
|
|
|
db.session.add(task)
|
|
db.session.commit()
|
|
|
|
log_action(f"Scheduled task '{name}' created")
|
|
flash(f'Scheduled task "{name}" created successfully.', 'success')
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
flash(f'Error creating scheduled task: {str(e)}', 'danger')
|
|
|
|
return redirect(url_for('admin.index'))
|
|
|
|
@bp.route('/toggle_task/<int:task_id>', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def toggle_task(task_id):
|
|
"""Toggle a scheduled task on/off"""
|
|
from flask import jsonify
|
|
from app.models.scheduled_task import ScheduledTask
|
|
|
|
try:
|
|
task = ScheduledTask.query.get_or_404(task_id)
|
|
task.enabled = not task.enabled
|
|
db.session.commit()
|
|
|
|
status = 'enabled' if task.enabled else 'disabled'
|
|
log_action(f"Scheduled task '{task.name}' {status}")
|
|
return jsonify({'success': True, 'enabled': task.enabled})
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return jsonify({'success': False, 'message': str(e)})
|
|
|
|
@bp.route('/delete_task/<int:task_id>', methods=['DELETE'])
|
|
@login_required
|
|
@admin_required
|
|
def delete_task(task_id):
|
|
"""Delete a scheduled task"""
|
|
from flask import jsonify
|
|
from app.models.scheduled_task import ScheduledTask
|
|
|
|
try:
|
|
task = ScheduledTask.query.get_or_404(task_id)
|
|
task_name = task.name
|
|
db.session.delete(task)
|
|
db.session.commit()
|
|
|
|
log_action(f"Scheduled task '{task_name}' deleted")
|
|
return jsonify({'success': True})
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return jsonify({'success': False, 'message': str(e)})
|
|
|
|
@bp.route('/edit_user', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def edit_user():
|
|
"""Edit a user"""
|
|
user_id = request.form.get('user_id')
|
|
if not user_id:
|
|
flash('User ID is required.', 'danger')
|
|
return redirect(url_for('admin.index'))
|
|
|
|
# Prevent self-editing
|
|
if int(user_id) == current_user.id:
|
|
flash('You cannot edit your own account.', 'danger')
|
|
return redirect(url_for('admin.index'))
|
|
|
|
user = User.query.get_or_404(user_id)
|
|
|
|
# Get form data
|
|
role = request.form.get('role', 'user')
|
|
is_active = 'is_active' in request.form
|
|
password = request.form.get('password', '').strip()
|
|
|
|
if role not in ['user', 'admin']:
|
|
flash('Invalid role specified.', 'danger')
|
|
return redirect(url_for('admin.index'))
|
|
|
|
try:
|
|
# Update user
|
|
user.role = role
|
|
user.is_active_user = is_active
|
|
|
|
# Update password if provided
|
|
if password:
|
|
if len(password) < 6:
|
|
flash('Password must be at least 6 characters long.', 'danger')
|
|
return redirect(url_for('admin.index'))
|
|
user.set_password(password)
|
|
|
|
db.session.commit()
|
|
|
|
log_action(f"User '{user.username}' updated - Role: {role}, Active: {is_active}")
|
|
flash(f'User "{user.username}" updated successfully.', 'success')
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
flash(f'Error updating user: {str(e)}', 'danger')
|
|
|
|
return redirect(url_for('admin.index'))
|
|
|
|
def calculate_next_run(cron_expression):
|
|
"""Calculate next run time from cron expression (simplified)"""
|
|
try:
|
|
parts = cron_expression.split()
|
|
if len(parts) >= 2:
|
|
minute, hour = parts[0], parts[1]
|
|
return f"Next run: {hour}:{minute.zfill(2)}"
|
|
return "Invalid schedule"
|
|
except:
|
|
return "Invalid schedule"
|
|
|
|
def calculate_next_run_datetime(cron_expression):
|
|
"""Calculate next run datetime from cron expression (basic implementation)"""
|
|
from datetime import datetime, timedelta
|
|
|
|
try:
|
|
parts = cron_expression.split()
|
|
if len(parts) >= 2:
|
|
minute = int(parts[0])
|
|
hour = int(parts[1])
|
|
|
|
now = datetime.now()
|
|
next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
|
|
|
|
# If the time has passed today, schedule for tomorrow
|
|
if next_run <= now:
|
|
next_run += timedelta(days=1)
|
|
|
|
return next_run
|
|
except:
|
|
pass
|
|
|
|
return None
|