Files
Ske_Signage/app/routes/admin.py
2025-07-16 08:03:57 +03:00

246 lines
7.9 KiB
Python

"""
Admin routes
"""
from flask import Blueprint, render_template, request, redirect, url_for, flash
from flask_login import login_required, current_user
from functools import wraps
from app.models.user import User
from app.extensions import db
from app.utils.logger import log_user_created, log_user_deleted, log_action
import os
bp = Blueprint('admin', __name__)
def admin_required(f):
"""Decorator to require admin role"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or not current_user.is_admin:
flash('Admin access required.', 'danger')
return redirect(url_for('dashboard.index'))
return f(*args, **kwargs)
return decorated_function
@bp.route('/')
@login_required
@admin_required
def index():
"""Admin dashboard"""
from flask import current_app
# Check if assets exist
logo_path = os.path.join(current_app.static_folder, 'assets', 'logo.png')
login_picture_path = os.path.join(current_app.static_folder, 'assets', 'login_picture.png')
logo_exists = os.path.exists(logo_path)
login_picture_exists = os.path.exists(login_picture_path)
# Get all users
users = User.query.order_by(User.username).all()
return render_template(
'admin/index.html',
users=users,
logo_exists=logo_exists,
login_picture_exists=login_picture_exists
)
@bp.route('/create_user', methods=['POST'])
@login_required
@admin_required
def create_user():
"""Create a new user"""
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
role = request.form.get('role', 'user')
# Validation
if not username or not password:
flash('Username and password are required.', 'danger')
return redirect(url_for('admin.index'))
if len(password) < 6:
flash('Password must be at least 6 characters long.', 'danger')
return redirect(url_for('admin.index'))
if role not in ['user', 'admin']:
flash('Invalid role specified.', 'danger')
return redirect(url_for('admin.index'))
# Check if user already exists
if User.query.filter_by(username=username).first():
flash(f'User "{username}" already exists.', 'danger')
return redirect(url_for('admin.index'))
try:
# Create new user
user = User(username=username, role=role)
user.set_password(password)
db.session.add(user)
db.session.commit()
log_user_created(username, role)
flash(f'User "{username}" created successfully.', 'success')
except Exception as e:
db.session.rollback()
flash(f'Error creating user: {str(e)}', 'danger')
return redirect(url_for('admin.index'))
@bp.route('/delete_user/<int:user_id>', methods=['POST'])
@login_required
@admin_required
def delete_user(user_id):
"""Delete a user"""
# Prevent self-deletion
if user_id == current_user.id:
flash('You cannot delete your own account.', 'danger')
return redirect(url_for('admin.index'))
user = User.query.get_or_404(user_id)
username = user.username
try:
db.session.delete(user)
db.session.commit()
log_user_deleted(username)
flash(f'User "{username}" deleted successfully.', 'success')
except Exception as e:
db.session.rollback()
flash(f'Error deleting user: {str(e)}', 'danger')
return redirect(url_for('admin.index'))
@bp.route('/change_role/<int:user_id>', methods=['POST'])
@login_required
@admin_required
def change_role(user_id):
"""Change user role"""
# Prevent changing own role
if user_id == current_user.id:
flash('You cannot change your own role.', 'danger')
return redirect(url_for('admin.index'))
user = User.query.get_or_404(user_id)
new_role = request.form.get('role')
if new_role not in ['user', 'admin']:
flash('Invalid role specified.', 'danger')
return redirect(url_for('admin.index'))
try:
old_role = user.role
user.role = new_role
db.session.commit()
log_action(f"User '{user.username}' role changed from '{old_role}' to '{new_role}'")
flash(f'User "{user.username}" role changed to "{new_role}".', 'success')
except Exception as e:
db.session.rollback()
flash(f'Error changing user role: {str(e)}', 'danger')
return redirect(url_for('admin.index'))
@bp.route('/change_theme', methods=['POST'])
@login_required
def change_theme():
"""Change user theme"""
theme = request.form.get('theme', 'light')
if theme not in ['light', 'dark']:
flash('Invalid theme specified.', 'danger')
return redirect(request.referrer or url_for('admin.index'))
try:
current_user.theme = theme
db.session.commit()
flash(f'Theme changed to "{theme}".', 'success')
except Exception as e:
db.session.rollback()
flash(f'Error changing theme: {str(e)}', 'danger')
return redirect(request.referrer or url_for('admin.index'))
@bp.route('/upload_assets', methods=['POST'])
@login_required
@admin_required
def upload_assets():
"""Upload logo and login picture"""
from flask import current_app
from werkzeug.utils import secure_filename
assets_folder = os.path.join(current_app.static_folder, 'assets')
os.makedirs(assets_folder, exist_ok=True)
# Handle logo upload
logo_file = request.files.get('logo')
if logo_file and logo_file.filename:
try:
logo_path = os.path.join(assets_folder, 'logo.png')
logo_file.save(logo_path)
flash('Logo uploaded successfully.', 'success')
log_action('Logo uploaded')
except Exception as e:
flash(f'Error uploading logo: {str(e)}', 'danger')
# Handle login picture upload
login_picture_file = request.files.get('login_picture')
if login_picture_file and login_picture_file.filename:
try:
login_picture_path = os.path.join(assets_folder, 'login_picture.png')
login_picture_file.save(login_picture_path)
flash('Login picture uploaded successfully.', 'success')
log_action('Login picture uploaded')
except Exception as e:
flash(f'Error uploading login picture: {str(e)}', 'danger')
return redirect(url_for('admin.index'))
@bp.route('/clean_unused_files', methods=['POST'])
@login_required
@admin_required
def clean_unused_files():
"""Clean unused files from uploads folder"""
from flask import current_app
from app.models.content import Content
try:
upload_folder = os.path.join(current_app.static_folder, 'uploads')
# Get all file names from database
content_files = {content.file_name for content in Content.query.all()}
# Get all files in upload folder
if os.path.exists(upload_folder):
all_files = set(os.listdir(upload_folder))
# Find unused files
unused_files = all_files - content_files
# Delete unused files
deleted_count = 0
for file_name in unused_files:
file_path = os.path.join(upload_folder, file_name)
if os.path.isfile(file_path):
try:
os.remove(file_path)
deleted_count += 1
except Exception as e:
print(f"Error deleting {file_path}: {e}")
flash(f'Cleaned {deleted_count} unused files.', 'success')
log_action(f'Cleaned {deleted_count} unused files')
else:
flash('Upload folder does not exist.', 'info')
except Exception as e:
flash(f'Error cleaning files: {str(e)}', 'danger')
return redirect(url_for('admin.index'))