Initial commit — Server_Monitorizare_v2

This commit is contained in:
ske087
2026-04-23 15:55:46 +03:00
commit d2485e4c66
61 changed files with 13861 additions and 0 deletions

0
app/services/__init__.py Normal file
View File

View File

@@ -0,0 +1,925 @@
"""
SSH and Ansible management service for remote device operations
"""
import os
import json
import subprocess
import tempfile
import threading
import paramiko
import yaml
import uuid
from typing import Dict, List, Optional, Tuple
from datetime import datetime
from pathlib import Path
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from app.models import Device, AnsibleExecution, PlaybookExecution
from config.database_config import get_db
class AnsibleService:
"""Service for managing remote devices via SSH and Ansible"""
SETTINGS_FILE = Path("data/ansible_settings.json")
DEFAULT_SETTINGS = {
"ssh_fallback_password": "raspberry",
}
def __init__(self):
self.db = get_db()
self.ansible_dir = Path("ansible")
self.inventory_file = self.ansible_dir / "inventory" / "dynamic_inventory.yaml"
self.playbook_dir = self.ansible_dir / "playbooks"
self.ssh_key_path = Path.home() / ".ssh" / "ansible_key"
# Ensure directories exist
self.ansible_dir.mkdir(exist_ok=True)
(self.ansible_dir / "inventory").mkdir(exist_ok=True)
(self.ansible_dir / "playbooks").mkdir(exist_ok=True)
(self.ansible_dir / "roles").mkdir(exist_ok=True)
# ------------------------------------------------------------------ #
# Settings helpers #
# ------------------------------------------------------------------ #
def load_settings(self) -> Dict:
"""Load ansible settings from data/ansible_settings.json."""
settings = dict(self.DEFAULT_SETTINGS)
if self.SETTINGS_FILE.exists():
try:
with open(self.SETTINGS_FILE, 'r') as f:
stored = json.load(f)
settings.update(stored)
except Exception as e:
logging.error(f"Error reading ansible settings: {e}")
return settings
def save_settings(self, settings: Dict):
"""Persist ansible settings to data/ansible_settings.json."""
self.SETTINGS_FILE.parent.mkdir(parents=True, exist_ok=True)
current = self.load_settings()
current.update(settings)
with open(self.SETTINGS_FILE, 'w') as f:
json.dump(current, f, indent=2)
logging.info("Ansible settings saved")
# ------------------------------------------------------------------ #
# Inventory file helpers #
# ------------------------------------------------------------------ #
def _read_inventory(self) -> Dict:
"""Read inventory YAML file and return parsed dict (safe)."""
if self.inventory_file.exists():
try:
with open(self.inventory_file, 'r') as f:
data = yaml.safe_load(f) or {}
if 'all' not in data:
data['all'] = {'children': {}}
if 'children' not in (data['all'] or {}):
data['all']['children'] = {}
return data
except Exception as e:
logging.error(f"Error reading inventory file: {e}")
return {'all': {'children': {}}}
def _write_inventory(self, data: Dict):
"""Write inventory dict to YAML file."""
self.inventory_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.inventory_file, 'w') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True)
def get_inventory_data(self) -> Dict:
"""Return structured inventory data for display (groups + hosts)."""
data = self._read_inventory()
groups = {}
children = data.get('all', {}).get('children', {}) or {}
for group_name, group_data in children.items():
hosts = []
group_data = group_data or {}
for hostname, host_vars in (group_data.get('hosts') or {}).items():
entry = {'hostname': hostname}
entry.update(host_vars or {})
hosts.append(entry)
groups[group_name] = {
'hosts': hosts,
'vars': group_data.get('vars', {}) or {}
}
raw = ''
if self.inventory_file.exists():
try:
with open(self.inventory_file, 'r') as f:
raw = f.read()
except Exception:
pass
return {'groups': groups, 'raw_yaml': raw}
# ------------------------------------------------------------------ #
# Inventory CRUD #
# ------------------------------------------------------------------ #
def sync_devices_to_inventory(self) -> Dict:
"""Sync all active DB devices into monitoring_devices group.
Preserves all other custom groups already in the inventory."""
try:
import re as _re
data = self._read_inventory()
children = data['all'].setdefault('children', {})
# Reset only monitoring_devices group
children['monitoring_devices'] = {'hosts': {}}
synced = 0
with self.db.get_session() as session:
devices = session.query(Device).filter_by(status='active').all()
for device in devices:
if device.device_ip == '127.0.0.1' or device.hostname == 'localhost':
hvars = {
'ansible_connection': 'local',
'ansible_host': '127.0.0.1'
}
else:
hvars = {
'ansible_host': device.device_ip,
'ansible_user': 'pi',
'ansible_ssh_private_key_file': str(self.ssh_key_path),
'ansible_ssh_common_args': '-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null'
}
children['monitoring_devices']['hosts'][device.hostname] = hvars
synced += 1
self._write_inventory(data)
return {'success': True, 'synced': synced,
'message': f'Synced {synced} device(s) to monitoring_devices group'}
except Exception as e:
logging.error(f"Error syncing devices to inventory: {e}")
return {'success': False, 'error': str(e)}
def add_group_to_inventory(self, group_name: str) -> Dict:
"""Add a new empty group to the inventory."""
import re as _re
if not _re.match(r'^[a-zA-Z0-9_-]+$', group_name):
return {'success': False,
'error': 'Group name may only contain letters, numbers, underscores and hyphens'}
try:
data = self._read_inventory()
children = data['all'].setdefault('children', {})
if group_name in children:
return {'success': False, 'error': f'Group "{group_name}" already exists'}
children[group_name] = {'hosts': {}}
self._write_inventory(data)
return {'success': True, 'message': f'Group "{group_name}" created'}
except Exception as e:
return {'success': False, 'error': str(e)}
def remove_group_from_inventory(self, group_name: str) -> Dict:
"""Remove a custom group from the inventory."""
if group_name == 'monitoring_devices':
return {'success': False,
'error': 'Cannot remove the default monitoring_devices group'}
try:
data = self._read_inventory()
children = data['all'].get('children', {}) or {}
if group_name not in children:
return {'success': False, 'error': f'Group "{group_name}" not found'}
del children[group_name]
self._write_inventory(data)
return {'success': True, 'message': f'Group "{group_name}" removed'}
except Exception as e:
return {'success': False, 'error': str(e)}
def add_host_to_inventory(self, group: str, hostname: str, ip: str,
ssh_user: str = 'pi', ssh_port: int = 22,
use_key: bool = True, password: str = None) -> Dict:
"""Manually add a host to a specific inventory group."""
import re as _re
if not _re.match(r'^[a-zA-Z0-9_.-]+$', hostname):
return {'success': False, 'error': 'Invalid hostname (letters, digits, dot, hyphen, underscore only)'}
try:
data = self._read_inventory()
children = data['all'].setdefault('children', {})
if group not in children:
children[group] = {'hosts': {}}
if children[group] is None:
children[group] = {'hosts': {}}
hosts = children[group].setdefault('hosts', {})
if hosts is None:
children[group]['hosts'] = {}
hosts = children[group]['hosts']
hvars = {
'ansible_host': ip,
'ansible_user': ssh_user,
'ansible_port': ssh_port,
'ansible_ssh_common_args': '-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null'
}
if use_key:
hvars['ansible_ssh_private_key_file'] = str(self.ssh_key_path)
elif password:
hvars['ansible_password'] = password
hosts[hostname] = hvars
self._write_inventory(data)
return {'success': True, 'message': f'Host "{hostname}" added to group "{group}"'}
except Exception as e:
return {'success': False, 'error': str(e)}
def remove_host_from_inventory(self, group: str, hostname: str) -> Dict:
"""Remove a host from an inventory group."""
try:
data = self._read_inventory()
children = data['all'].get('children', {}) or {}
group_data = children.get(group) or {}
hosts = group_data.get('hosts') or {}
if hostname not in hosts:
return {'success': False,
'error': f'Host "{hostname}" not found in group "{group}"'}
del hosts[hostname]
self._write_inventory(data)
return {'success': True, 'message': f'Host "{hostname}" removed from "{group}"'}
except Exception as e:
return {'success': False, 'error': str(e)}
# ------------------------------------------------------------------ #
# Legacy / compatibility #
# ------------------------------------------------------------------ #
def generate_dynamic_inventory(self) -> Dict:
"""Sync DB devices into inventory and return the full inventory dict."""
self.sync_devices_to_inventory()
return self._read_inventory()
def create_update_playbook(self) -> str:
"""Create Ansible playbook for device updates"""
playbook_content = {
'name': 'Update monitoring devices',
'hosts': 'all',
'become': True,
'gather_facts': True,
'tasks': [
{
'name': 'Update apt cache',
'apt': {
'update_cache': True,
'cache_valid_time': 3600
}
},
{
'name': 'Upgrade all packages',
'apt': {
'upgrade': 'dist',
'autoremove': True,
'autoclean': True
},
'register': 'upgrade_result'
},
{
'name': 'Restart device if required',
'reboot': {
'reboot_timeout': 600
},
'when': 'upgrade_result.changed'
},
{
'name': 'Check service status',
'systemd': {
'name': 'prezenta.service',
'state': 'started'
}
},
{
'name': 'Report update completion',
'uri': {
'url': 'http://{{ ansible_controller_ip }}/api/update_complete',
'method': 'POST',
'body_format': 'json',
'body': {
'hostname': '{{ inventory_hostname }}',
'device_ip': '{{ ansible_host }}',
'status': 'completed',
'packages_updated': '{{ upgrade_result.stdout_lines | length }}'
}
}
}
]
}
playbook_path = self.playbook_dir / "update_devices.yml"
with open(playbook_path, 'w') as f:
yaml.dump([playbook_content], f, default_flow_style=False)
return str(playbook_path)
def create_restart_service_playbook(self) -> str:
"""Create playbook for restarting device services"""
playbook_content = {
'name': 'Restart monitoring service',
'hosts': 'all',
'become': True,
'tasks': [
{
'name': 'Stop prezenta service',
'systemd': {
'name': 'prezenta.service',
'state': 'stopped'
}
},
{
'name': 'Wait for service to stop',
'wait_for': {
'timeout': 10
}
},
{
'name': 'Start prezenta service',
'systemd': {
'name': 'prezenta.service',
'state': 'started',
'enabled': True
}
},
{
'name': 'Verify service is running',
'systemd': {
'name': 'prezenta.service'
},
'register': 'service_status'
},
{
'name': 'Report service restart',
'uri': {
'url': 'http://{{ ansible_controller_ip }}/api/service_restarted',
'method': 'POST',
'body_format': 'json',
'body': {
'hostname': '{{ inventory_hostname }}',
'device_ip': '{{ ansible_host }}',
'service_status': '{{ service_status.status.ActiveState }}'
}
}
}
]
}
playbook_path = self.playbook_dir / "restart_service.yml"
with open(playbook_path, 'w') as f:
yaml.dump([playbook_content], f, default_flow_style=False)
return str(playbook_path)
def execute_playbook(self, playbook_name: str, limit_hosts: List[str] = None,
extra_vars: Dict = None, priority: int = 5, max_retries: int = 0) -> Dict:
"""Execute Ansible playbook with enhanced tracking and queue management"""
try:
# Generate fresh inventory
self.generate_dynamic_inventory()
# Build ansible-playbook command
playbook_path = self.playbook_dir / f"{playbook_name}.yml"
if not playbook_path.exists():
return {
'success': False,
'error': f'Playbook {playbook_name} not found'
}
cmd = [
'ansible-playbook',
str(playbook_path.resolve()),
'-i', str(self.inventory_file.resolve()),
'-v' # Verbose output
]
# Limit to specific hosts if provided
if limit_hosts:
cmd.extend(['--limit', ','.join(limit_hosts)])
# Add extra variables
if extra_vars:
cmd.extend(['--extra-vars', json.dumps(extra_vars)])
# Create enhanced execution record using new model
execution_id = str(uuid.uuid4())
with self.db.get_session() as session:
execution = PlaybookExecution(
execution_id=execution_id,
playbook_name=playbook_name,
playbook_description=self._get_playbook_description(playbook_name),
target_hosts=json.dumps(limit_hosts or []),
command_line=' '.join(cmd),
extra_vars=json.dumps(extra_vars or {}),
queued_at=datetime.utcnow(),
started_at=datetime.utcnow(),
status='running',
priority=priority,
max_retries=max_retries,
total_hosts=len(limit_hosts) if limit_hosts else 0
)
session.add(execution)
session.flush()
execution_db_id = execution.id
# Execute playbook
with tempfile.NamedTemporaryFile(mode='w+', suffix='.log', delete=False) as log_file:
log_file_path = log_file.name
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=str(self.ansible_dir)
)
stdout, stderr = process.communicate()
# Update execution record with results
with self.db.get_session() as session:
execution = session.query(PlaybookExecution).get(execution_db_id)
execution.completed_at = datetime.utcnow()
execution.exit_code = process.returncode
execution.stdout_log = stdout
execution.stderr_log = stderr
execution.ansible_log_file = log_file_path
if process.returncode == 0:
execution.status = 'completed'
execution.summary_message = 'Playbook executed successfully'
# Parse stdout for success/failure counts
self._parse_ansible_results_enhanced(execution, stdout)
else:
execution.status = 'failed'
execution.summary_message = f'Playbook failed with exit code {process.returncode}'
# Check if retry is needed
if execution.retry_count < max_retries:
execution.status = 'retry_pending'
# Write logs to file
with open(log_file_path, 'w') as f:
f.write(f"STDOUT:\n{stdout}\n\nSTDERR:\n{stderr}\n")
return {
'success': process.returncode == 0,
'execution_id': execution_id,
'stdout': stdout,
'stderr': stderr,
'exit_code': process.returncode,
'log_file': log_file_path,
'error': stderr if process.returncode != 0 else None
}
except Exception as e:
logging.error(f"Error executing playbook {playbook_name}: {e}")
return {
'success': False,
'error': str(e)
}
# ------------------------------------------------------------------ #
# Async execution (background thread + live log streaming) #
# ------------------------------------------------------------------ #
def execute_playbook_async(self, playbook_name: str, limit_hosts: List[str] = None,
extra_vars: Dict = None, priority: int = 5,
max_retries: int = 0) -> Dict:
"""
Start a playbook in a background thread.
Returns immediately with the execution_id so the caller can poll /live.
"""
try:
self.generate_dynamic_inventory()
playbook_path = self.playbook_dir / f"{playbook_name}.yml"
if not playbook_path.exists():
return {'success': False, 'error': f'Playbook {playbook_name} not found'}
cmd = [
'ansible-playbook',
str(playbook_path.resolve()),
'-i', str(self.inventory_file.resolve()),
'-v',
]
if limit_hosts:
cmd.extend(['--limit', ','.join(limit_hosts)])
if extra_vars:
# Pass all extra vars as a single JSON string to avoid value-quoting issues
cmd.extend(['--extra-vars', json.dumps(extra_vars)])
# Create a persistent log file (NOT deleted on close)
log_fd, log_file_path = tempfile.mkstemp(suffix='.log', prefix='ansible_')
os.close(log_fd)
execution_id = str(uuid.uuid4())
with self.db.get_session() as session:
execution = PlaybookExecution(
execution_id=execution_id,
playbook_name=playbook_name,
playbook_description=self._get_playbook_description(playbook_name),
target_hosts=json.dumps(limit_hosts or []),
command_line=' '.join(cmd),
extra_vars=json.dumps(extra_vars or {}),
queued_at=datetime.utcnow(),
started_at=datetime.utcnow(),
status='running',
priority=priority,
max_retries=max_retries,
total_hosts=len(limit_hosts) if limit_hosts else 0,
ansible_log_file=log_file_path,
)
session.add(execution)
session.flush()
execution_db_id = execution.id
thread = threading.Thread(
target=self._run_playbook_thread,
args=(execution_db_id, execution_id, cmd, log_file_path, max_retries),
daemon=True,
)
thread.start()
return {'success': True, 'execution_id': execution_id}
except Exception as e:
logging.error(f"Error starting async playbook {playbook_name}: {e}")
return {'success': False, 'error': str(e)}
def _run_playbook_thread(self, execution_db_id: int, execution_id: str,
cmd: List[str], log_file_path: str, max_retries: int):
"""Background worker: streams stdout/stderr to log file, updates DB on completion."""
try:
# Build subprocess env: PYTHONUNBUFFERED forces ansible (Python-based) to
# flush each line immediately instead of block-buffering through the pipe.
env = os.environ.copy()
env['PYTHONUNBUFFERED'] = '1'
env['ANSIBLE_FORCE_COLOR'] = '0'
env['ANSIBLE_NOCOLOR'] = '1'
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # merge stderr into stdout
text=True,
bufsize=1, # line-buffered on the read side
cwd=str(self.ansible_dir),
env=env,
)
with open(log_file_path, 'w') as lf:
# Write a startup marker immediately so the UI has something to show
lf.write(f'--- ansible-playbook started (pid {process.pid}) ---\n')
lf.write(f'Command: {" ".join(cmd)}\n')
lf.write('---\n')
lf.flush()
# Explicit readline loop — avoids Python's read-ahead buffer
# that the `for line in process.stdout` iterator uses.
while True:
line = process.stdout.readline()
if line:
lf.write(line)
lf.flush() # flush after every line for live view
elif process.poll() is not None:
break
process.wait()
# Read full output for DB storage
with open(log_file_path, 'r') as lf:
full_output = lf.read()
with self.db.get_session() as session:
execution = session.query(PlaybookExecution).get(execution_db_id)
if execution:
execution.completed_at = datetime.utcnow()
execution.exit_code = process.returncode
execution.stdout_log = full_output
if process.returncode == 0:
execution.status = 'completed'
execution.summary_message = 'Playbook executed successfully'
self._parse_ansible_results_enhanced(execution, full_output)
else:
execution.status = 'failed'
execution.summary_message = f'Playbook failed (exit {process.returncode})'
if execution.retry_count < max_retries:
execution.status = 'retry_pending'
except Exception as e:
logging.error(f"Background playbook thread error [{execution_id}]: {e}")
try:
with self.db.get_session() as session:
execution = session.query(PlaybookExecution).get(execution_db_id)
if execution:
execution.status = 'failed'
execution.summary_message = str(e)
execution.completed_at = datetime.utcnow()
except Exception:
pass
def get_live_execution(self, execution_id: str) -> Dict:
"""Return current status + log content for a running or finished execution."""
try:
with self.db.get_session() as session:
execution = session.query(PlaybookExecution).filter_by(
execution_id=execution_id
).first()
if not execution:
return {'success': False, 'error': 'Execution not found'}
log_content = ''
log_file = execution.ansible_log_file
if log_file and os.path.exists(log_file):
try:
with open(log_file, 'r') as f:
log_content = f.read()
except Exception:
log_content = execution.stdout_log or ''
else:
log_content = execution.stdout_log or ''
if not log_content and execution.status == 'running':
log_content = f'Waiting for ansible-playbook to produce output...\nCommand: {execution.command_line or ""}'
return {
'success': True,
'execution_id': execution_id,
'status': execution.status,
'playbook_name': execution.playbook_name,
'target_hosts': json.loads(execution.target_hosts) if execution.target_hosts else [],
'started_at': execution.started_at.isoformat() if execution.started_at else None,
'completed_at': execution.completed_at.isoformat() if execution.completed_at else None,
'successful_hosts': execution.successful_hosts,
'failed_hosts': execution.failed_hosts,
'unreachable_hosts': execution.unreachable_hosts,
'exit_code': execution.exit_code,
'summary_message': execution.summary_message,
'log': log_content,
}
except Exception as e:
logging.error(f"Error fetching live execution {execution_id}: {e}")
return {'success': False, 'error': str(e)}
def _parse_ansible_results_enhanced(self, execution: PlaybookExecution, output: str):
"""Parse Ansible output for enhanced result statistics"""
lines = output.split('\n')
successful_hosts = 0
failed_hosts = 0
unreachable_hosts = 0
skipped_hosts = 0
changed_hosts = 0
for line in lines:
if 'ok=' in line and 'changed=' in line:
# Parse line like: "host1: ok=4 changed=2 unreachable=0 failed=0"
try:
if 'failed=0' in line:
successful_hosts += 1
else:
failed_count = int(line.split('failed=')[1].split()[0])
if failed_count > 0:
failed_hosts += 1
else:
successful_hosts += 1
if 'unreachable=' in line:
unreachable = int(line.split('unreachable=')[1].split()[0])
if unreachable > 0:
unreachable_hosts += 1
if 'skipped=' in line:
skipped = int(line.split('skipped=')[1].split()[0])
if skipped > 0:
skipped_hosts += 1
if 'changed=' in line:
changed = int(line.split('changed=')[1].split()[0])
if changed > 0:
changed_hosts += 1
except (ValueError, IndexError):
# Skip malformed lines
continue
# Update execution record
execution.successful_hosts = successful_hosts
execution.failed_hosts = failed_hosts
execution.unreachable_hosts = unreachable_hosts
execution.skipped_hosts = skipped_hosts
execution.changed_hosts = changed_hosts
def _get_playbook_description(self, playbook_name: str) -> str:
"""Get user-friendly description for playbook"""
descriptions = {
'update_devices': 'Update all packages and monitoring software on devices',
'restart_service': 'Restart monitoring services on selected devices',
'system_health': 'Check system health and monitoring status',
'maintenance_mode': 'Put devices in maintenance mode'
}
return descriptions.get(playbook_name, f'Execute {playbook_name} playbook')
def create_system_health_playbook(self) -> str:
"""Create system health check playbook"""
playbook_content = {
'name': 'System Health Check',
'hosts': 'all',
'become': True,
'gather_facts': True,
'tasks': [
{
'name': 'Check disk usage',
'shell': 'df -h',
'register': 'disk_usage'
},
{
'name': 'Check memory usage',
'shell': 'free -m',
'register': 'memory_usage'
},
{
'name': 'Check system uptime',
'shell': 'uptime',
'register': 'system_uptime'
},
{
'name': 'Check running services',
'shell': 'systemctl list-units --type=service --state=running | grep -E "(ssh|monitoring|python)"',
'register': 'running_services',
'ignore_errors': True
},
{
'name': 'Check network connectivity',
'shell': 'ping -c 3 8.8.8.8',
'register': 'network_test',
'ignore_errors': True
},
{
'name': 'Display health summary',
'debug': {
'msg': [
'=== SYSTEM HEALTH REPORT ===',
'Disk Usage: {{ disk_usage.stdout_lines[0] if disk_usage.stdout_lines else "N/A" }}',
'Memory: {{ memory_usage.stdout_lines[1] if memory_usage.stdout_lines|length > 1 else "N/A" }}',
'Uptime: {{ system_uptime.stdout if system_uptime.stdout else "N/A" }}',
'Network: {{ "OK" if network_test.rc == 0 else "FAILED" }}',
'Services: {{ running_services.stdout_lines|length if running_services.stdout_lines else 0 }} monitoring services running'
]
}
}
]
}
self.playbook_dir.mkdir(exist_ok=True)
playbook_path = self.playbook_dir / "system_health.yml"
with open(playbook_path, 'w') as f:
yaml.dump([playbook_content], f, default_flow_style=False)
return str(playbook_path)
def _parse_ansible_results(self, execution: AnsibleExecution, output: str):
"""Parse Ansible output for result statistics"""
lines = output.split('\n')
for line in lines:
if 'ok=' in line and 'changed=' in line:
# Parse line like: "host1: ok=4 changed=2 unreachable=0 failed=0"
if 'failed=0' in line or 'failed=0 ' in line:
execution.successful_hosts += 1
else:
execution.failed_hosts += 1
if 'unreachable=' in line:
unreachable = int(line.split('unreachable=')[1].split()[0])
execution.unreachable_hosts += unreachable
def test_ssh_connectivity(self, device_ip: str, username: str = 'pi') -> Dict:
"""Test SSH connectivity to a device"""
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Try with SSH key first, then password
try:
client.connect(
device_ip,
username=username,
key_filename=str(self.ssh_key_path),
timeout=10
)
except paramiko.AuthenticationException:
# Fallback to configurable password
fallback_pw = self.load_settings().get('ssh_fallback_password', 'raspberry')
client.connect(
device_ip,
username=username,
password=fallback_pw,
timeout=10
)
# Test command execution
stdin, stdout, stderr = client.exec_command('uptime')
uptime_output = stdout.read().decode()
client.close()
return {
'success': True,
'message': 'SSH connection successful',
'uptime': uptime_output.strip()
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def bulk_ssh_test(self, device_ips: List[str]) -> Dict:
"""Test SSH connectivity to multiple devices in parallel"""
results = {}
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_ip = {
executor.submit(self.test_ssh_connectivity, ip): ip
for ip in device_ips
}
for future in as_completed(future_to_ip):
ip = future_to_ip[future]
try:
result = future.result()
results[ip] = result
except Exception as e:
results[ip] = {
'success': False,
'error': str(e)
}
return results
def setup_ssh_keys(self) -> Dict:
"""Setup SSH keys for Ansible authentication"""
try:
key_path = Path(self.ssh_key_path)
key_path.parent.mkdir(exist_ok=True)
if not key_path.exists():
# Generate new SSH key pair
subprocess.run([
'ssh-keygen',
'-t', 'rsa',
'-b', '4096',
'-f', str(key_path),
'-N', '', # No passphrase
'-C', 'ansible@monitoring-server'
], check=True)
# Set proper permissions
key_path.chmod(0o600)
return {
'success': True,
'message': 'SSH key pair generated',
'public_key_path': f"{key_path}.pub",
'private_key_path': str(key_path)
}
else:
return {
'success': True,
'message': 'SSH key already exists',
'public_key_path': f"{key_path}.pub",
'private_key_path': str(key_path)
}
except Exception as e:
logging.error(f"Error setting up SSH keys: {e}")
return {
'success': False,
'error': str(e)
}
def get_execution_history(self, limit: int = 50) -> List[Dict]:
"""Get Ansible execution history using enhanced PlaybookExecution model"""
try:
with self.db.get_session() as session:
executions = session.query(PlaybookExecution).order_by(
PlaybookExecution.queued_at.desc()
).limit(limit).all()
return [{
'id': exec.id,
'execution_id': exec.execution_id,
'playbook_name': exec.playbook_name,
'playbook_description': exec.playbook_description,
'queued_at': exec.queued_at.isoformat() if exec.queued_at else None,
'started_at': exec.started_at.isoformat() if exec.started_at else None,
'completed_at': exec.completed_at.isoformat() if exec.completed_at else None,
'status': exec.status,
'priority': exec.priority,
'retry_count': exec.retry_count,
'max_retries': exec.max_retries,
'exit_code': exec.exit_code,
'total_hosts': exec.total_hosts,
'successful_hosts': exec.successful_hosts,
'failed_hosts': exec.failed_hosts,
'unreachable_hosts': exec.unreachable_hosts,
'skipped_hosts': exec.skipped_hosts,
'changed_hosts': exec.changed_hosts,
'summary_message': exec.summary_message,
'duration': exec.duration,
'duration_formatted': exec.duration_formatted
} for exec in executions]
except Exception as e:
logging.error(f"Error getting execution history: {e}")
return []

View File

@@ -0,0 +1,324 @@
"""
Device management service with CRUD operations and device monitoring
"""
import logging
from typing import List, Optional, Dict, Any
from datetime import datetime, timedelta
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import desc, func, and_
from app.models import Device, LogEntry, FileUpload, InventoryGroup
from config.database_config import get_db
class DeviceService:
"""Service for managing devices and device-related operations"""
def __init__(self):
self.db = get_db()
self.logger = logging.getLogger(__name__)
# Basic CRUD Operations
def create_device(self, hostname: str, device_ip: str, nume_masa: str, **kwargs) -> Device:
"""Create a new device"""
try:
with self.db.get_session() as session:
# Check if device already exists
existing = session.query(Device).filter(
(Device.hostname == hostname) | (Device.device_ip == device_ip)
).first()
if existing:
raise ValueError(f"Device with hostname '{hostname}' or IP '{device_ip}' already exists")
device = Device(
hostname=hostname,
device_ip=device_ip,
nume_masa=nume_masa,
device_type=kwargs.get('device_type', 'unknown'),
os_version=kwargs.get('os_version'),
status=kwargs.get('status', 'active'),
location=kwargs.get('location'),
description=kwargs.get('description'),
last_seen=datetime.utcnow()
)
session.add(device)
session.commit()
session.refresh(device)
self.logger.info(f"Created device: {hostname} ({device_ip})")
return device
except Exception as e:
self.logger.error(f"Error creating device: {e}")
raise
def get_device_by_id(self, device_id: int) -> Optional[Device]:
"""Get device by ID with relationships loaded"""
try:
with self.db.get_session() as session:
return session.query(Device).options(
joinedload(Device.logs),
joinedload(Device.files),
joinedload(Device.inventory_groups)
).filter(Device.id == device_id).first()
except Exception as e:
self.logger.error(f"Error getting device {device_id}: {e}")
return None
def get_device_by_hostname(self, hostname: str) -> Optional[Device]:
"""Get device by hostname"""
try:
with self.db.get_session() as session:
return session.query(Device).filter(Device.hostname == hostname).first()
except Exception as e:
self.logger.error(f"Error getting device by hostname {hostname}: {e}")
return None
def get_device_by_ip(self, device_ip: str) -> Optional[Device]:
"""Get device by IP address"""
try:
with self.db.get_session() as session:
return session.query(Device).filter(Device.device_ip == device_ip).first()
except Exception as e:
self.logger.error(f"Error getting device by IP {device_ip}: {e}")
return None
def get_all_devices(self, status: Optional[str] = None, limit: Optional[int] = None) -> List[Device]:
"""Get all devices with optional filtering"""
try:
with self.db.get_session() as session:
query = session.query(Device).order_by(desc(Device.last_seen))
if status:
query = query.filter(Device.status == status)
if limit:
query = query.limit(limit)
return query.all()
except Exception as e:
self.logger.error(f"Error getting devices: {e}")
return []
def update_device(self, device_id: int, **kwargs) -> Optional[Device]:
"""Update device information"""
try:
with self.db.get_session() as session:
device = session.query(Device).filter(Device.id == device_id).first()
if not device:
return None
# Update allowed fields
allowed_fields = [
'hostname', 'device_ip', 'nume_masa', 'device_type',
'os_version', 'status', 'location', 'description'
]
for field, value in kwargs.items():
if field in allowed_fields and hasattr(device, field):
setattr(device, field, value)
session.commit()
session.refresh(device)
self.logger.info(f"Updated device {device_id}")
return device
except Exception as e:
self.logger.error(f"Error updating device {device_id}: {e}")
raise
def delete_device(self, device_id: int) -> bool:
"""Delete device (soft delete by setting status to inactive)"""
try:
with self.db.get_session() as session:
device = session.query(Device).filter(Device.id == device_id).first()
if not device:
return False
# Soft delete - set status to inactive
device.status = 'inactive'
session.commit()
self.logger.info(f"Soft deleted device {device_id}")
return True
except Exception as e:
self.logger.error(f"Error deleting device {device_id}: {e}")
return False
# Device Monitoring Functions
def update_device_last_seen(self, hostname: str = None, device_ip: str = None) -> Optional[Device]:
"""Update device last seen timestamp"""
try:
with self.db.get_session() as session:
device = None
if hostname:
device = session.query(Device).filter(Device.hostname == hostname).first()
elif device_ip:
device = session.query(Device).filter(Device.device_ip == device_ip).first()
if device:
device.last_seen = datetime.utcnow()
session.commit()
return device
except Exception as e:
self.logger.error(f"Error updating last seen: {e}")
return None
def get_device_statistics(self, device_id: int) -> Dict[str, Any]:
"""Get comprehensive statistics for a device"""
try:
with self.db.get_session() as session:
device = session.query(Device).filter(Device.id == device_id).first()
if not device:
return {}
# Log statistics
total_logs = session.query(LogEntry).filter(LogEntry.device_id == device_id).count()
# Logs by severity
severity_counts = session.query(
LogEntry.severity,
func.count(LogEntry.id)
).filter(
LogEntry.device_id == device_id
).group_by(LogEntry.severity).all()
# Recent activity (last 24 hours)
last_24h = datetime.utcnow() - timedelta(hours=24)
recent_logs = session.query(LogEntry).filter(
and_(LogEntry.device_id == device_id, LogEntry.timestamp >= last_24h)
).count()
# File uploads
total_files = session.query(FileUpload).filter(
FileUpload.device_id == device_id
).count()
# Last log
last_log = session.query(LogEntry).filter(
LogEntry.device_id == device_id
).order_by(desc(LogEntry.timestamp)).first()
return {
'device': device,
'total_logs': total_logs,
'severity_counts': dict(severity_counts),
'recent_logs_24h': recent_logs,
'total_files': total_files,
'last_log': last_log,
'uptime_days': (datetime.utcnow() - device.last_seen).days if device.last_seen else 0
}
except Exception as e:
self.logger.error(f"Error getting device statistics: {e}")
return {}
def get_inactive_devices(self, hours: int = 24) -> List[Device]:
"""Get devices that haven't been seen recently"""
try:
with self.db.get_session() as session:
cutoff_time = datetime.utcnow() - timedelta(hours=hours)
return session.query(Device).filter(
and_(
Device.last_seen < cutoff_time,
Device.status.in_(['active', 'maintenance'])
)
).order_by(desc(Device.last_seen)).all()
except Exception as e:
self.logger.error(f"Error getting inactive devices: {e}")
return []
def get_device_logs(self, device_id: int, limit: int = 100, severity: str = None) -> List[LogEntry]:
"""Get logs for a specific device"""
try:
with self.db.get_session() as session:
query = session.query(LogEntry).filter(
LogEntry.device_id == device_id
).order_by(desc(LogEntry.timestamp))
if severity:
query = query.filter(LogEntry.severity == severity)
return query.limit(limit).all()
except Exception as e:
self.logger.error(f"Error getting device logs: {e}")
return []
def search_devices(self, search_term: str) -> List[Device]:
"""Search devices by hostname, IP, or description"""
try:
with self.db.get_session() as session:
search_pattern = f"%{search_term}%"
return session.query(Device).filter(
(Device.hostname.like(search_pattern)) |
(Device.device_ip.like(search_pattern)) |
(Device.nume_masa.like(search_pattern)) |
(Device.location.like(search_pattern)) |
(Device.description.like(search_pattern))
).order_by(desc(Device.last_seen)).all()
except Exception as e:
self.logger.error(f"Error searching devices: {e}")
return []
# Bulk Operations
def bulk_update_status(self, device_ids: List[int], status: str) -> int:
"""Update status for multiple devices"""
try:
with self.db.get_session() as session:
updated = session.query(Device).filter(
Device.id.in_(device_ids)
).update({Device.status: status}, synchronize_session=False)
session.commit()
self.logger.info(f"Updated status for {updated} devices")
return updated
except Exception as e:
self.logger.error(f"Error bulk updating status: {e}")
return 0
def get_device_summary(self) -> Dict[str, Any]:
"""Get summary statistics for all devices"""
try:
with self.db.get_session() as session:
# Device status counts
status_counts = session.query(
Device.status,
func.count(Device.id)
).group_by(Device.status).all()
# Device type counts
type_counts = session.query(
Device.device_type,
func.count(Device.id)
).group_by(Device.device_type).all()
# Recent activity
last_24h = datetime.utcnow() - timedelta(hours=24)
devices_seen_24h = session.query(Device).filter(
Device.last_seen >= last_24h
).count()
return {
'total_devices': session.query(Device).count(),
'status_counts': dict(status_counts),
'type_counts': dict(type_counts),
'devices_seen_24h': devices_seen_24h
}
except Exception as e:
self.logger.error(f"Error getting device summary: {e}")
return {}

View File

@@ -0,0 +1,256 @@
"""
File upload and processing service
"""
import os
import hashlib
import mimetypes
from datetime import datetime
from pathlib import Path
from werkzeug.utils import secure_filename
from app.models import Device, FileUpload
from config.database_config import get_db
import logging
class FileUploadService:
"""Service for handling file uploads and processing"""
def __init__(self):
self.db = get_db()
self.upload_folder = Path("data/uploads")
self.upload_folder.mkdir(exist_ok=True)
# Allowed file extensions
self.allowed_extensions = {
'txt', 'log', 'conf', 'cfg', 'json', 'yml', 'yaml',
'py', 'sh', 'service', 'env', 'ini'
}
# Max file size (50MB)
self.max_file_size = 50 * 1024 * 1024
def process_uploaded_file(self, file, device_info):
"""Process uploaded file from device"""
try:
# Validate file
if not file or file.filename == '':
return {'success': False, 'error': 'No file provided'}
# Check file extension
filename = secure_filename(file.filename)
if not self._allowed_file(filename):
return {
'success': False,
'error': f'File type not allowed. Allowed: {", ".join(self.allowed_extensions)}'
}
# Check file size
file.seek(0, 2) # Seek to end
file_size = file.tell()
file.seek(0) # Reset position
if file_size > self.max_file_size:
return {
'success': False,
'error': f'File too large. Max size: {self.max_file_size // (1024*1024)}MB'
}
# Calculate file hash
file_content = file.read()
file.seek(0) # Reset for saving
file_hash = hashlib.sha256(file_content).hexdigest()
with self.db.get_session() as session:
# Get or create device
device = self._get_or_create_device(session, device_info)
# Check for duplicate file
existing_file = session.query(FileUpload).filter_by(file_hash=file_hash).first()
if existing_file:
return {
'success': True,
'message': 'File already exists (duplicate detected)',
'file_id': existing_file.id,
'duplicate': True
}
# Generate unique filename
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
new_filename = f"{device.hostname}_{timestamp}_{filename}"
file_path = self.upload_folder / new_filename
# Save file
with open(file_path, 'wb') as f:
f.write(file_content)
# Get MIME type
mime_type, _ = mimetypes.guess_type(filename)
# Create file record
file_upload = FileUpload(
device_id=device.id,
filename=new_filename,
original_filename=filename,
file_path=str(file_path),
file_size=file_size,
file_hash=file_hash,
mime_type=mime_type,
upload_date=datetime.utcnow(),
upload_ip=device_info.get('device_ip'),
processed=False,
processing_status='pending'
)
session.add(file_upload)
session.flush()
# Process file content if it's a log file
if self._is_log_file(filename, mime_type):
self._process_log_file(file_upload, file_content)
return {
'success': True,
'message': 'File uploaded successfully',
'file_id': file_upload.id,
'filename': new_filename,
'size': file_size,
'hash': file_hash,
'processed': file_upload.processed
}
except Exception as e:
logging.error(f"Error processing uploaded file: {e}")
return {
'success': False,
'error': str(e)
}
def _allowed_file(self, filename):
"""Check if file extension is allowed"""
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in self.allowed_extensions
def _get_or_create_device(self, session, device_info):
"""Get existing device or create new one"""
device = session.query(Device).filter_by(
hostname=device_info['hostname'],
device_ip=device_info['device_ip']
).first()
if not device:
device = Device(
hostname=device_info['hostname'],
device_ip=device_info['device_ip'],
nume_masa=device_info['nume_masa'],
last_seen=datetime.utcnow(),
status='active'
)
session.add(device)
session.flush()
else:
device.last_seen = datetime.utcnow()
if device.nume_masa != device_info['nume_masa']:
device.nume_masa = device_info['nume_masa']
return device
def _is_log_file(self, filename, mime_type):
"""Check if file is a log file that should be processed"""
log_extensions = {'log', 'txt'}
log_keywords = ['log', 'error', 'debug', 'trace', 'audit']
# Check extension
if '.' in filename:
ext = filename.rsplit('.', 1)[1].lower()
if ext in log_extensions:
return True
# Check filename for log keywords
filename_lower = filename.lower()
for keyword in log_keywords:
if keyword in filename_lower:
return True
# Check MIME type
if mime_type and 'text' in mime_type:
return True
return False
def _process_log_file(self, file_upload, content):
"""Process log file content to extract log entries"""
try:
# Mark as log file
file_upload.is_log_file = True
# Simple log processing - split by lines
lines = content.decode('utf-8', errors='ignore').split('\n')
entries_extracted = 0
from app.services.log_service import LogCompressionService
log_service = LogCompressionService()
device_info = {
'hostname': file_upload.device.hostname,
'device_ip': file_upload.device.device_ip,
'nume_masa': file_upload.device.nume_masa
}
for line_num, line in enumerate(lines, 1):
line = line.strip()
if not line:
continue
# Try to extract timestamp and message
# This is a simple implementation - enhance as needed
message = f"[File: {file_upload.original_filename}:{line_num}] {line}"
# Process through log compression service
result = log_service.process_log_message(
device_info=device_info,
message=message,
severity='info'
)
if result['success']:
entries_extracted += 1
# Update file record
file_upload.log_entries_extracted = entries_extracted
file_upload.processed = True
file_upload.processing_status = 'completed'
logging.info(f"Processed log file {file_upload.filename}: {entries_extracted} entries extracted")
except Exception as e:
logging.error(f"Error processing log file content: {e}")
file_upload.processing_status = 'error'
file_upload.processing_error = str(e)
def get_upload_stats(self):
"""Get file upload statistics"""
try:
with self.db.get_session() as session:
total_files = session.query(FileUpload).count()
log_files = session.query(FileUpload).filter_by(is_log_file=True).count()
# Calculate total size
total_size = session.query(
session.func.sum(FileUpload.file_size)
).scalar() or 0
# Count by processing status
processed = session.query(FileUpload).filter_by(processed=True).count()
pending = session.query(FileUpload).filter(
FileUpload.processing_status == 'pending'
).count()
return {
'total_files': total_files,
'log_files': log_files,
'total_size': total_size,
'processed': processed,
'pending': pending
}
except Exception as e:
logging.error(f"Error getting upload stats: {e}")
return {'error': str(e)}

378
app/services/log_service.py Normal file
View File

@@ -0,0 +1,378 @@
"""
Log processing service with message compression and aliasing
"""
import json
import re
import hashlib
from typing import Dict, List, Optional, Tuple
from datetime import datetime
from sqlalchemy.orm import Session
from app.models import Device, LogEntry, MessageTemplate
from config.database_config import get_db
import logging
class LogCompressionService:
"""Service for compressing log messages using templates and aliases"""
def __init__(self):
self.db = get_db()
self.template_patterns = self._load_common_patterns()
def _load_common_patterns(self) -> List[Dict]:
"""Load common log message patterns for template matching"""
return [
{
'pattern': r'Card detected: ([A-F0-9]+)',
'template': 'Card detected: {card_id}',
'category': 'card_detection',
'alias_prefix': 'CD'
},
{
'pattern': r'Connection failed: (.+)',
'template': 'Connection failed: {error}',
'category': 'connection_error',
'alias_prefix': 'CE'
},
{
'pattern': r'System startup completed in ([0-9.]+)s',
'template': 'System startup completed in {time}s',
'category': 'system_startup',
'alias_prefix': 'SS'
},
{
'pattern': r'Auto-update: (.+)',
'template': 'Auto-update: {message}',
'category': 'auto_update',
'alias_prefix': 'AU'
},
{
'pattern': r'Command \'([^\']+)\' (SUCCESS|FAILED)',
'template': 'Command \'{command}\' {status}',
'category': 'command_execution',
'alias_prefix': 'EX'
},
{
'pattern': r'Temperature: ([0-9.]+)°C',
'template': 'Temperature: {temp}°C',
'category': 'temperature',
'alias_prefix': 'TM'
}
]
def process_log_message(self, device_info: Dict, message: str, severity: str = 'info') -> Dict:
"""
Process incoming log message with compression
Args:
device_info: Dict with hostname, device_ip, nume_masa
message: Log message text
severity: Message severity level
Returns:
Dict with processing results and storage info
"""
try:
with self.db.get_session() as session:
# Get or create device
device = self._get_or_create_device(session, device_info)
# Try to match message to existing template
template, variables = self._match_message_template(session, message)
if template:
# Use existing template
log_entry = LogEntry(
device_id=device.id,
template_id=template.id,
template_variables=json.dumps(variables) if variables else None,
severity=severity,
timestamp=datetime.utcnow()
)
# Update template usage count
template.usage_count += 1
# Calculate size savings
original_size = len(message.encode('utf-8'))
compressed_size = len(template.alias.encode('utf-8')) + \
len(json.dumps(variables or {}).encode('utf-8'))
compression_info = {
'used_template': True,
'template_alias': template.alias,
'original_size': original_size,
'compressed_size': compressed_size,
'savings_percent': ((original_size - compressed_size) / original_size) * 100
}
else:
# Create new template if message matches a pattern
template = self._create_new_template(session, message)
if template:
# New template created
variables = self._extract_variables(message, template.template_text)
log_entry = LogEntry(
device_id=device.id,
template_id=template.id,
template_variables=json.dumps(variables) if variables else None,
severity=severity,
timestamp=datetime.utcnow()
)
template.usage_count = 1
compression_info = {
'used_template': True,
'template_alias': template.alias,
'new_template': True,
'original_size': len(message.encode('utf-8')),
'compressed_size': len(template.alias.encode('utf-8'))
}
else:
# Store as full message
log_entry = LogEntry(
device_id=device.id,
full_message=message,
severity=severity,
timestamp=datetime.utcnow()
)
compression_info = {
'used_template': False,
'stored_full': True,
'original_size': len(message.encode('utf-8'))
}
session.add(log_entry)
session.flush() # Get the log entry ID
return {
'success': True,
'log_id': log_entry.id,
'device_id': device.id,
'compression': compression_info,
'message': 'Log processed successfully'
}
except Exception as e:
logging.error(f"Error processing log message: {e}")
return {
'success': False,
'error': str(e),
'message': 'Log processing failed'
}
@staticmethod
def _infer_device_type(hostname: str) -> str:
"""Guess device type from hostname pattern."""
h = hostname.upper()
if any(k in h for k in ('RPI', 'PI', 'RASP')):
return 'Raspberry Pi'
if any(k in h for k in ('SRV', 'SERVER')):
return 'Server'
if any(k in h for k in ('PC', 'DESK', 'WRK')):
return 'PC'
if any(k in h for k in ('LAPTOP', 'NB')):
return 'Laptop'
return 'unknown'
def _get_or_create_device(self, session: Session, device_info: Dict) -> Device:
"""Get existing device or create new one.
Lookup priority:
1. MAC address (most reliable survives IP/hostname changes)
2. hostname + device_ip (legacy fallback)
"""
mac = device_info.get('mac_address')
device = None
# 1. Try MAC lookup first
if mac:
device = session.query(Device).filter_by(mac_address=mac).first()
# 2. Fall back to hostname+IP
if not device:
device = session.query(Device).filter_by(
hostname=device_info['hostname'],
device_ip=device_info['device_ip']
).first()
if not device:
device = Device(
hostname=device_info['hostname'],
device_ip=device_info['device_ip'],
nume_masa=device_info['nume_masa'],
device_type=device_info.get('device_type') or self._infer_device_type(device_info['hostname']),
os_version=device_info.get('os_version'),
location=device_info.get('location'),
mac_address=mac or None,
last_seen=datetime.utcnow(),
status='active'
)
session.add(device)
session.flush()
else:
# Always update last_seen and nome_masa
device.last_seen = datetime.utcnow()
if device.nume_masa != device_info['nume_masa']:
device.nume_masa = device_info['nume_masa']
# Sync MAC address if we now know it and device doesn't have one
if mac and not device.mac_address:
device.mac_address = mac
# Update type from hostname if still unknown
if not device.device_type or device.device_type == 'unknown':
device.device_type = device_info.get('device_type') or self._infer_device_type(device_info['hostname'])
# Update OS / location only when client sends them (don't overwrite manual edits with None)
if device_info.get('os_version'):
device.os_version = device_info['os_version']
if device_info.get('location'):
device.location = device_info['location']
return device
def _match_message_template(self, session: Session, message: str) -> Tuple[Optional[MessageTemplate], Optional[Dict]]:
"""Try to match message to existing template"""
# First, try exact template match
message_hash = MessageTemplate.create_hash(message)
template = session.query(MessageTemplate).filter_by(template_hash=message_hash).first()
if template:
return template, None
# Try pattern matching with variable extraction
for pattern_info in self.template_patterns:
match = re.match(pattern_info['pattern'], message)
if match:
# Look for template with this pattern
template_text = pattern_info['template']
template = session.query(MessageTemplate).filter_by(
template_text=template_text,
category=pattern_info['category']
).first()
if template:
# Extract variables
variables = {}
for i, group in enumerate(match.groups(), 1):
# Map to variable names based on template
if '{card_id}' in template_text and pattern_info['category'] == 'card_detection':
variables['card_id'] = group
elif '{error}' in template_text and pattern_info['category'] == 'connection_error':
variables['error'] = group
elif '{time}' in template_text and pattern_info['category'] == 'system_startup':
variables['time'] = group
elif '{message}' in template_text:
variables['message'] = group
elif '{command}' in template_text and i == 1:
variables['command'] = group
elif '{status}' in template_text and i == 2:
variables['status'] = group
elif '{temp}' in template_text:
variables['temp'] = group
return template, variables
return None, None
def _create_new_template(self, session: Session, message: str) -> Optional[MessageTemplate]:
"""Create new template if message matches a known pattern"""
for pattern_info in self.template_patterns:
match = re.match(pattern_info['pattern'], message)
if match:
# Check if template already exists
existing = session.query(MessageTemplate).filter_by(
template_text=pattern_info['template'],
category=pattern_info['category']
).first()
if existing:
return existing
# Create new template
alias = self._generate_alias(session, pattern_info['alias_prefix'])
template_hash = MessageTemplate.create_hash(pattern_info['template'])
template = MessageTemplate(
template_hash=template_hash,
template_text=pattern_info['template'],
category=pattern_info['category'],
alias=alias,
created_at=datetime.utcnow()
)
session.add(template)
session.flush()
return template
return None
def _generate_alias(self, session: Session, prefix: str) -> str:
"""Generate unique alias for template"""
# Find highest existing alias number for this prefix
existing_aliases = session.query(MessageTemplate.alias).filter(
MessageTemplate.alias.like(f"{prefix}%")
).all()
max_num = 0
for (alias,) in existing_aliases:
try:
num = int(alias[len(prefix):])
max_num = max(max_num, num)
except ValueError:
continue
return f"{prefix}{max_num + 1:03d}"
def _extract_variables(self, message: str, template: str) -> Dict:
"""Extract variables from message using template"""
# Simple variable extraction - could be enhanced
variables = {}
# This is a simplified implementation
# In production, you'd want more sophisticated template matching
return variables
def get_compression_stats(self) -> Dict:
"""Get compression statistics"""
try:
with self.db.get_session() as session:
# Count total logs
total_logs = session.query(LogEntry).count()
# Count templated logs
templated_logs = session.query(LogEntry).filter(
LogEntry.template_id.isnot(None)
).count()
# Count templates
total_templates = session.query(MessageTemplate).count()
# Calculate average savings (simplified)
compression_ratio = (templated_logs / total_logs * 100) if total_logs > 0 else 0
return {
'total_logs': total_logs,
'templated_logs': templated_logs,
'total_templates': total_templates,
'compression_ratio': round(compression_ratio, 2),
'estimated_savings': round(compression_ratio * 0.6, 2) # Estimated 60% savings per template
}
except Exception as e:
logging.error(f"Error getting compression stats: {e}")
return {'error': str(e)}
def get_message_by_alias(self, alias: str, variables: Dict = None) -> Optional[str]:
"""Retrieve full message using alias and variables"""
try:
with self.db.get_session() as session:
template = session.query(MessageTemplate).filter_by(alias=alias).first()
if template:
if variables:
return template.template_text.format(**variables)
return template.template_text
return None
except Exception as e:
logging.error(f"Error retrieving message by alias: {e}")
return None