378 lines
16 KiB
Python
378 lines
16 KiB
Python
"""
|
||
Log processing service with message compression and aliasing
|
||
"""
|
||
import json
|
||
import re
|
||
import hashlib
|
||
from typing import Dict, List, Optional, Tuple
|
||
from datetime import datetime
|
||
from sqlalchemy.orm import Session
|
||
from app.models import Device, LogEntry, MessageTemplate
|
||
from config.database_config import get_db
|
||
import logging
|
||
|
||
class LogCompressionService:
|
||
"""Service for compressing log messages using templates and aliases"""
|
||
|
||
def __init__(self):
|
||
self.db = get_db()
|
||
self.template_patterns = self._load_common_patterns()
|
||
|
||
def _load_common_patterns(self) -> List[Dict]:
|
||
"""Load common log message patterns for template matching"""
|
||
return [
|
||
{
|
||
'pattern': r'Card detected: ([A-F0-9]+)',
|
||
'template': 'Card detected: {card_id}',
|
||
'category': 'card_detection',
|
||
'alias_prefix': 'CD'
|
||
},
|
||
{
|
||
'pattern': r'Connection failed: (.+)',
|
||
'template': 'Connection failed: {error}',
|
||
'category': 'connection_error',
|
||
'alias_prefix': 'CE'
|
||
},
|
||
{
|
||
'pattern': r'System startup completed in ([0-9.]+)s',
|
||
'template': 'System startup completed in {time}s',
|
||
'category': 'system_startup',
|
||
'alias_prefix': 'SS'
|
||
},
|
||
{
|
||
'pattern': r'Auto-update: (.+)',
|
||
'template': 'Auto-update: {message}',
|
||
'category': 'auto_update',
|
||
'alias_prefix': 'AU'
|
||
},
|
||
{
|
||
'pattern': r'Command \'([^\']+)\' (SUCCESS|FAILED)',
|
||
'template': 'Command \'{command}\' {status}',
|
||
'category': 'command_execution',
|
||
'alias_prefix': 'EX'
|
||
},
|
||
{
|
||
'pattern': r'Temperature: ([0-9.]+)°C',
|
||
'template': 'Temperature: {temp}°C',
|
||
'category': 'temperature',
|
||
'alias_prefix': 'TM'
|
||
}
|
||
]
|
||
|
||
def process_log_message(self, device_info: Dict, message: str, severity: str = 'info') -> Dict:
|
||
"""
|
||
Process incoming log message with compression
|
||
|
||
Args:
|
||
device_info: Dict with hostname, device_ip, nume_masa
|
||
message: Log message text
|
||
severity: Message severity level
|
||
|
||
Returns:
|
||
Dict with processing results and storage info
|
||
"""
|
||
try:
|
||
with self.db.get_session() as session:
|
||
# Get or create device
|
||
device = self._get_or_create_device(session, device_info)
|
||
|
||
# Try to match message to existing template
|
||
template, variables = self._match_message_template(session, message)
|
||
|
||
if template:
|
||
# Use existing template
|
||
log_entry = LogEntry(
|
||
device_id=device.id,
|
||
template_id=template.id,
|
||
template_variables=json.dumps(variables) if variables else None,
|
||
severity=severity,
|
||
timestamp=datetime.utcnow()
|
||
)
|
||
# Update template usage count
|
||
template.usage_count += 1
|
||
|
||
# Calculate size savings
|
||
original_size = len(message.encode('utf-8'))
|
||
compressed_size = len(template.alias.encode('utf-8')) + \
|
||
len(json.dumps(variables or {}).encode('utf-8'))
|
||
|
||
compression_info = {
|
||
'used_template': True,
|
||
'template_alias': template.alias,
|
||
'original_size': original_size,
|
||
'compressed_size': compressed_size,
|
||
'savings_percent': ((original_size - compressed_size) / original_size) * 100
|
||
}
|
||
else:
|
||
# Create new template if message matches a pattern
|
||
template = self._create_new_template(session, message)
|
||
|
||
if template:
|
||
# New template created
|
||
variables = self._extract_variables(message, template.template_text)
|
||
log_entry = LogEntry(
|
||
device_id=device.id,
|
||
template_id=template.id,
|
||
template_variables=json.dumps(variables) if variables else None,
|
||
severity=severity,
|
||
timestamp=datetime.utcnow()
|
||
)
|
||
template.usage_count = 1
|
||
|
||
compression_info = {
|
||
'used_template': True,
|
||
'template_alias': template.alias,
|
||
'new_template': True,
|
||
'original_size': len(message.encode('utf-8')),
|
||
'compressed_size': len(template.alias.encode('utf-8'))
|
||
}
|
||
else:
|
||
# Store as full message
|
||
log_entry = LogEntry(
|
||
device_id=device.id,
|
||
full_message=message,
|
||
severity=severity,
|
||
timestamp=datetime.utcnow()
|
||
)
|
||
|
||
compression_info = {
|
||
'used_template': False,
|
||
'stored_full': True,
|
||
'original_size': len(message.encode('utf-8'))
|
||
}
|
||
|
||
session.add(log_entry)
|
||
session.flush() # Get the log entry ID
|
||
|
||
return {
|
||
'success': True,
|
||
'log_id': log_entry.id,
|
||
'device_id': device.id,
|
||
'compression': compression_info,
|
||
'message': 'Log processed successfully'
|
||
}
|
||
|
||
except Exception as e:
|
||
logging.error(f"Error processing log message: {e}")
|
||
return {
|
||
'success': False,
|
||
'error': str(e),
|
||
'message': 'Log processing failed'
|
||
}
|
||
|
||
@staticmethod
|
||
def _infer_device_type(hostname: str) -> str:
|
||
"""Guess device type from hostname pattern."""
|
||
h = hostname.upper()
|
||
if any(k in h for k in ('RPI', 'PI', 'RASP')):
|
||
return 'Raspberry Pi'
|
||
if any(k in h for k in ('SRV', 'SERVER')):
|
||
return 'Server'
|
||
if any(k in h for k in ('PC', 'DESK', 'WRK')):
|
||
return 'PC'
|
||
if any(k in h for k in ('LAPTOP', 'NB')):
|
||
return 'Laptop'
|
||
return 'unknown'
|
||
|
||
def _get_or_create_device(self, session: Session, device_info: Dict) -> Device:
|
||
"""Get existing device or create new one.
|
||
|
||
Lookup priority:
|
||
1. MAC address (most reliable – survives IP/hostname changes)
|
||
2. hostname + device_ip (legacy fallback)
|
||
"""
|
||
mac = device_info.get('mac_address')
|
||
device = None
|
||
|
||
# 1. Try MAC lookup first
|
||
if mac:
|
||
device = session.query(Device).filter_by(mac_address=mac).first()
|
||
|
||
# 2. Fall back to hostname+IP
|
||
if not device:
|
||
device = session.query(Device).filter_by(
|
||
hostname=device_info['hostname'],
|
||
device_ip=device_info['device_ip']
|
||
).first()
|
||
|
||
if not device:
|
||
device = Device(
|
||
hostname=device_info['hostname'],
|
||
device_ip=device_info['device_ip'],
|
||
nume_masa=device_info['nume_masa'],
|
||
device_type=device_info.get('device_type') or self._infer_device_type(device_info['hostname']),
|
||
os_version=device_info.get('os_version'),
|
||
location=device_info.get('location'),
|
||
mac_address=mac or None,
|
||
last_seen=datetime.utcnow(),
|
||
status='active'
|
||
)
|
||
session.add(device)
|
||
session.flush()
|
||
else:
|
||
# Always update last_seen and nome_masa
|
||
device.last_seen = datetime.utcnow()
|
||
if device.nume_masa != device_info['nume_masa']:
|
||
device.nume_masa = device_info['nume_masa']
|
||
|
||
# Sync MAC address if we now know it and device doesn't have one
|
||
if mac and not device.mac_address:
|
||
device.mac_address = mac
|
||
|
||
# Update type from hostname if still unknown
|
||
if not device.device_type or device.device_type == 'unknown':
|
||
device.device_type = device_info.get('device_type') or self._infer_device_type(device_info['hostname'])
|
||
|
||
# Update OS / location only when client sends them (don't overwrite manual edits with None)
|
||
if device_info.get('os_version'):
|
||
device.os_version = device_info['os_version']
|
||
if device_info.get('location'):
|
||
device.location = device_info['location']
|
||
|
||
return device
|
||
|
||
def _match_message_template(self, session: Session, message: str) -> Tuple[Optional[MessageTemplate], Optional[Dict]]:
|
||
"""Try to match message to existing template"""
|
||
# First, try exact template match
|
||
message_hash = MessageTemplate.create_hash(message)
|
||
template = session.query(MessageTemplate).filter_by(template_hash=message_hash).first()
|
||
|
||
if template:
|
||
return template, None
|
||
|
||
# Try pattern matching with variable extraction
|
||
for pattern_info in self.template_patterns:
|
||
match = re.match(pattern_info['pattern'], message)
|
||
if match:
|
||
# Look for template with this pattern
|
||
template_text = pattern_info['template']
|
||
template = session.query(MessageTemplate).filter_by(
|
||
template_text=template_text,
|
||
category=pattern_info['category']
|
||
).first()
|
||
|
||
if template:
|
||
# Extract variables
|
||
variables = {}
|
||
for i, group in enumerate(match.groups(), 1):
|
||
# Map to variable names based on template
|
||
if '{card_id}' in template_text and pattern_info['category'] == 'card_detection':
|
||
variables['card_id'] = group
|
||
elif '{error}' in template_text and pattern_info['category'] == 'connection_error':
|
||
variables['error'] = group
|
||
elif '{time}' in template_text and pattern_info['category'] == 'system_startup':
|
||
variables['time'] = group
|
||
elif '{message}' in template_text:
|
||
variables['message'] = group
|
||
elif '{command}' in template_text and i == 1:
|
||
variables['command'] = group
|
||
elif '{status}' in template_text and i == 2:
|
||
variables['status'] = group
|
||
elif '{temp}' in template_text:
|
||
variables['temp'] = group
|
||
|
||
return template, variables
|
||
|
||
return None, None
|
||
|
||
def _create_new_template(self, session: Session, message: str) -> Optional[MessageTemplate]:
|
||
"""Create new template if message matches a known pattern"""
|
||
for pattern_info in self.template_patterns:
|
||
match = re.match(pattern_info['pattern'], message)
|
||
if match:
|
||
# Check if template already exists
|
||
existing = session.query(MessageTemplate).filter_by(
|
||
template_text=pattern_info['template'],
|
||
category=pattern_info['category']
|
||
).first()
|
||
|
||
if existing:
|
||
return existing
|
||
|
||
# Create new template
|
||
alias = self._generate_alias(session, pattern_info['alias_prefix'])
|
||
template_hash = MessageTemplate.create_hash(pattern_info['template'])
|
||
|
||
template = MessageTemplate(
|
||
template_hash=template_hash,
|
||
template_text=pattern_info['template'],
|
||
category=pattern_info['category'],
|
||
alias=alias,
|
||
created_at=datetime.utcnow()
|
||
)
|
||
|
||
session.add(template)
|
||
session.flush()
|
||
return template
|
||
|
||
return None
|
||
|
||
def _generate_alias(self, session: Session, prefix: str) -> str:
|
||
"""Generate unique alias for template"""
|
||
# Find highest existing alias number for this prefix
|
||
existing_aliases = session.query(MessageTemplate.alias).filter(
|
||
MessageTemplate.alias.like(f"{prefix}%")
|
||
).all()
|
||
|
||
max_num = 0
|
||
for (alias,) in existing_aliases:
|
||
try:
|
||
num = int(alias[len(prefix):])
|
||
max_num = max(max_num, num)
|
||
except ValueError:
|
||
continue
|
||
|
||
return f"{prefix}{max_num + 1:03d}"
|
||
|
||
def _extract_variables(self, message: str, template: str) -> Dict:
|
||
"""Extract variables from message using template"""
|
||
# Simple variable extraction - could be enhanced
|
||
variables = {}
|
||
# This is a simplified implementation
|
||
# In production, you'd want more sophisticated template matching
|
||
return variables
|
||
|
||
def get_compression_stats(self) -> Dict:
|
||
"""Get compression statistics"""
|
||
try:
|
||
with self.db.get_session() as session:
|
||
# Count total logs
|
||
total_logs = session.query(LogEntry).count()
|
||
|
||
# Count templated logs
|
||
templated_logs = session.query(LogEntry).filter(
|
||
LogEntry.template_id.isnot(None)
|
||
).count()
|
||
|
||
# Count templates
|
||
total_templates = session.query(MessageTemplate).count()
|
||
|
||
# Calculate average savings (simplified)
|
||
compression_ratio = (templated_logs / total_logs * 100) if total_logs > 0 else 0
|
||
|
||
return {
|
||
'total_logs': total_logs,
|
||
'templated_logs': templated_logs,
|
||
'total_templates': total_templates,
|
||
'compression_ratio': round(compression_ratio, 2),
|
||
'estimated_savings': round(compression_ratio * 0.6, 2) # Estimated 60% savings per template
|
||
}
|
||
except Exception as e:
|
||
logging.error(f"Error getting compression stats: {e}")
|
||
return {'error': str(e)}
|
||
|
||
def get_message_by_alias(self, alias: str, variables: Dict = None) -> Optional[str]:
|
||
"""Retrieve full message using alias and variables"""
|
||
try:
|
||
with self.db.get_session() as session:
|
||
template = session.query(MessageTemplate).filter_by(alias=alias).first()
|
||
|
||
if template:
|
||
if variables:
|
||
return template.template_text.format(**variables)
|
||
return template.template_text
|
||
|
||
return None
|
||
except Exception as e:
|
||
logging.error(f"Error retrieving message by alias: {e}")
|
||
return None |