""" Log processing service with message compression and aliasing """ import json import re import hashlib from typing import Dict, List, Optional, Tuple from datetime import datetime from sqlalchemy.orm import Session from app.models import Device, LogEntry, MessageTemplate from config.database_config import get_db import logging class LogCompressionService: """Service for compressing log messages using templates and aliases""" def __init__(self): self.db = get_db() self.template_patterns = self._load_common_patterns() def _load_common_patterns(self) -> List[Dict]: """Load common log message patterns for template matching""" return [ { 'pattern': r'Card detected: ([A-F0-9]+)', 'template': 'Card detected: {card_id}', 'category': 'card_detection', 'alias_prefix': 'CD' }, { 'pattern': r'Connection failed: (.+)', 'template': 'Connection failed: {error}', 'category': 'connection_error', 'alias_prefix': 'CE' }, { 'pattern': r'System startup completed in ([0-9.]+)s', 'template': 'System startup completed in {time}s', 'category': 'system_startup', 'alias_prefix': 'SS' }, { 'pattern': r'Auto-update: (.+)', 'template': 'Auto-update: {message}', 'category': 'auto_update', 'alias_prefix': 'AU' }, { 'pattern': r'Command \'([^\']+)\' (SUCCESS|FAILED)', 'template': 'Command \'{command}\' {status}', 'category': 'command_execution', 'alias_prefix': 'EX' }, { 'pattern': r'Temperature: ([0-9.]+)°C', 'template': 'Temperature: {temp}°C', 'category': 'temperature', 'alias_prefix': 'TM' } ] def process_log_message(self, device_info: Dict, message: str, severity: str = 'info') -> Dict: """ Process incoming log message with compression Args: device_info: Dict with hostname, device_ip, nume_masa message: Log message text severity: Message severity level Returns: Dict with processing results and storage info """ try: with self.db.get_session() as session: # Get or create device device = self._get_or_create_device(session, device_info) # Try to match message to existing template template, variables = self._match_message_template(session, message) if template: # Use existing template log_entry = LogEntry( device_id=device.id, template_id=template.id, template_variables=json.dumps(variables) if variables else None, severity=severity, timestamp=datetime.utcnow() ) # Update template usage count template.usage_count += 1 # Calculate size savings original_size = len(message.encode('utf-8')) compressed_size = len(template.alias.encode('utf-8')) + \ len(json.dumps(variables or {}).encode('utf-8')) compression_info = { 'used_template': True, 'template_alias': template.alias, 'original_size': original_size, 'compressed_size': compressed_size, 'savings_percent': ((original_size - compressed_size) / original_size) * 100 } else: # Create new template if message matches a pattern template = self._create_new_template(session, message) if template: # New template created variables = self._extract_variables(message, template.template_text) log_entry = LogEntry( device_id=device.id, template_id=template.id, template_variables=json.dumps(variables) if variables else None, severity=severity, timestamp=datetime.utcnow() ) template.usage_count = 1 compression_info = { 'used_template': True, 'template_alias': template.alias, 'new_template': True, 'original_size': len(message.encode('utf-8')), 'compressed_size': len(template.alias.encode('utf-8')) } else: # Store as full message log_entry = LogEntry( device_id=device.id, full_message=message, severity=severity, timestamp=datetime.utcnow() ) compression_info = { 'used_template': False, 'stored_full': True, 'original_size': len(message.encode('utf-8')) } session.add(log_entry) session.flush() # Get the log entry ID return { 'success': True, 'log_id': log_entry.id, 'device_id': device.id, 'compression': compression_info, 'message': 'Log processed successfully' } except Exception as e: logging.error(f"Error processing log message: {e}") return { 'success': False, 'error': str(e), 'message': 'Log processing failed' } @staticmethod def _infer_device_type(hostname: str) -> str: """Guess device type from hostname pattern.""" h = hostname.upper() if any(k in h for k in ('RPI', 'PI', 'RASP')): return 'Raspberry Pi' if any(k in h for k in ('SRV', 'SERVER')): return 'Server' if any(k in h for k in ('PC', 'DESK', 'WRK')): return 'PC' if any(k in h for k in ('LAPTOP', 'NB')): return 'Laptop' return 'unknown' def _get_or_create_device(self, session: Session, device_info: Dict) -> Device: """Get existing device or create new one. Lookup priority: 1. MAC address (most reliable – survives IP/hostname changes) 2. hostname + device_ip (legacy fallback) """ mac = device_info.get('mac_address') device = None # 1. Try MAC lookup first if mac: device = session.query(Device).filter_by(mac_address=mac).first() # 2. Fall back to hostname+IP if not device: device = session.query(Device).filter_by( hostname=device_info['hostname'], device_ip=device_info['device_ip'] ).first() if not device: device = Device( hostname=device_info['hostname'], device_ip=device_info['device_ip'], nume_masa=device_info['nume_masa'], device_type=device_info.get('device_type') or self._infer_device_type(device_info['hostname']), os_version=device_info.get('os_version'), location=device_info.get('location'), mac_address=mac or None, last_seen=datetime.utcnow(), status='active' ) session.add(device) session.flush() else: # Always update last_seen and nome_masa device.last_seen = datetime.utcnow() if device.nume_masa != device_info['nume_masa']: device.nume_masa = device_info['nume_masa'] # Sync MAC address if we now know it and device doesn't have one if mac and not device.mac_address: device.mac_address = mac # Update type from hostname if still unknown if not device.device_type or device.device_type == 'unknown': device.device_type = device_info.get('device_type') or self._infer_device_type(device_info['hostname']) # Update OS / location only when client sends them (don't overwrite manual edits with None) if device_info.get('os_version'): device.os_version = device_info['os_version'] if device_info.get('location'): device.location = device_info['location'] return device def _match_message_template(self, session: Session, message: str) -> Tuple[Optional[MessageTemplate], Optional[Dict]]: """Try to match message to existing template""" # First, try exact template match message_hash = MessageTemplate.create_hash(message) template = session.query(MessageTemplate).filter_by(template_hash=message_hash).first() if template: return template, None # Try pattern matching with variable extraction for pattern_info in self.template_patterns: match = re.match(pattern_info['pattern'], message) if match: # Look for template with this pattern template_text = pattern_info['template'] template = session.query(MessageTemplate).filter_by( template_text=template_text, category=pattern_info['category'] ).first() if template: # Extract variables variables = {} for i, group in enumerate(match.groups(), 1): # Map to variable names based on template if '{card_id}' in template_text and pattern_info['category'] == 'card_detection': variables['card_id'] = group elif '{error}' in template_text and pattern_info['category'] == 'connection_error': variables['error'] = group elif '{time}' in template_text and pattern_info['category'] == 'system_startup': variables['time'] = group elif '{message}' in template_text: variables['message'] = group elif '{command}' in template_text and i == 1: variables['command'] = group elif '{status}' in template_text and i == 2: variables['status'] = group elif '{temp}' in template_text: variables['temp'] = group return template, variables return None, None def _create_new_template(self, session: Session, message: str) -> Optional[MessageTemplate]: """Create new template if message matches a known pattern""" for pattern_info in self.template_patterns: match = re.match(pattern_info['pattern'], message) if match: # Check if template already exists existing = session.query(MessageTemplate).filter_by( template_text=pattern_info['template'], category=pattern_info['category'] ).first() if existing: return existing # Create new template alias = self._generate_alias(session, pattern_info['alias_prefix']) template_hash = MessageTemplate.create_hash(pattern_info['template']) template = MessageTemplate( template_hash=template_hash, template_text=pattern_info['template'], category=pattern_info['category'], alias=alias, created_at=datetime.utcnow() ) session.add(template) session.flush() return template return None def _generate_alias(self, session: Session, prefix: str) -> str: """Generate unique alias for template""" # Find highest existing alias number for this prefix existing_aliases = session.query(MessageTemplate.alias).filter( MessageTemplate.alias.like(f"{prefix}%") ).all() max_num = 0 for (alias,) in existing_aliases: try: num = int(alias[len(prefix):]) max_num = max(max_num, num) except ValueError: continue return f"{prefix}{max_num + 1:03d}" def _extract_variables(self, message: str, template: str) -> Dict: """Extract variables from message using template""" # Simple variable extraction - could be enhanced variables = {} # This is a simplified implementation # In production, you'd want more sophisticated template matching return variables def get_compression_stats(self) -> Dict: """Get compression statistics""" try: with self.db.get_session() as session: # Count total logs total_logs = session.query(LogEntry).count() # Count templated logs templated_logs = session.query(LogEntry).filter( LogEntry.template_id.isnot(None) ).count() # Count templates total_templates = session.query(MessageTemplate).count() # Calculate average savings (simplified) compression_ratio = (templated_logs / total_logs * 100) if total_logs > 0 else 0 return { 'total_logs': total_logs, 'templated_logs': templated_logs, 'total_templates': total_templates, 'compression_ratio': round(compression_ratio, 2), 'estimated_savings': round(compression_ratio * 0.6, 2) # Estimated 60% savings per template } except Exception as e: logging.error(f"Error getting compression stats: {e}") return {'error': str(e)} def get_message_by_alias(self, alias: str, variables: Dict = None) -> Optional[str]: """Retrieve full message using alias and variables""" try: with self.db.get_session() as session: template = session.query(MessageTemplate).filter_by(alias=alias).first() if template: if variables: return template.template_text.format(**variables) return template.template_text return None except Exception as e: logging.error(f"Error retrieving message by alias: {e}") return None