230 lines
7.9 KiB
Python
230 lines
7.9 KiB
Python
"""
|
|
File upload and processing utilities
|
|
"""
|
|
|
|
import os
|
|
import subprocess
|
|
from flask import current_app
|
|
from werkzeug.utils import secure_filename
|
|
from app.extensions import db
|
|
from app.models.content import Content
|
|
from app.models.player import Player
|
|
from app.models.group import Group
|
|
from app.utils.logger import log_upload, log_process, log_content_added
|
|
|
|
def process_uploaded_files(app, files, duration, target_type, target_id):
|
|
"""
|
|
Process uploaded files and add them to playlists
|
|
|
|
Args:
|
|
app: Flask application instance
|
|
files: List of uploaded files
|
|
duration: Display duration in seconds
|
|
target_type: 'player' or 'group'
|
|
target_id: Target ID
|
|
|
|
Returns:
|
|
dict: Results with success and error lists
|
|
"""
|
|
results = {'success': [], 'errors': []}
|
|
|
|
upload_folder = os.path.join(app.static_folder, 'uploads')
|
|
os.makedirs(upload_folder, exist_ok=True)
|
|
|
|
for file in files:
|
|
if file and file.filename:
|
|
try:
|
|
# Secure the filename
|
|
filename = secure_filename(file.filename)
|
|
if not filename:
|
|
results['errors'].append(f"Invalid filename: {file.filename}")
|
|
continue
|
|
|
|
# Get file extension and determine content type
|
|
file_ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
|
|
content_type = get_content_type(file_ext)
|
|
|
|
if not content_type:
|
|
results['errors'].append(f"Unsupported file type: {file_ext}")
|
|
continue
|
|
|
|
# Save file
|
|
file_path = os.path.join(upload_folder, filename)
|
|
file.save(file_path)
|
|
|
|
# Get file size
|
|
file_size = os.path.getsize(file_path)
|
|
|
|
# Process based on target type
|
|
if target_type == 'player':
|
|
success = add_content_to_player(
|
|
player_id=target_id,
|
|
filename=filename,
|
|
original_name=file.filename,
|
|
duration=duration,
|
|
content_type=content_type,
|
|
file_size=file_size
|
|
)
|
|
elif target_type == 'group':
|
|
success = add_content_to_group(
|
|
group_id=target_id,
|
|
filename=filename,
|
|
original_name=file.filename,
|
|
duration=duration,
|
|
content_type=content_type,
|
|
file_size=file_size
|
|
)
|
|
else:
|
|
results['errors'].append(f"Invalid target type: {target_type}")
|
|
continue
|
|
|
|
if success:
|
|
results['success'].append(filename)
|
|
log_upload(content_type, filename, target_type, str(target_id))
|
|
else:
|
|
results['errors'].append(f"Failed to add {filename} to {target_type}")
|
|
|
|
except Exception as e:
|
|
results['errors'].append(f"Error processing {file.filename}: {str(e)}")
|
|
|
|
return results
|
|
|
|
def get_content_type(file_ext):
|
|
"""Determine content type from file extension"""
|
|
image_extensions = {'png', 'jpg', 'jpeg', 'gif', 'bmp', 'webp'}
|
|
video_extensions = {'mp4', 'avi', 'mov', 'wmv', 'flv', 'webm', 'mkv'}
|
|
document_extensions = {'pdf', 'pptx', 'ppt'}
|
|
|
|
if file_ext in image_extensions:
|
|
return 'image'
|
|
elif file_ext in video_extensions:
|
|
return 'video'
|
|
elif file_ext in document_extensions:
|
|
return 'document'
|
|
else:
|
|
return None
|
|
|
|
def add_content_to_player(player_id, filename, original_name, duration, content_type, file_size):
|
|
"""Add content to a specific player"""
|
|
try:
|
|
player = Player.query.get(player_id)
|
|
if not player:
|
|
return False
|
|
|
|
# Get next position
|
|
max_position = db.session.query(db.func.max(Content.position)).filter_by(player_id=player_id).scalar() or 0
|
|
|
|
# Create content entry
|
|
content = Content(
|
|
file_name=filename,
|
|
original_name=original_name,
|
|
duration=duration,
|
|
position=max_position + 1,
|
|
player_id=player_id,
|
|
content_type=content_type,
|
|
file_size=file_size
|
|
)
|
|
|
|
db.session.add(content)
|
|
player.increment_playlist_version()
|
|
db.session.commit()
|
|
|
|
log_content_added(filename, 'player', player.username)
|
|
return True
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
print(f"Error adding content to player: {e}")
|
|
return False
|
|
|
|
def add_content_to_group(group_id, filename, original_name, duration, content_type, file_size):
|
|
"""Add content to all players in a group"""
|
|
try:
|
|
group = Group.query.get(group_id)
|
|
if not group:
|
|
return False
|
|
|
|
# Add content to all players in the group
|
|
for player in group.players:
|
|
# Get next position for this player
|
|
max_position = db.session.query(db.func.max(Content.position)).filter_by(player_id=player.id).scalar() or 0
|
|
|
|
# Create content entry
|
|
content = Content(
|
|
file_name=filename,
|
|
original_name=original_name,
|
|
duration=duration,
|
|
position=max_position + 1,
|
|
player_id=player.id,
|
|
content_type=content_type,
|
|
file_size=file_size
|
|
)
|
|
|
|
db.session.add(content)
|
|
|
|
# Update playlist version for group
|
|
group.increment_playlist_version()
|
|
db.session.commit()
|
|
|
|
log_content_added(filename, 'group', group.name)
|
|
return True
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
print(f"Error adding content to group: {e}")
|
|
return False
|
|
|
|
def allowed_file(filename, allowed_extensions=None):
|
|
"""Check if file has an allowed extension"""
|
|
if allowed_extensions is None:
|
|
allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'bmp', 'webp',
|
|
'mp4', 'avi', 'mov', 'wmv', 'flv', 'webm', 'mkv',
|
|
'pdf', 'pptx', 'ppt'}
|
|
|
|
return '.' in filename and \
|
|
filename.rsplit('.', 1)[1].lower() in allowed_extensions
|
|
|
|
def get_file_info(file_path):
|
|
"""Get basic file information"""
|
|
try:
|
|
stat = os.stat(file_path)
|
|
return {
|
|
'size': stat.st_size,
|
|
'modified': stat.st_mtime,
|
|
'exists': True
|
|
}
|
|
except OSError:
|
|
return {'exists': False}
|
|
|
|
def cleanup_orphaned_files(upload_folder):
|
|
"""Remove files that are not referenced in the database"""
|
|
try:
|
|
# Get all filenames from database
|
|
db_files = {content.file_name for content in Content.query.all()}
|
|
|
|
# Get all files in upload folder
|
|
if os.path.exists(upload_folder):
|
|
disk_files = set(os.listdir(upload_folder))
|
|
|
|
# Find orphaned files
|
|
orphaned = disk_files - db_files
|
|
|
|
# Remove orphaned files
|
|
removed_count = 0
|
|
for filename in orphaned:
|
|
file_path = os.path.join(upload_folder, filename)
|
|
if os.path.isfile(file_path):
|
|
try:
|
|
os.remove(file_path)
|
|
removed_count += 1
|
|
except OSError as e:
|
|
print(f"Error removing {file_path}: {e}")
|
|
|
|
return removed_count
|
|
|
|
return 0
|
|
|
|
except Exception as e:
|
|
print(f"Error during cleanup: {e}")
|
|
return 0
|