421 lines
15 KiB
Python
421 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Diagnostic script to test the player edit media API endpoint.
|
||
This script simulates what a player would do when uploading edited images.
|
||
"""
|
||
|
||
import requests
|
||
import json
|
||
import sys
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
# Color codes for output
|
||
class Colors:
|
||
HEADER = '\033[95m'
|
||
OKBLUE = '\033[94m'
|
||
OKCYAN = '\033[96m'
|
||
OKGREEN = '\033[92m'
|
||
WARNING = '\033[93m'
|
||
FAIL = '\033[91m'
|
||
ENDC = '\033[0m'
|
||
BOLD = '\033[1m'
|
||
UNDERLINE = '\033[4m'
|
||
|
||
def print_section(title):
|
||
"""Print a section header"""
|
||
print(f"\n{Colors.HEADER}{Colors.BOLD}{'='*60}{Colors.ENDC}")
|
||
print(f"{Colors.HEADER}{Colors.BOLD}{title}{Colors.ENDC}")
|
||
print(f"{Colors.HEADER}{Colors.BOLD}{'='*60}{Colors.ENDC}\n")
|
||
|
||
def print_success(msg):
|
||
print(f"{Colors.OKGREEN}✓ {msg}{Colors.ENDC}")
|
||
|
||
def print_error(msg):
|
||
print(f"{Colors.FAIL}✗ {msg}{Colors.ENDC}")
|
||
|
||
def print_info(msg):
|
||
print(f"{Colors.OKCYAN}ℹ {msg}{Colors.ENDC}")
|
||
|
||
def print_warning(msg):
|
||
print(f"{Colors.WARNING}⚠ {msg}{Colors.ENDC}")
|
||
|
||
def test_server_health(base_url):
|
||
"""Test if server is accessible"""
|
||
print_section("1. Testing Server Health")
|
||
|
||
try:
|
||
response = requests.get(f"{base_url}/api/health", timeout=5)
|
||
if response.status_code == 200:
|
||
print_success(f"Server is accessible at {base_url}")
|
||
data = response.json()
|
||
print(f" Status: {data.get('status')}")
|
||
print(f" Version: {data.get('version')}")
|
||
return True
|
||
else:
|
||
print_error(f"Server returned status {response.status_code}")
|
||
return False
|
||
except requests.exceptions.ConnectionError:
|
||
print_error(f"Cannot connect to server at {base_url}")
|
||
return False
|
||
except Exception as e:
|
||
print_error(f"Error testing server health: {str(e)}")
|
||
return False
|
||
|
||
def test_endpoint_exists(base_url):
|
||
"""Test if the endpoint is available"""
|
||
print_section("2. Testing Endpoint Availability")
|
||
|
||
endpoint = f"{base_url}/api/player-edit-media"
|
||
print_info(f"Testing endpoint: {endpoint}")
|
||
|
||
# Test without auth (should get 401)
|
||
try:
|
||
response = requests.post(endpoint, timeout=5)
|
||
if response.status_code == 401:
|
||
print_success("Endpoint exists and requires authentication (401)")
|
||
print(f" Response: {response.json()}")
|
||
return True
|
||
elif response.status_code == 404:
|
||
print_error("Endpoint NOT FOUND (404) - The endpoint doesn't exist!")
|
||
return False
|
||
elif response.status_code == 400:
|
||
print_warning("Endpoint exists but got 400 (Bad Request) - likely missing data")
|
||
print(f" Response: {response.json()}")
|
||
return True
|
||
else:
|
||
print_warning(f"Unexpected status code: {response.status_code}")
|
||
print(f" Response: {response.text}")
|
||
return True
|
||
except requests.exceptions.ConnectionError:
|
||
print_error("Cannot connect to endpoint")
|
||
return False
|
||
except Exception as e:
|
||
print_error(f"Error testing endpoint: {str(e)}")
|
||
return False
|
||
|
||
def get_player_auth_code(base_url, db_path):
|
||
"""Get a valid player auth code from the database"""
|
||
print_section("3. Retrieving Player Auth Code")
|
||
|
||
try:
|
||
import sqlite3
|
||
|
||
# Try to connect to the database
|
||
try:
|
||
conn = sqlite3.connect(db_path)
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT id, name, auth_code FROM player LIMIT 1")
|
||
result = cursor.fetchone()
|
||
|
||
if result:
|
||
player_id, player_name, auth_code = result
|
||
print_success(f"Found player: {player_name} (ID: {player_id})")
|
||
print_info(f"Auth code: {auth_code[:10]}...{auth_code[-5:]}")
|
||
|
||
# Get playlist for this player
|
||
cursor.execute("SELECT playlist_id FROM player WHERE id = ?", (player_id,))
|
||
playlist_row = cursor.fetchone()
|
||
has_playlist = playlist_row and playlist_row[0] is not None
|
||
|
||
print_info(f"Has assigned playlist: {has_playlist}")
|
||
|
||
conn.close()
|
||
return player_id, player_name, auth_code
|
||
else:
|
||
print_error("No players found in database")
|
||
conn.close()
|
||
return None, None, None
|
||
except sqlite3.OperationalError as e:
|
||
print_error(f"Cannot access database at {db_path}")
|
||
print_warning("Make sure you're running this from the correct directory")
|
||
return None, None, None
|
||
except Exception as e:
|
||
print_error(f"Error retrieving player auth code: {str(e)}")
|
||
return None, None, None
|
||
|
||
def get_sample_content(base_url, auth_code):
|
||
"""Get a sample content file to use for testing"""
|
||
print_section("4. Retrieving Sample Content")
|
||
|
||
try:
|
||
headers = {"Authorization": f"Bearer {auth_code}"}
|
||
|
||
# Get player ID from auth
|
||
player_response = requests.get(
|
||
f"{base_url}/api/health",
|
||
headers=headers,
|
||
timeout=5
|
||
)
|
||
|
||
# Try to get a playlist
|
||
# We need to query the database for this
|
||
print_warning("Getting sample content from filesystem...")
|
||
|
||
uploads_dir = Path("app/static/uploads")
|
||
if uploads_dir.exists():
|
||
# Find a non-edited media file
|
||
image_files = list(uploads_dir.glob("*.jpg")) + list(uploads_dir.glob("*.png"))
|
||
|
||
if image_files:
|
||
sample_file = image_files[0]
|
||
print_success(f"Found sample image: {sample_file.name}")
|
||
return sample_file.name, sample_file
|
||
|
||
print_warning("No sample images found in uploads directory")
|
||
return None, None
|
||
except Exception as e:
|
||
print_error(f"Error getting sample content: {str(e)}")
|
||
return None, None
|
||
|
||
def test_authentication(base_url, auth_code):
|
||
"""Test if authentication works"""
|
||
print_section("5. Testing Authentication")
|
||
|
||
try:
|
||
headers = {"Authorization": f"Bearer {auth_code}"}
|
||
response = requests.post(
|
||
f"{base_url}/api/player-edit-media",
|
||
headers=headers,
|
||
timeout=5
|
||
)
|
||
|
||
if response.status_code == 401:
|
||
print_error("Authentication FAILED - Invalid auth code")
|
||
print(f" Response: {response.json()}")
|
||
return False
|
||
elif response.status_code == 400:
|
||
print_success("Authentication passed! (Got 400 because of missing data)")
|
||
print(f" Response: {response.json()}")
|
||
return True
|
||
elif response.status_code == 404:
|
||
print_error("Endpoint not found!")
|
||
return False
|
||
else:
|
||
print_warning(f"Unexpected status: {response.status_code}")
|
||
print(f" Response: {response.text}")
|
||
return True
|
||
except Exception as e:
|
||
print_error(f"Error testing authentication: {str(e)}")
|
||
return False
|
||
|
||
def test_full_upload(base_url, auth_code, content_filename, sample_file):
|
||
"""Test a full media upload"""
|
||
print_section("6. Testing Full Media Upload")
|
||
|
||
if not auth_code or not content_filename or not sample_file:
|
||
print_error("Missing required parameters for upload test")
|
||
return False
|
||
|
||
try:
|
||
# Create metadata
|
||
metadata = {
|
||
"time_of_modification": datetime.utcnow().isoformat() + "Z",
|
||
"original_name": content_filename,
|
||
"new_name": f"{content_filename.split('.')[0]}_v1.{content_filename.split('.')[-1]}",
|
||
"version": 1,
|
||
"user_card_data": "test_user_123"
|
||
}
|
||
|
||
print_info(f"Preparing upload with metadata:")
|
||
print(f" Original: {metadata['original_name']}")
|
||
print(f" New name: {metadata['new_name']}")
|
||
print(f" Version: {metadata['version']}")
|
||
|
||
# Prepare the request
|
||
headers = {"Authorization": f"Bearer {auth_code}"}
|
||
|
||
with open(sample_file, 'rb') as f:
|
||
files = {
|
||
'image_file': (sample_file.name, f, 'image/jpeg'),
|
||
'metadata': (None, json.dumps(metadata))
|
||
}
|
||
|
||
print_info("Sending upload request...")
|
||
response = requests.post(
|
||
f"{base_url}/api/player-edit-media",
|
||
headers=headers,
|
||
files=files,
|
||
timeout=10
|
||
)
|
||
|
||
print(f" Status Code: {response.status_code}")
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
print_success("Upload successful!")
|
||
print(f" Response: {json.dumps(data, indent=2)}")
|
||
return True
|
||
elif response.status_code == 404:
|
||
print_error("Content not found - The original_name doesn't match any content in database")
|
||
print(f" Error: {response.json()}")
|
||
return False
|
||
elif response.status_code == 400:
|
||
print_error("Bad Request - Check metadata format")
|
||
print(f" Error: {response.json()}")
|
||
return False
|
||
elif response.status_code == 401:
|
||
print_error("Authentication failed")
|
||
print(f" Error: {response.json()}")
|
||
return False
|
||
else:
|
||
print_error(f"Upload failed with status {response.status_code}")
|
||
print(f" Response: {response.text}")
|
||
return False
|
||
except Exception as e:
|
||
print_error(f"Error during upload: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
def check_database_integrity(db_path):
|
||
"""Check database tables and records"""
|
||
print_section("7. Database Integrity Check")
|
||
|
||
try:
|
||
import sqlite3
|
||
|
||
conn = sqlite3.connect(db_path)
|
||
cursor = conn.cursor()
|
||
|
||
# Check player table
|
||
cursor.execute("SELECT COUNT(*) FROM player")
|
||
player_count = cursor.fetchone()[0]
|
||
print_info(f"Players in database: {player_count}")
|
||
|
||
# Check content table
|
||
cursor.execute("SELECT COUNT(*) FROM content")
|
||
content_count = cursor.fetchone()[0]
|
||
print_info(f"Content items in database: {content_count}")
|
||
|
||
# Check player_edit table
|
||
cursor.execute("SELECT COUNT(*) FROM player_edit")
|
||
edit_count = cursor.fetchone()[0]
|
||
print_info(f"Player edits recorded: {edit_count}")
|
||
|
||
# List content files
|
||
print_info("Sample content files:")
|
||
cursor.execute("SELECT id, filename, content_type FROM content LIMIT 5")
|
||
for row in cursor.fetchall():
|
||
print(f" - [{row[0]}] {row[1]} ({row[2]})")
|
||
|
||
conn.close()
|
||
print_success("Database integrity check passed")
|
||
return True
|
||
except Exception as e:
|
||
print_error(f"Database integrity check failed: {str(e)}")
|
||
return False
|
||
|
||
def main():
|
||
"""Run all diagnostic tests"""
|
||
print(f"{Colors.BOLD}{Colors.OKCYAN}")
|
||
print("""
|
||
╔═══════════════════════════════════════════════════════════╗
|
||
║ DIGISERVER EDIT MEDIA API - DIAGNOSTIC SCRIPT ║
|
||
║ Testing Player Edit Upload Functionality ║
|
||
╚═══════════════════════════════════════════════════════════╝
|
||
""")
|
||
print(Colors.ENDC)
|
||
|
||
# Configuration
|
||
base_url = "http://localhost:5000" # Change this if server is on different host
|
||
db_path = "instance/digiserver.db"
|
||
|
||
print_info(f"Server URL: {base_url}")
|
||
print_info(f"Database: {db_path}\n")
|
||
|
||
# Run tests
|
||
tests_passed = []
|
||
tests_failed = []
|
||
|
||
# Test 1: Server health
|
||
if test_server_health(base_url):
|
||
tests_passed.append("Server Health")
|
||
else:
|
||
tests_failed.append("Server Health")
|
||
print_error("Cannot continue without server access")
|
||
return
|
||
|
||
# Test 2: Endpoint exists
|
||
if test_endpoint_exists(base_url):
|
||
tests_passed.append("Endpoint Availability")
|
||
else:
|
||
tests_failed.append("Endpoint Availability")
|
||
print_error("Cannot continue - endpoint doesn't exist!")
|
||
return
|
||
|
||
# Test 3: Get player auth code
|
||
player_id, player_name, auth_code = get_player_auth_code(base_url, db_path)
|
||
if auth_code:
|
||
tests_passed.append("Player Auth Code Retrieval")
|
||
else:
|
||
tests_failed.append("Player Auth Code Retrieval")
|
||
print_error("Cannot continue without valid player auth code")
|
||
return
|
||
|
||
# Test 4: Authentication
|
||
if test_authentication(base_url, auth_code):
|
||
tests_passed.append("Authentication")
|
||
else:
|
||
tests_failed.append("Authentication")
|
||
print_error("Authentication test failed")
|
||
|
||
# Test 5: Get sample content
|
||
content_name, sample_file = get_sample_content(base_url, auth_code)
|
||
if content_name and sample_file:
|
||
tests_passed.append("Sample Content Retrieval")
|
||
|
||
# Test 6: Full upload
|
||
if test_full_upload(base_url, auth_code, content_name, sample_file):
|
||
tests_passed.append("Full Media Upload")
|
||
else:
|
||
tests_failed.append("Full Media Upload")
|
||
else:
|
||
tests_failed.append("Sample Content Retrieval")
|
||
|
||
# Test 7: Database integrity
|
||
if check_database_integrity(db_path):
|
||
tests_passed.append("Database Integrity")
|
||
else:
|
||
tests_failed.append("Database Integrity")
|
||
|
||
# Summary
|
||
print_section("Summary")
|
||
|
||
if tests_passed:
|
||
print(f"{Colors.OKGREEN}Passed Tests ({len(tests_passed)}):{Colors.ENDC}")
|
||
for test in tests_passed:
|
||
print(f" {Colors.OKGREEN}✓{Colors.ENDC} {test}")
|
||
|
||
if tests_failed:
|
||
print(f"\n{Colors.FAIL}Failed Tests ({len(tests_failed)}):{Colors.ENDC}")
|
||
for test in tests_failed:
|
||
print(f" {Colors.FAIL}✗{Colors.ENDC} {test}")
|
||
|
||
print(f"\n{Colors.BOLD}Result: {len(tests_passed)}/{len(tests_passed) + len(tests_failed)} tests passed{Colors.ENDC}\n")
|
||
|
||
# Recommendations
|
||
print_section("Recommendations")
|
||
|
||
if "Endpoint Availability" in tests_failed:
|
||
print_warning("The /api/player-edit-media endpoint is not available")
|
||
print(" 1. Check if the Flask app reloaded after code changes")
|
||
print(" 2. Verify the endpoint is properly registered in api.py")
|
||
print(" 3. Restart the Docker container")
|
||
|
||
if "Full Media Upload" in tests_failed:
|
||
print_warning("Upload test failed - check:")
|
||
print(" 1. The original_name matches actual content filenames")
|
||
print(" 2. Content record exists in the database")
|
||
print(" 3. Server has permission to write to uploads directory")
|
||
print(" 4. Check server logs for error details")
|
||
|
||
if "Authentication" in tests_failed:
|
||
print_warning("Authentication failed - check:")
|
||
print(" 1. Player auth code is valid and hasn't expired")
|
||
print(" 2. Auth header format is correct: 'Authorization: Bearer <code>'")
|
||
print(" 3. Player record hasn't been deleted from database")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|