import os from app import create_app, db from app.models import User, Post, PostImage, GPXFile, Comment, Like, PageView, MapRoute app = create_app() @app.shell_context_processor def make_shell_context(): return { 'db': db, 'User': User, 'Post': Post, 'PostImage': PostImage, 'GPXFile': GPXFile, 'Comment': Comment, 'Like': Like, 'PageView': PageView, 'MapRoute': MapRoute } @app.cli.command() def init_db(): """Initialize the database.""" db.create_all() print('Database initialized.') @app.cli.command() def migrate_db(): """Apply database migrations.""" from sqlalchemy import text try: # Check if the media_folder column exists result = db.session.execute(text('PRAGMA table_info(posts)')) columns = [row[1] for row in result.fetchall()] if 'media_folder' not in columns: # Add the media_folder column db.session.execute(text('ALTER TABLE posts ADD COLUMN media_folder VARCHAR(100)')) db.session.commit() print('Successfully added media_folder column to posts table') else: print('media_folder column already exists') # Check if is_cover column exists in post_images table result = db.session.execute(text('PRAGMA table_info(post_images)')) columns = [row[1] for row in result.fetchall()] if 'is_cover' not in columns: # Add the is_cover column db.session.execute(text('ALTER TABLE post_images ADD COLUMN is_cover BOOLEAN DEFAULT 0')) db.session.commit() print('Successfully added is_cover column to post_images table') else: print('is_cover column already exists') # Check if page_views table exists result = db.session.execute(text("SELECT name FROM sqlite_master WHERE type='table' AND name='page_views'")) if not result.fetchone(): # Create page_views table db.session.execute(text(''' CREATE TABLE page_views ( id INTEGER PRIMARY KEY AUTOINCREMENT, path VARCHAR(255) NOT NULL, user_agent VARCHAR(500), ip_address VARCHAR(45), referer VARCHAR(500), created_at DATETIME DEFAULT CURRENT_TIMESTAMP, user_id INTEGER, post_id INTEGER, FOREIGN KEY (user_id) REFERENCES users(id), FOREIGN KEY (post_id) REFERENCES posts(id) ) ''')) db.session.commit() print('Successfully created page_views table') else: print('page_views table already exists') # Check if GPX statistics columns exist result = db.session.execute(text('PRAGMA table_info(gpx_files)')) columns = [row[1] for row in result.fetchall()] new_columns = [ ('total_distance', 'REAL DEFAULT 0.0'), ('elevation_gain', 'REAL DEFAULT 0.0'), ('max_elevation', 'REAL DEFAULT 0.0'), ('min_elevation', 'REAL DEFAULT 0.0'), ('total_points', 'INTEGER DEFAULT 0') ] for column_name, column_type in new_columns: if column_name not in columns: db.session.execute(text(f'ALTER TABLE gpx_files ADD COLUMN {column_name} {column_type}')) db.session.commit() print(f'Successfully added {column_name} column to gpx_files table') else: print(f'{column_name} column already exists in gpx_files table') print('Database schema is up to date') except Exception as e: print(f'Migration error: {e}') db.session.rollback() @app.cli.command() def process_gpx_files(): """Process existing GPX files to extract statistics.""" from app.utils.gpx_processor import process_gpx_file gpx_files = GPXFile.query.all() processed = 0 for gpx_file in gpx_files: try: if process_gpx_file(gpx_file): processed += 1 print(f'Processed: {gpx_file.original_name}') else: print(f'Failed to process: {gpx_file.original_name}') except Exception as e: print(f'Error processing {gpx_file.original_name}: {e}') db.session.commit() print(f'Processed {processed}/{len(gpx_files)} GPX files') @app.cli.command() def set_cover_images(): """Set cover images for posts that don't have them.""" posts = Post.query.all() updated = 0 for post in posts: # Check if post has a cover image cover_image = post.images.filter_by(is_cover=True).first() if not cover_image: # Set the first image as cover if available first_image = post.images.first() if first_image: first_image.is_cover = True updated += 1 print(f'Set cover image for post: {post.title}') db.session.commit() print(f'Updated {updated} posts with cover images') @app.cli.command() def create_admin(): """Create an admin user.""" admin_email = os.environ.get('ADMIN_EMAIL', 'admin@moto-adv.com') admin_password = os.environ.get('ADMIN_PASSWORD', 'admin123') admin = User.query.filter_by(email=admin_email).first() if admin: print(f'Admin user {admin_email} already exists.') return admin = User( nickname='admin', email=admin_email, is_admin=True ) admin.set_password(admin_password) db.session.add(admin) db.session.commit() print(f'Admin user created: {admin_email}') if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=True)