updated get playlist function
This commit is contained in:
332
src/get_playlists_v2.py
Normal file
332
src/get_playlists_v2.py
Normal file
@@ -0,0 +1,332 @@
|
||||
"""
|
||||
Updated get_playlists.py for Kiwy-Signage with DigiServer v2 authentication
|
||||
Uses secure auth flow: hostname → password/quickconnect → auth_code → API calls
|
||||
"""
|
||||
import os
|
||||
import json
|
||||
import requests
|
||||
import logging
|
||||
from player_auth import PlayerAuth
|
||||
|
||||
# Set up logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Global auth instance
|
||||
_auth_instance = None
|
||||
|
||||
|
||||
def get_auth_instance(config_file='player_auth.json'):
|
||||
"""Get or create global auth instance."""
|
||||
global _auth_instance
|
||||
if _auth_instance is None:
|
||||
_auth_instance = PlayerAuth(config_file)
|
||||
return _auth_instance
|
||||
|
||||
|
||||
def ensure_authenticated(config):
|
||||
"""Ensure player is authenticated, authenticate if needed.
|
||||
|
||||
Args:
|
||||
config: Legacy config dict with server_ip, screen_name, quickconnect_key, port
|
||||
|
||||
Returns:
|
||||
PlayerAuth instance if authenticated, None otherwise
|
||||
"""
|
||||
auth = get_auth_instance()
|
||||
|
||||
# If already authenticated and valid, return auth instance
|
||||
if auth.is_authenticated():
|
||||
valid, _ = auth.verify_auth()
|
||||
if valid:
|
||||
logger.info("✅ Using existing authentication")
|
||||
return auth
|
||||
else:
|
||||
logger.warning("⚠️ Auth expired, re-authenticating...")
|
||||
|
||||
# Need to authenticate
|
||||
server_ip = config.get("server_ip", "")
|
||||
hostname = config.get("screen_name", "")
|
||||
quickconnect_key = config.get("quickconnect_key", "")
|
||||
port = config.get("port", "")
|
||||
|
||||
if not all([server_ip, hostname, quickconnect_key]):
|
||||
logger.error("❌ Missing configuration: server_ip, screen_name, or quickconnect_key")
|
||||
return None
|
||||
|
||||
# Build server URL
|
||||
import re
|
||||
ip_pattern = r'^\d+\.\d+\.\d+\.\d+$'
|
||||
if re.match(ip_pattern, server_ip):
|
||||
server_url = f'http://{server_ip}:{port}'
|
||||
else:
|
||||
server_url = f'http://{server_ip}'
|
||||
|
||||
# Authenticate using quickconnect code
|
||||
logger.info(f"🔐 Authenticating player: {hostname}")
|
||||
success, error = auth.authenticate(
|
||||
server_url=server_url,
|
||||
hostname=hostname,
|
||||
quickconnect_code=quickconnect_key
|
||||
)
|
||||
|
||||
if success:
|
||||
logger.info(f"✅ Authentication successful: {auth.get_player_name()}")
|
||||
return auth
|
||||
else:
|
||||
logger.error(f"❌ Authentication failed: {error}")
|
||||
return None
|
||||
|
||||
|
||||
def send_player_feedback(config, message, status="active", playlist_version=None, error_details=None):
|
||||
"""Send feedback to the server about player status.
|
||||
|
||||
Args:
|
||||
config (dict): Configuration containing server details
|
||||
message (str): Main feedback message
|
||||
status (str): Player status - "active", "playing", "error", "restarting"
|
||||
playlist_version (int, optional): Current playlist version being played
|
||||
error_details (str, optional): Error details if status is "error"
|
||||
|
||||
Returns:
|
||||
bool: True if feedback sent successfully, False otherwise
|
||||
"""
|
||||
auth = ensure_authenticated(config)
|
||||
if not auth:
|
||||
logger.warning("Cannot send feedback - not authenticated")
|
||||
return False
|
||||
|
||||
return auth.send_feedback(
|
||||
message=message,
|
||||
status=status,
|
||||
playlist_version=playlist_version,
|
||||
error_details=error_details
|
||||
)
|
||||
|
||||
|
||||
def send_playlist_check_feedback(config, playlist_version=None):
|
||||
"""Send feedback when playlist is checked for updates."""
|
||||
player_name = config.get("screen_name", "unknown")
|
||||
version_info = f"v{playlist_version}" if playlist_version else "unknown"
|
||||
message = f"player {player_name}, is active, Playing {version_info}"
|
||||
|
||||
return send_player_feedback(
|
||||
config=config,
|
||||
message=message,
|
||||
status="active",
|
||||
playlist_version=playlist_version
|
||||
)
|
||||
|
||||
|
||||
def send_playlist_restart_feedback(config, playlist_version=None):
|
||||
"""Send feedback when playlist loop ends and restarts."""
|
||||
player_name = config.get("screen_name", "unknown")
|
||||
version_info = f"v{playlist_version}" if playlist_version else "unknown"
|
||||
message = f"player {player_name}, playlist loop completed, restarting {version_info}"
|
||||
|
||||
return send_player_feedback(
|
||||
config=config,
|
||||
message=message,
|
||||
status="restarting",
|
||||
playlist_version=playlist_version
|
||||
)
|
||||
|
||||
|
||||
def send_player_error_feedback(config, error_message, playlist_version=None):
|
||||
"""Send feedback when an error occurs in the player."""
|
||||
player_name = config.get("screen_name", "unknown")
|
||||
message = f"player {player_name}, error occurred"
|
||||
|
||||
return send_player_feedback(
|
||||
config=config,
|
||||
message=message,
|
||||
status="error",
|
||||
playlist_version=playlist_version,
|
||||
error_details=error_message
|
||||
)
|
||||
|
||||
|
||||
def send_playing_status_feedback(config, playlist_version=None, current_media=None):
|
||||
"""Send feedback about current playing status."""
|
||||
player_name = config.get("screen_name", "unknown")
|
||||
version_info = f"v{playlist_version}" if playlist_version else "unknown"
|
||||
media_info = f" - {current_media}" if current_media else ""
|
||||
message = f"player {player_name}, is active, Playing {version_info}{media_info}"
|
||||
|
||||
return send_player_feedback(
|
||||
config=config,
|
||||
message=message,
|
||||
status="playing",
|
||||
playlist_version=playlist_version
|
||||
)
|
||||
|
||||
|
||||
def fetch_server_playlist(config):
|
||||
"""Fetch the updated playlist from the server using authenticated API.
|
||||
|
||||
Args:
|
||||
config: Legacy config dict
|
||||
|
||||
Returns:
|
||||
dict: {'playlist': [...], 'version': int}
|
||||
"""
|
||||
auth = ensure_authenticated(config)
|
||||
if not auth:
|
||||
logger.error("❌ Cannot fetch playlist - authentication failed")
|
||||
return {'playlist': [], 'version': 0}
|
||||
|
||||
# Get playlist using auth code
|
||||
playlist_data = auth.get_playlist()
|
||||
|
||||
if playlist_data:
|
||||
return {
|
||||
'playlist': playlist_data.get('playlist', []),
|
||||
'version': playlist_data.get('playlist_version', 0)
|
||||
}
|
||||
else:
|
||||
logger.error("❌ Failed to fetch playlist")
|
||||
return {'playlist': [], 'version': 0}
|
||||
|
||||
|
||||
def save_playlist_with_version(playlist_data, playlist_dir):
|
||||
"""Save playlist to file with version number."""
|
||||
version = playlist_data.get('version', 0)
|
||||
playlist_file = os.path.join(playlist_dir, f'server_playlist_v{version}.json')
|
||||
|
||||
# Ensure directory exists
|
||||
os.makedirs(playlist_dir, exist_ok=True)
|
||||
|
||||
with open(playlist_file, 'w') as f:
|
||||
json.dump(playlist_data, f, indent=2)
|
||||
|
||||
logger.info(f"✅ Playlist saved to {playlist_file}")
|
||||
return playlist_file
|
||||
|
||||
|
||||
def download_media_files(playlist, media_dir):
|
||||
"""Download media files from the server and save them to media_dir."""
|
||||
if not os.path.exists(media_dir):
|
||||
os.makedirs(media_dir)
|
||||
logger.info(f"📁 Created directory {media_dir} for media files")
|
||||
|
||||
updated_playlist = []
|
||||
for media in playlist:
|
||||
file_name = media.get('file_name', '')
|
||||
file_url = media.get('url', '')
|
||||
duration = media.get('duration', 10)
|
||||
local_path = os.path.join(media_dir, file_name)
|
||||
|
||||
logger.info(f"📥 Preparing to download {file_name}...")
|
||||
|
||||
if os.path.exists(local_path):
|
||||
logger.info(f"✓ File {file_name} already exists. Skipping download.")
|
||||
else:
|
||||
try:
|
||||
response = requests.get(file_url, timeout=30)
|
||||
if response.status_code == 200:
|
||||
with open(local_path, 'wb') as file:
|
||||
file.write(response.content)
|
||||
logger.info(f"✅ Successfully downloaded {file_name}")
|
||||
else:
|
||||
logger.error(f"❌ Failed to download {file_name}. Status: {response.status_code}")
|
||||
continue
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"❌ Error downloading {file_name}: {e}")
|
||||
continue
|
||||
|
||||
updated_media = {
|
||||
'file_name': file_name,
|
||||
'url': os.path.relpath(local_path, os.path.dirname(media_dir)),
|
||||
'duration': duration
|
||||
}
|
||||
updated_playlist.append(updated_media)
|
||||
|
||||
return updated_playlist
|
||||
|
||||
|
||||
def delete_old_playlists_and_media(current_version, playlist_dir, media_dir, keep_versions=1):
|
||||
"""Delete old playlist files and media files not referenced by the latest playlist version."""
|
||||
try:
|
||||
# Find all playlist files
|
||||
playlist_files = [f for f in os.listdir(playlist_dir)
|
||||
if f.startswith('server_playlist_v') and f.endswith('.json')]
|
||||
|
||||
# Extract versions and sort
|
||||
versions = []
|
||||
for f in playlist_files:
|
||||
try:
|
||||
version = int(f.replace('server_playlist_v', '').replace('.json', ''))
|
||||
versions.append((version, f))
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
versions.sort(reverse=True)
|
||||
|
||||
# Keep only the latest N versions
|
||||
files_to_delete = [f for v, f in versions[keep_versions:]]
|
||||
|
||||
for f in files_to_delete:
|
||||
filepath = os.path.join(playlist_dir, f)
|
||||
os.remove(filepath)
|
||||
logger.info(f"🗑️ Deleted old playlist: {f}")
|
||||
|
||||
# TODO: Clean up unused media files
|
||||
logger.info(f"✅ Cleanup complete (kept {keep_versions} latest versions)")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error during cleanup: {e}")
|
||||
|
||||
|
||||
def update_playlist_if_needed(config, playlist_dir, media_dir):
|
||||
"""Check for and download updated playlist if available."""
|
||||
try:
|
||||
# Fetch latest playlist from server
|
||||
server_data = fetch_server_playlist(config)
|
||||
server_version = server_data.get('version', 0)
|
||||
|
||||
if server_version == 0:
|
||||
logger.warning("⚠️ No valid playlist received from server")
|
||||
return None
|
||||
|
||||
# Check local version
|
||||
local_version = 0
|
||||
local_playlist_file = None
|
||||
|
||||
if os.path.exists(playlist_dir):
|
||||
playlist_files = [f for f in os.listdir(playlist_dir)
|
||||
if f.startswith('server_playlist_v') and f.endswith('.json')]
|
||||
|
||||
for f in playlist_files:
|
||||
try:
|
||||
version = int(f.replace('server_playlist_v', '').replace('.json', ''))
|
||||
if version > local_version:
|
||||
local_version = version
|
||||
local_playlist_file = os.path.join(playlist_dir, f)
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
logger.info(f"📊 Playlist versions - Server: v{server_version}, Local: v{local_version}")
|
||||
|
||||
# Update if needed
|
||||
if server_version > local_version:
|
||||
logger.info(f"🔄 Updating playlist from v{local_version} to v{server_version}")
|
||||
|
||||
# Download media files
|
||||
updated_playlist = download_media_files(server_data['playlist'], media_dir)
|
||||
server_data['playlist'] = updated_playlist
|
||||
|
||||
# Save new playlist
|
||||
playlist_file = save_playlist_with_version(server_data, playlist_dir)
|
||||
|
||||
# Clean up old versions
|
||||
delete_old_playlists_and_media(server_version, playlist_dir, media_dir)
|
||||
|
||||
logger.info(f"✅ Playlist updated successfully to v{server_version}")
|
||||
return playlist_file
|
||||
else:
|
||||
logger.info("✓ Playlist is up to date")
|
||||
return local_playlist_file
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error updating playlist: {e}")
|
||||
return None
|
||||
10
src/player_auth.json
Normal file
10
src/player_auth.json
Normal file
@@ -0,0 +1,10 @@
|
||||
{
|
||||
"hostname": "rpi-tvholba1",
|
||||
"auth_code": "aDHIMS2yx_HhfR0dWKy9VHaM_h0CKemfcsqv4Zgp0IY",
|
||||
"player_id": 2,
|
||||
"player_name": "Tv-Anunturi",
|
||||
"group_id": null,
|
||||
"orientation": "Landscape",
|
||||
"authenticated": true,
|
||||
"server_url": "http://172.18.0.1:5000"
|
||||
}
|
||||
350
src/player_auth.py
Normal file
350
src/player_auth.py
Normal file
@@ -0,0 +1,350 @@
|
||||
"""
|
||||
Player Authentication Module for Kiwy-Signage
|
||||
Handles secure authentication with DigiServer v2
|
||||
Uses: hostname → password/quickconnect → get auth_code → use auth_code for API calls
|
||||
"""
|
||||
import os
|
||||
import json
|
||||
import requests
|
||||
import logging
|
||||
from typing import Optional, Dict, Tuple
|
||||
|
||||
# Set up logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PlayerAuth:
|
||||
"""Handle player authentication with DigiServer v2."""
|
||||
|
||||
def __init__(self, config_file: str = 'player_auth.json'):
|
||||
"""Initialize player authentication.
|
||||
|
||||
Args:
|
||||
config_file: Path to authentication config file
|
||||
"""
|
||||
self.config_file = config_file
|
||||
self.auth_data = self._load_auth_data()
|
||||
|
||||
def _load_auth_data(self) -> Dict:
|
||||
"""Load authentication data from file."""
|
||||
if os.path.exists(self.config_file):
|
||||
try:
|
||||
with open(self.config_file, 'r') as f:
|
||||
return json.load(f)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load auth data: {e}")
|
||||
|
||||
# Return default empty auth data
|
||||
return {
|
||||
'hostname': '',
|
||||
'auth_code': '',
|
||||
'player_id': None,
|
||||
'player_name': '',
|
||||
'group_id': None,
|
||||
'orientation': 'Landscape',
|
||||
'authenticated': False
|
||||
}
|
||||
|
||||
def _save_auth_data(self) -> None:
|
||||
"""Save authentication data to file."""
|
||||
try:
|
||||
with open(self.config_file, 'w') as f:
|
||||
json.dump(self.auth_data, f, indent=2)
|
||||
logger.info(f"Auth data saved to {self.config_file}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save auth data: {e}")
|
||||
|
||||
def is_authenticated(self) -> bool:
|
||||
"""Check if player is authenticated."""
|
||||
return (self.auth_data.get('authenticated', False) and
|
||||
bool(self.auth_data.get('auth_code')))
|
||||
|
||||
def authenticate(self, server_url: str, hostname: str,
|
||||
password: str = None, quickconnect_code: str = None,
|
||||
timeout: int = 30) -> Tuple[bool, Optional[str]]:
|
||||
"""Authenticate with DigiServer v2.
|
||||
|
||||
Args:
|
||||
server_url: Server URL (e.g., 'http://server:5000')
|
||||
hostname: Player hostname/identifier
|
||||
password: Player password (optional if using quickconnect)
|
||||
quickconnect_code: Quick connect code (optional if using password)
|
||||
timeout: Request timeout in seconds
|
||||
|
||||
Returns:
|
||||
Tuple of (success: bool, error_message: Optional[str])
|
||||
"""
|
||||
if not password and not quickconnect_code:
|
||||
return False, "Password or quick connect code required"
|
||||
|
||||
# Prepare authentication request
|
||||
auth_url = f"{server_url}/api/auth/player"
|
||||
payload = {
|
||||
'hostname': hostname,
|
||||
'password': password,
|
||||
'quickconnect_code': quickconnect_code
|
||||
}
|
||||
|
||||
try:
|
||||
logger.info(f"Authenticating with server: {auth_url}")
|
||||
response = requests.post(auth_url, json=payload, timeout=timeout)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
|
||||
# Save authentication data
|
||||
self.auth_data = {
|
||||
'hostname': hostname,
|
||||
'auth_code': data.get('auth_code'),
|
||||
'player_id': data.get('player_id'),
|
||||
'player_name': data.get('player_name'),
|
||||
'group_id': data.get('group_id'),
|
||||
'orientation': data.get('orientation', 'Landscape'),
|
||||
'authenticated': True,
|
||||
'server_url': server_url
|
||||
}
|
||||
|
||||
self._save_auth_data()
|
||||
|
||||
logger.info(f"✅ Authentication successful for player: {data.get('player_name')}")
|
||||
return True, None
|
||||
|
||||
elif response.status_code == 401:
|
||||
error_msg = response.json().get('error', 'Invalid credentials')
|
||||
logger.warning(f"Authentication failed: {error_msg}")
|
||||
return False, error_msg
|
||||
|
||||
else:
|
||||
error_msg = f"Server error: {response.status_code}"
|
||||
logger.error(error_msg)
|
||||
return False, error_msg
|
||||
|
||||
except requests.exceptions.ConnectionError:
|
||||
error_msg = "Cannot connect to server"
|
||||
logger.error(error_msg)
|
||||
return False, error_msg
|
||||
|
||||
except requests.exceptions.Timeout:
|
||||
error_msg = "Connection timeout"
|
||||
logger.error(error_msg)
|
||||
return False, error_msg
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Authentication error: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
return False, error_msg
|
||||
|
||||
def verify_auth(self, timeout: int = 10) -> Tuple[bool, Optional[Dict]]:
|
||||
"""Verify current auth code with server.
|
||||
|
||||
Args:
|
||||
timeout: Request timeout in seconds
|
||||
|
||||
Returns:
|
||||
Tuple of (valid: bool, player_info: Optional[Dict])
|
||||
"""
|
||||
if not self.is_authenticated():
|
||||
return False, None
|
||||
|
||||
server_url = self.auth_data.get('server_url')
|
||||
if not server_url:
|
||||
return False, None
|
||||
|
||||
verify_url = f"{server_url}/api/auth/verify"
|
||||
payload = {'auth_code': self.auth_data.get('auth_code')}
|
||||
|
||||
try:
|
||||
response = requests.post(verify_url, json=payload, timeout=timeout)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
if data.get('valid'):
|
||||
logger.info("✅ Auth code verified")
|
||||
return True, data
|
||||
|
||||
logger.warning("❌ Auth code invalid or expired")
|
||||
return False, None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to verify auth: {e}")
|
||||
return False, None
|
||||
|
||||
def get_playlist(self, timeout: int = 30) -> Optional[Dict]:
|
||||
"""Get playlist from server using auth code.
|
||||
|
||||
Args:
|
||||
timeout: Request timeout in seconds
|
||||
|
||||
Returns:
|
||||
Playlist data or None if failed
|
||||
"""
|
||||
if not self.is_authenticated():
|
||||
logger.error("Not authenticated. Call authenticate() first.")
|
||||
return None
|
||||
|
||||
server_url = self.auth_data.get('server_url')
|
||||
player_id = self.auth_data.get('player_id')
|
||||
auth_code = self.auth_data.get('auth_code')
|
||||
|
||||
if not all([server_url, player_id, auth_code]):
|
||||
logger.error("Missing authentication data")
|
||||
return None
|
||||
|
||||
playlist_url = f"{server_url}/api/playlists/{player_id}"
|
||||
headers = {'Authorization': f'Bearer {auth_code}'}
|
||||
|
||||
try:
|
||||
logger.info(f"Fetching playlist from: {playlist_url}")
|
||||
response = requests.get(playlist_url, headers=headers, timeout=timeout)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
logger.info(f"✅ Playlist received (version: {data.get('playlist_version')})")
|
||||
return data
|
||||
|
||||
elif response.status_code == 401 or response.status_code == 403:
|
||||
logger.warning("❌ Authentication failed. Need to re-authenticate.")
|
||||
self.clear_auth()
|
||||
return None
|
||||
|
||||
else:
|
||||
logger.error(f"Failed to get playlist: {response.status_code}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching playlist: {e}")
|
||||
return None
|
||||
|
||||
def send_heartbeat(self, status: str = 'online', timeout: int = 10) -> bool:
|
||||
"""Send heartbeat to server.
|
||||
|
||||
Args:
|
||||
status: Player status (online, playing, error, etc.)
|
||||
timeout: Request timeout in seconds
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
if not self.is_authenticated():
|
||||
return False
|
||||
|
||||
server_url = self.auth_data.get('server_url')
|
||||
player_id = self.auth_data.get('player_id')
|
||||
auth_code = self.auth_data.get('auth_code')
|
||||
|
||||
if not all([server_url, player_id, auth_code]):
|
||||
return False
|
||||
|
||||
heartbeat_url = f"{server_url}/api/players/{player_id}/heartbeat"
|
||||
headers = {'Authorization': f'Bearer {auth_code}'}
|
||||
payload = {'status': status}
|
||||
|
||||
try:
|
||||
response = requests.post(heartbeat_url, headers=headers,
|
||||
json=payload, timeout=timeout)
|
||||
return response.status_code == 200
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Heartbeat failed: {e}")
|
||||
return False
|
||||
|
||||
def send_feedback(self, message: str, status: str = 'active',
|
||||
playlist_version: int = None, error_details: str = None,
|
||||
timeout: int = 10) -> bool:
|
||||
"""Send feedback to server.
|
||||
|
||||
Args:
|
||||
message: Feedback message
|
||||
status: Player status
|
||||
playlist_version: Current playlist version
|
||||
error_details: Error details if applicable
|
||||
timeout: Request timeout in seconds
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
if not self.is_authenticated():
|
||||
return False
|
||||
|
||||
server_url = self.auth_data.get('server_url')
|
||||
auth_code = self.auth_data.get('auth_code')
|
||||
|
||||
if not all([server_url, auth_code]):
|
||||
return False
|
||||
|
||||
feedback_url = f"{server_url}/api/player-feedback"
|
||||
headers = {'Authorization': f'Bearer {auth_code}'}
|
||||
payload = {
|
||||
'message': message,
|
||||
'status': status,
|
||||
'playlist_version': playlist_version,
|
||||
'error_details': error_details
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(feedback_url, headers=headers,
|
||||
json=payload, timeout=timeout)
|
||||
return response.status_code == 200
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Feedback failed: {e}")
|
||||
return False
|
||||
|
||||
def clear_auth(self) -> None:
|
||||
"""Clear saved authentication data."""
|
||||
self.auth_data = {
|
||||
'hostname': self.auth_data.get('hostname', ''),
|
||||
'auth_code': '',
|
||||
'player_id': None,
|
||||
'player_name': '',
|
||||
'group_id': None,
|
||||
'orientation': 'Landscape',
|
||||
'authenticated': False
|
||||
}
|
||||
self._save_auth_data()
|
||||
logger.info("Authentication data cleared")
|
||||
|
||||
def get_hostname(self) -> str:
|
||||
"""Get saved hostname."""
|
||||
return self.auth_data.get('hostname', '')
|
||||
|
||||
def get_player_id(self) -> Optional[int]:
|
||||
"""Get player ID."""
|
||||
return self.auth_data.get('player_id')
|
||||
|
||||
def get_player_name(self) -> str:
|
||||
"""Get player name."""
|
||||
return self.auth_data.get('player_name', '')
|
||||
|
||||
def get_orientation(self) -> str:
|
||||
"""Get display orientation."""
|
||||
return self.auth_data.get('orientation', 'Landscape')
|
||||
|
||||
|
||||
# Example usage
|
||||
if __name__ == '__main__':
|
||||
# Initialize auth
|
||||
auth = PlayerAuth()
|
||||
|
||||
# Check if already authenticated
|
||||
if auth.is_authenticated():
|
||||
print(f"✅ Already authenticated as: {auth.get_player_name()}")
|
||||
|
||||
# Verify authentication
|
||||
valid, info = auth.verify_auth()
|
||||
if valid:
|
||||
print(f"✅ Authentication valid")
|
||||
print(f" Player ID: {info['player_id']}")
|
||||
print(f" Group ID: {info.get('group_id', 'None')}")
|
||||
else:
|
||||
print("❌ Authentication expired, need to re-authenticate")
|
||||
else:
|
||||
print("❌ Not authenticated")
|
||||
print("\nTo authenticate, run:")
|
||||
print("auth.authenticate(")
|
||||
print(" server_url='http://your-server:5000',")
|
||||
print(" hostname='player-001',")
|
||||
print(" password='your_password'")
|
||||
print(" # OR")
|
||||
print(" quickconnect_code='QUICK123'")
|
||||
print(")")
|
||||
Reference in New Issue
Block a user