Files
digiserver-v2/player_auth_module.py
2025-11-12 16:07:03 +02:00

255 lines
8.5 KiB
Python

"""
Player Authentication Module for Kiwy-Signage
Handles authentication with DigiServer v2 and secure config storage
"""
import configparser
import os
import requests
from typing import Optional, Dict, Tuple
import json
class PlayerAuth:
"""Handle player authentication and configuration management."""
def __init__(self, config_path: str = 'player_config.ini'):
"""Initialize player authentication.
Args:
config_path: Path to configuration file
"""
self.config_path = config_path
self.config = configparser.ConfigParser()
self.load_config()
def load_config(self) -> None:
"""Load configuration from file."""
if os.path.exists(self.config_path):
self.config.read(self.config_path)
else:
# Create default config
self._create_default_config()
def _create_default_config(self) -> None:
"""Create default configuration file."""
self.config['server'] = {
'server_url': 'http://localhost:5000'
}
self.config['player'] = {
'hostname': '',
'auth_code': '',
'player_id': '',
'group_id': ''
}
self.config['display'] = {
'orientation': 'Landscape',
'resolution': '1920x1080'
}
self.config['security'] = {
'verify_ssl': 'true',
'timeout': '30'
}
self.config['cache'] = {
'cache_dir': './cache',
'max_cache_size': '1024'
}
self.config['logging'] = {
'enabled': 'true',
'log_level': 'INFO',
'log_file': './player.log'
}
self.save_config()
def save_config(self) -> None:
"""Save configuration to file."""
with open(self.config_path, 'w') as f:
self.config.write(f)
def get_server_url(self) -> str:
"""Get server URL from config."""
return self.config.get('server', 'server_url', fallback='http://localhost:5000')
def get_hostname(self) -> str:
"""Get player hostname from config."""
return self.config.get('player', 'hostname', fallback='')
def get_auth_code(self) -> str:
"""Get saved auth code from config."""
return self.config.get('player', 'auth_code', fallback='')
def is_authenticated(self) -> bool:
"""Check if player has valid authentication."""
return bool(self.get_hostname() and self.get_auth_code())
def authenticate(self, hostname: str, password: str = None,
quickconnect_code: str = None) -> Tuple[bool, Optional[str]]:
"""Authenticate with server and save credentials.
Args:
hostname: Player hostname/identifier
password: Player password (optional if using quickconnect)
quickconnect_code: Quick connect code (optional if using password)
Returns:
Tuple of (success: bool, error_message: Optional[str])
"""
if not password and not quickconnect_code:
return False, "Password or quick connect code required"
server_url = self.get_server_url()
try:
# Make authentication request
response = requests.post(
f"{server_url}/api/auth/player",
json={
'hostname': hostname,
'password': password,
'quickconnect_code': quickconnect_code
},
timeout=int(self.config.get('security', 'timeout', fallback='30')),
verify=self.config.getboolean('security', 'verify_ssl', fallback=True)
)
if response.status_code == 200:
data = response.json()
# Save authentication data
self.config['player']['hostname'] = hostname
self.config['player']['auth_code'] = data.get('auth_code', '')
self.config['player']['player_id'] = str(data.get('player_id', ''))
self.config['player']['group_id'] = str(data.get('group_id', ''))
self.config['display']['orientation'] = data.get('orientation', 'Landscape')
self.save_config()
return True, None
else:
error_data = response.json()
return False, error_data.get('error', 'Authentication failed')
except requests.exceptions.ConnectionError:
return False, "Cannot connect to server"
except requests.exceptions.Timeout:
return False, "Connection timeout"
except Exception as e:
return False, f"Error: {str(e)}"
def verify_auth(self) -> Tuple[bool, Optional[Dict]]:
"""Verify current auth code with server.
Returns:
Tuple of (valid: bool, player_info: Optional[Dict])
"""
auth_code = self.get_auth_code()
if not auth_code:
return False, None
server_url = self.get_server_url()
try:
response = requests.post(
f"{server_url}/api/auth/verify",
json={'auth_code': auth_code},
timeout=int(self.config.get('security', 'timeout', fallback='30')),
verify=self.config.getboolean('security', 'verify_ssl', fallback=True)
)
if response.status_code == 200:
data = response.json()
return data.get('valid', False), data
return False, None
except Exception:
return False, None
def get_playlist(self) -> Optional[Dict]:
"""Get playlist for this player from server.
Returns:
Playlist data or None if failed
"""
auth_code = self.get_auth_code()
player_id = self.config.get('player', 'player_id', fallback='')
if not auth_code or not player_id:
return None
server_url = self.get_server_url()
try:
response = requests.get(
f"{server_url}/api/playlists/{player_id}",
headers={'Authorization': f'Bearer {auth_code}'},
timeout=int(self.config.get('security', 'timeout', fallback='30')),
verify=self.config.getboolean('security', 'verify_ssl', fallback=True)
)
if response.status_code == 200:
return response.json()
return None
except Exception:
return None
def send_heartbeat(self, status: str = 'online') -> bool:
"""Send heartbeat to server.
Args:
status: Player status
Returns:
True if successful, False otherwise
"""
auth_code = self.get_auth_code()
player_id = self.config.get('player', 'player_id', fallback='')
if not auth_code or not player_id:
return False
server_url = self.get_server_url()
try:
response = requests.post(
f"{server_url}/api/players/{player_id}/heartbeat",
headers={'Authorization': f'Bearer {auth_code}'},
json={'status': status},
timeout=int(self.config.get('security', 'timeout', fallback='30')),
verify=self.config.getboolean('security', 'verify_ssl', fallback=True)
)
return response.status_code == 200
except Exception:
return False
def clear_auth(self) -> None:
"""Clear saved authentication data."""
self.config['player']['auth_code'] = ''
self.config['player']['player_id'] = ''
self.config['player']['group_id'] = ''
self.save_config()
# Example usage
if __name__ == '__main__':
auth = PlayerAuth()
# Check if already authenticated
if auth.is_authenticated():
print(f"Already authenticated as: {auth.get_hostname()}")
# Verify authentication
valid, info = auth.verify_auth()
if valid:
print(f"Authentication valid: {info}")
else:
print("Authentication expired or invalid")
else:
print("Not authenticated. Please run authentication:")
print("auth.authenticate(hostname='player-001', password='your_password')")