#!/usr/bin/env python3 """ Diagnostic script to test the player edit media API endpoint. This script simulates what a player would do when uploading edited images. """ import requests import json import sys from datetime import datetime from pathlib import Path # Color codes for output class Colors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKCYAN = '\033[96m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' def print_section(title): """Print a section header""" print(f"\n{Colors.HEADER}{Colors.BOLD}{'='*60}{Colors.ENDC}") print(f"{Colors.HEADER}{Colors.BOLD}{title}{Colors.ENDC}") print(f"{Colors.HEADER}{Colors.BOLD}{'='*60}{Colors.ENDC}\n") def print_success(msg): print(f"{Colors.OKGREEN}✓ {msg}{Colors.ENDC}") def print_error(msg): print(f"{Colors.FAIL}✗ {msg}{Colors.ENDC}") def print_info(msg): print(f"{Colors.OKCYAN}ℹ {msg}{Colors.ENDC}") def print_warning(msg): print(f"{Colors.WARNING}⚠ {msg}{Colors.ENDC}") def test_server_health(base_url): """Test if server is accessible""" print_section("1. Testing Server Health") try: response = requests.get(f"{base_url}/api/health", timeout=5) if response.status_code == 200: print_success(f"Server is accessible at {base_url}") data = response.json() print(f" Status: {data.get('status')}") print(f" Version: {data.get('version')}") return True else: print_error(f"Server returned status {response.status_code}") return False except requests.exceptions.ConnectionError: print_error(f"Cannot connect to server at {base_url}") return False except Exception as e: print_error(f"Error testing server health: {str(e)}") return False def test_endpoint_exists(base_url): """Test if the endpoint is available""" print_section("2. Testing Endpoint Availability") endpoint = f"{base_url}/api/player-edit-media" print_info(f"Testing endpoint: {endpoint}") # Test without auth (should get 401) try: response = requests.post(endpoint, timeout=5) if response.status_code == 401: print_success("Endpoint exists and requires authentication (401)") print(f" Response: {response.json()}") return True elif response.status_code == 404: print_error("Endpoint NOT FOUND (404) - The endpoint doesn't exist!") return False elif response.status_code == 400: print_warning("Endpoint exists but got 400 (Bad Request) - likely missing data") print(f" Response: {response.json()}") return True else: print_warning(f"Unexpected status code: {response.status_code}") print(f" Response: {response.text}") return True except requests.exceptions.ConnectionError: print_error("Cannot connect to endpoint") return False except Exception as e: print_error(f"Error testing endpoint: {str(e)}") return False def get_player_auth_code(base_url, db_path): """Get a valid player auth code from the database""" print_section("3. Retrieving Player Auth Code") try: import sqlite3 # Try to connect to the database try: conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute("SELECT id, name, auth_code FROM player LIMIT 1") result = cursor.fetchone() if result: player_id, player_name, auth_code = result print_success(f"Found player: {player_name} (ID: {player_id})") print_info(f"Auth code: {auth_code[:10]}...{auth_code[-5:]}") # Get playlist for this player cursor.execute("SELECT playlist_id FROM player WHERE id = ?", (player_id,)) playlist_row = cursor.fetchone() has_playlist = playlist_row and playlist_row[0] is not None print_info(f"Has assigned playlist: {has_playlist}") conn.close() return player_id, player_name, auth_code else: print_error("No players found in database") conn.close() return None, None, None except sqlite3.OperationalError as e: print_error(f"Cannot access database at {db_path}") print_warning("Make sure you're running this from the correct directory") return None, None, None except Exception as e: print_error(f"Error retrieving player auth code: {str(e)}") return None, None, None def get_sample_content(base_url, auth_code): """Get a sample content file to use for testing""" print_section("4. Retrieving Sample Content") try: headers = {"Authorization": f"Bearer {auth_code}"} # Get player ID from auth player_response = requests.get( f"{base_url}/api/health", headers=headers, timeout=5 ) # Try to get a playlist # We need to query the database for this print_warning("Getting sample content from filesystem...") uploads_dir = Path("app/static/uploads") if uploads_dir.exists(): # Find a non-edited media file image_files = list(uploads_dir.glob("*.jpg")) + list(uploads_dir.glob("*.png")) if image_files: sample_file = image_files[0] print_success(f"Found sample image: {sample_file.name}") return sample_file.name, sample_file print_warning("No sample images found in uploads directory") return None, None except Exception as e: print_error(f"Error getting sample content: {str(e)}") return None, None def test_authentication(base_url, auth_code): """Test if authentication works""" print_section("5. Testing Authentication") try: headers = {"Authorization": f"Bearer {auth_code}"} response = requests.post( f"{base_url}/api/player-edit-media", headers=headers, timeout=5 ) if response.status_code == 401: print_error("Authentication FAILED - Invalid auth code") print(f" Response: {response.json()}") return False elif response.status_code == 400: print_success("Authentication passed! (Got 400 because of missing data)") print(f" Response: {response.json()}") return True elif response.status_code == 404: print_error("Endpoint not found!") return False else: print_warning(f"Unexpected status: {response.status_code}") print(f" Response: {response.text}") return True except Exception as e: print_error(f"Error testing authentication: {str(e)}") return False def test_full_upload(base_url, auth_code, content_filename, sample_file): """Test a full media upload""" print_section("6. Testing Full Media Upload") if not auth_code or not content_filename or not sample_file: print_error("Missing required parameters for upload test") return False try: # Create metadata metadata = { "time_of_modification": datetime.utcnow().isoformat() + "Z", "original_name": content_filename, "new_name": f"{content_filename.split('.')[0]}_v1.{content_filename.split('.')[-1]}", "version": 1, "user_card_data": "test_user_123" } print_info(f"Preparing upload with metadata:") print(f" Original: {metadata['original_name']}") print(f" New name: {metadata['new_name']}") print(f" Version: {metadata['version']}") # Prepare the request headers = {"Authorization": f"Bearer {auth_code}"} with open(sample_file, 'rb') as f: files = { 'image_file': (sample_file.name, f, 'image/jpeg'), 'metadata': (None, json.dumps(metadata)) } print_info("Sending upload request...") response = requests.post( f"{base_url}/api/player-edit-media", headers=headers, files=files, timeout=10 ) print(f" Status Code: {response.status_code}") if response.status_code == 200: data = response.json() print_success("Upload successful!") print(f" Response: {json.dumps(data, indent=2)}") return True elif response.status_code == 404: print_error("Content not found - The original_name doesn't match any content in database") print(f" Error: {response.json()}") return False elif response.status_code == 400: print_error("Bad Request - Check metadata format") print(f" Error: {response.json()}") return False elif response.status_code == 401: print_error("Authentication failed") print(f" Error: {response.json()}") return False else: print_error(f"Upload failed with status {response.status_code}") print(f" Response: {response.text}") return False except Exception as e: print_error(f"Error during upload: {str(e)}") import traceback traceback.print_exc() return False def check_database_integrity(db_path): """Check database tables and records""" print_section("7. Database Integrity Check") try: import sqlite3 conn = sqlite3.connect(db_path) cursor = conn.cursor() # Check player table cursor.execute("SELECT COUNT(*) FROM player") player_count = cursor.fetchone()[0] print_info(f"Players in database: {player_count}") # Check content table cursor.execute("SELECT COUNT(*) FROM content") content_count = cursor.fetchone()[0] print_info(f"Content items in database: {content_count}") # Check player_edit table cursor.execute("SELECT COUNT(*) FROM player_edit") edit_count = cursor.fetchone()[0] print_info(f"Player edits recorded: {edit_count}") # List content files print_info("Sample content files:") cursor.execute("SELECT id, filename, content_type FROM content LIMIT 5") for row in cursor.fetchall(): print(f" - [{row[0]}] {row[1]} ({row[2]})") conn.close() print_success("Database integrity check passed") return True except Exception as e: print_error(f"Database integrity check failed: {str(e)}") return False def main(): """Run all diagnostic tests""" print(f"{Colors.BOLD}{Colors.OKCYAN}") print(""" ╔═══════════════════════════════════════════════════════════╗ ║ DIGISERVER EDIT MEDIA API - DIAGNOSTIC SCRIPT ║ ║ Testing Player Edit Upload Functionality ║ ╚═══════════════════════════════════════════════════════════╝ """) print(Colors.ENDC) # Configuration base_url = "http://localhost:5000" # Change this if server is on different host db_path = "instance/digiserver.db" print_info(f"Server URL: {base_url}") print_info(f"Database: {db_path}\n") # Run tests tests_passed = [] tests_failed = [] # Test 1: Server health if test_server_health(base_url): tests_passed.append("Server Health") else: tests_failed.append("Server Health") print_error("Cannot continue without server access") return # Test 2: Endpoint exists if test_endpoint_exists(base_url): tests_passed.append("Endpoint Availability") else: tests_failed.append("Endpoint Availability") print_error("Cannot continue - endpoint doesn't exist!") return # Test 3: Get player auth code player_id, player_name, auth_code = get_player_auth_code(base_url, db_path) if auth_code: tests_passed.append("Player Auth Code Retrieval") else: tests_failed.append("Player Auth Code Retrieval") print_error("Cannot continue without valid player auth code") return # Test 4: Authentication if test_authentication(base_url, auth_code): tests_passed.append("Authentication") else: tests_failed.append("Authentication") print_error("Authentication test failed") # Test 5: Get sample content content_name, sample_file = get_sample_content(base_url, auth_code) if content_name and sample_file: tests_passed.append("Sample Content Retrieval") # Test 6: Full upload if test_full_upload(base_url, auth_code, content_name, sample_file): tests_passed.append("Full Media Upload") else: tests_failed.append("Full Media Upload") else: tests_failed.append("Sample Content Retrieval") # Test 7: Database integrity if check_database_integrity(db_path): tests_passed.append("Database Integrity") else: tests_failed.append("Database Integrity") # Summary print_section("Summary") if tests_passed: print(f"{Colors.OKGREEN}Passed Tests ({len(tests_passed)}):{Colors.ENDC}") for test in tests_passed: print(f" {Colors.OKGREEN}✓{Colors.ENDC} {test}") if tests_failed: print(f"\n{Colors.FAIL}Failed Tests ({len(tests_failed)}):{Colors.ENDC}") for test in tests_failed: print(f" {Colors.FAIL}✗{Colors.ENDC} {test}") print(f"\n{Colors.BOLD}Result: {len(tests_passed)}/{len(tests_passed) + len(tests_failed)} tests passed{Colors.ENDC}\n") # Recommendations print_section("Recommendations") if "Endpoint Availability" in tests_failed: print_warning("The /api/player-edit-media endpoint is not available") print(" 1. Check if the Flask app reloaded after code changes") print(" 2. Verify the endpoint is properly registered in api.py") print(" 3. Restart the Docker container") if "Full Media Upload" in tests_failed: print_warning("Upload test failed - check:") print(" 1. The original_name matches actual content filenames") print(" 2. Content record exists in the database") print(" 3. Server has permission to write to uploads directory") print(" 4. Check server logs for error details") if "Authentication" in tests_failed: print_warning("Authentication failed - check:") print(" 1. Player auth code is valid and hasn't expired") print(" 2. Auth header format is correct: 'Authorization: Bearer '") print(" 3. Player record hasn't been deleted from database") if __name__ == "__main__": main()