Files
Server_Monitorizare_v2/app/services/log_service.py
2026-04-23 15:55:46 +03:00

378 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Log processing service with message compression and aliasing
"""
import json
import re
import hashlib
from typing import Dict, List, Optional, Tuple
from datetime import datetime
from sqlalchemy.orm import Session
from app.models import Device, LogEntry, MessageTemplate
from config.database_config import get_db
import logging
class LogCompressionService:
"""Service for compressing log messages using templates and aliases"""
def __init__(self):
self.db = get_db()
self.template_patterns = self._load_common_patterns()
def _load_common_patterns(self) -> List[Dict]:
"""Load common log message patterns for template matching"""
return [
{
'pattern': r'Card detected: ([A-F0-9]+)',
'template': 'Card detected: {card_id}',
'category': 'card_detection',
'alias_prefix': 'CD'
},
{
'pattern': r'Connection failed: (.+)',
'template': 'Connection failed: {error}',
'category': 'connection_error',
'alias_prefix': 'CE'
},
{
'pattern': r'System startup completed in ([0-9.]+)s',
'template': 'System startup completed in {time}s',
'category': 'system_startup',
'alias_prefix': 'SS'
},
{
'pattern': r'Auto-update: (.+)',
'template': 'Auto-update: {message}',
'category': 'auto_update',
'alias_prefix': 'AU'
},
{
'pattern': r'Command \'([^\']+)\' (SUCCESS|FAILED)',
'template': 'Command \'{command}\' {status}',
'category': 'command_execution',
'alias_prefix': 'EX'
},
{
'pattern': r'Temperature: ([0-9.]+)°C',
'template': 'Temperature: {temp}°C',
'category': 'temperature',
'alias_prefix': 'TM'
}
]
def process_log_message(self, device_info: Dict, message: str, severity: str = 'info') -> Dict:
"""
Process incoming log message with compression
Args:
device_info: Dict with hostname, device_ip, nume_masa
message: Log message text
severity: Message severity level
Returns:
Dict with processing results and storage info
"""
try:
with self.db.get_session() as session:
# Get or create device
device = self._get_or_create_device(session, device_info)
# Try to match message to existing template
template, variables = self._match_message_template(session, message)
if template:
# Use existing template
log_entry = LogEntry(
device_id=device.id,
template_id=template.id,
template_variables=json.dumps(variables) if variables else None,
severity=severity,
timestamp=datetime.utcnow()
)
# Update template usage count
template.usage_count += 1
# Calculate size savings
original_size = len(message.encode('utf-8'))
compressed_size = len(template.alias.encode('utf-8')) + \
len(json.dumps(variables or {}).encode('utf-8'))
compression_info = {
'used_template': True,
'template_alias': template.alias,
'original_size': original_size,
'compressed_size': compressed_size,
'savings_percent': ((original_size - compressed_size) / original_size) * 100
}
else:
# Create new template if message matches a pattern
template = self._create_new_template(session, message)
if template:
# New template created
variables = self._extract_variables(message, template.template_text)
log_entry = LogEntry(
device_id=device.id,
template_id=template.id,
template_variables=json.dumps(variables) if variables else None,
severity=severity,
timestamp=datetime.utcnow()
)
template.usage_count = 1
compression_info = {
'used_template': True,
'template_alias': template.alias,
'new_template': True,
'original_size': len(message.encode('utf-8')),
'compressed_size': len(template.alias.encode('utf-8'))
}
else:
# Store as full message
log_entry = LogEntry(
device_id=device.id,
full_message=message,
severity=severity,
timestamp=datetime.utcnow()
)
compression_info = {
'used_template': False,
'stored_full': True,
'original_size': len(message.encode('utf-8'))
}
session.add(log_entry)
session.flush() # Get the log entry ID
return {
'success': True,
'log_id': log_entry.id,
'device_id': device.id,
'compression': compression_info,
'message': 'Log processed successfully'
}
except Exception as e:
logging.error(f"Error processing log message: {e}")
return {
'success': False,
'error': str(e),
'message': 'Log processing failed'
}
@staticmethod
def _infer_device_type(hostname: str) -> str:
"""Guess device type from hostname pattern."""
h = hostname.upper()
if any(k in h for k in ('RPI', 'PI', 'RASP')):
return 'Raspberry Pi'
if any(k in h for k in ('SRV', 'SERVER')):
return 'Server'
if any(k in h for k in ('PC', 'DESK', 'WRK')):
return 'PC'
if any(k in h for k in ('LAPTOP', 'NB')):
return 'Laptop'
return 'unknown'
def _get_or_create_device(self, session: Session, device_info: Dict) -> Device:
"""Get existing device or create new one.
Lookup priority:
1. MAC address (most reliable survives IP/hostname changes)
2. hostname + device_ip (legacy fallback)
"""
mac = device_info.get('mac_address')
device = None
# 1. Try MAC lookup first
if mac:
device = session.query(Device).filter_by(mac_address=mac).first()
# 2. Fall back to hostname+IP
if not device:
device = session.query(Device).filter_by(
hostname=device_info['hostname'],
device_ip=device_info['device_ip']
).first()
if not device:
device = Device(
hostname=device_info['hostname'],
device_ip=device_info['device_ip'],
nume_masa=device_info['nume_masa'],
device_type=device_info.get('device_type') or self._infer_device_type(device_info['hostname']),
os_version=device_info.get('os_version'),
location=device_info.get('location'),
mac_address=mac or None,
last_seen=datetime.utcnow(),
status='active'
)
session.add(device)
session.flush()
else:
# Always update last_seen and nome_masa
device.last_seen = datetime.utcnow()
if device.nume_masa != device_info['nume_masa']:
device.nume_masa = device_info['nume_masa']
# Sync MAC address if we now know it and device doesn't have one
if mac and not device.mac_address:
device.mac_address = mac
# Update type from hostname if still unknown
if not device.device_type or device.device_type == 'unknown':
device.device_type = device_info.get('device_type') or self._infer_device_type(device_info['hostname'])
# Update OS / location only when client sends them (don't overwrite manual edits with None)
if device_info.get('os_version'):
device.os_version = device_info['os_version']
if device_info.get('location'):
device.location = device_info['location']
return device
def _match_message_template(self, session: Session, message: str) -> Tuple[Optional[MessageTemplate], Optional[Dict]]:
"""Try to match message to existing template"""
# First, try exact template match
message_hash = MessageTemplate.create_hash(message)
template = session.query(MessageTemplate).filter_by(template_hash=message_hash).first()
if template:
return template, None
# Try pattern matching with variable extraction
for pattern_info in self.template_patterns:
match = re.match(pattern_info['pattern'], message)
if match:
# Look for template with this pattern
template_text = pattern_info['template']
template = session.query(MessageTemplate).filter_by(
template_text=template_text,
category=pattern_info['category']
).first()
if template:
# Extract variables
variables = {}
for i, group in enumerate(match.groups(), 1):
# Map to variable names based on template
if '{card_id}' in template_text and pattern_info['category'] == 'card_detection':
variables['card_id'] = group
elif '{error}' in template_text and pattern_info['category'] == 'connection_error':
variables['error'] = group
elif '{time}' in template_text and pattern_info['category'] == 'system_startup':
variables['time'] = group
elif '{message}' in template_text:
variables['message'] = group
elif '{command}' in template_text and i == 1:
variables['command'] = group
elif '{status}' in template_text and i == 2:
variables['status'] = group
elif '{temp}' in template_text:
variables['temp'] = group
return template, variables
return None, None
def _create_new_template(self, session: Session, message: str) -> Optional[MessageTemplate]:
"""Create new template if message matches a known pattern"""
for pattern_info in self.template_patterns:
match = re.match(pattern_info['pattern'], message)
if match:
# Check if template already exists
existing = session.query(MessageTemplate).filter_by(
template_text=pattern_info['template'],
category=pattern_info['category']
).first()
if existing:
return existing
# Create new template
alias = self._generate_alias(session, pattern_info['alias_prefix'])
template_hash = MessageTemplate.create_hash(pattern_info['template'])
template = MessageTemplate(
template_hash=template_hash,
template_text=pattern_info['template'],
category=pattern_info['category'],
alias=alias,
created_at=datetime.utcnow()
)
session.add(template)
session.flush()
return template
return None
def _generate_alias(self, session: Session, prefix: str) -> str:
"""Generate unique alias for template"""
# Find highest existing alias number for this prefix
existing_aliases = session.query(MessageTemplate.alias).filter(
MessageTemplate.alias.like(f"{prefix}%")
).all()
max_num = 0
for (alias,) in existing_aliases:
try:
num = int(alias[len(prefix):])
max_num = max(max_num, num)
except ValueError:
continue
return f"{prefix}{max_num + 1:03d}"
def _extract_variables(self, message: str, template: str) -> Dict:
"""Extract variables from message using template"""
# Simple variable extraction - could be enhanced
variables = {}
# This is a simplified implementation
# In production, you'd want more sophisticated template matching
return variables
def get_compression_stats(self) -> Dict:
"""Get compression statistics"""
try:
with self.db.get_session() as session:
# Count total logs
total_logs = session.query(LogEntry).count()
# Count templated logs
templated_logs = session.query(LogEntry).filter(
LogEntry.template_id.isnot(None)
).count()
# Count templates
total_templates = session.query(MessageTemplate).count()
# Calculate average savings (simplified)
compression_ratio = (templated_logs / total_logs * 100) if total_logs > 0 else 0
return {
'total_logs': total_logs,
'templated_logs': templated_logs,
'total_templates': total_templates,
'compression_ratio': round(compression_ratio, 2),
'estimated_savings': round(compression_ratio * 0.6, 2) # Estimated 60% savings per template
}
except Exception as e:
logging.error(f"Error getting compression stats: {e}")
return {'error': str(e)}
def get_message_by_alias(self, alias: str, variables: Dict = None) -> Optional[str]:
"""Retrieve full message using alias and variables"""
try:
with self.db.get_session() as session:
template = session.query(MessageTemplate).filter_by(alias=alias).first()
if template:
if variables:
return template.template_text.format(**variables)
return template.template_text
return None
except Exception as e:
logging.error(f"Error retrieving message by alias: {e}")
return None