""" Player Authentication Module for Kiwy-Signage Handles secure authentication with DigiServer v2 Uses: hostname → password/quickconnect → get auth_code → use auth_code for API calls Now with HTTPS support and SSL certificate management """ import os import json import requests import logging from typing import Optional, Dict, Tuple from ssl_utils import SSLManager, setup_ssl_for_requests # Set up logging logger = logging.getLogger(__name__) class PlayerAuth: """Handle player authentication with DigiServer v2.""" def __init__(self, config_file: str = 'player_auth.json', use_https: bool = True, verify_ssl: bool = True): """Initialize player authentication. Args: config_file: Path to authentication config file use_https: Whether to use HTTPS for connections verify_ssl: Whether to verify SSL certificates """ self.config_file = config_file self.use_https = use_https self.verify_ssl = verify_ssl self.ssl_manager = SSLManager(verify_ssl=verify_ssl) self.auth_data = self._load_auth_data() def _load_auth_data(self) -> Dict: """Load authentication data from file.""" if os.path.exists(self.config_file): try: with open(self.config_file, 'r') as f: return json.load(f) except Exception as e: logger.error(f"Failed to load auth data: {e}") # Return default empty auth data return { 'hostname': '', 'auth_code': '', 'player_id': None, 'player_name': '', 'playlist_id': None, 'orientation': 'Landscape', 'authenticated': False } def _save_auth_data(self) -> None: """Save authentication data to file.""" try: with open(self.config_file, 'w') as f: json.dump(self.auth_data, f, indent=2) logger.info(f"Auth data saved to {self.config_file}") except Exception as e: logger.error(f"Failed to save auth data: {e}") def is_authenticated(self) -> bool: """Check if player is authenticated.""" return (self.auth_data.get('authenticated', False) and bool(self.auth_data.get('auth_code'))) def authenticate(self, server_url: str, hostname: str, password: str = None, quickconnect_code: str = None, timeout: int = 30) -> Tuple[bool, Optional[str]]: """Authenticate with DigiServer v2. Args: server_url: Server URL (e.g., 'http://server:5000' or 'https://server') hostname: Player hostname/identifier password: Player password (optional if using quickconnect) quickconnect_code: Quick connect code (optional if using password) timeout: Request timeout in seconds Returns: Tuple of (success: bool, error_message: Optional[str]) """ if not password and not quickconnect_code: return False, "Password or quick connect code required" # Normalize server URL to HTTPS if needed if self.use_https: server_url = self.ssl_manager.validate_url_scheme(server_url) # Try to download certificate if not present if not self.ssl_manager.has_certificate(): logger.info("Downloading server certificate for HTTPS verification...") success, error = self.ssl_manager.download_server_certificate(server_url, timeout=timeout) if not success: logger.warning(f"⚠️ Certificate download failed: {error}") if self.verify_ssl: return False, error # Continue with unverified connection for testing # Prepare authentication request auth_url = f"{server_url}/api/auth/player" payload = { 'hostname': hostname, 'password': password, 'quickconnect_code': quickconnect_code } try: logger.info(f"Authenticating with server: {auth_url}") # Use SSL-configured session session = self.ssl_manager.get_session() response = session.post(auth_url, json=payload, timeout=timeout) if response.status_code == 200: data = response.json() # Save authentication data self.auth_data = { 'hostname': hostname, 'auth_code': data.get('auth_code'), 'player_id': data.get('player_id'), 'player_name': data.get('player_name'), 'playlist_id': data.get('playlist_id'), 'orientation': data.get('orientation', 'Landscape'), 'authenticated': True, 'server_url': server_url } self._save_auth_data() logger.info(f"✅ Authentication successful for player: {data.get('player_name')}") return True, None elif response.status_code == 401: error_msg = response.json().get('error', 'Invalid credentials') logger.warning(f"Authentication failed: {error_msg}") return False, error_msg else: error_msg = f"Server error: {response.status_code}" logger.error(error_msg) return False, error_msg except requests.exceptions.SSLError as e: error_msg = f"SSL Certificate Error: {e}" logger.error(error_msg) if self.verify_ssl: logger.error(" This usually means the server certificate is not trusted.") logger.error(" Try downloading the server certificate or disabling SSL verification.") return False, error_msg except requests.exceptions.ConnectionError as e: error_msg = f"Connection Error: {e}" logger.error(error_msg) return False, error_msg except requests.exceptions.Timeout: error_msg = "Connection timeout" logger.error(error_msg) return False, error_msg except Exception as e: error_msg = f"Authentication error: {str(e)}" logger.error(error_msg) return False, error_msg def verify_auth(self, timeout: int = 10) -> Tuple[bool, Optional[Dict]]: """Verify current auth code with server. Args: timeout: Request timeout in seconds Returns: Tuple of (valid: bool, player_info: Optional[Dict]) """ if not self.is_authenticated(): return False, None server_url = self.auth_data.get('server_url') if not server_url: return False, None verify_url = f"{server_url}/api/auth/verify" payload = {'auth_code': self.auth_data.get('auth_code')} try: # Use SSL-configured session session = self.ssl_manager.get_session() response = session.post(verify_url, json=payload, timeout=timeout) if response.status_code == 200: data = response.json() if data.get('valid'): logger.info("✅ Auth code verified") return True, data logger.warning("❌ Auth code invalid or expired") return False, None except requests.exceptions.SSLError as e: logger.error(f"SSL Error during verification: {e}") return False, None except Exception as e: logger.error(f"Failed to verify auth: {e}") return False, None def get_playlist(self, timeout: int = 30) -> Optional[Dict]: """Get playlist from server using auth code. Args: timeout: Request timeout in seconds Returns: Playlist data or None if failed """ if not self.is_authenticated(): logger.error("Not authenticated. Call authenticate() first.") return None server_url = self.auth_data.get('server_url') player_id = self.auth_data.get('player_id') auth_code = self.auth_data.get('auth_code') if not all([server_url, player_id, auth_code]): logger.error("Missing authentication data") return None playlist_url = f"{server_url}/api/playlists/{player_id}" headers = {'Authorization': f'Bearer {auth_code}'} try: logger.info(f"Fetching playlist from: {playlist_url}") # Use SSL-configured session session = self.ssl_manager.get_session() response = session.get(playlist_url, headers=headers, timeout=timeout) if response.status_code == 200: data = response.json() logger.info(f"✅ Playlist received (version: {data.get('playlist_version')})") return data elif response.status_code == 401 or response.status_code == 403: logger.warning("❌ Authentication failed. Need to re-authenticate.") self.clear_auth() return None else: logger.error(f"Failed to get playlist: {response.status_code}") return None except requests.exceptions.SSLError as e: logger.error(f"SSL Error fetching playlist: {e}") return None except Exception as e: logger.error(f"Error fetching playlist: {e}") return None def send_heartbeat(self, status: str = 'online', timeout: int = 10) -> bool: """Send heartbeat to server. Args: status: Player status (online, playing, error, etc.) timeout: Request timeout in seconds Returns: True if successful, False otherwise """ if not self.is_authenticated(): return False server_url = self.auth_data.get('server_url') player_id = self.auth_data.get('player_id') auth_code = self.auth_data.get('auth_code') if not all([server_url, player_id, auth_code]): return False heartbeat_url = f"{server_url}/api/players/{player_id}/heartbeat" headers = {'Authorization': f'Bearer {auth_code}'} payload = {'status': status} try: # Use SSL-configured session session = self.ssl_manager.get_session() response = session.post(heartbeat_url, headers=headers, json=payload, timeout=timeout) return response.status_code == 200 except requests.exceptions.SSLError as e: logger.debug(f"SSL Error in heartbeat: {e}") return False except Exception as e: logger.debug(f"Heartbeat failed: {e}") return False def send_feedback(self, message: str, status: str = 'active', playlist_version: int = None, error_details: str = None, timeout: int = 10) -> bool: """Send feedback to server. Args: message: Feedback message status: Player status playlist_version: Current playlist version error_details: Error details if applicable timeout: Request timeout in seconds Returns: True if successful, False otherwise """ if not self.is_authenticated(): return False server_url = self.auth_data.get('server_url') auth_code = self.auth_data.get('auth_code') if not all([server_url, auth_code]): return False feedback_url = f"{server_url}/api/player-feedback" headers = {'Authorization': f'Bearer {auth_code}'} payload = { 'hostname': self.auth_data.get('hostname'), 'quickconnect_code': self.auth_data.get('quickconnect_code'), 'message': message, 'status': status, 'playlist_version': playlist_version, 'error_details': error_details } try: # Use SSL-configured session session = self.ssl_manager.get_session() response = session.post(feedback_url, headers=headers, json=payload, timeout=timeout) return response.status_code == 200 except requests.exceptions.SSLError as e: logger.debug(f"SSL Error sending feedback: {e}") return False except Exception as e: logger.debug(f"Feedback failed: {e}") return False def clear_auth(self) -> None: """Clear saved authentication data.""" self.auth_data = { 'hostname': self.auth_data.get('hostname', ''), 'auth_code': '', 'player_id': None, 'player_name': '', 'playlist_id': None, 'orientation': 'Landscape', 'authenticated': False } self._save_auth_data() logger.info("Authentication data cleared") def get_hostname(self) -> str: """Get saved hostname.""" return self.auth_data.get('hostname', '') def get_player_id(self) -> Optional[int]: """Get player ID.""" return self.auth_data.get('player_id') def get_player_name(self) -> str: """Get player name.""" return self.auth_data.get('player_name', '') def get_orientation(self) -> str: """Get display orientation.""" return self.auth_data.get('orientation', 'Landscape') # Example usage if __name__ == '__main__': # Initialize auth auth = PlayerAuth() # Check if already authenticated if auth.is_authenticated(): print(f"✅ Already authenticated as: {auth.get_player_name()}") # Verify authentication valid, info = auth.verify_auth() if valid: print(f"✅ Authentication valid") print(f" Player ID: {info['player_id']}") print(f" Playlist ID: {info.get('playlist_id', 'None')}") else: print("❌ Authentication expired, need to re-authenticate") else: print("❌ Not authenticated") print("\nTo authenticate, run:") print("auth.authenticate(") print(" server_url='http://your-server:5000',") print(" hostname='player-001',") print(" password='your_password'") print(" # OR") print(" quickconnect_code='QUICK123'") print(")")