Files
moto-adv-website/run.py

169 lines
5.8 KiB
Python

import os
from app import create_app, db
from app.models import User, Post, PostImage, GPXFile, Comment, Like, PageView, MapRoute
app = create_app()
@app.shell_context_processor
def make_shell_context():
return {
'db': db,
'User': User,
'Post': Post,
'PostImage': PostImage,
'GPXFile': GPXFile,
'Comment': Comment,
'Like': Like,
'PageView': PageView,
'MapRoute': MapRoute
}
@app.cli.command()
def init_db():
"""Initialize the database."""
db.create_all()
print('Database initialized.')
@app.cli.command()
def migrate_db():
"""Apply database migrations."""
from sqlalchemy import text
try:
# Check if the media_folder column exists
result = db.session.execute(text('PRAGMA table_info(posts)'))
columns = [row[1] for row in result.fetchall()]
if 'media_folder' not in columns:
# Add the media_folder column
db.session.execute(text('ALTER TABLE posts ADD COLUMN media_folder VARCHAR(100)'))
db.session.commit()
print('Successfully added media_folder column to posts table')
else:
print('media_folder column already exists')
# Check if is_cover column exists in post_images table
result = db.session.execute(text('PRAGMA table_info(post_images)'))
columns = [row[1] for row in result.fetchall()]
if 'is_cover' not in columns:
# Add the is_cover column
db.session.execute(text('ALTER TABLE post_images ADD COLUMN is_cover BOOLEAN DEFAULT 0'))
db.session.commit()
print('Successfully added is_cover column to post_images table')
else:
print('is_cover column already exists')
# Check if page_views table exists
result = db.session.execute(text("SELECT name FROM sqlite_master WHERE type='table' AND name='page_views'"))
if not result.fetchone():
# Create page_views table
db.session.execute(text('''
CREATE TABLE page_views (
id INTEGER PRIMARY KEY AUTOINCREMENT,
path VARCHAR(255) NOT NULL,
user_agent VARCHAR(500),
ip_address VARCHAR(45),
referer VARCHAR(500),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
user_id INTEGER,
post_id INTEGER,
FOREIGN KEY (user_id) REFERENCES users(id),
FOREIGN KEY (post_id) REFERENCES posts(id)
)
'''))
db.session.commit()
print('Successfully created page_views table')
else:
print('page_views table already exists')
# Check if GPX statistics columns exist
result = db.session.execute(text('PRAGMA table_info(gpx_files)'))
columns = [row[1] for row in result.fetchall()]
new_columns = [
('total_distance', 'REAL DEFAULT 0.0'),
('elevation_gain', 'REAL DEFAULT 0.0'),
('max_elevation', 'REAL DEFAULT 0.0'),
('min_elevation', 'REAL DEFAULT 0.0'),
('total_points', 'INTEGER DEFAULT 0')
]
for column_name, column_type in new_columns:
if column_name not in columns:
db.session.execute(text(f'ALTER TABLE gpx_files ADD COLUMN {column_name} {column_type}'))
db.session.commit()
print(f'Successfully added {column_name} column to gpx_files table')
else:
print(f'{column_name} column already exists in gpx_files table')
print('Database schema is up to date')
except Exception as e:
print(f'Migration error: {e}')
db.session.rollback()
@app.cli.command()
def process_gpx_files():
"""Process existing GPX files to extract statistics."""
from app.utils.gpx_processor import process_gpx_file
gpx_files = GPXFile.query.all()
processed = 0
for gpx_file in gpx_files:
try:
if process_gpx_file(gpx_file):
processed += 1
print(f'Processed: {gpx_file.original_name}')
else:
print(f'Failed to process: {gpx_file.original_name}')
except Exception as e:
print(f'Error processing {gpx_file.original_name}: {e}')
db.session.commit()
print(f'Processed {processed}/{len(gpx_files)} GPX files')
@app.cli.command()
def set_cover_images():
"""Set cover images for posts that don't have them."""
posts = Post.query.all()
updated = 0
for post in posts:
# Check if post has a cover image
cover_image = post.images.filter_by(is_cover=True).first()
if not cover_image:
# Set the first image as cover if available
first_image = post.images.first()
if first_image:
first_image.is_cover = True
updated += 1
print(f'Set cover image for post: {post.title}')
db.session.commit()
print(f'Updated {updated} posts with cover images')
@app.cli.command()
def create_admin():
"""Create an admin user."""
admin_email = os.environ.get('ADMIN_EMAIL', 'admin@moto-adv.com')
admin_password = os.environ.get('ADMIN_PASSWORD', 'admin123')
admin = User.query.filter_by(email=admin_email).first()
if admin:
print(f'Admin user {admin_email} already exists.')
return
admin = User(
nickname='admin',
email=admin_email,
is_admin=True
)
admin.set_password(admin_password)
db.session.add(admin)
db.session.commit()
print(f'Admin user created: {admin_email}')
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)