Initial commit: enterprise digital platform with portal SSO, DigiServer, IT Assets, NetworkView, Server Monitor

This commit is contained in:
ske087
2026-05-10 21:07:50 +03:00
commit 8d9df56b0b
364 changed files with 73655 additions and 0 deletions
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,324 @@
"""
Device management service with CRUD operations and device monitoring
"""
import logging
from typing import List, Optional, Dict, Any
from datetime import datetime, timedelta
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import desc, func, and_
from app.models import Device, LogEntry, FileUpload, InventoryGroup
from config.database_config import get_db
class DeviceService:
"""Service for managing devices and device-related operations"""
def __init__(self):
self.db = get_db()
self.logger = logging.getLogger(__name__)
# Basic CRUD Operations
def create_device(self, hostname: str, device_ip: str, nume_masa: str, **kwargs) -> Device:
"""Create a new device"""
try:
with self.db.get_session() as session:
# Check if device already exists
existing = session.query(Device).filter(
(Device.hostname == hostname) | (Device.device_ip == device_ip)
).first()
if existing:
raise ValueError(f"Device with hostname '{hostname}' or IP '{device_ip}' already exists")
device = Device(
hostname=hostname,
device_ip=device_ip,
nume_masa=nume_masa,
device_type=kwargs.get('device_type', 'unknown'),
os_version=kwargs.get('os_version'),
status=kwargs.get('status', 'active'),
location=kwargs.get('location'),
description=kwargs.get('description'),
last_seen=datetime.utcnow()
)
session.add(device)
session.commit()
session.refresh(device)
self.logger.info(f"Created device: {hostname} ({device_ip})")
return device
except Exception as e:
self.logger.error(f"Error creating device: {e}")
raise
def get_device_by_id(self, device_id: int) -> Optional[Device]:
"""Get device by ID with relationships loaded"""
try:
with self.db.get_session() as session:
return session.query(Device).options(
joinedload(Device.logs),
joinedload(Device.files),
joinedload(Device.inventory_groups)
).filter(Device.id == device_id).first()
except Exception as e:
self.logger.error(f"Error getting device {device_id}: {e}")
return None
def get_device_by_hostname(self, hostname: str) -> Optional[Device]:
"""Get device by hostname"""
try:
with self.db.get_session() as session:
return session.query(Device).filter(Device.hostname == hostname).first()
except Exception as e:
self.logger.error(f"Error getting device by hostname {hostname}: {e}")
return None
def get_device_by_ip(self, device_ip: str) -> Optional[Device]:
"""Get device by IP address"""
try:
with self.db.get_session() as session:
return session.query(Device).filter(Device.device_ip == device_ip).first()
except Exception as e:
self.logger.error(f"Error getting device by IP {device_ip}: {e}")
return None
def get_all_devices(self, status: Optional[str] = None, limit: Optional[int] = None) -> List[Device]:
"""Get all devices with optional filtering"""
try:
with self.db.get_session() as session:
query = session.query(Device).order_by(desc(Device.last_seen))
if status:
query = query.filter(Device.status == status)
if limit:
query = query.limit(limit)
return query.all()
except Exception as e:
self.logger.error(f"Error getting devices: {e}")
return []
def update_device(self, device_id: int, **kwargs) -> Optional[Device]:
"""Update device information"""
try:
with self.db.get_session() as session:
device = session.query(Device).filter(Device.id == device_id).first()
if not device:
return None
# Update allowed fields
allowed_fields = [
'hostname', 'device_ip', 'nume_masa', 'device_type',
'os_version', 'status', 'location', 'description'
]
for field, value in kwargs.items():
if field in allowed_fields and hasattr(device, field):
setattr(device, field, value)
session.commit()
session.refresh(device)
self.logger.info(f"Updated device {device_id}")
return device
except Exception as e:
self.logger.error(f"Error updating device {device_id}: {e}")
raise
def delete_device(self, device_id: int) -> bool:
"""Delete device (soft delete by setting status to inactive)"""
try:
with self.db.get_session() as session:
device = session.query(Device).filter(Device.id == device_id).first()
if not device:
return False
# Soft delete - set status to inactive
device.status = 'inactive'
session.commit()
self.logger.info(f"Soft deleted device {device_id}")
return True
except Exception as e:
self.logger.error(f"Error deleting device {device_id}: {e}")
return False
# Device Monitoring Functions
def update_device_last_seen(self, hostname: str = None, device_ip: str = None) -> Optional[Device]:
"""Update device last seen timestamp"""
try:
with self.db.get_session() as session:
device = None
if hostname:
device = session.query(Device).filter(Device.hostname == hostname).first()
elif device_ip:
device = session.query(Device).filter(Device.device_ip == device_ip).first()
if device:
device.last_seen = datetime.utcnow()
session.commit()
return device
except Exception as e:
self.logger.error(f"Error updating last seen: {e}")
return None
def get_device_statistics(self, device_id: int) -> Dict[str, Any]:
"""Get comprehensive statistics for a device"""
try:
with self.db.get_session() as session:
device = session.query(Device).filter(Device.id == device_id).first()
if not device:
return {}
# Log statistics
total_logs = session.query(LogEntry).filter(LogEntry.device_id == device_id).count()
# Logs by severity
severity_counts = session.query(
LogEntry.severity,
func.count(LogEntry.id)
).filter(
LogEntry.device_id == device_id
).group_by(LogEntry.severity).all()
# Recent activity (last 24 hours)
last_24h = datetime.utcnow() - timedelta(hours=24)
recent_logs = session.query(LogEntry).filter(
and_(LogEntry.device_id == device_id, LogEntry.timestamp >= last_24h)
).count()
# File uploads
total_files = session.query(FileUpload).filter(
FileUpload.device_id == device_id
).count()
# Last log
last_log = session.query(LogEntry).filter(
LogEntry.device_id == device_id
).order_by(desc(LogEntry.timestamp)).first()
return {
'device': device,
'total_logs': total_logs,
'severity_counts': dict(severity_counts),
'recent_logs_24h': recent_logs,
'total_files': total_files,
'last_log': last_log,
'uptime_days': (datetime.utcnow() - device.last_seen).days if device.last_seen else 0
}
except Exception as e:
self.logger.error(f"Error getting device statistics: {e}")
return {}
def get_inactive_devices(self, hours: int = 24) -> List[Device]:
"""Get devices that haven't been seen recently"""
try:
with self.db.get_session() as session:
cutoff_time = datetime.utcnow() - timedelta(hours=hours)
return session.query(Device).filter(
and_(
Device.last_seen < cutoff_time,
Device.status.in_(['active', 'maintenance'])
)
).order_by(desc(Device.last_seen)).all()
except Exception as e:
self.logger.error(f"Error getting inactive devices: {e}")
return []
def get_device_logs(self, device_id: int, limit: int = 100, severity: str = None) -> List[LogEntry]:
"""Get logs for a specific device"""
try:
with self.db.get_session() as session:
query = session.query(LogEntry).filter(
LogEntry.device_id == device_id
).order_by(desc(LogEntry.timestamp))
if severity:
query = query.filter(LogEntry.severity == severity)
return query.limit(limit).all()
except Exception as e:
self.logger.error(f"Error getting device logs: {e}")
return []
def search_devices(self, search_term: str) -> List[Device]:
"""Search devices by hostname, IP, or description"""
try:
with self.db.get_session() as session:
search_pattern = f"%{search_term}%"
return session.query(Device).filter(
(Device.hostname.like(search_pattern)) |
(Device.device_ip.like(search_pattern)) |
(Device.nume_masa.like(search_pattern)) |
(Device.location.like(search_pattern)) |
(Device.description.like(search_pattern))
).order_by(desc(Device.last_seen)).all()
except Exception as e:
self.logger.error(f"Error searching devices: {e}")
return []
# Bulk Operations
def bulk_update_status(self, device_ids: List[int], status: str) -> int:
"""Update status for multiple devices"""
try:
with self.db.get_session() as session:
updated = session.query(Device).filter(
Device.id.in_(device_ids)
).update({Device.status: status}, synchronize_session=False)
session.commit()
self.logger.info(f"Updated status for {updated} devices")
return updated
except Exception as e:
self.logger.error(f"Error bulk updating status: {e}")
return 0
def get_device_summary(self) -> Dict[str, Any]:
"""Get summary statistics for all devices"""
try:
with self.db.get_session() as session:
# Device status counts
status_counts = session.query(
Device.status,
func.count(Device.id)
).group_by(Device.status).all()
# Device type counts
type_counts = session.query(
Device.device_type,
func.count(Device.id)
).group_by(Device.device_type).all()
# Recent activity
last_24h = datetime.utcnow() - timedelta(hours=24)
devices_seen_24h = session.query(Device).filter(
Device.last_seen >= last_24h
).count()
return {
'total_devices': session.query(Device).count(),
'status_counts': dict(status_counts),
'type_counts': dict(type_counts),
'devices_seen_24h': devices_seen_24h
}
except Exception as e:
self.logger.error(f"Error getting device summary: {e}")
return {}
@@ -0,0 +1,256 @@
"""
File upload and processing service
"""
import os
import hashlib
import mimetypes
from datetime import datetime
from pathlib import Path
from werkzeug.utils import secure_filename
from app.models import Device, FileUpload
from config.database_config import get_db
import logging
class FileUploadService:
"""Service for handling file uploads and processing"""
def __init__(self):
self.db = get_db()
self.upload_folder = Path("data/uploads")
self.upload_folder.mkdir(exist_ok=True)
# Allowed file extensions
self.allowed_extensions = {
'txt', 'log', 'conf', 'cfg', 'json', 'yml', 'yaml',
'py', 'sh', 'service', 'env', 'ini'
}
# Max file size (50MB)
self.max_file_size = 50 * 1024 * 1024
def process_uploaded_file(self, file, device_info):
"""Process uploaded file from device"""
try:
# Validate file
if not file or file.filename == '':
return {'success': False, 'error': 'No file provided'}
# Check file extension
filename = secure_filename(file.filename)
if not self._allowed_file(filename):
return {
'success': False,
'error': f'File type not allowed. Allowed: {", ".join(self.allowed_extensions)}'
}
# Check file size
file.seek(0, 2) # Seek to end
file_size = file.tell()
file.seek(0) # Reset position
if file_size > self.max_file_size:
return {
'success': False,
'error': f'File too large. Max size: {self.max_file_size // (1024*1024)}MB'
}
# Calculate file hash
file_content = file.read()
file.seek(0) # Reset for saving
file_hash = hashlib.sha256(file_content).hexdigest()
with self.db.get_session() as session:
# Get or create device
device = self._get_or_create_device(session, device_info)
# Check for duplicate file
existing_file = session.query(FileUpload).filter_by(file_hash=file_hash).first()
if existing_file:
return {
'success': True,
'message': 'File already exists (duplicate detected)',
'file_id': existing_file.id,
'duplicate': True
}
# Generate unique filename
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
new_filename = f"{device.hostname}_{timestamp}_{filename}"
file_path = self.upload_folder / new_filename
# Save file
with open(file_path, 'wb') as f:
f.write(file_content)
# Get MIME type
mime_type, _ = mimetypes.guess_type(filename)
# Create file record
file_upload = FileUpload(
device_id=device.id,
filename=new_filename,
original_filename=filename,
file_path=str(file_path),
file_size=file_size,
file_hash=file_hash,
mime_type=mime_type,
upload_date=datetime.utcnow(),
upload_ip=device_info.get('device_ip'),
processed=False,
processing_status='pending'
)
session.add(file_upload)
session.flush()
# Process file content if it's a log file
if self._is_log_file(filename, mime_type):
self._process_log_file(file_upload, file_content)
return {
'success': True,
'message': 'File uploaded successfully',
'file_id': file_upload.id,
'filename': new_filename,
'size': file_size,
'hash': file_hash,
'processed': file_upload.processed
}
except Exception as e:
logging.error(f"Error processing uploaded file: {e}")
return {
'success': False,
'error': str(e)
}
def _allowed_file(self, filename):
"""Check if file extension is allowed"""
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in self.allowed_extensions
def _get_or_create_device(self, session, device_info):
"""Get existing device or create new one"""
device = session.query(Device).filter_by(
hostname=device_info['hostname'],
device_ip=device_info['device_ip']
).first()
if not device:
device = Device(
hostname=device_info['hostname'],
device_ip=device_info['device_ip'],
nume_masa=device_info['nume_masa'],
last_seen=datetime.utcnow(),
status='active'
)
session.add(device)
session.flush()
else:
device.last_seen = datetime.utcnow()
if device.nume_masa != device_info['nume_masa']:
device.nume_masa = device_info['nume_masa']
return device
def _is_log_file(self, filename, mime_type):
"""Check if file is a log file that should be processed"""
log_extensions = {'log', 'txt'}
log_keywords = ['log', 'error', 'debug', 'trace', 'audit']
# Check extension
if '.' in filename:
ext = filename.rsplit('.', 1)[1].lower()
if ext in log_extensions:
return True
# Check filename for log keywords
filename_lower = filename.lower()
for keyword in log_keywords:
if keyword in filename_lower:
return True
# Check MIME type
if mime_type and 'text' in mime_type:
return True
return False
def _process_log_file(self, file_upload, content):
"""Process log file content to extract log entries"""
try:
# Mark as log file
file_upload.is_log_file = True
# Simple log processing - split by lines
lines = content.decode('utf-8', errors='ignore').split('\n')
entries_extracted = 0
from app.services.log_service import LogCompressionService
log_service = LogCompressionService()
device_info = {
'hostname': file_upload.device.hostname,
'device_ip': file_upload.device.device_ip,
'nume_masa': file_upload.device.nume_masa
}
for line_num, line in enumerate(lines, 1):
line = line.strip()
if not line:
continue
# Try to extract timestamp and message
# This is a simple implementation - enhance as needed
message = f"[File: {file_upload.original_filename}:{line_num}] {line}"
# Process through log compression service
result = log_service.process_log_message(
device_info=device_info,
message=message,
severity='info'
)
if result['success']:
entries_extracted += 1
# Update file record
file_upload.log_entries_extracted = entries_extracted
file_upload.processed = True
file_upload.processing_status = 'completed'
logging.info(f"Processed log file {file_upload.filename}: {entries_extracted} entries extracted")
except Exception as e:
logging.error(f"Error processing log file content: {e}")
file_upload.processing_status = 'error'
file_upload.processing_error = str(e)
def get_upload_stats(self):
"""Get file upload statistics"""
try:
with self.db.get_session() as session:
total_files = session.query(FileUpload).count()
log_files = session.query(FileUpload).filter_by(is_log_file=True).count()
# Calculate total size
total_size = session.query(
session.func.sum(FileUpload.file_size)
).scalar() or 0
# Count by processing status
processed = session.query(FileUpload).filter_by(processed=True).count()
pending = session.query(FileUpload).filter(
FileUpload.processing_status == 'pending'
).count()
return {
'total_files': total_files,
'log_files': log_files,
'total_size': total_size,
'processed': processed,
'pending': pending
}
except Exception as e:
logging.error(f"Error getting upload stats: {e}")
return {'error': str(e)}
@@ -0,0 +1,378 @@
"""
Log processing service with message compression and aliasing
"""
import json
import re
import hashlib
from typing import Dict, List, Optional, Tuple
from datetime import datetime
from sqlalchemy.orm import Session
from app.models import Device, LogEntry, MessageTemplate
from config.database_config import get_db
import logging
class LogCompressionService:
"""Service for compressing log messages using templates and aliases"""
def __init__(self):
self.db = get_db()
self.template_patterns = self._load_common_patterns()
def _load_common_patterns(self) -> List[Dict]:
"""Load common log message patterns for template matching"""
return [
{
'pattern': r'Card detected: ([A-F0-9]+)',
'template': 'Card detected: {card_id}',
'category': 'card_detection',
'alias_prefix': 'CD'
},
{
'pattern': r'Connection failed: (.+)',
'template': 'Connection failed: {error}',
'category': 'connection_error',
'alias_prefix': 'CE'
},
{
'pattern': r'System startup completed in ([0-9.]+)s',
'template': 'System startup completed in {time}s',
'category': 'system_startup',
'alias_prefix': 'SS'
},
{
'pattern': r'Auto-update: (.+)',
'template': 'Auto-update: {message}',
'category': 'auto_update',
'alias_prefix': 'AU'
},
{
'pattern': r'Command \'([^\']+)\' (SUCCESS|FAILED)',
'template': 'Command \'{command}\' {status}',
'category': 'command_execution',
'alias_prefix': 'EX'
},
{
'pattern': r'Temperature: ([0-9.]+)°C',
'template': 'Temperature: {temp}°C',
'category': 'temperature',
'alias_prefix': 'TM'
}
]
def process_log_message(self, device_info: Dict, message: str, severity: str = 'info') -> Dict:
"""
Process incoming log message with compression
Args:
device_info: Dict with hostname, device_ip, nume_masa
message: Log message text
severity: Message severity level
Returns:
Dict with processing results and storage info
"""
try:
with self.db.get_session() as session:
# Get or create device
device = self._get_or_create_device(session, device_info)
# Try to match message to existing template
template, variables = self._match_message_template(session, message)
if template:
# Use existing template
log_entry = LogEntry(
device_id=device.id,
template_id=template.id,
template_variables=json.dumps(variables) if variables else None,
severity=severity,
timestamp=datetime.utcnow()
)
# Update template usage count
template.usage_count += 1
# Calculate size savings
original_size = len(message.encode('utf-8'))
compressed_size = len(template.alias.encode('utf-8')) + \
len(json.dumps(variables or {}).encode('utf-8'))
compression_info = {
'used_template': True,
'template_alias': template.alias,
'original_size': original_size,
'compressed_size': compressed_size,
'savings_percent': ((original_size - compressed_size) / original_size) * 100
}
else:
# Create new template if message matches a pattern
template = self._create_new_template(session, message)
if template:
# New template created
variables = self._extract_variables(message, template.template_text)
log_entry = LogEntry(
device_id=device.id,
template_id=template.id,
template_variables=json.dumps(variables) if variables else None,
severity=severity,
timestamp=datetime.utcnow()
)
template.usage_count = 1
compression_info = {
'used_template': True,
'template_alias': template.alias,
'new_template': True,
'original_size': len(message.encode('utf-8')),
'compressed_size': len(template.alias.encode('utf-8'))
}
else:
# Store as full message
log_entry = LogEntry(
device_id=device.id,
full_message=message,
severity=severity,
timestamp=datetime.utcnow()
)
compression_info = {
'used_template': False,
'stored_full': True,
'original_size': len(message.encode('utf-8'))
}
session.add(log_entry)
session.flush() # Get the log entry ID
return {
'success': True,
'log_id': log_entry.id,
'device_id': device.id,
'compression': compression_info,
'message': 'Log processed successfully'
}
except Exception as e:
logging.error(f"Error processing log message: {e}")
return {
'success': False,
'error': str(e),
'message': 'Log processing failed'
}
@staticmethod
def _infer_device_type(hostname: str) -> str:
"""Guess device type from hostname pattern."""
h = hostname.upper()
if any(k in h for k in ('RPI', 'PI', 'RASP')):
return 'Raspberry Pi'
if any(k in h for k in ('SRV', 'SERVER')):
return 'Server'
if any(k in h for k in ('PC', 'DESK', 'WRK')):
return 'PC'
if any(k in h for k in ('LAPTOP', 'NB')):
return 'Laptop'
return 'unknown'
def _get_or_create_device(self, session: Session, device_info: Dict) -> Device:
"""Get existing device or create new one.
Lookup priority:
1. MAC address (most reliable survives IP/hostname changes)
2. hostname + device_ip (legacy fallback)
"""
mac = device_info.get('mac_address')
device = None
# 1. Try MAC lookup first
if mac:
device = session.query(Device).filter_by(mac_address=mac).first()
# 2. Fall back to hostname+IP
if not device:
device = session.query(Device).filter_by(
hostname=device_info['hostname'],
device_ip=device_info['device_ip']
).first()
if not device:
device = Device(
hostname=device_info['hostname'],
device_ip=device_info['device_ip'],
nume_masa=device_info['nume_masa'],
device_type=device_info.get('device_type') or self._infer_device_type(device_info['hostname']),
os_version=device_info.get('os_version'),
location=device_info.get('location'),
mac_address=mac or None,
last_seen=datetime.utcnow(),
status='active'
)
session.add(device)
session.flush()
else:
# Always update last_seen and nome_masa
device.last_seen = datetime.utcnow()
if device.nume_masa != device_info['nume_masa']:
device.nume_masa = device_info['nume_masa']
# Sync MAC address if we now know it and device doesn't have one
if mac and not device.mac_address:
device.mac_address = mac
# Update type from hostname if still unknown
if not device.device_type or device.device_type == 'unknown':
device.device_type = device_info.get('device_type') or self._infer_device_type(device_info['hostname'])
# Update OS / location only when client sends them (don't overwrite manual edits with None)
if device_info.get('os_version'):
device.os_version = device_info['os_version']
if device_info.get('location'):
device.location = device_info['location']
return device
def _match_message_template(self, session: Session, message: str) -> Tuple[Optional[MessageTemplate], Optional[Dict]]:
"""Try to match message to existing template"""
# First, try exact template match
message_hash = MessageTemplate.create_hash(message)
template = session.query(MessageTemplate).filter_by(template_hash=message_hash).first()
if template:
return template, None
# Try pattern matching with variable extraction
for pattern_info in self.template_patterns:
match = re.match(pattern_info['pattern'], message)
if match:
# Look for template with this pattern
template_text = pattern_info['template']
template = session.query(MessageTemplate).filter_by(
template_text=template_text,
category=pattern_info['category']
).first()
if template:
# Extract variables
variables = {}
for i, group in enumerate(match.groups(), 1):
# Map to variable names based on template
if '{card_id}' in template_text and pattern_info['category'] == 'card_detection':
variables['card_id'] = group
elif '{error}' in template_text and pattern_info['category'] == 'connection_error':
variables['error'] = group
elif '{time}' in template_text and pattern_info['category'] == 'system_startup':
variables['time'] = group
elif '{message}' in template_text:
variables['message'] = group
elif '{command}' in template_text and i == 1:
variables['command'] = group
elif '{status}' in template_text and i == 2:
variables['status'] = group
elif '{temp}' in template_text:
variables['temp'] = group
return template, variables
return None, None
def _create_new_template(self, session: Session, message: str) -> Optional[MessageTemplate]:
"""Create new template if message matches a known pattern"""
for pattern_info in self.template_patterns:
match = re.match(pattern_info['pattern'], message)
if match:
# Check if template already exists
existing = session.query(MessageTemplate).filter_by(
template_text=pattern_info['template'],
category=pattern_info['category']
).first()
if existing:
return existing
# Create new template
alias = self._generate_alias(session, pattern_info['alias_prefix'])
template_hash = MessageTemplate.create_hash(pattern_info['template'])
template = MessageTemplate(
template_hash=template_hash,
template_text=pattern_info['template'],
category=pattern_info['category'],
alias=alias,
created_at=datetime.utcnow()
)
session.add(template)
session.flush()
return template
return None
def _generate_alias(self, session: Session, prefix: str) -> str:
"""Generate unique alias for template"""
# Find highest existing alias number for this prefix
existing_aliases = session.query(MessageTemplate.alias).filter(
MessageTemplate.alias.like(f"{prefix}%")
).all()
max_num = 0
for (alias,) in existing_aliases:
try:
num = int(alias[len(prefix):])
max_num = max(max_num, num)
except ValueError:
continue
return f"{prefix}{max_num + 1:03d}"
def _extract_variables(self, message: str, template: str) -> Dict:
"""Extract variables from message using template"""
# Simple variable extraction - could be enhanced
variables = {}
# This is a simplified implementation
# In production, you'd want more sophisticated template matching
return variables
def get_compression_stats(self) -> Dict:
"""Get compression statistics"""
try:
with self.db.get_session() as session:
# Count total logs
total_logs = session.query(LogEntry).count()
# Count templated logs
templated_logs = session.query(LogEntry).filter(
LogEntry.template_id.isnot(None)
).count()
# Count templates
total_templates = session.query(MessageTemplate).count()
# Calculate average savings (simplified)
compression_ratio = (templated_logs / total_logs * 100) if total_logs > 0 else 0
return {
'total_logs': total_logs,
'templated_logs': templated_logs,
'total_templates': total_templates,
'compression_ratio': round(compression_ratio, 2),
'estimated_savings': round(compression_ratio * 0.6, 2) # Estimated 60% savings per template
}
except Exception as e:
logging.error(f"Error getting compression stats: {e}")
return {'error': str(e)}
def get_message_by_alias(self, alias: str, variables: Dict = None) -> Optional[str]:
"""Retrieve full message using alias and variables"""
try:
with self.db.get_session() as session:
template = session.query(MessageTemplate).filter_by(alias=alias).first()
if template:
if variables:
return template.template_text.format(**variables)
return template.template_text
return None
except Exception as e:
logging.error(f"Error retrieving message by alias: {e}")
return None