Files
Ske_Signage/app/routes/admin.py
2025-07-17 16:17:52 +03:00

536 lines
18 KiB
Python

"""
Admin routes
"""
from flask import Blueprint, render_template, request, redirect, url_for, flash
from flask_login import login_required, current_user
from functools import wraps
from app.models.user import User
from app.extensions import db
from app.utils.logger import log_user_created, log_user_deleted, log_action
import os
bp = Blueprint('admin', __name__)
def admin_required(f):
"""Decorator to require admin or super admin role"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or not current_user.has_admin_access:
flash('Admin access required.', 'danger')
return redirect(url_for('dashboard.index'))
return f(*args, **kwargs)
return decorated_function
def super_admin_required(f):
"""Decorator to require super admin role only"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or not current_user.is_super_admin:
flash('Super admin access required.', 'danger')
return redirect(url_for('dashboard.index'))
return f(*args, **kwargs)
return decorated_function
@bp.route('/')
@login_required
@admin_required
def index():
"""Admin dashboard"""
from flask import current_app
# Check if assets exist
logo_path = os.path.join(current_app.static_folder, 'assets', 'logo.png')
login_picture_path = os.path.join(current_app.static_folder, 'assets', 'login_picture.png')
logo_exists = os.path.exists(logo_path)
login_picture_exists = os.path.exists(login_picture_path)
# Get all users
users = User.query.order_by(User.username).all()
return render_template(
'admin/index.html',
users=users,
logo_exists=logo_exists,
login_picture_exists=login_picture_exists
)
@bp.route('/create_user', methods=['POST'])
@login_required
@admin_required
def create_user():
"""Create a new user"""
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
role = request.form.get('role', 'user')
# Validation
if not username or not password:
flash('Username and password are required.', 'danger')
return redirect(url_for('admin.index'))
if len(password) < 6:
flash('Password must be at least 6 characters long.', 'danger')
return redirect(url_for('admin.index'))
if role not in ['user', 'admin', 'sadmin']:
flash('Invalid role specified.', 'danger')
return redirect(url_for('admin.index'))
# Prevent regular admins from creating sadmin users - only sadmin can create sadmin
if role == 'sadmin' and not current_user.is_super_admin:
flash('Only super admin users can create other super admin users.', 'danger')
return redirect(url_for('admin.index'))
# Check if user already exists
if User.query.filter_by(username=username).first():
flash(f'User "{username}" already exists.', 'danger')
return redirect(url_for('admin.index'))
try:
# Create new user
user = User(username=username, role=role)
user.set_password(password)
db.session.add(user)
db.session.commit()
log_user_created(username, role)
flash(f'User "{username}" created successfully.', 'success')
except Exception as e:
db.session.rollback()
flash(f'Error creating user: {str(e)}', 'danger')
return redirect(url_for('admin.index'))
@bp.route('/delete_user', methods=['POST'])
@login_required
@admin_required
def delete_user():
"""Delete a user using POST form data"""
user_id = request.form.get('user_id')
if not user_id:
flash('User ID is required.', 'danger')
return redirect(url_for('admin.index'))
# Prevent self-deletion
if int(user_id) == current_user.id:
flash('You cannot delete your own account.', 'danger')
return redirect(url_for('admin.index'))
user = User.query.get_or_404(user_id)
username = user.username
# Prevent deletion of sadmin users by regular admins - only sadmin can delete sadmin
if user.role == 'sadmin' and not current_user.is_super_admin:
flash('Only super admin users can delete other super admin users.', 'danger')
return redirect(url_for('admin.index'))
try:
db.session.delete(user)
db.session.commit()
log_user_deleted(username)
flash(f'User "{username}" deleted successfully.', 'success')
except Exception as e:
db.session.rollback()
flash(f'Error deleting user: {str(e)}', 'danger')
return redirect(url_for('admin.index'))
@bp.route('/change_role/<int:user_id>', methods=['POST'])
@login_required
@super_admin_required
def change_role(user_id):
"""Change user role - restricted to super admin"""
# Prevent changing own role
if user_id == current_user.id:
flash('You cannot change your own role.', 'danger')
return redirect(url_for('admin.index'))
user = User.query.get_or_404(user_id)
new_role = request.form.get('role')
if new_role not in ['user', 'admin', 'sadmin']:
flash('Invalid role specified.', 'danger')
return redirect(url_for('admin.index'))
# Prevent any changes to sadmin users - they are permanent
if user.role == 'sadmin':
flash('Super admin users cannot have their role changed.', 'danger')
return redirect(url_for('admin.index'))
# Prevent assigning sadmin role - sadmin only exists from deployment
if new_role == 'sadmin':
flash('Super admin role cannot be assigned through the interface.', 'danger')
return redirect(url_for('admin.index'))
try:
old_role = user.role
user.role = new_role
db.session.commit()
log_action(f"User '{user.username}' role changed from '{old_role}' to '{new_role}'")
flash(f'User "{user.username}" role changed to "{new_role}".', 'success')
except Exception as e:
db.session.rollback()
flash(f'Error changing user role: {str(e)}', 'danger')
return redirect(url_for('admin.index'))
@bp.route('/change_theme', methods=['POST'])
@login_required
def change_theme():
"""Change user theme"""
theme = request.form.get('theme', 'light')
if theme not in ['light', 'dark']:
flash('Invalid theme specified.', 'danger')
return redirect(request.referrer or url_for('admin.index'))
try:
current_user.theme = theme
db.session.commit()
flash(f'Theme changed to "{theme}".', 'success')
except Exception as e:
db.session.rollback()
flash(f'Error changing theme: {str(e)}', 'danger')
return redirect(request.referrer or url_for('admin.index'))
@bp.route('/upload_assets', methods=['POST'])
@login_required
@admin_required
def upload_assets():
"""Upload logo and login picture"""
from flask import current_app
from werkzeug.utils import secure_filename
assets_folder = os.path.join(current_app.static_folder, 'assets')
os.makedirs(assets_folder, exist_ok=True)
# Handle logo upload
logo_file = request.files.get('logo')
if logo_file and logo_file.filename:
try:
logo_path = os.path.join(assets_folder, 'logo.png')
logo_file.save(logo_path)
flash('Logo uploaded successfully.', 'success')
log_action('Logo uploaded')
except Exception as e:
flash(f'Error uploading logo: {str(e)}', 'danger')
# Handle login picture upload
login_picture_file = request.files.get('login_picture')
if login_picture_file and login_picture_file.filename:
try:
login_picture_path = os.path.join(assets_folder, 'login_picture.png')
login_picture_file.save(login_picture_path)
flash('Login picture uploaded successfully.', 'success')
log_action('Login picture uploaded')
except Exception as e:
flash(f'Error uploading login picture: {str(e)}', 'danger')
return redirect(url_for('admin.index'))
@bp.route('/clean_unused_files', methods=['POST'])
@login_required
@admin_required
def clean_unused_files():
"""Clean unused files from uploads folder - API endpoint"""
from flask import current_app, jsonify
from app.models.content import Content
try:
upload_folder = os.path.join(current_app.static_folder, 'uploads')
# Get all file names from database
content_files = {content.file_name for content in Content.query.all()}
# Get all files in upload folder
deleted_count = 0
if os.path.exists(upload_folder):
all_files = set(os.listdir(upload_folder))
# Find unused files
unused_files = all_files - content_files
# Delete unused files
for file_name in unused_files:
file_path = os.path.join(upload_folder, file_name)
if os.path.isfile(file_path):
try:
os.remove(file_path)
deleted_count += 1
except Exception as e:
print(f"Error deleting {file_path}: {e}")
log_action(f'Cleaned {deleted_count} unused files')
return jsonify({'success': True, 'deleted_count': deleted_count})
except Exception as e:
return jsonify({'success': False, 'message': str(e)})
@bp.route('/optimize_database', methods=['POST'])
@login_required
@admin_required
def optimize_database():
"""Optimize database performance"""
from flask import jsonify
try:
# SQLite optimization commands
db.session.execute(db.text('VACUUM;'))
db.session.execute(db.text('ANALYZE;'))
db.session.commit()
log_action('Database optimized')
return jsonify({'success': True, 'message': 'Database optimized successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@bp.route('/scheduled_tasks', methods=['GET'])
@login_required
@admin_required
def get_scheduled_tasks():
"""Get all scheduled tasks"""
from flask import jsonify
from app.models.scheduled_task import ScheduledTask
try:
tasks = ScheduledTask.query.order_by(ScheduledTask.created_at.desc()).all()
tasks_data = []
for task in tasks:
task_dict = task.to_dict()
# Format next run time
if task.next_run:
task_dict['next_run'] = task.next_run.strftime('%Y-%m-%d %H:%M')
else:
task_dict['next_run'] = calculate_next_run(task.schedule)
tasks_data.append(task_dict)
return jsonify({'success': True, 'tasks': tasks_data})
except Exception as e:
return jsonify({'success': False, 'message': str(e)})
@bp.route('/create_scheduled_task', methods=['POST'])
@login_required
@admin_required
def create_scheduled_task():
"""Create a new scheduled task"""
from app.models.scheduled_task import ScheduledTask
name = request.form.get('name', '').strip()
task_type = request.form.get('task_type', '').strip()
schedule = request.form.get('schedule', '').strip()
enabled = 'enabled' in request.form or request.form.get('enabled') == 'true'
# Handle time and frequency form data for quick setup
time_str = request.form.get('time')
frequency = request.form.get('frequency')
if time_str and frequency:
# Convert time and frequency to cron expression
hour, minute = time_str.split(':')
if frequency == 'daily':
schedule = f"{minute} {hour} * * *"
elif frequency == 'weekly':
schedule = f"{minute} {hour} * * 0" # Sunday
elif frequency == 'monthly':
schedule = f"{minute} {hour} 1 * *" # 1st of month
# Generate name if not provided
if not name:
name = f"{task_type.replace('_', ' ').title()} - {frequency.title()}"
# Validation
if not task_type or not schedule:
flash('Task type and schedule are required.', 'danger')
return redirect(url_for('admin.index'))
if not name:
name = f"{task_type.replace('_', ' ').title()} Task"
try:
# Create new scheduled task
task = ScheduledTask(
name=name,
task_type=task_type,
schedule=schedule,
enabled=enabled
)
# Calculate next run time
task.next_run = calculate_next_run_datetime(schedule)
db.session.add(task)
db.session.commit()
log_action(f"Scheduled task '{name}' created")
flash(f'Scheduled task "{name}" created successfully.', 'success')
except Exception as e:
db.session.rollback()
flash(f'Error creating scheduled task: {str(e)}', 'danger')
return redirect(url_for('admin.index'))
@bp.route('/toggle_task/<int:task_id>', methods=['POST'])
@login_required
@admin_required
def toggle_task(task_id):
"""Toggle a scheduled task on/off"""
from flask import jsonify
from app.models.scheduled_task import ScheduledTask
try:
task = ScheduledTask.query.get_or_404(task_id)
task.enabled = not task.enabled
db.session.commit()
status = 'enabled' if task.enabled else 'disabled'
log_action(f"Scheduled task '{task.name}' {status}")
return jsonify({'success': True, 'enabled': task.enabled})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@bp.route('/delete_task/<int:task_id>', methods=['DELETE'])
@login_required
@admin_required
def delete_task(task_id):
"""Delete a scheduled task"""
from flask import jsonify
from app.models.scheduled_task import ScheduledTask
try:
task = ScheduledTask.query.get_or_404(task_id)
task_name = task.name
db.session.delete(task)
db.session.commit()
log_action(f"Scheduled task '{task_name}' deleted")
return jsonify({'success': True})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@bp.route('/edit_user', methods=['POST'])
@login_required
@admin_required
def edit_user():
"""Edit a user"""
user_id = request.form.get('user_id')
if not user_id:
flash('User ID is required.', 'danger')
return redirect(url_for('admin.index'))
# Prevent self-editing
if int(user_id) == current_user.id:
flash('You cannot edit your own account.', 'danger')
return redirect(url_for('admin.index'))
user = User.query.get_or_404(user_id)
# Get form data
username = request.form.get('username', '').strip()
role = request.form.get('role', 'user')
is_active = 'is_active' in request.form
password = request.form.get('password', '').strip()
if not username:
flash('Username cannot be empty.', 'danger')
return redirect(url_for('admin.index'))
if role not in ['user', 'admin', 'sadmin']:
flash('Invalid role specified.', 'danger')
return redirect(url_for('admin.index'))
# Prevent regular admins from modifying sadmin users - only sadmin can modify sadmin
if user.role == 'sadmin' and not current_user.is_super_admin:
flash('Only super admin users can modify other super admin users.', 'danger')
return redirect(url_for('admin.index'))
# Prevent regular admins from assigning sadmin role - only sadmin can assign sadmin
if role == 'sadmin' and not current_user.is_super_admin:
flash('Only super admin users can assign super admin role.', 'danger')
return redirect(url_for('admin.index'))
# Check if username is taken by another user
if username != user.username:
existing_user = User.query.filter_by(username=username).first()
if existing_user:
flash('Username already exists.', 'danger')
return redirect(url_for('admin.index'))
try:
# Update user
old_username = user.username
user.username = username
user.role = role
user.is_active_user = is_active
# Update password if provided
if password:
if len(password) < 6:
flash('Password must be at least 6 characters long.', 'danger')
return redirect(url_for('admin.index'))
user.set_password(password)
db.session.commit()
log_action(f"User '{old_username}' updated - Username: {username}, Role: {role}, Active: {is_active}" +
(", Password changed" if password else ""))
flash(f'User "{username}" updated successfully.', 'success')
except Exception as e:
db.session.rollback()
flash(f'Error updating user: {str(e)}', 'danger')
return redirect(url_for('admin.index'))
def calculate_next_run(cron_expression):
"""Calculate next run time from cron expression (simplified)"""
try:
parts = cron_expression.split()
if len(parts) >= 2:
minute, hour = parts[0], parts[1]
return f"Next run: {hour}:{minute.zfill(2)}"
return "Invalid schedule"
except:
return "Invalid schedule"
def calculate_next_run_datetime(cron_expression):
"""Calculate next run datetime from cron expression (basic implementation)"""
from datetime import datetime, timedelta
try:
parts = cron_expression.split()
if len(parts) >= 2:
minute = int(parts[0])
hour = int(parts[1])
now = datetime.now()
next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
# If the time has passed today, schedule for tomorrow
if next_run <= now:
next_run += timedelta(days=1)
return next_run
except:
pass
return None