255 lines
8.5 KiB
Python
255 lines
8.5 KiB
Python
"""
|
|
Player Authentication Module for Kiwy-Signage
|
|
Handles authentication with DigiServer v2 and secure config storage
|
|
"""
|
|
import configparser
|
|
import os
|
|
import requests
|
|
from typing import Optional, Dict, Tuple
|
|
import json
|
|
|
|
|
|
class PlayerAuth:
|
|
"""Handle player authentication and configuration management."""
|
|
|
|
def __init__(self, config_path: str = 'player_config.ini'):
|
|
"""Initialize player authentication.
|
|
|
|
Args:
|
|
config_path: Path to configuration file
|
|
"""
|
|
self.config_path = config_path
|
|
self.config = configparser.ConfigParser()
|
|
self.load_config()
|
|
|
|
def load_config(self) -> None:
|
|
"""Load configuration from file."""
|
|
if os.path.exists(self.config_path):
|
|
self.config.read(self.config_path)
|
|
else:
|
|
# Create default config
|
|
self._create_default_config()
|
|
|
|
def _create_default_config(self) -> None:
|
|
"""Create default configuration file."""
|
|
self.config['server'] = {
|
|
'server_url': 'http://localhost:5000'
|
|
}
|
|
self.config['player'] = {
|
|
'hostname': '',
|
|
'auth_code': '',
|
|
'player_id': '',
|
|
'group_id': ''
|
|
}
|
|
self.config['display'] = {
|
|
'orientation': 'Landscape',
|
|
'resolution': '1920x1080'
|
|
}
|
|
self.config['security'] = {
|
|
'verify_ssl': 'true',
|
|
'timeout': '30'
|
|
}
|
|
self.config['cache'] = {
|
|
'cache_dir': './cache',
|
|
'max_cache_size': '1024'
|
|
}
|
|
self.config['logging'] = {
|
|
'enabled': 'true',
|
|
'log_level': 'INFO',
|
|
'log_file': './player.log'
|
|
}
|
|
self.save_config()
|
|
|
|
def save_config(self) -> None:
|
|
"""Save configuration to file."""
|
|
with open(self.config_path, 'w') as f:
|
|
self.config.write(f)
|
|
|
|
def get_server_url(self) -> str:
|
|
"""Get server URL from config."""
|
|
return self.config.get('server', 'server_url', fallback='http://localhost:5000')
|
|
|
|
def get_hostname(self) -> str:
|
|
"""Get player hostname from config."""
|
|
return self.config.get('player', 'hostname', fallback='')
|
|
|
|
def get_auth_code(self) -> str:
|
|
"""Get saved auth code from config."""
|
|
return self.config.get('player', 'auth_code', fallback='')
|
|
|
|
def is_authenticated(self) -> bool:
|
|
"""Check if player has valid authentication."""
|
|
return bool(self.get_hostname() and self.get_auth_code())
|
|
|
|
def authenticate(self, hostname: str, password: str = None,
|
|
quickconnect_code: str = None) -> Tuple[bool, Optional[str]]:
|
|
"""Authenticate with server and save credentials.
|
|
|
|
Args:
|
|
hostname: Player hostname/identifier
|
|
password: Player password (optional if using quickconnect)
|
|
quickconnect_code: Quick connect code (optional if using password)
|
|
|
|
Returns:
|
|
Tuple of (success: bool, error_message: Optional[str])
|
|
"""
|
|
if not password and not quickconnect_code:
|
|
return False, "Password or quick connect code required"
|
|
|
|
server_url = self.get_server_url()
|
|
|
|
try:
|
|
# Make authentication request
|
|
response = requests.post(
|
|
f"{server_url}/api/auth/player",
|
|
json={
|
|
'hostname': hostname,
|
|
'password': password,
|
|
'quickconnect_code': quickconnect_code
|
|
},
|
|
timeout=int(self.config.get('security', 'timeout', fallback='30')),
|
|
verify=self.config.getboolean('security', 'verify_ssl', fallback=True)
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
|
|
# Save authentication data
|
|
self.config['player']['hostname'] = hostname
|
|
self.config['player']['auth_code'] = data.get('auth_code', '')
|
|
self.config['player']['player_id'] = str(data.get('player_id', ''))
|
|
self.config['player']['group_id'] = str(data.get('group_id', ''))
|
|
self.config['display']['orientation'] = data.get('orientation', 'Landscape')
|
|
|
|
self.save_config()
|
|
|
|
return True, None
|
|
|
|
else:
|
|
error_data = response.json()
|
|
return False, error_data.get('error', 'Authentication failed')
|
|
|
|
except requests.exceptions.ConnectionError:
|
|
return False, "Cannot connect to server"
|
|
except requests.exceptions.Timeout:
|
|
return False, "Connection timeout"
|
|
except Exception as e:
|
|
return False, f"Error: {str(e)}"
|
|
|
|
def verify_auth(self) -> Tuple[bool, Optional[Dict]]:
|
|
"""Verify current auth code with server.
|
|
|
|
Returns:
|
|
Tuple of (valid: bool, player_info: Optional[Dict])
|
|
"""
|
|
auth_code = self.get_auth_code()
|
|
|
|
if not auth_code:
|
|
return False, None
|
|
|
|
server_url = self.get_server_url()
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{server_url}/api/auth/verify",
|
|
json={'auth_code': auth_code},
|
|
timeout=int(self.config.get('security', 'timeout', fallback='30')),
|
|
verify=self.config.getboolean('security', 'verify_ssl', fallback=True)
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return data.get('valid', False), data
|
|
|
|
return False, None
|
|
|
|
except Exception:
|
|
return False, None
|
|
|
|
def get_playlist(self) -> Optional[Dict]:
|
|
"""Get playlist for this player from server.
|
|
|
|
Returns:
|
|
Playlist data or None if failed
|
|
"""
|
|
auth_code = self.get_auth_code()
|
|
player_id = self.config.get('player', 'player_id', fallback='')
|
|
|
|
if not auth_code or not player_id:
|
|
return None
|
|
|
|
server_url = self.get_server_url()
|
|
|
|
try:
|
|
response = requests.get(
|
|
f"{server_url}/api/playlists/{player_id}",
|
|
headers={'Authorization': f'Bearer {auth_code}'},
|
|
timeout=int(self.config.get('security', 'timeout', fallback='30')),
|
|
verify=self.config.getboolean('security', 'verify_ssl', fallback=True)
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
|
|
return None
|
|
|
|
except Exception:
|
|
return None
|
|
|
|
def send_heartbeat(self, status: str = 'online') -> bool:
|
|
"""Send heartbeat to server.
|
|
|
|
Args:
|
|
status: Player status
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
auth_code = self.get_auth_code()
|
|
player_id = self.config.get('player', 'player_id', fallback='')
|
|
|
|
if not auth_code or not player_id:
|
|
return False
|
|
|
|
server_url = self.get_server_url()
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{server_url}/api/players/{player_id}/heartbeat",
|
|
headers={'Authorization': f'Bearer {auth_code}'},
|
|
json={'status': status},
|
|
timeout=int(self.config.get('security', 'timeout', fallback='30')),
|
|
verify=self.config.getboolean('security', 'verify_ssl', fallback=True)
|
|
)
|
|
|
|
return response.status_code == 200
|
|
|
|
except Exception:
|
|
return False
|
|
|
|
def clear_auth(self) -> None:
|
|
"""Clear saved authentication data."""
|
|
self.config['player']['auth_code'] = ''
|
|
self.config['player']['player_id'] = ''
|
|
self.config['player']['group_id'] = ''
|
|
self.save_config()
|
|
|
|
|
|
# Example usage
|
|
if __name__ == '__main__':
|
|
auth = PlayerAuth()
|
|
|
|
# Check if already authenticated
|
|
if auth.is_authenticated():
|
|
print(f"Already authenticated as: {auth.get_hostname()}")
|
|
|
|
# Verify authentication
|
|
valid, info = auth.verify_auth()
|
|
if valid:
|
|
print(f"Authentication valid: {info}")
|
|
else:
|
|
print("Authentication expired or invalid")
|
|
else:
|
|
print("Not authenticated. Please run authentication:")
|
|
print("auth.authenticate(hostname='player-001', password='your_password')")
|