169 lines
5.8 KiB
Python
169 lines
5.8 KiB
Python
import os
|
|
from app import create_app, db
|
|
from app.models import User, Post, PostImage, GPXFile, Comment, Like, PageView, MapRoute
|
|
|
|
app = create_app()
|
|
|
|
@app.shell_context_processor
|
|
def make_shell_context():
|
|
return {
|
|
'db': db,
|
|
'User': User,
|
|
'Post': Post,
|
|
'PostImage': PostImage,
|
|
'GPXFile': GPXFile,
|
|
'Comment': Comment,
|
|
'Like': Like,
|
|
'PageView': PageView,
|
|
'MapRoute': MapRoute
|
|
}
|
|
|
|
@app.cli.command()
|
|
def init_db():
|
|
"""Initialize the database."""
|
|
db.create_all()
|
|
print('Database initialized.')
|
|
|
|
@app.cli.command()
|
|
def migrate_db():
|
|
"""Apply database migrations."""
|
|
from sqlalchemy import text
|
|
try:
|
|
# Check if the media_folder column exists
|
|
result = db.session.execute(text('PRAGMA table_info(posts)'))
|
|
columns = [row[1] for row in result.fetchall()]
|
|
|
|
if 'media_folder' not in columns:
|
|
# Add the media_folder column
|
|
db.session.execute(text('ALTER TABLE posts ADD COLUMN media_folder VARCHAR(100)'))
|
|
db.session.commit()
|
|
print('Successfully added media_folder column to posts table')
|
|
else:
|
|
print('media_folder column already exists')
|
|
|
|
# Check if is_cover column exists in post_images table
|
|
result = db.session.execute(text('PRAGMA table_info(post_images)'))
|
|
columns = [row[1] for row in result.fetchall()]
|
|
|
|
if 'is_cover' not in columns:
|
|
# Add the is_cover column
|
|
db.session.execute(text('ALTER TABLE post_images ADD COLUMN is_cover BOOLEAN DEFAULT 0'))
|
|
db.session.commit()
|
|
print('Successfully added is_cover column to post_images table')
|
|
else:
|
|
print('is_cover column already exists')
|
|
|
|
# Check if page_views table exists
|
|
result = db.session.execute(text("SELECT name FROM sqlite_master WHERE type='table' AND name='page_views'"))
|
|
if not result.fetchone():
|
|
# Create page_views table
|
|
db.session.execute(text('''
|
|
CREATE TABLE page_views (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
path VARCHAR(255) NOT NULL,
|
|
user_agent VARCHAR(500),
|
|
ip_address VARCHAR(45),
|
|
referer VARCHAR(500),
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
user_id INTEGER,
|
|
post_id INTEGER,
|
|
FOREIGN KEY (user_id) REFERENCES users(id),
|
|
FOREIGN KEY (post_id) REFERENCES posts(id)
|
|
)
|
|
'''))
|
|
db.session.commit()
|
|
print('Successfully created page_views table')
|
|
else:
|
|
print('page_views table already exists')
|
|
|
|
# Check if GPX statistics columns exist
|
|
result = db.session.execute(text('PRAGMA table_info(gpx_files)'))
|
|
columns = [row[1] for row in result.fetchall()]
|
|
|
|
new_columns = [
|
|
('total_distance', 'REAL DEFAULT 0.0'),
|
|
('elevation_gain', 'REAL DEFAULT 0.0'),
|
|
('max_elevation', 'REAL DEFAULT 0.0'),
|
|
('min_elevation', 'REAL DEFAULT 0.0'),
|
|
('total_points', 'INTEGER DEFAULT 0')
|
|
]
|
|
|
|
for column_name, column_type in new_columns:
|
|
if column_name not in columns:
|
|
db.session.execute(text(f'ALTER TABLE gpx_files ADD COLUMN {column_name} {column_type}'))
|
|
db.session.commit()
|
|
print(f'Successfully added {column_name} column to gpx_files table')
|
|
else:
|
|
print(f'{column_name} column already exists in gpx_files table')
|
|
|
|
print('Database schema is up to date')
|
|
except Exception as e:
|
|
print(f'Migration error: {e}')
|
|
db.session.rollback()
|
|
|
|
@app.cli.command()
|
|
def process_gpx_files():
|
|
"""Process existing GPX files to extract statistics."""
|
|
from app.utils.gpx_processor import process_gpx_file
|
|
|
|
gpx_files = GPXFile.query.all()
|
|
processed = 0
|
|
|
|
for gpx_file in gpx_files:
|
|
try:
|
|
if process_gpx_file(gpx_file):
|
|
processed += 1
|
|
print(f'Processed: {gpx_file.original_name}')
|
|
else:
|
|
print(f'Failed to process: {gpx_file.original_name}')
|
|
except Exception as e:
|
|
print(f'Error processing {gpx_file.original_name}: {e}')
|
|
|
|
db.session.commit()
|
|
print(f'Processed {processed}/{len(gpx_files)} GPX files')
|
|
|
|
@app.cli.command()
|
|
def set_cover_images():
|
|
"""Set cover images for posts that don't have them."""
|
|
posts = Post.query.all()
|
|
updated = 0
|
|
|
|
for post in posts:
|
|
# Check if post has a cover image
|
|
cover_image = post.images.filter_by(is_cover=True).first()
|
|
if not cover_image:
|
|
# Set the first image as cover if available
|
|
first_image = post.images.first()
|
|
if first_image:
|
|
first_image.is_cover = True
|
|
updated += 1
|
|
print(f'Set cover image for post: {post.title}')
|
|
|
|
db.session.commit()
|
|
print(f'Updated {updated} posts with cover images')
|
|
|
|
@app.cli.command()
|
|
def create_admin():
|
|
"""Create an admin user."""
|
|
admin_email = os.environ.get('ADMIN_EMAIL', 'admin@moto-adv.com')
|
|
admin_password = os.environ.get('ADMIN_PASSWORD', 'admin123')
|
|
|
|
admin = User.query.filter_by(email=admin_email).first()
|
|
if admin:
|
|
print(f'Admin user {admin_email} already exists.')
|
|
return
|
|
|
|
admin = User(
|
|
nickname='admin',
|
|
email=admin_email,
|
|
is_admin=True
|
|
)
|
|
admin.set_password(admin_password)
|
|
|
|
db.session.add(admin)
|
|
db.session.commit()
|
|
print(f'Admin user created: {admin_email}')
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host='0.0.0.0', port=5000, debug=True)
|