""" SSH and Ansible management service for remote device operations """ import os import json import subprocess import tempfile import threading import paramiko import yaml import uuid from typing import Dict, List, Optional, Tuple from datetime import datetime from pathlib import Path import logging from concurrent.futures import ThreadPoolExecutor, as_completed from app.models import Device, AnsibleExecution, PlaybookExecution from config.database_config import get_db class AnsibleService: """Service for managing remote devices via SSH and Ansible""" SETTINGS_FILE = Path("data/ansible_settings.json") DEFAULT_SETTINGS = { "ssh_fallback_password": "raspberry", "use_password_auth": False, } def __init__(self): self.db = get_db() self.ansible_dir = Path("ansible") self.inventory_file = self.ansible_dir / "inventory" / "dynamic_inventory.yaml" self.playbook_dir = self.ansible_dir / "playbooks" self.ssh_keys_dir = self.ansible_dir / "ssh_keys" self.ssh_key_path = self.ssh_keys_dir / "app_key" self.ansible_cfg_path = self.ansible_dir / "ansible.cfg" # Ensure directories exist self.ansible_dir.mkdir(exist_ok=True) (self.ansible_dir / "inventory").mkdir(exist_ok=True) (self.ansible_dir / "playbooks").mkdir(exist_ok=True) (self.ansible_dir / "roles").mkdir(exist_ok=True) self.ssh_keys_dir.mkdir(mode=0o700, exist_ok=True) # ------------------------------------------------------------------ # # Settings helpers # # ------------------------------------------------------------------ # def load_settings(self) -> Dict: """Load ansible settings from data/ansible_settings.json.""" settings = dict(self.DEFAULT_SETTINGS) if self.SETTINGS_FILE.exists(): try: with open(self.SETTINGS_FILE, 'r') as f: stored = json.load(f) settings.update(stored) except Exception as e: logging.error(f"Error reading ansible settings: {e}") return settings def save_settings(self, settings: Dict): """Persist ansible settings to data/ansible_settings.json.""" self.SETTINGS_FILE.parent.mkdir(parents=True, exist_ok=True) current = self.load_settings() current.update(settings) with open(self.SETTINGS_FILE, 'w') as f: json.dump(current, f, indent=2) logging.info("Ansible settings saved") # ------------------------------------------------------------------ # # Inventory file helpers # # ------------------------------------------------------------------ # def _read_inventory(self) -> Dict: """Read inventory YAML file and return parsed dict (safe).""" if self.inventory_file.exists(): try: with open(self.inventory_file, 'r') as f: data = yaml.safe_load(f) or {} if 'all' not in data: data['all'] = {'children': {}} if 'children' not in (data['all'] or {}): data['all']['children'] = {} return data except Exception as e: logging.error(f"Error reading inventory file: {e}") return {'all': {'children': {}}} def _write_inventory(self, data: Dict): """Write inventory dict to YAML file.""" self.inventory_file.parent.mkdir(parents=True, exist_ok=True) with open(self.inventory_file, 'w') as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True) def get_inventory_data(self) -> Dict: """Return structured inventory data for display (groups + hosts).""" data = self._read_inventory() groups = {} children = data.get('all', {}).get('children', {}) or {} for group_name, group_data in children.items(): hosts = [] group_data = group_data or {} for hostname, host_vars in (group_data.get('hosts') or {}).items(): entry = {'hostname': hostname} entry.update(host_vars or {}) hosts.append(entry) groups[group_name] = { 'hosts': hosts, 'vars': group_data.get('vars', {}) or {} } raw = '' if self.inventory_file.exists(): try: with open(self.inventory_file, 'r') as f: raw = f.read() except Exception: pass return {'groups': groups, 'raw_yaml': raw} # ------------------------------------------------------------------ # # Inventory CRUD # # ------------------------------------------------------------------ # def sync_devices_to_inventory(self) -> Dict: """Sync all active DB devices into monitoring_devices group. Preserves all other custom groups already in the inventory.""" try: import re as _re data = self._read_inventory() children = data['all'].setdefault('children', {}) # Reset only monitoring_devices group children['monitoring_devices'] = {'hosts': {}} synced = 0 with self.db.get_session() as session: devices = session.query(Device).filter_by(status='active').all() for device in devices: if device.device_ip == '127.0.0.1' or device.hostname == 'localhost': hvars = { 'ansible_connection': 'local', 'ansible_host': '127.0.0.1' } else: settings = self.load_settings() use_password = settings.get('use_password_auth', False) ssh_password = settings.get('ssh_fallback_password', '') if use_password and ssh_password: hvars = { 'ansible_host': device.device_ip, 'ansible_user': 'pi', 'ansible_password': ssh_password, 'ansible_become_password': ssh_password, 'ansible_ssh_common_args': '-o PubkeyAuthentication=no -o PreferredAuthentications=password -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null' } else: hvars = { 'ansible_host': device.device_ip, 'ansible_user': 'pi', 'ansible_ssh_private_key_file': str(self.ssh_key_path.resolve()), 'ansible_ssh_common_args': '-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null' } children['monitoring_devices']['hosts'][device.hostname] = hvars synced += 1 self._write_inventory(data) return {'success': True, 'synced': synced, 'message': f'Synced {synced} device(s) to monitoring_devices group'} except Exception as e: logging.error(f"Error syncing devices to inventory: {e}") return {'success': False, 'error': str(e)} def add_group_to_inventory(self, group_name: str) -> Dict: """Add a new empty group to the inventory.""" import re as _re if not _re.match(r'^[a-zA-Z0-9_-]+$', group_name): return {'success': False, 'error': 'Group name may only contain letters, numbers, underscores and hyphens'} try: data = self._read_inventory() children = data['all'].setdefault('children', {}) if group_name in children: return {'success': False, 'error': f'Group "{group_name}" already exists'} children[group_name] = {'hosts': {}} self._write_inventory(data) return {'success': True, 'message': f'Group "{group_name}" created'} except Exception as e: return {'success': False, 'error': str(e)} def remove_group_from_inventory(self, group_name: str) -> Dict: """Remove a custom group from the inventory.""" if group_name == 'monitoring_devices': return {'success': False, 'error': 'Cannot remove the default monitoring_devices group'} try: data = self._read_inventory() children = data['all'].get('children', {}) or {} if group_name not in children: return {'success': False, 'error': f'Group "{group_name}" not found'} del children[group_name] self._write_inventory(data) return {'success': True, 'message': f'Group "{group_name}" removed'} except Exception as e: return {'success': False, 'error': str(e)} def add_host_to_inventory(self, group: str, hostname: str, ip: str, ssh_user: str = 'pi', ssh_port: int = 22, use_key: bool = True, password: str = None) -> Dict: """Manually add a host to a specific inventory group.""" import re as _re if not _re.match(r'^[a-zA-Z0-9_.-]+$', hostname): return {'success': False, 'error': 'Invalid hostname (letters, digits, dot, hyphen, underscore only)'} try: data = self._read_inventory() children = data['all'].setdefault('children', {}) if group not in children: children[group] = {'hosts': {}} if children[group] is None: children[group] = {'hosts': {}} hosts = children[group].setdefault('hosts', {}) if hosts is None: children[group]['hosts'] = {} hosts = children[group]['hosts'] hvars = { 'ansible_host': ip, 'ansible_user': ssh_user, 'ansible_port': ssh_port, 'ansible_ssh_common_args': '-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null' } if use_key: hvars['ansible_ssh_private_key_file'] = str(self.ssh_key_path) elif password: hvars['ansible_password'] = password hosts[hostname] = hvars self._write_inventory(data) return {'success': True, 'message': f'Host "{hostname}" added to group "{group}"'} except Exception as e: return {'success': False, 'error': str(e)} def remove_host_from_inventory(self, group: str, hostname: str) -> Dict: """Remove a host from an inventory group.""" try: data = self._read_inventory() children = data['all'].get('children', {}) or {} group_data = children.get(group) or {} hosts = group_data.get('hosts') or {} if hostname not in hosts: return {'success': False, 'error': f'Host "{hostname}" not found in group "{group}"'} del hosts[hostname] self._write_inventory(data) return {'success': True, 'message': f'Host "{hostname}" removed from "{group}"'} except Exception as e: return {'success': False, 'error': str(e)} # ------------------------------------------------------------------ # # Legacy / compatibility # # ------------------------------------------------------------------ # def generate_dynamic_inventory(self) -> Dict: """Sync DB devices into inventory and return the full inventory dict.""" self.sync_devices_to_inventory() return self._read_inventory() def create_update_playbook(self) -> str: """Create Ansible playbook for device updates""" playbook_content = { 'name': 'Update monitoring devices', 'hosts': 'all', 'become': True, 'gather_facts': False, 'tasks': [ { 'name': 'Update apt cache', 'apt': { 'update_cache': True, 'cache_valid_time': 3600 } }, { 'name': 'Upgrade all packages', 'apt': { 'upgrade': 'dist', 'autoremove': True, 'autoclean': True }, 'register': 'upgrade_result' }, { 'name': 'Show upgrade result', 'debug': { 'msg': '{{ upgrade_result.stdout_lines }}' } }, { 'name': 'Clean up apt cache', 'apt': { 'autoclean': True } } ] } playbook_path = self.playbook_dir / "update_devices.yml" with open(playbook_path, 'w') as f: yaml.dump([playbook_content], f, default_flow_style=False) return str(playbook_path) def create_restart_service_playbook(self) -> str: """Create playbook for restarting device services""" playbook_content = { 'name': 'Restart monitoring service', 'hosts': 'all', 'become': True, 'tasks': [ { 'name': 'Stop prezenta service', 'systemd': { 'name': 'prezenta.service', 'state': 'stopped' } }, { 'name': 'Wait for service to stop', 'wait_for': { 'timeout': 10 } }, { 'name': 'Start prezenta service', 'systemd': { 'name': 'prezenta.service', 'state': 'started', 'enabled': True } }, { 'name': 'Verify service is running', 'systemd': { 'name': 'prezenta.service' }, 'register': 'service_status' }, { 'name': 'Report service restart', 'uri': { 'url': 'http://{{ ansible_controller_ip }}/api/service_restarted', 'method': 'POST', 'body_format': 'json', 'body': { 'hostname': '{{ inventory_hostname }}', 'device_ip': '{{ ansible_host }}', 'service_status': '{{ service_status.status.ActiveState }}' } } } ] } playbook_path = self.playbook_dir / "restart_service.yml" with open(playbook_path, 'w') as f: yaml.dump([playbook_content], f, default_flow_style=False) return str(playbook_path) def execute_playbook(self, playbook_name: str, limit_hosts: List[str] = None, extra_vars: Dict = None, priority: int = 5, max_retries: int = 0) -> Dict: """Execute Ansible playbook with enhanced tracking and queue management""" try: # Generate fresh inventory self.generate_dynamic_inventory() # Build ansible-playbook command playbook_path = self.playbook_dir / f"{playbook_name}.yml" if not playbook_path.exists(): return { 'success': False, 'error': f'Playbook {playbook_name} not found' } cmd = [ 'ansible-playbook', str(playbook_path.resolve()), '-i', str(self.inventory_file.resolve()), '-v' # Verbose output ] # Limit to specific hosts if provided if limit_hosts: cmd.extend(['--limit', ','.join(limit_hosts)]) # Add extra variables if extra_vars: cmd.extend(['--extra-vars', json.dumps(extra_vars)]) # Inject password auth vars if enabled (overrides per-host inventory vars) settings = self.load_settings() if settings.get('use_password_auth') and settings.get('ssh_fallback_password'): pwd = settings['ssh_fallback_password'] cmd.extend(['--extra-vars', json.dumps({ 'ansible_password': pwd, 'ansible_become_password': pwd, 'ansible_ssh_private_key_file': '', 'ansible_ssh_common_args': '-o PubkeyAuthentication=no -o PreferredAuthentications=password -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null' })]) # Create enhanced execution record using new model execution_id = str(uuid.uuid4()) with self.db.get_session() as session: execution = PlaybookExecution( execution_id=execution_id, playbook_name=playbook_name, playbook_description=self._get_playbook_description(playbook_name), target_hosts=json.dumps(limit_hosts or []), command_line=' '.join(cmd), extra_vars=json.dumps(extra_vars or {}), queued_at=datetime.utcnow(), started_at=datetime.utcnow(), status='running', priority=priority, max_retries=max_retries, total_hosts=len(limit_hosts) if limit_hosts else 0 ) session.add(execution) session.flush() execution_db_id = execution.id # Execute playbook with tempfile.NamedTemporaryFile(mode='w+', suffix='.log', delete=False) as log_file: log_file_path = log_file.name env = os.environ.copy() env['PYTHONUNBUFFERED'] = '1' env['ANSIBLE_FORCE_COLOR'] = '0' env['ANSIBLE_NOCOLOR'] = '1' env['ANSIBLE_CONFIG'] = str(self.ansible_cfg_path.resolve()) process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, cwd=str(self.ansible_dir), env=env, ) stdout, stderr = process.communicate() # Update execution record with results with self.db.get_session() as session: execution = session.query(PlaybookExecution).get(execution_db_id) execution.completed_at = datetime.utcnow() execution.exit_code = process.returncode execution.stdout_log = stdout execution.stderr_log = stderr execution.ansible_log_file = log_file_path # Always parse recap stats regardless of exit code — # Ansible exits non-zero when any host fails/is unreachable. self._parse_ansible_results_enhanced(execution, stdout) if process.returncode == 0: execution.status = 'completed' execution.summary_message = 'Playbook executed successfully' else: execution.status = 'failed' execution.summary_message = f'Playbook failed with exit code {process.returncode}' # Check if retry is needed if execution.retry_count < max_retries: execution.status = 'retry_pending' # Write logs to file with open(log_file_path, 'w') as f: f.write(f"STDOUT:\n{stdout}\n\nSTDERR:\n{stderr}\n") return { 'success': process.returncode == 0, 'execution_id': execution_id, 'stdout': stdout, 'stderr': stderr, 'exit_code': process.returncode, 'log_file': log_file_path, 'error': stderr if process.returncode != 0 else None } except Exception as e: logging.error(f"Error executing playbook {playbook_name}: {e}") return { 'success': False, 'error': str(e) } # ------------------------------------------------------------------ # # Async execution (background thread + live log streaming) # # ------------------------------------------------------------------ # def execute_playbook_async(self, playbook_name: str, limit_hosts: List[str] = None, extra_vars: Dict = None, priority: int = 5, max_retries: int = 0, force_password_auth: bool = False) -> Dict: """ Start a playbook in a background thread. Returns immediately with the execution_id so the caller can poll /live. force_password_auth=True overrides the use_password_auth setting and always injects password vars — used by distribute_ssh_keys which must run before keys are deployed. """ try: self.generate_dynamic_inventory() playbook_path = self.playbook_dir / f"{playbook_name}.yml" if not playbook_path.exists(): return {'success': False, 'error': f'Playbook {playbook_name} not found'} cmd = [ 'ansible-playbook', str(playbook_path.resolve()), '-i', str(self.inventory_file.resolve()), '-v', ] if limit_hosts: cmd.extend(['--limit', ','.join(limit_hosts)]) if extra_vars: # Pass all extra vars as a single JSON string to avoid value-quoting issues cmd.extend(['--extra-vars', json.dumps(extra_vars)]) # Inject password auth vars if enabled OR forced settings = self.load_settings() if (force_password_auth or settings.get('use_password_auth')) and settings.get('ssh_fallback_password'): pwd = settings['ssh_fallback_password'] cmd.extend(['--extra-vars', json.dumps({ 'ansible_password': pwd, 'ansible_become_password': pwd, 'ansible_ssh_private_key_file': '', 'ansible_ssh_common_args': '-o PubkeyAuthentication=no -o PreferredAuthentications=password -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null' })]) # Create a persistent log file (NOT deleted on close) log_fd, log_file_path = tempfile.mkstemp(suffix='.log', prefix='ansible_') os.close(log_fd) execution_id = str(uuid.uuid4()) with self.db.get_session() as session: execution = PlaybookExecution( execution_id=execution_id, playbook_name=playbook_name, playbook_description=self._get_playbook_description(playbook_name), target_hosts=json.dumps(limit_hosts or []), command_line=' '.join(cmd), extra_vars=json.dumps(extra_vars or {}), queued_at=datetime.utcnow(), started_at=datetime.utcnow(), status='running', priority=priority, max_retries=max_retries, total_hosts=len(limit_hosts) if limit_hosts else 0, ansible_log_file=log_file_path, ) session.add(execution) session.flush() execution_db_id = execution.id thread = threading.Thread( target=self._run_playbook_thread, args=(execution_db_id, execution_id, cmd, log_file_path, max_retries), daemon=True, ) thread.start() return {'success': True, 'execution_id': execution_id} except Exception as e: logging.error(f"Error starting async playbook {playbook_name}: {e}") return {'success': False, 'error': str(e)} def _run_playbook_thread(self, execution_db_id: int, execution_id: str, cmd: List[str], log_file_path: str, max_retries: int): """Background worker: streams stdout/stderr to log file, updates DB on completion.""" try: # Build subprocess env: PYTHONUNBUFFERED forces ansible (Python-based) to # flush each line immediately instead of block-buffering through the pipe. env = os.environ.copy() env['PYTHONUNBUFFERED'] = '1' env['ANSIBLE_FORCE_COLOR'] = '0' env['ANSIBLE_NOCOLOR'] = '1' env['ANSIBLE_CONFIG'] = str(self.ansible_cfg_path.resolve()) process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # merge stderr into stdout text=True, bufsize=1, # line-buffered on the read side cwd=str(self.ansible_dir), env=env, ) with open(log_file_path, 'w') as lf: # Write a startup marker immediately so the UI has something to show lf.write(f'--- ansible-playbook started (pid {process.pid}) ---\n') lf.write(f'Command: {" ".join(cmd)}\n') lf.write('---\n') lf.flush() # Explicit readline loop — avoids Python's read-ahead buffer # that the `for line in process.stdout` iterator uses. while True: line = process.stdout.readline() if line: lf.write(line) lf.flush() # flush after every line for live view elif process.poll() is not None: break process.wait() # Read full output for DB storage with open(log_file_path, 'r') as lf: full_output = lf.read() with self.db.get_session() as session: execution = session.query(PlaybookExecution).get(execution_db_id) if execution: execution.completed_at = datetime.utcnow() execution.exit_code = process.returncode execution.stdout_log = full_output # Always parse recap stats regardless of exit code — # Ansible exits non-zero when any host fails/is unreachable. self._parse_ansible_results_enhanced(execution, full_output) if process.returncode == 0: execution.status = 'completed' execution.summary_message = 'Playbook executed successfully' else: execution.status = 'failed' execution.summary_message = f'Playbook failed (exit {process.returncode})' if execution.retry_count < max_retries: execution.status = 'retry_pending' except Exception as e: logging.error(f"Background playbook thread error [{execution_id}]: {e}") try: with self.db.get_session() as session: execution = session.query(PlaybookExecution).get(execution_db_id) if execution: execution.status = 'failed' execution.summary_message = str(e) execution.completed_at = datetime.utcnow() except Exception: pass def get_live_execution(self, execution_id: str) -> Dict: """Return current status + log content for a running or finished execution.""" try: with self.db.get_session() as session: execution = session.query(PlaybookExecution).filter_by( execution_id=execution_id ).first() if not execution: return {'success': False, 'error': 'Execution not found'} log_content = '' log_file = execution.ansible_log_file if log_file and os.path.exists(log_file): try: with open(log_file, 'r') as f: log_content = f.read() except Exception: log_content = execution.stdout_log or '' else: log_content = execution.stdout_log or '' if not log_content and execution.status == 'running': log_content = f'Waiting for ansible-playbook to produce output...\nCommand: {execution.command_line or ""}' return { 'success': True, 'execution_id': execution_id, 'status': execution.status, 'playbook_name': execution.playbook_name, 'target_hosts': json.loads(execution.target_hosts) if execution.target_hosts else [], 'started_at': execution.started_at.isoformat() if execution.started_at else None, 'completed_at': execution.completed_at.isoformat() if execution.completed_at else None, 'successful_hosts': execution.successful_hosts, 'failed_hosts': execution.failed_hosts, 'unreachable_hosts': execution.unreachable_hosts, 'exit_code': execution.exit_code, 'summary_message': execution.summary_message, 'log': log_content, } except Exception as e: logging.error(f"Error fetching live execution {execution_id}: {e}") return {'success': False, 'error': str(e)} def _parse_ansible_results_enhanced(self, execution: PlaybookExecution, output: str): """Parse Ansible PLAY RECAP output for result statistics.""" import re successful_hosts = 0 failed_hosts = 0 unreachable_hosts = 0 skipped_hosts = 0 changed_hosts = 0 # Match PLAY RECAP lines: # "RPI-FOO : ok=4 changed=1 unreachable=0 failed=0 skipped=0 rescued=0 ignored=0" recap_re = re.compile( r'ok=(\d+)\s+changed=(\d+)\s+unreachable=(\d+)\s+failed=(\d+)' ) for line in output.split('\n'): m = recap_re.search(line) if not m: continue ok = int(m.group(1)) changed = int(m.group(2)) unreachable = int(m.group(3)) failed = int(m.group(4)) if unreachable > 0: unreachable_hosts += 1 elif failed > 0: failed_hosts += 1 else: successful_hosts += 1 if changed > 0: changed_hosts += 1 execution.successful_hosts = successful_hosts execution.failed_hosts = failed_hosts execution.unreachable_hosts = unreachable_hosts execution.skipped_hosts = skipped_hosts execution.changed_hosts = changed_hosts def _get_playbook_description(self, playbook_name: str) -> str: """Get user-friendly description for playbook""" descriptions = { 'update_devices': 'Update all packages and monitoring software on devices', 'restart_service': 'Restart monitoring services on selected devices', 'system_health': 'Check system health and monitoring status', 'maintenance_mode': 'Put devices in maintenance mode', 'distribute_ssh_keys': 'Push server public key to all devices using password auth', } return descriptions.get(playbook_name, f'Execute {playbook_name} playbook') def create_distribute_ssh_keys_playbook(self) -> str: """Ensure the distribute_ssh_keys playbook file exists (ships with the repo).""" playbook_path = self.playbook_dir / 'distribute_ssh_keys.yml' if not playbook_path.exists(): logging.warning('distribute_ssh_keys.yml not found — playbook file is missing') return str(playbook_path) def create_system_health_playbook(self) -> str: """Create system health check playbook""" playbook_content = { 'name': 'System Health Check', 'hosts': 'all', 'become': True, 'gather_facts': True, 'tasks': [ { 'name': 'Check disk usage', 'shell': 'df -h', 'register': 'disk_usage' }, { 'name': 'Check memory usage', 'shell': 'free -m', 'register': 'memory_usage' }, { 'name': 'Check system uptime', 'shell': 'uptime', 'register': 'system_uptime' }, { 'name': 'Check running services', 'shell': 'systemctl list-units --type=service --state=running | grep -E "(ssh|monitoring|python)"', 'register': 'running_services', 'ignore_errors': True }, { 'name': 'Check network connectivity', 'shell': 'ping -c 3 8.8.8.8', 'register': 'network_test', 'ignore_errors': True }, { 'name': 'Display health summary', 'debug': { 'msg': [ '=== SYSTEM HEALTH REPORT ===', 'Disk Usage: {{ disk_usage.stdout_lines[0] if disk_usage.stdout_lines else "N/A" }}', 'Memory: {{ memory_usage.stdout_lines[1] if memory_usage.stdout_lines|length > 1 else "N/A" }}', 'Uptime: {{ system_uptime.stdout if system_uptime.stdout else "N/A" }}', 'Network: {{ "OK" if network_test.rc == 0 else "FAILED" }}', 'Services: {{ running_services.stdout_lines|length if running_services.stdout_lines else 0 }} monitoring services running' ] } } ] } self.playbook_dir.mkdir(exist_ok=True) playbook_path = self.playbook_dir / "system_health.yml" with open(playbook_path, 'w') as f: yaml.dump([playbook_content], f, default_flow_style=False) return str(playbook_path) def _parse_ansible_results(self, execution: AnsibleExecution, output: str): """Parse Ansible output for result statistics""" lines = output.split('\n') for line in lines: if 'ok=' in line and 'changed=' in line: # Parse line like: "host1: ok=4 changed=2 unreachable=0 failed=0" if 'failed=0' in line or 'failed=0 ' in line: execution.successful_hosts += 1 else: execution.failed_hosts += 1 if 'unreachable=' in line: unreachable = int(line.split('unreachable=')[1].split()[0]) execution.unreachable_hosts += unreachable def test_password_auth(self, device_ip: str, password: str, username: str = 'pi', port: int = 22) -> Dict: """ Test SSH connectivity using password-only authentication (no key fallback). Uses sshpass so we can confirm the exact password works before deploying keys. """ try: # Quick TCP reachability check first import socket with socket.create_connection((device_ip, port), timeout=5): pass except (OSError, ConnectionRefusedError) as e: return {'success': False, 'reachable': False, 'error': f'Host unreachable on port {port}: {e}'} try: result = subprocess.run( [ 'sshpass', '-p', password, 'ssh', '-o', 'PubkeyAuthentication=no', '-o', 'PreferredAuthentications=password', '-o', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null', '-o', f'ConnectTimeout=8', '-p', str(port), f'{username}@{device_ip}', 'echo OK', ], capture_output=True, text=True, timeout=15, ) if result.returncode == 0 and 'OK' in result.stdout: return {'success': True, 'reachable': True, 'message': f'Password authentication succeeded for {username}@{device_ip}'} else: stderr = (result.stderr or '').strip() return {'success': False, 'reachable': True, 'error': f'Authentication failed — {stderr or "wrong password"}'} except subprocess.TimeoutExpired: return {'success': False, 'reachable': True, 'error': 'SSH command timed out'} except FileNotFoundError: return {'success': False, 'reachable': True, 'error': 'sshpass not installed — run: sudo apt-get install sshpass'} except Exception as e: return {'success': False, 'reachable': True, 'error': str(e)} def test_ssh_connectivity(self, device_ip: str, username: str = 'pi') -> Dict: """Test SSH connectivity to a device""" try: client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Try with SSH key first, then password try: client.connect( device_ip, username=username, key_filename=str(self.ssh_key_path), timeout=10 ) except paramiko.AuthenticationException: # Fallback to configurable password fallback_pw = self.load_settings().get('ssh_fallback_password', 'raspberry') client.connect( device_ip, username=username, password=fallback_pw, timeout=10 ) # Test command execution stdin, stdout, stderr = client.exec_command('uptime') uptime_output = stdout.read().decode() client.close() return { 'success': True, 'message': 'SSH connection successful', 'uptime': uptime_output.strip() } except Exception as e: return { 'success': False, 'error': str(e) } def bulk_ssh_test(self, device_ips: List[str]) -> Dict: """Test SSH connectivity to multiple devices in parallel""" results = {} with ThreadPoolExecutor(max_workers=10) as executor: future_to_ip = { executor.submit(self.test_ssh_connectivity, ip): ip for ip in device_ips } for future in as_completed(future_to_ip): ip = future_to_ip[future] try: result = future.result() results[ip] = result except Exception as e: results[ip] = { 'success': False, 'error': str(e) } return results def setup_ssh_keys(self) -> Dict: """Setup SSH keys for Ansible authentication""" try: key_path = Path(self.ssh_key_path) key_path.parent.mkdir(exist_ok=True) if not key_path.exists(): # Generate new SSH key pair subprocess.run([ 'ssh-keygen', '-t', 'rsa', '-b', '4096', '-f', str(key_path), '-N', '', # No passphrase '-C', 'ansible@monitoring-server' ], check=True) # Set proper permissions key_path.chmod(0o600) return { 'success': True, 'message': 'SSH key pair generated', 'public_key_path': f"{key_path}.pub", 'private_key_path': str(key_path) } else: return { 'success': True, 'message': 'SSH key already exists', 'public_key_path': f"{key_path}.pub", 'private_key_path': str(key_path) } except Exception as e: logging.error(f"Error setting up SSH keys: {e}") return { 'success': False, 'error': str(e) } def get_execution_history(self, limit: int = 50) -> List[Dict]: """Get Ansible execution history using enhanced PlaybookExecution model""" try: with self.db.get_session() as session: executions = session.query(PlaybookExecution).order_by( PlaybookExecution.queued_at.desc() ).limit(limit).all() return [{ 'id': exec.id, 'execution_id': exec.execution_id, 'playbook_name': exec.playbook_name, 'playbook_description': exec.playbook_description, 'queued_at': exec.queued_at.isoformat() if exec.queued_at else None, 'started_at': exec.started_at.isoformat() if exec.started_at else None, 'completed_at': exec.completed_at.isoformat() if exec.completed_at else None, 'status': exec.status, 'priority': exec.priority, 'retry_count': exec.retry_count, 'max_retries': exec.max_retries, 'exit_code': exec.exit_code, 'total_hosts': exec.total_hosts, 'successful_hosts': exec.successful_hosts, 'failed_hosts': exec.failed_hosts, 'unreachable_hosts': exec.unreachable_hosts, 'skipped_hosts': exec.skipped_hosts, 'changed_hosts': exec.changed_hosts, 'summary_message': exec.summary_message, 'duration': exec.duration, 'duration_formatted': exec.duration_formatted } for exec in executions] except Exception as e: logging.error(f"Error getting execution history: {e}") return []