""" Player Authentication Module for Kiwy-Signage Handles authentication with DigiServer v2 and secure config storage """ import configparser import os import requests from typing import Optional, Dict, Tuple import json class PlayerAuth: """Handle player authentication and configuration management.""" def __init__(self, config_path: str = 'player_config.ini'): """Initialize player authentication. Args: config_path: Path to configuration file """ self.config_path = config_path self.config = configparser.ConfigParser() self.load_config() def load_config(self) -> None: """Load configuration from file.""" if os.path.exists(self.config_path): self.config.read(self.config_path) else: # Create default config self._create_default_config() def _create_default_config(self) -> None: """Create default configuration file.""" self.config['server'] = { 'server_url': 'http://localhost:5000' } self.config['player'] = { 'hostname': '', 'auth_code': '', 'player_id': '', 'group_id': '' } self.config['display'] = { 'orientation': 'Landscape', 'resolution': '1920x1080' } self.config['security'] = { 'verify_ssl': 'true', 'timeout': '30' } self.config['cache'] = { 'cache_dir': './cache', 'max_cache_size': '1024' } self.config['logging'] = { 'enabled': 'true', 'log_level': 'INFO', 'log_file': './player.log' } self.save_config() def save_config(self) -> None: """Save configuration to file.""" with open(self.config_path, 'w') as f: self.config.write(f) def get_server_url(self) -> str: """Get server URL from config.""" return self.config.get('server', 'server_url', fallback='http://localhost:5000') def get_hostname(self) -> str: """Get player hostname from config.""" return self.config.get('player', 'hostname', fallback='') def get_auth_code(self) -> str: """Get saved auth code from config.""" return self.config.get('player', 'auth_code', fallback='') def is_authenticated(self) -> bool: """Check if player has valid authentication.""" return bool(self.get_hostname() and self.get_auth_code()) def authenticate(self, hostname: str, password: str = None, quickconnect_code: str = None) -> Tuple[bool, Optional[str]]: """Authenticate with server and save credentials. Args: hostname: Player hostname/identifier password: Player password (optional if using quickconnect) quickconnect_code: Quick connect code (optional if using password) Returns: Tuple of (success: bool, error_message: Optional[str]) """ if not password and not quickconnect_code: return False, "Password or quick connect code required" server_url = self.get_server_url() try: # Make authentication request response = requests.post( f"{server_url}/api/auth/player", json={ 'hostname': hostname, 'password': password, 'quickconnect_code': quickconnect_code }, timeout=int(self.config.get('security', 'timeout', fallback='30')), verify=self.config.getboolean('security', 'verify_ssl', fallback=True) ) if response.status_code == 200: data = response.json() # Save authentication data self.config['player']['hostname'] = hostname self.config['player']['auth_code'] = data.get('auth_code', '') self.config['player']['player_id'] = str(data.get('player_id', '')) self.config['player']['group_id'] = str(data.get('group_id', '')) self.config['display']['orientation'] = data.get('orientation', 'Landscape') self.save_config() return True, None else: error_data = response.json() return False, error_data.get('error', 'Authentication failed') except requests.exceptions.ConnectionError: return False, "Cannot connect to server" except requests.exceptions.Timeout: return False, "Connection timeout" except Exception as e: return False, f"Error: {str(e)}" def verify_auth(self) -> Tuple[bool, Optional[Dict]]: """Verify current auth code with server. Returns: Tuple of (valid: bool, player_info: Optional[Dict]) """ auth_code = self.get_auth_code() if not auth_code: return False, None server_url = self.get_server_url() try: response = requests.post( f"{server_url}/api/auth/verify", json={'auth_code': auth_code}, timeout=int(self.config.get('security', 'timeout', fallback='30')), verify=self.config.getboolean('security', 'verify_ssl', fallback=True) ) if response.status_code == 200: data = response.json() return data.get('valid', False), data return False, None except Exception: return False, None def get_playlist(self) -> Optional[Dict]: """Get playlist for this player from server. Returns: Playlist data or None if failed """ auth_code = self.get_auth_code() player_id = self.config.get('player', 'player_id', fallback='') if not auth_code or not player_id: return None server_url = self.get_server_url() try: response = requests.get( f"{server_url}/api/playlists/{player_id}", headers={'Authorization': f'Bearer {auth_code}'}, timeout=int(self.config.get('security', 'timeout', fallback='30')), verify=self.config.getboolean('security', 'verify_ssl', fallback=True) ) if response.status_code == 200: return response.json() return None except Exception: return None def send_heartbeat(self, status: str = 'online') -> bool: """Send heartbeat to server. Args: status: Player status Returns: True if successful, False otherwise """ auth_code = self.get_auth_code() player_id = self.config.get('player', 'player_id', fallback='') if not auth_code or not player_id: return False server_url = self.get_server_url() try: response = requests.post( f"{server_url}/api/players/{player_id}/heartbeat", headers={'Authorization': f'Bearer {auth_code}'}, json={'status': status}, timeout=int(self.config.get('security', 'timeout', fallback='30')), verify=self.config.getboolean('security', 'verify_ssl', fallback=True) ) return response.status_code == 200 except Exception: return False def clear_auth(self) -> None: """Clear saved authentication data.""" self.config['player']['auth_code'] = '' self.config['player']['player_id'] = '' self.config['player']['group_id'] = '' self.save_config() # Example usage if __name__ == '__main__': auth = PlayerAuth() # Check if already authenticated if auth.is_authenticated(): print(f"Already authenticated as: {auth.get_hostname()}") # Verify authentication valid, info = auth.verify_auth() if valid: print(f"Authentication valid: {info}") else: print("Authentication expired or invalid") else: print("Not authenticated. Please run authentication:") print("auth.authenticate(hostname='player-001', password='your_password')")