501 lines
18 KiB
Python
501 lines
18 KiB
Python
|
|
|
|
from flask import Blueprint, render_template, request, flash, redirect, url_for, jsonify, current_app
|
|
from flask_mail import Message
|
|
from flask_login import login_required, current_user
|
|
from functools import wraps
|
|
from datetime import datetime, timedelta
|
|
from sqlalchemy import func, desc
|
|
import secrets
|
|
from app.routes.mail import mail
|
|
from app.extensions import db
|
|
from app.models import User, Post, PostImage, GPXFile, Comment, Like, PageView, MailSettings, SentEmail
|
|
|
|
admin = Blueprint('admin', __name__, url_prefix='/admin')
|
|
|
|
def admin_required(f):
|
|
"""Decorator to require admin access"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if not current_user.is_authenticated or not current_user.is_admin:
|
|
flash('Admin access required.', 'error')
|
|
return redirect(url_for('auth.login'))
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
@admin.route('/mail-settings', methods=['GET', 'POST'])
|
|
@login_required
|
|
@admin_required
|
|
def mail_settings():
|
|
settings = MailSettings.query.first()
|
|
if request.method == 'POST':
|
|
enabled = bool(request.form.get('enabled'))
|
|
provider = request.form.get('provider')
|
|
server = request.form.get('server')
|
|
port = int(request.form.get('port') or 0)
|
|
use_tls = bool(request.form.get('use_tls'))
|
|
username = request.form.get('username')
|
|
password = request.form.get('password')
|
|
default_sender = request.form.get('default_sender')
|
|
if not settings:
|
|
settings = MailSettings()
|
|
db.session.add(settings)
|
|
settings.enabled = enabled
|
|
settings.provider = provider
|
|
settings.server = server
|
|
settings.port = port
|
|
settings.use_tls = use_tls
|
|
settings.username = username
|
|
settings.password = password
|
|
settings.default_sender = default_sender
|
|
db.session.commit()
|
|
flash('Mail settings updated.', 'success')
|
|
sent_emails = SentEmail.query.order_by(SentEmail.sent_at.desc()).limit(50).all()
|
|
return render_template('admin/mail_settings.html', settings=settings, sent_emails=sent_emails)
|
|
|
|
|
|
## Duplicate imports and Blueprint definitions removed
|
|
|
|
# Password reset token generator (simple, for demonstration)
|
|
def generate_reset_token(user):
|
|
# In production, use itsdangerous or Flask-Security for secure tokens
|
|
return secrets.token_urlsafe(32)
|
|
|
|
# Admin: Send password reset email to user
|
|
@admin.route('/users/<int:user_id>/reset-password', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def reset_user_password(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
token = generate_reset_token(user)
|
|
# In production, save token to DB or cache, and validate on reset
|
|
reset_url = url_for('auth.reset_password', token=token, _external=True)
|
|
msg = Message(
|
|
subject="Password Reset Request",
|
|
recipients=[user.email],
|
|
body=f"Hello {user.nickname},\n\nAn admin has requested a password reset for your account. Click the link below to reset your password:\n{reset_url}\n\nIf you did not request this, please ignore this email."
|
|
)
|
|
try:
|
|
mail.send(msg)
|
|
flash(f"Password reset email sent to {user.email}.", "success")
|
|
except Exception as e:
|
|
current_app.logger.error(f"Error sending reset email: {e}")
|
|
flash(f"Failed to send password reset email: {e}", "danger")
|
|
return redirect(url_for('admin.user_detail', user_id=user.id))
|
|
|
|
def admin_required(f):
|
|
"""Decorator to require admin access"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if not current_user.is_authenticated or not current_user.is_admin:
|
|
flash('Admin access required.', 'error')
|
|
return redirect(url_for('auth.login'))
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
@admin.route('/')
|
|
@login_required
|
|
@admin_required
|
|
def dashboard():
|
|
"""Admin dashboard with site statistics"""
|
|
# Get basic statistics
|
|
total_users = User.query.count()
|
|
total_posts = Post.query.count()
|
|
published_posts = Post.query.filter_by(published=True).count()
|
|
pending_posts = Post.query.filter_by(published=False).count()
|
|
total_comments = Comment.query.count()
|
|
total_likes = Like.query.count()
|
|
|
|
# Active users (users who created posts or comments in the last 30 days)
|
|
thirty_days_ago = datetime.utcnow() - timedelta(days=30)
|
|
active_users = db.session.query(User.id).filter(
|
|
db.or_(
|
|
User.posts.any(Post.created_at >= thirty_days_ago),
|
|
User.comments.any(Comment.created_at >= thirty_days_ago)
|
|
)
|
|
).distinct().count()
|
|
|
|
# Recent activity
|
|
recent_posts = Post.query.order_by(Post.created_at.desc()).limit(5).all()
|
|
recent_users = User.query.order_by(User.created_at.desc()).limit(5).all()
|
|
|
|
# Page views statistics
|
|
today = datetime.utcnow().date()
|
|
yesterday = today - timedelta(days=1)
|
|
week_ago = today - timedelta(days=7)
|
|
|
|
views_today = PageView.query.filter(
|
|
func.date(PageView.created_at) == today
|
|
).count()
|
|
|
|
views_yesterday = PageView.query.filter(
|
|
func.date(PageView.created_at) == yesterday
|
|
).count()
|
|
|
|
views_this_week = PageView.query.filter(
|
|
PageView.created_at >= week_ago
|
|
).count()
|
|
|
|
# Most viewed posts
|
|
most_viewed_posts = db.session.query(
|
|
Post.id, Post.title, func.count(PageView.id).label('view_count')
|
|
).join(PageView, Post.id == PageView.post_id)\
|
|
.group_by(Post.id, Post.title)\
|
|
.order_by(desc('view_count'))\
|
|
.limit(10).all()
|
|
|
|
# Most viewed pages
|
|
most_viewed_pages = db.session.query(
|
|
PageView.path, func.count(PageView.id).label('view_count')
|
|
).group_by(PageView.path)\
|
|
.order_by(desc('view_count'))\
|
|
.limit(10).all()
|
|
|
|
return render_template('admin/dashboard.html',
|
|
total_users=total_users,
|
|
total_posts=total_posts,
|
|
published_posts=published_posts,
|
|
pending_posts=pending_posts,
|
|
total_comments=total_comments,
|
|
total_likes=total_likes,
|
|
active_users=active_users,
|
|
recent_posts=recent_posts,
|
|
recent_users=recent_users,
|
|
views_today=views_today,
|
|
views_yesterday=views_yesterday,
|
|
views_this_week=views_this_week,
|
|
most_viewed_posts=most_viewed_posts,
|
|
most_viewed_pages=most_viewed_pages)
|
|
|
|
@admin.route('/posts')
|
|
@login_required
|
|
@admin_required
|
|
def posts():
|
|
"""Admin post management - review posts"""
|
|
page = request.args.get('page', 1, type=int)
|
|
status = request.args.get('status', 'all') # pending, published, all
|
|
|
|
query = Post.query
|
|
|
|
if status == 'pending':
|
|
query = query.filter_by(published=False)
|
|
elif status == 'published':
|
|
query = query.filter_by(published=True)
|
|
# 'all' shows all posts
|
|
|
|
posts = query.order_by(Post.created_at.desc()).paginate(
|
|
page=page, per_page=20, error_out=False
|
|
)
|
|
|
|
return render_template('admin/posts.html', posts=posts, status=status)
|
|
|
|
@admin.route('/posts/<int:post_id>')
|
|
@login_required
|
|
@admin_required
|
|
def post_detail(post_id):
|
|
"""View post details for review"""
|
|
post = Post.query.get_or_404(post_id)
|
|
return render_template('admin/post_detail.html', post=post)
|
|
|
|
@admin.route('/posts/<int:post_id>/publish', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def publish_post(post_id):
|
|
"""Publish a post and create map routes if GPX files exist"""
|
|
post = Post.query.get_or_404(post_id)
|
|
post.published = True
|
|
post.updated_at = datetime.utcnow()
|
|
|
|
try:
|
|
db.session.commit()
|
|
|
|
# Create map routes for GPX files when post is published
|
|
from app.utils.gpx_processor import process_post_approval
|
|
success = process_post_approval(post_id)
|
|
|
|
if success:
|
|
flash(f'Post "{post.title}" has been published and map routes created.', 'success')
|
|
else:
|
|
flash(f'Post "{post.title}" has been published. No GPX files found or error creating map routes.', 'warning')
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
current_app.logger.error(f'Error publishing post {post_id}: {str(e)}')
|
|
flash(f'Error publishing post: {str(e)}', 'error')
|
|
|
|
return redirect(url_for('admin.posts'))
|
|
|
|
@admin.route('/posts/<int:post_id>/unpublish', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def unpublish_post(post_id):
|
|
"""Unpublish a post and remove from map"""
|
|
post = Post.query.get_or_404(post_id)
|
|
post.published = False
|
|
post.updated_at = datetime.utcnow()
|
|
|
|
try:
|
|
# Note: We keep the MapRoute data for potential re-publishing
|
|
# Only the API will filter by published status
|
|
db.session.commit()
|
|
flash(f'Post "{post.title}" has been unpublished.', 'success')
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
current_app.logger.error(f'Error unpublishing post {post_id}: {str(e)}')
|
|
flash(f'Error unpublishing post: {str(e)}', 'error')
|
|
|
|
return redirect(url_for('admin.posts'))
|
|
|
|
@admin.route('/posts/<int:post_id>/delete', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def delete_post(post_id):
|
|
"""Delete a post"""
|
|
current_app.logger.info(f'Admin {current_user.id} attempting to delete post {post_id}')
|
|
|
|
# Get all posts before deletion for debugging
|
|
all_posts_before = Post.query.all()
|
|
current_app.logger.info(f'Posts before deletion: {[p.id for p in all_posts_before]}')
|
|
|
|
post = Post.query.get_or_404(post_id)
|
|
title = post.title
|
|
|
|
current_app.logger.info(f'Found post to delete: ID={post.id}, Title="{title}"')
|
|
|
|
try:
|
|
# Delete associated files and records
|
|
db.session.delete(post)
|
|
db.session.commit()
|
|
|
|
# Clean up orphaned media folders
|
|
from app.utils.clean_orphan_media import clean_orphan_post_media
|
|
clean_orphan_post_media()
|
|
|
|
# Check posts after deletion
|
|
all_posts_after = Post.query.all()
|
|
current_app.logger.info(f'Posts after deletion: {[p.id for p in all_posts_after]}')
|
|
|
|
current_app.logger.info(f'Successfully deleted post {post_id}: "{title}"')
|
|
flash(f'Post "{title}" has been deleted.', 'success')
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
current_app.logger.error(f'Error deleting post {post_id}: {str(e)}')
|
|
flash(f'Error deleting post: {str(e)}', 'error')
|
|
|
|
return redirect(url_for('admin.posts'))
|
|
|
|
@admin.route('/users')
|
|
@login_required
|
|
@admin_required
|
|
def users():
|
|
"""Admin user management"""
|
|
page = request.args.get('page', 1, type=int)
|
|
|
|
users = User.query.order_by(User.created_at.desc()).paginate(
|
|
page=page, per_page=50, error_out=False
|
|
)
|
|
|
|
return render_template('admin/users.html', users=users)
|
|
|
|
@admin.route('/users/<int:user_id>')
|
|
@login_required
|
|
@admin_required
|
|
def user_detail(user_id):
|
|
"""View user details"""
|
|
user = User.query.get_or_404(user_id)
|
|
user_posts = Post.query.filter_by(author_id=user_id).order_by(Post.created_at.desc()).all()
|
|
user_comments = Comment.query.filter_by(author_id=user_id).order_by(Comment.created_at.desc()).all()
|
|
|
|
return render_template('admin/user_detail.html',
|
|
user=user,
|
|
user_posts=user_posts,
|
|
user_comments=user_comments)
|
|
|
|
@admin.route('/users/<int:user_id>/toggle-status', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def toggle_user_status(user_id):
|
|
"""Toggle user active/inactive status"""
|
|
user = User.query.get_or_404(user_id)
|
|
|
|
# Prevent self-modification if current user is admin
|
|
if user.is_admin and user.id == current_user.id:
|
|
return jsonify({'success': False, 'error': 'Cannot deactivate your own admin account'})
|
|
|
|
# Toggle status
|
|
user.is_active = not user.is_active
|
|
status = "activated" if user.is_active else "deactivated"
|
|
|
|
try:
|
|
db.session.commit()
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'User {user.nickname} has been {status}',
|
|
'is_active': user.is_active
|
|
})
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return jsonify({'success': False, 'error': f'Failed to update user status: {str(e)}'})
|
|
|
|
@admin.route('/users/<int:user_id>/delete', methods=['DELETE', 'POST'])
|
|
@login_required
|
|
@admin_required
|
|
def delete_user(user_id):
|
|
"""Delete user and transfer their posts to admin"""
|
|
user = User.query.get_or_404(user_id)
|
|
|
|
# Prevent self-deletion
|
|
if user.id == current_user.id:
|
|
return jsonify({'success': False, 'error': 'Cannot delete your own account'})
|
|
|
|
# Prevent deletion of other admins
|
|
if user.is_admin:
|
|
return jsonify({'success': False, 'error': 'Cannot delete admin accounts'})
|
|
|
|
try:
|
|
# Transfer all user's posts to current admin
|
|
user_posts = Post.query.filter_by(author_id=user.id).all()
|
|
for post in user_posts:
|
|
post.author_id = current_user.id
|
|
|
|
# Transfer all user's comments to current admin
|
|
user_comments = Comment.query.filter_by(author_id=user.id).all()
|
|
for comment in user_comments:
|
|
comment.author_id = current_user.id
|
|
|
|
# Delete user's likes (they will be orphaned)
|
|
Like.query.filter_by(user_id=user.id).delete()
|
|
|
|
# Delete user's page views
|
|
PageView.query.filter_by(user_id=user.id).delete()
|
|
|
|
username = user.nickname
|
|
user_posts_count = len(user_posts)
|
|
|
|
# Delete the user
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'User {username} has been deleted. {user_posts_count} posts transferred to {current_user.nickname}.'
|
|
})
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return jsonify({'success': False, 'error': f'Failed to delete user: {str(e)}'})
|
|
|
|
@admin.route('/analytics')
|
|
@login_required
|
|
@admin_required
|
|
def analytics():
|
|
"""Detailed analytics page"""
|
|
# Get date range from query params
|
|
days = request.args.get('days', 30, type=int)
|
|
end_date = datetime.utcnow().date()
|
|
start_date = end_date - timedelta(days=days)
|
|
|
|
# Daily views for the chart
|
|
daily_views = db.session.query(
|
|
func.date(PageView.created_at).label('date'),
|
|
func.count(PageView.id).label('views')
|
|
).filter(
|
|
func.date(PageView.created_at) >= start_date
|
|
).group_by(func.date(PageView.created_at))\
|
|
.order_by('date').all()
|
|
|
|
# Top posts by views
|
|
top_posts = db.session.query(
|
|
Post.id, Post.title, Post.author, func.count(PageView.id).label('view_count')
|
|
).join(PageView, Post.id == PageView.post_id)\
|
|
.filter(PageView.created_at >= start_date)\
|
|
.group_by(Post.id, Post.title)\
|
|
.order_by(desc('view_count'))\
|
|
.limit(20).all()
|
|
|
|
# User activity - separate queries to avoid cartesian product
|
|
user_posts = db.session.query(
|
|
User.id,
|
|
User.nickname,
|
|
func.count(Post.id).label('post_count')
|
|
).outerjoin(Post, User.id == Post.author_id)\
|
|
.group_by(User.id, User.nickname).subquery()
|
|
|
|
user_comments = db.session.query(
|
|
User.id,
|
|
func.count(Comment.id).label('comment_count')
|
|
).outerjoin(Comment, User.id == Comment.author_id)\
|
|
.group_by(User.id).subquery()
|
|
|
|
user_activity = db.session.query(
|
|
user_posts.c.nickname,
|
|
user_posts.c.post_count,
|
|
func.coalesce(user_comments.c.comment_count, 0).label('comment_count')
|
|
).outerjoin(user_comments, user_posts.c.id == user_comments.c.id)\
|
|
.order_by(desc(user_posts.c.post_count))\
|
|
.limit(20).all()
|
|
|
|
# Get overall statistics
|
|
total_views = PageView.query.count()
|
|
unique_visitors = db.session.query(PageView.ip_address).distinct().count()
|
|
today_views = PageView.query.filter(
|
|
func.date(PageView.created_at) == datetime.utcnow().date()
|
|
).count()
|
|
week_start = datetime.utcnow().date() - timedelta(days=7)
|
|
week_views = PageView.query.filter(
|
|
func.date(PageView.created_at) >= week_start
|
|
).count()
|
|
|
|
# Popular pages
|
|
popular_pages = db.session.query(
|
|
PageView.path,
|
|
func.count(PageView.id).label('view_count')
|
|
).group_by(PageView.path)\
|
|
.order_by(desc('view_count'))\
|
|
.limit(10).all()
|
|
|
|
# Recent activity
|
|
recent_views = PageView.query.join(User, PageView.user_id == User.id, isouter=True)\
|
|
.order_by(desc(PageView.created_at))\
|
|
.limit(20).all()
|
|
|
|
# Browser statistics (extract from user agent)
|
|
browser_stats = db.session.query(
|
|
func.substr(PageView.user_agent, 1, 50).label('browser'),
|
|
func.count(PageView.id).label('view_count')
|
|
).filter(PageView.user_agent.isnot(None))\
|
|
.group_by(func.substr(PageView.user_agent, 1, 50))\
|
|
.order_by(desc('view_count'))\
|
|
.limit(10).all()
|
|
|
|
return render_template('admin/analytics.html',
|
|
total_views=total_views,
|
|
unique_visitors=unique_visitors,
|
|
today_views=today_views,
|
|
week_views=week_views,
|
|
popular_pages=popular_pages,
|
|
recent_views=recent_views,
|
|
browser_stats=browser_stats,
|
|
daily_views=daily_views,
|
|
top_posts=top_posts,
|
|
user_activity=user_activity,
|
|
days=days)
|
|
|
|
# API endpoints for AJAX requests
|
|
@admin.route('/api/quick-stats')
|
|
@login_required
|
|
@admin_required
|
|
def api_quick_stats():
|
|
"""Quick stats for dashboard widgets"""
|
|
pending_count = Post.query.filter_by(published=False).count()
|
|
today_views = PageView.query.filter(
|
|
func.date(PageView.created_at) == datetime.utcnow().date()
|
|
).count()
|
|
|
|
return jsonify({
|
|
'pending_posts': pending_count,
|
|
'today_views': today_views
|
|
})
|