"""Upload utilities for handling file uploads and processing.""" import os import subprocess import shutil import tempfile from typing import Optional, Dict, Tuple from werkzeug.utils import secure_filename from werkzeug.datastructures import FileStorage from app.utils.logger import log_action # In-memory storage for upload progress (use Redis in production) _upload_progress: Dict[str, Dict] = {} def get_upload_progress(upload_id: str) -> Dict: """Get upload progress for a specific upload ID. Args: upload_id: Unique upload identifier Returns: Progress dictionary with status, progress, and message """ return _upload_progress.get(upload_id, { 'status': 'unknown', 'progress': 0, 'message': 'Upload not found' }) def set_upload_progress(upload_id: str, progress: int, message: str, status: str = 'processing') -> None: """Set upload progress for a specific upload ID. Args: upload_id: Unique upload identifier progress: Progress percentage (0-100) message: Status message status: Status string (uploading, processing, complete, error) """ _upload_progress[upload_id] = { 'status': status, 'progress': progress, 'message': message } def clear_upload_progress(upload_id: str) -> None: """Clear upload progress for a specific upload ID. Args: upload_id: Unique upload identifier """ if upload_id in _upload_progress: del _upload_progress[upload_id] def save_uploaded_file(file: FileStorage, upload_folder: str, filename: Optional[str] = None) -> Tuple[bool, str, str]: """Save an uploaded file to the upload folder. Args: file: FileStorage object from request.files upload_folder: Path to upload directory filename: Optional custom filename (will be secured) Returns: Tuple of (success, filepath, message) """ try: if not filename: filename = secure_filename(file.filename) else: filename = secure_filename(filename) # Ensure upload folder exists os.makedirs(upload_folder, exist_ok=True) filepath = os.path.join(upload_folder, filename) # Save file file.save(filepath) log_action('info', f'File saved: {filename}') return True, filepath, 'File saved successfully' except Exception as e: log_action('error', f'Error saving file: {str(e)}') return False, '', f'Error saving file: {str(e)}' def process_video_file(filepath: str, upload_id: Optional[str] = None) -> Tuple[bool, str]: """Process video file for optimization (H.264, 30fps, max 1080p). Args: filepath: Path to video file upload_id: Optional upload ID for progress tracking Returns: Tuple of (success, message) """ try: if upload_id: set_upload_progress(upload_id, 60, 'Converting video to optimized format...') # Prepare temp output file temp_dir = tempfile.gettempdir() temp_output = os.path.join(temp_dir, f"optimized_{os.path.basename(filepath)}") # ffmpeg command for Raspberry Pi optimization ffmpeg_cmd = [ 'ffmpeg', '-y', '-i', filepath, '-c:v', 'libx264', '-preset', 'medium', '-profile:v', 'main', '-crf', '23', '-maxrate', '8M', '-bufsize', '12M', '-vf', 'scale=\'min(1920,iw)\':\'min(1080,ih)\':force_original_aspect_ratio=decrease,fps=30', '-r', '30', '-c:a', 'aac', '-b:a', '128k', '-movflags', '+faststart', temp_output ] if upload_id: set_upload_progress(upload_id, 70, 'Processing video (this may take a few minutes)...') # Run ffmpeg result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True, timeout=1800) if result.returncode != 0: error_msg = f'Video conversion failed: {result.stderr[:200]}' log_action('error', error_msg) if upload_id: set_upload_progress(upload_id, 0, error_msg, 'error') # Remove original file if conversion failed if os.path.exists(filepath): os.remove(filepath) return False, error_msg if upload_id: set_upload_progress(upload_id, 85, 'Replacing with optimized video...') # Replace original with optimized version shutil.move(temp_output, filepath) log_action('info', f'Video optimized successfully: {os.path.basename(filepath)}') return True, 'Video optimized successfully' except subprocess.TimeoutExpired: error_msg = 'Video conversion timed out (30 minutes)' log_action('error', error_msg) if upload_id: set_upload_progress(upload_id, 0, error_msg, 'error') return False, error_msg except Exception as e: error_msg = f'Error processing video: {str(e)}' log_action('error', error_msg) if upload_id: set_upload_progress(upload_id, 0, error_msg, 'error') return False, error_msg def process_pdf_file(filepath: str, upload_id: Optional[str] = None) -> Tuple[bool, str]: """Process PDF file (convert to images for display). Args: filepath: Path to PDF file upload_id: Optional upload ID for progress tracking Returns: Tuple of (success, message) """ try: if upload_id: set_upload_progress(upload_id, 60, 'Converting PDF to images...') # This would use pdf2image or similar # For now, just log the action log_action('info', f'PDF processing requested for: {os.path.basename(filepath)}') if upload_id: set_upload_progress(upload_id, 85, 'PDF processed successfully') return True, 'PDF processed successfully' except Exception as e: error_msg = f'Error processing PDF: {str(e)}' log_action('error', error_msg) if upload_id: set_upload_progress(upload_id, 0, error_msg, 'error') return False, error_msg def get_file_size(filepath: str) -> int: """Get file size in bytes. Args: filepath: Path to file Returns: File size in bytes, or 0 if file doesn't exist """ try: return os.path.getsize(filepath) except Exception: return 0 def delete_file(filepath: str) -> Tuple[bool, str]: """Delete a file from disk. Args: filepath: Path to file Returns: Tuple of (success, message) """ try: if os.path.exists(filepath): os.remove(filepath) log_action('info', f'File deleted: {os.path.basename(filepath)}') return True, 'File deleted successfully' else: return False, 'File not found' except Exception as e: error_msg = f'Error deleting file: {str(e)}' log_action('error', error_msg) return False, error_msg