"""SSH deployment utilities for player provisioning.""" import subprocess import logging import os import json from typing import Tuple, Dict, Any, Optional from datetime import datetime logger = logging.getLogger(__name__) # Pre-staged player code location in container LOCAL_PLAYER_CODE_DIR = '/app/data/player' def get_local_player_code_status() -> Dict[str, Any]: """ Check status of pre-staged player code. Returns: Dict with availability, version, and path info """ try: if not os.path.isdir(LOCAL_PLAYER_CODE_DIR): return { 'available': False, 'reason': 'Directory not found', 'path': LOCAL_PLAYER_CODE_DIR } # Check if git repository git_dir = os.path.join(LOCAL_PLAYER_CODE_DIR, '.git') if not os.path.isdir(git_dir): return { 'available': False, 'reason': 'Not a git repository', 'path': LOCAL_PLAYER_CODE_DIR } # Get current git version try: result = subprocess.run( ['git', '-C', LOCAL_PLAYER_CODE_DIR, 'rev-parse', '--short', 'HEAD'], capture_output=True, text=True, timeout=5 ) version = result.stdout.strip() if result.returncode == 0 else 'unknown' except: version = 'unknown' # Get directory size try: result = subprocess.run( ['du', '-sh', LOCAL_PLAYER_CODE_DIR], capture_output=True, text=True, timeout=5 ) size = result.stdout.split()[0] if result.returncode == 0 else 'unknown' except: size = 'unknown' return { 'available': True, 'path': LOCAL_PLAYER_CODE_DIR, 'version': version, 'size': size, 'updated': os.path.getmtime(git_dir), 'reason': 'Pre-staged code ready for deployment' } except Exception as e: logger.warning(f'Error checking player code status: {str(e)}') return { 'available': False, 'reason': f'Status check failed: {str(e)}', 'path': LOCAL_PLAYER_CODE_DIR } def test_ssh_connection(hostname: str, username: str, password: str, port: int = 22) -> Dict[str, Any]: """ Test SSH connection to a remote host. Args: hostname: Target hostname or IP username: SSH username password: SSH password port: SSH port (default 22) Returns: Dict with status, message, and timestamp """ try: # Use sshpass to test connection without interactive prompt cmd = [ 'sshpass', '-p', password, 'ssh', '-o', 'StrictHostKeyChecking=no', '-o', 'ConnectTimeout=10', '-p', str(port), f'{username}@{hostname}', 'echo "SSH connection successful"' ] result = subprocess.run( cmd, capture_output=True, text=True, timeout=15 ) if result.returncode == 0: return { 'success': True, 'message': f'SSH connection successful to {hostname}', 'timestamp': datetime.now().isoformat(), 'output': result.stdout.strip() } else: error_msg = result.stderr.strip() or result.stdout.strip() return { 'success': False, 'message': f'SSH connection failed: {error_msg}', 'timestamp': datetime.now().isoformat(), 'error': error_msg } except subprocess.TimeoutExpired: return { 'success': False, 'message': f'SSH connection timeout to {hostname}', 'timestamp': datetime.now().isoformat(), 'error': 'Connection timeout (10s)' } except Exception as e: logger.error(f'SSH test error: {str(e)}') return { 'success': False, 'message': f'SSH connection error: {str(e)}', 'timestamp': datetime.now().isoformat(), 'error': str(e) } def generate_player_config( player_name: str, server_url: str, api_key: str, player_id: str = None, location: str = None ) -> str: """ Generate player configuration JSON for connecting to DigiServer. Args: player_name: Name of the player server_url: DigiServer base URL (e.g., http://localhost/digiserver) api_key: API authentication key player_id: Optional player ID (defaults to player_name) location: Optional player location/description Returns: JSON configuration string """ config = { "player": { "name": player_name, "id": player_id or player_name, "location": location or "", "version": "2.0" }, "server": { "url": server_url, "api_endpoint": f"{server_url}/api", "authentication": { "type": "api_key", "key": api_key }, "endpoints": { "playlists": f"{server_url}/api/playlists", "content": f"{server_url}/api/content", "schedule": f"{server_url}/api/schedule", "heartbeat": f"{server_url}/api/player/heartbeat", "logs": f"{server_url}/api/player/logs" } }, "playback": { "audio_enabled": True, "video_enabled": True, "max_resolution": "4K", "refresh_interval": 60, "rotation": "0" }, "networking": { "timeout": 30, "retry_count": 3, "retry_delay": 5 } } return json.dumps(config, indent=2) def deploy_player_to_host( hostname: str, username: str, password: str, player_name: str, repo_url: str = 'https://gitea.moto-adv.com/ske087/Kiwy-Signage.git', deploy_path: str = None, # Default: /home/[user]/kiwy-signage port: int = 22, server_url: str = None, # DigiServer URL for player to connect to server_api_key: str = None # API key for player authentication ) -> Dict[str, Any]: """ Deploy player code to remote host. Args: hostname: Target hostname or IP username: SSH username password: SSH password player_name: Name for the player instance repo_url: Git repository URL deploy_path: Path where to deploy on remote host (default: /home/[user]/kiwy-signage) port: SSH port (default 22) server_url: DigiServer URL for player connection server_api_key: API key for player authentication Returns: Dict with deployment status and output """ # Set default deployment path to user's home directory if deploy_path is None: deploy_path = f'/home/{username}/kiwy-signage' try: # Step 1: Verify host accessibility test_result = test_ssh_connection(hostname, username, password, port) if not test_result['success']: return { 'success': False, 'message': 'Cannot deploy: SSH connection failed', 'timestamp': datetime.now().isoformat(), 'error': test_result['message'], 'steps': [] } steps = [ { 'step': 'SSH Connection Test', 'status': 'completed', 'message': 'SSH connection successful', 'timestamp': datetime.now().isoformat() } ] # Step 2: Create deployment directory try: mkdir_cmd = [ 'sshpass', '-p', password, 'ssh', '-o', 'StrictHostKeyChecking=no', '-p', str(port), f'{username}@{hostname}', f'mkdir -p {deploy_path}' ] result = subprocess.run(mkdir_cmd, capture_output=True, text=True, timeout=30) steps.append({ 'step': 'Create Deploy Directory', 'status': 'completed' if result.returncode == 0 else 'failed', 'message': f'Directory {deploy_path} created', 'timestamp': datetime.now().isoformat() }) except Exception as e: steps.append({ 'step': 'Create Deploy Directory', 'status': 'failed', 'message': f'Failed: {str(e)}', 'timestamp': datetime.now().isoformat() }) return { 'success': False, 'message': f'Deployment failed at step: Create Deploy Directory', 'timestamp': datetime.now().isoformat(), 'error': str(e), 'steps': steps } # Step 3: Deploy code (use local if available, otherwise clone from git) try: code_status = get_local_player_code_status() if code_status['available']: # Use pre-staged player code via rsync logger.info(f'Using pre-staged player code (version: {code_status.get("version", "unknown")})') rsync_cmd = [ 'sshpass', '-p', password, 'rsync', '-avz', '--delete', '-e', f'ssh -o StrictHostKeyChecking=no -p {port}', f'{LOCAL_PLAYER_CODE_DIR}/', f'{username}@{hostname}:{deploy_path}/' ] result = subprocess.run(rsync_cmd, capture_output=True, text=True, timeout=300) steps.append({ 'step': 'Deploy Code', 'status': 'completed' if result.returncode == 0 else 'failed', 'message': f'Code deployed via rsync (version: {code_status.get("version", "local")})', 'timestamp': datetime.now().isoformat() }) if result.returncode != 0: logger.warning(f'Rsync failed, falling back to git clone: {result.stderr}') # Fall back to git clone raise Exception('Rsync failed, retrying with git') else: # No local code, clone from repository logger.info(f'No pre-staged code available ({code_status.get("reason", "unknown")}), cloning from repository') raise Exception('Local code not available') except Exception as rsync_error: # Fallback: Clone or pull repository try: logger.info(f'Deploying via git: {rsync_error}') # Check if repo already exists check_cmd = [ 'sshpass', '-p', password, 'ssh', '-o', 'StrictHostKeyChecking=no', '-p', str(port), f'{username}@{hostname}', f'[ -d {deploy_path}/.git ]' ] result = subprocess.run(check_cmd, capture_output=True, text=True, timeout=10) if result.returncode == 0: # Repo exists, pull latest git_cmd = [ 'sshpass', '-p', password, 'ssh', '-o', 'StrictHostKeyChecking=no', '-p', str(port), f'{username}@{hostname}', f'cd {deploy_path} && git pull origin main 2>&1' ] git_msg = 'Pull latest code' else: # Clone repository git_cmd = [ 'sshpass', '-p', password, 'ssh', '-o', 'StrictHostKeyChecking=no', '-p', str(port), f'{username}@{hostname}', f'git clone {repo_url} {deploy_path} 2>&1' ] git_msg = 'Clone repository' result = subprocess.run(git_cmd, capture_output=True, text=True, timeout=120) steps.append({ 'step': 'Deploy Code', 'status': 'completed' if result.returncode == 0 else 'failed', 'message': f'{git_msg}: {result.stdout.split(chr(10))[0][:100]}', 'timestamp': datetime.now().isoformat() }) if result.returncode != 0: return { 'success': False, 'message': f'Deployment failed at step: Deploy Code', 'timestamp': datetime.now().isoformat(), 'error': result.stderr or result.stdout, 'steps': steps } except Exception as e: steps.append({ 'step': 'Deploy Code', 'status': 'failed', 'message': f'Failed: {str(e)}', 'timestamp': datetime.now().isoformat() }) return { 'success': False, 'message': f'Deployment failed at step: Deploy Code', 'timestamp': datetime.now().isoformat(), 'error': str(e), 'steps': steps } # Step 3.5: Generate player configuration try: if server_url and server_api_key: config_content = generate_player_config( player_name=player_name, server_url=server_url, api_key=server_api_key ) # Write config file to remote host write_config_cmd = [ 'sshpass', '-p', password, 'ssh', '-o', 'StrictHostKeyChecking=no', '-p', str(port), f'{username}@{hostname}', f'cat > {deploy_path}/config.json << \'EOF\'\n{config_content}\nEOF' ] result = subprocess.run(write_config_cmd, capture_output=True, text=True, timeout=30) steps.append({ 'step': 'Configure Player', 'status': 'completed' if result.returncode == 0 else 'warning', 'message': f'Player configuration created', 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.warning(f'Failed to create player config: {str(e)}') # Step 4: Run installation script try: # First check if install script exists check_script = [ 'sshpass', '-p', password, 'ssh', '-o', 'StrictHostKeyChecking=no', '-p', str(port), f'{username}@{hostname}', f'[ -f {deploy_path}/install.sh ] || [ -f {deploy_path}/setup.sh ] || [ -f {deploy_path}/install_player.sh ]' ] script_check = subprocess.run(check_script, capture_output=True, text=True, timeout=10) if script_check.returncode == 0: # Find the install script find_cmd = [ 'sshpass', '-p', password, 'ssh', '-o', 'StrictHostKeyChecking=no', '-p', str(port), f'{username}@{hostname}', f'ls {deploy_path}/install*.sh {deploy_path}/setup.sh {deploy_path}/*.sh 2>/dev/null | head -1' ] find_result = subprocess.run(find_cmd, capture_output=True, text=True, timeout=10) install_script = find_result.stdout.strip().split('\n')[0] if install_script: # Run the install script install_cmd = [ 'sshpass', '-p', password, 'ssh', '-o', 'StrictHostKeyChecking=no', '-p', str(port), f'{username}@{hostname}', f'cd {deploy_path} && bash {install_script} 2>&1' ] result = subprocess.run(install_cmd, capture_output=True, text=True, timeout=300) steps.append({ 'step': 'Run Installation Script', 'status': 'completed' if result.returncode == 0 else 'completed_with_warnings', 'message': f'Installation script executed: {install_script}', 'timestamp': datetime.now().isoformat() }) else: steps.append({ 'step': 'Run Installation Script', 'status': 'skipped', 'message': 'No installation script found (manual setup may be required)', 'timestamp': datetime.now().isoformat() }) else: steps.append({ 'step': 'Run Installation Script', 'status': 'skipped', 'message': 'No installation script found', 'timestamp': datetime.now().isoformat() }) except Exception as e: steps.append({ 'step': 'Run Installation Script', 'status': 'error', 'message': f'Error running installation: {str(e)}', 'timestamp': datetime.now().isoformat() }) logger.error(f'Installation script error: {str(e)}') # Step 5: Start player service (execute start.sh) try: # Check if start.sh exists check_start = [ 'sshpass', '-p', password, 'ssh', '-o', 'StrictHostKeyChecking=no', '-p', str(port), f'{username}@{hostname}', f'[ -f {deploy_path}/start.sh ]' ] start_check = subprocess.run(check_start, capture_output=True, text=True, timeout=10) if start_check.returncode == 0: # Make sure start.sh is executable and run it start_cmd = [ 'sshpass', '-p', password, 'ssh', '-o', 'StrictHostKeyChecking=no', '-p', str(port), f'{username}@{hostname}', f'cd {deploy_path} && chmod +x start.sh && bash start.sh 2>&1' ] result = subprocess.run(start_cmd, capture_output=True, text=True, timeout=300) # Capture first line of output for feedback output_msg = result.stdout.split('\n')[0][:100] if result.stdout else 'Started' steps.append({ 'step': 'Start Player Service', 'status': 'completed' if result.returncode == 0 else 'completed_with_warnings', 'message': f'Player service started: {output_msg}', 'timestamp': datetime.now().isoformat() }) logger.info(f'Player service started on {hostname} at {deploy_path}') else: steps.append({ 'step': 'Start Player Service', 'status': 'warning', 'message': 'start.sh not found - player may require manual startup', 'timestamp': datetime.now().isoformat() }) logger.warning(f'start.sh not found at {deploy_path}/start.sh on {hostname}') except Exception as e: steps.append({ 'step': 'Start Player Service', 'status': 'error', 'message': f'Error starting player service: {str(e)}', 'timestamp': datetime.now().isoformat() }) logger.error(f'Failed to start player service: {str(e)}') return { 'success': True, 'message': f'Player "{player_name}" deployed successfully to {hostname}', 'timestamp': datetime.now().isoformat(), 'deploy_path': deploy_path, 'steps': steps } except Exception as e: logger.error(f'Deployment error: {str(e)}') return { 'success': False, 'message': f'Unexpected deployment error: {str(e)}', 'timestamp': datetime.now().isoformat(), 'error': str(e), 'steps': [] }