Files
moto-adv-website/app/routes/admin.py

509 lines
18 KiB
Python

from flask import Blueprint, render_template, request, flash, redirect, url_for, jsonify, current_app
from flask_mail import Message
from flask_login import login_required, current_user
from functools import wraps
from datetime import datetime, timedelta
from sqlalchemy import func, desc
import secrets
from app.routes.mail import mail
from app.extensions import db
from app.models import User, Post, PostImage, GPXFile, Comment, Like, PageView, MailSettings, SentEmail
admin = Blueprint('admin', __name__, url_prefix='/admin')
def admin_required(f):
"""Decorator to require admin access"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or not current_user.is_admin:
flash('Admin access required.', 'error')
return redirect(url_for('auth.login'))
return f(*args, **kwargs)
return decorated_function
@admin.route('/mail-settings', methods=['GET', 'POST'])
@login_required
@admin_required
def mail_settings():
settings = MailSettings.query.first()
if request.method == 'POST':
enabled = bool(request.form.get('enabled'))
provider = request.form.get('provider')
server = request.form.get('server')
port = int(request.form.get('port') or 0)
use_tls = bool(request.form.get('use_tls'))
username = request.form.get('username')
password = request.form.get('password')
default_sender = request.form.get('default_sender')
if not settings:
settings = MailSettings()
db.session.add(settings)
settings.enabled = enabled
settings.provider = provider
settings.server = server
settings.port = port
settings.use_tls = use_tls
settings.username = username
settings.password = password
settings.default_sender = default_sender
db.session.commit()
flash('Mail settings updated.', 'success')
sent_emails = SentEmail.query.order_by(SentEmail.sent_at.desc()).limit(50).all()
return render_template('admin/mail_settings.html', settings=settings, sent_emails=sent_emails)
## Duplicate imports and Blueprint definitions removed
# Password reset token generator (simple, for demonstration)
def generate_reset_token(user):
# In production, use itsdangerous or Flask-Security for secure tokens
return secrets.token_urlsafe(32)
# Admin: Send password reset email to user
@admin.route('/users/<int:user_id>/reset-password', methods=['POST'])
@login_required
@admin_required
def reset_user_password(user_id):
user = User.query.get_or_404(user_id)
token = generate_reset_token(user)
# In production, save token to DB or cache, and validate on reset
reset_url = url_for('auth.reset_password', token=token, _external=True)
msg = Message(
subject="Password Reset Request",
recipients=[user.email],
body=f"Hello {user.nickname},\n\nAn admin has requested a password reset for your account. Click the link below to reset your password:\n{reset_url}\n\nIf you did not request this, please ignore this email."
)
try:
mail.send(msg)
flash(f"Password reset email sent to {user.email}.", "success")
except Exception as e:
current_app.logger.error(f"Error sending reset email: {e}")
flash(f"Failed to send password reset email: {e}", "danger")
return redirect(url_for('admin.user_detail', user_id=user.id))
def admin_required(f):
"""Decorator to require admin access"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or not current_user.is_admin:
flash('Admin access required.', 'error')
return redirect(url_for('auth.login'))
return f(*args, **kwargs)
return decorated_function
@admin.route('/')
@login_required
@admin_required
def dashboard():
"""Admin dashboard with site statistics"""
# Get basic statistics
total_users = User.query.count()
total_posts = Post.query.count()
published_posts = Post.query.filter_by(published=True).count()
pending_posts = Post.query.filter_by(published=False).count()
total_comments = Comment.query.count()
total_likes = Like.query.count()
# Active users (users who created posts or comments in the last 30 days)
thirty_days_ago = datetime.utcnow() - timedelta(days=30)
active_users = db.session.query(User.id).filter(
db.or_(
User.posts.any(Post.created_at >= thirty_days_ago),
User.comments.any(Comment.created_at >= thirty_days_ago)
)
).distinct().count()
# Recent activity
recent_posts = Post.query.order_by(Post.created_at.desc()).limit(5).all()
recent_users = User.query.order_by(User.created_at.desc()).limit(5).all()
# Page views statistics
today = datetime.utcnow().date()
yesterday = today - timedelta(days=1)
week_ago = today - timedelta(days=7)
views_today = PageView.query.filter(
func.date(PageView.created_at) == today
).count()
views_yesterday = PageView.query.filter(
func.date(PageView.created_at) == yesterday
).count()
views_this_week = PageView.query.filter(
PageView.created_at >= week_ago
).count()
# Most viewed posts
most_viewed_posts = db.session.query(
Post.id, Post.title, func.count(PageView.id).label('view_count')
).join(PageView, Post.id == PageView.post_id)\
.group_by(Post.id, Post.title)\
.order_by(desc('view_count'))\
.limit(10).all()
# Most viewed pages
most_viewed_pages = db.session.query(
PageView.path, func.count(PageView.id).label('view_count')
).group_by(PageView.path)\
.order_by(desc('view_count'))\
.limit(10).all()
return render_template('admin/dashboard.html',
total_users=total_users,
total_posts=total_posts,
published_posts=published_posts,
pending_posts=pending_posts,
total_comments=total_comments,
total_likes=total_likes,
active_users=active_users,
recent_posts=recent_posts,
recent_users=recent_users,
views_today=views_today,
views_yesterday=views_yesterday,
views_this_week=views_this_week,
most_viewed_posts=most_viewed_posts,
most_viewed_pages=most_viewed_pages)
@admin.route('/posts')
@login_required
@admin_required
def posts():
"""Admin post management - review posts"""
page = request.args.get('page', 1, type=int)
status = request.args.get('status', 'all') # pending, published, all
query = Post.query
if status == 'pending':
query = query.filter_by(published=False)
elif status == 'published':
query = query.filter_by(published=True)
# 'all' shows all posts
posts = query.order_by(Post.created_at.desc()).paginate(
page=page, per_page=20, error_out=False
)
return render_template('admin/posts.html', posts=posts, status=status)
@admin.route('/posts/<int:post_id>')
@login_required
@admin_required
def post_detail(post_id):
"""View post details for review"""
post = Post.query.get_or_404(post_id)
return render_template('admin/post_detail.html', post=post)
@admin.route('/posts/<int:post_id>/publish', methods=['POST'])
@login_required
@admin_required
def publish_post(post_id):
"""Publish a post and create map routes if GPX files exist"""
post = Post.query.get_or_404(post_id)
post.published = True
post.updated_at = datetime.utcnow()
try:
db.session.commit()
# Create map routes for GPX files when post is published
from app.utils.gpx_processor import process_post_approval
success = process_post_approval(post_id)
if success:
flash(f'Post "{post.title}" has been published and map routes created.', 'success')
else:
flash(f'Post "{post.title}" has been published. No GPX files found or error creating map routes.', 'warning')
except Exception as e:
db.session.rollback()
current_app.logger.error(f'Error publishing post {post_id}: {str(e)}')
flash(f'Error publishing post: {str(e)}', 'error')
return redirect(url_for('admin.posts'))
@admin.route('/posts/<int:post_id>/unpublish', methods=['POST'])
@login_required
@admin_required
def unpublish_post(post_id):
"""Unpublish a post and remove from map"""
post = Post.query.get_or_404(post_id)
post.published = False
post.updated_at = datetime.utcnow()
try:
# Note: We keep the MapRoute data for potential re-publishing
# Only the API will filter by published status
db.session.commit()
flash(f'Post "{post.title}" has been unpublished.', 'success')
except Exception as e:
db.session.rollback()
current_app.logger.error(f'Error unpublishing post {post_id}: {str(e)}')
flash(f'Error unpublishing post: {str(e)}', 'error')
return redirect(url_for('admin.posts'))
@admin.route('/posts/<int:post_id>/delete', methods=['POST'])
@login_required
@admin_required
def delete_post(post_id):
"""Delete a post"""
current_app.logger.info(f'Admin {current_user.id} attempting to delete post {post_id}')
# Get all posts before deletion for debugging
all_posts_before = Post.query.all()
current_app.logger.info(f'Posts before deletion: {[p.id for p in all_posts_before]}')
post = Post.query.get_or_404(post_id)
title = post.title
current_app.logger.info(f'Found post to delete: ID={post.id}, Title="{title}"')
try:
# Delete associated map route if exists
from app.models import MapRoute
map_route = MapRoute.query.filter_by(post_id=post.id).first()
if map_route:
db.session.delete(map_route)
current_app.logger.info(f'Deleted MapRoute for post {post.id}')
# Delete associated files and records
db.session.delete(post)
db.session.commit()
# Clean up orphaned media folders
from app.utils.clean_orphan_media import clean_orphan_post_media
clean_orphan_post_media()
# Check posts after deletion
all_posts_after = Post.query.all()
current_app.logger.info(f'Posts after deletion: {[p.id for p in all_posts_after]}')
current_app.logger.info(f'Successfully deleted post {post_id}: "{title}"')
flash(f'Post "{title}" has been deleted.', 'success')
except Exception as e:
db.session.rollback()
current_app.logger.error(f'Error deleting post {post_id}: {str(e)}')
flash(f'Error deleting post: {str(e)}', 'error')
return redirect(url_for('admin.posts'))
@admin.route('/users')
@login_required
@admin_required
def users():
"""Admin user management"""
page = request.args.get('page', 1, type=int)
users = User.query.order_by(User.created_at.desc()).paginate(
page=page, per_page=50, error_out=False
)
return render_template('admin/users.html', users=users)
@admin.route('/users/<int:user_id>')
@login_required
@admin_required
def user_detail(user_id):
"""View user details"""
user = User.query.get_or_404(user_id)
user_posts = Post.query.filter_by(author_id=user_id).order_by(Post.created_at.desc()).all()
user_comments = Comment.query.filter_by(author_id=user_id).order_by(Comment.created_at.desc()).all()
return render_template('admin/user_detail.html',
user=user,
user_posts=user_posts,
user_comments=user_comments)
@admin.route('/users/<int:user_id>/toggle-status', methods=['POST'])
@login_required
@admin_required
def toggle_user_status(user_id):
"""Toggle user active/inactive status"""
user = User.query.get_or_404(user_id)
# Prevent self-modification if current user is admin
if user.is_admin and user.id == current_user.id:
return jsonify({'success': False, 'error': 'Cannot deactivate your own admin account'})
# Toggle status
user.is_active = not user.is_active
status = "activated" if user.is_active else "deactivated"
try:
db.session.commit()
return jsonify({
'success': True,
'message': f'User {user.nickname} has been {status}',
'is_active': user.is_active
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'error': f'Failed to update user status: {str(e)}'})
@admin.route('/users/<int:user_id>/delete', methods=['DELETE', 'POST'])
@login_required
@admin_required
def delete_user(user_id):
"""Delete user and transfer their posts to admin"""
user = User.query.get_or_404(user_id)
# Prevent self-deletion
if user.id == current_user.id:
return jsonify({'success': False, 'error': 'Cannot delete your own account'})
# Prevent deletion of other admins
if user.is_admin:
return jsonify({'success': False, 'error': 'Cannot delete admin accounts'})
try:
# Transfer all user's posts to current admin
user_posts = Post.query.filter_by(author_id=user.id).all()
for post in user_posts:
post.author_id = current_user.id
# Transfer all user's comments to current admin
user_comments = Comment.query.filter_by(author_id=user.id).all()
for comment in user_comments:
comment.author_id = current_user.id
# Delete user's likes (they will be orphaned)
Like.query.filter_by(user_id=user.id).delete()
# Delete user's page views
PageView.query.filter_by(user_id=user.id).delete()
username = user.nickname
user_posts_count = len(user_posts)
# Delete the user
db.session.delete(user)
db.session.commit()
return jsonify({
'success': True,
'message': f'User {username} has been deleted. {user_posts_count} posts transferred to {current_user.nickname}.'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'error': f'Failed to delete user: {str(e)}'})
@admin.route('/analytics')
@login_required
@admin_required
def analytics():
"""Detailed analytics page"""
# Get date range from query params
days = request.args.get('days', 30, type=int)
end_date = datetime.utcnow().date()
start_date = end_date - timedelta(days=days)
# Daily views for the chart
daily_views = db.session.query(
func.date(PageView.created_at).label('date'),
func.count(PageView.id).label('views')
).filter(
func.date(PageView.created_at) >= start_date
).group_by(func.date(PageView.created_at))\
.order_by('date').all()
# Top posts by views
top_posts = db.session.query(
Post.id, Post.title, Post.author, func.count(PageView.id).label('view_count')
).join(PageView, Post.id == PageView.post_id)\
.filter(PageView.created_at >= start_date)\
.group_by(Post.id, Post.title)\
.order_by(desc('view_count'))\
.limit(20).all()
# User activity - separate queries to avoid cartesian product
user_posts = db.session.query(
User.id,
User.nickname,
func.count(Post.id).label('post_count')
).outerjoin(Post, User.id == Post.author_id)\
.group_by(User.id, User.nickname).subquery()
user_comments = db.session.query(
User.id,
func.count(Comment.id).label('comment_count')
).outerjoin(Comment, User.id == Comment.author_id)\
.group_by(User.id).subquery()
user_activity = db.session.query(
user_posts.c.nickname,
user_posts.c.post_count,
func.coalesce(user_comments.c.comment_count, 0).label('comment_count')
).outerjoin(user_comments, user_posts.c.id == user_comments.c.id)\
.order_by(desc(user_posts.c.post_count))\
.limit(20).all()
# Get overall statistics
total_views = PageView.query.count()
unique_visitors = db.session.query(PageView.ip_address).distinct().count()
today_views = PageView.query.filter(
func.date(PageView.created_at) == datetime.utcnow().date()
).count()
week_start = datetime.utcnow().date() - timedelta(days=7)
week_views = PageView.query.filter(
func.date(PageView.created_at) >= week_start
).count()
# Popular pages
popular_pages = db.session.query(
PageView.path,
func.count(PageView.id).label('view_count')
).group_by(PageView.path)\
.order_by(desc('view_count'))\
.limit(10).all()
# Recent activity
recent_views = PageView.query.join(User, PageView.user_id == User.id, isouter=True)\
.order_by(desc(PageView.created_at))\
.limit(20).all()
# Browser statistics (extract from user agent)
browser_stats = db.session.query(
func.substr(PageView.user_agent, 1, 50).label('browser'),
func.count(PageView.id).label('view_count')
).filter(PageView.user_agent.isnot(None))\
.group_by(func.substr(PageView.user_agent, 1, 50))\
.order_by(desc('view_count'))\
.limit(10).all()
return render_template('admin/analytics.html',
total_views=total_views,
unique_visitors=unique_visitors,
today_views=today_views,
week_views=week_views,
popular_pages=popular_pages,
recent_views=recent_views,
browser_stats=browser_stats,
daily_views=daily_views,
top_posts=top_posts,
user_activity=user_activity,
days=days)
# API endpoints for AJAX requests
@admin.route('/api/quick-stats')
@login_required
@admin_required
def api_quick_stats():
"""Quick stats for dashboard widgets"""
pending_count = Post.query.filter_by(published=False).count()
today_views = PageView.query.filter(
func.date(PageView.created_at) == datetime.utcnow().date()
).count()
return jsonify({
'pending_posts': pending_count,
'today_views': today_views
})