256 lines
9.5 KiB
Python
256 lines
9.5 KiB
Python
"""
|
|
File upload and processing service
|
|
"""
|
|
import os
|
|
import hashlib
|
|
import mimetypes
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from werkzeug.utils import secure_filename
|
|
from app.models import Device, FileUpload
|
|
from config.database_config import get_db
|
|
import logging
|
|
|
|
class FileUploadService:
|
|
"""Service for handling file uploads and processing"""
|
|
|
|
def __init__(self):
|
|
self.db = get_db()
|
|
self.upload_folder = Path("data/uploads")
|
|
self.upload_folder.mkdir(exist_ok=True)
|
|
|
|
# Allowed file extensions
|
|
self.allowed_extensions = {
|
|
'txt', 'log', 'conf', 'cfg', 'json', 'yml', 'yaml',
|
|
'py', 'sh', 'service', 'env', 'ini'
|
|
}
|
|
|
|
# Max file size (50MB)
|
|
self.max_file_size = 50 * 1024 * 1024
|
|
|
|
def process_uploaded_file(self, file, device_info):
|
|
"""Process uploaded file from device"""
|
|
try:
|
|
# Validate file
|
|
if not file or file.filename == '':
|
|
return {'success': False, 'error': 'No file provided'}
|
|
|
|
# Check file extension
|
|
filename = secure_filename(file.filename)
|
|
if not self._allowed_file(filename):
|
|
return {
|
|
'success': False,
|
|
'error': f'File type not allowed. Allowed: {", ".join(self.allowed_extensions)}'
|
|
}
|
|
|
|
# Check file size
|
|
file.seek(0, 2) # Seek to end
|
|
file_size = file.tell()
|
|
file.seek(0) # Reset position
|
|
|
|
if file_size > self.max_file_size:
|
|
return {
|
|
'success': False,
|
|
'error': f'File too large. Max size: {self.max_file_size // (1024*1024)}MB'
|
|
}
|
|
|
|
# Calculate file hash
|
|
file_content = file.read()
|
|
file.seek(0) # Reset for saving
|
|
file_hash = hashlib.sha256(file_content).hexdigest()
|
|
|
|
with self.db.get_session() as session:
|
|
# Get or create device
|
|
device = self._get_or_create_device(session, device_info)
|
|
|
|
# Check for duplicate file
|
|
existing_file = session.query(FileUpload).filter_by(file_hash=file_hash).first()
|
|
if existing_file:
|
|
return {
|
|
'success': True,
|
|
'message': 'File already exists (duplicate detected)',
|
|
'file_id': existing_file.id,
|
|
'duplicate': True
|
|
}
|
|
|
|
# Generate unique filename
|
|
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
|
|
new_filename = f"{device.hostname}_{timestamp}_{filename}"
|
|
file_path = self.upload_folder / new_filename
|
|
|
|
# Save file
|
|
with open(file_path, 'wb') as f:
|
|
f.write(file_content)
|
|
|
|
# Get MIME type
|
|
mime_type, _ = mimetypes.guess_type(filename)
|
|
|
|
# Create file record
|
|
file_upload = FileUpload(
|
|
device_id=device.id,
|
|
filename=new_filename,
|
|
original_filename=filename,
|
|
file_path=str(file_path),
|
|
file_size=file_size,
|
|
file_hash=file_hash,
|
|
mime_type=mime_type,
|
|
upload_date=datetime.utcnow(),
|
|
upload_ip=device_info.get('device_ip'),
|
|
processed=False,
|
|
processing_status='pending'
|
|
)
|
|
|
|
session.add(file_upload)
|
|
session.flush()
|
|
|
|
# Process file content if it's a log file
|
|
if self._is_log_file(filename, mime_type):
|
|
self._process_log_file(file_upload, file_content)
|
|
|
|
return {
|
|
'success': True,
|
|
'message': 'File uploaded successfully',
|
|
'file_id': file_upload.id,
|
|
'filename': new_filename,
|
|
'size': file_size,
|
|
'hash': file_hash,
|
|
'processed': file_upload.processed
|
|
}
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error processing uploaded file: {e}")
|
|
return {
|
|
'success': False,
|
|
'error': str(e)
|
|
}
|
|
|
|
def _allowed_file(self, filename):
|
|
"""Check if file extension is allowed"""
|
|
return '.' in filename and \
|
|
filename.rsplit('.', 1)[1].lower() in self.allowed_extensions
|
|
|
|
def _get_or_create_device(self, session, device_info):
|
|
"""Get existing device or create new one"""
|
|
device = session.query(Device).filter_by(
|
|
hostname=device_info['hostname'],
|
|
device_ip=device_info['device_ip']
|
|
).first()
|
|
|
|
if not device:
|
|
device = Device(
|
|
hostname=device_info['hostname'],
|
|
device_ip=device_info['device_ip'],
|
|
nume_masa=device_info['nume_masa'],
|
|
last_seen=datetime.utcnow(),
|
|
status='active'
|
|
)
|
|
session.add(device)
|
|
session.flush()
|
|
else:
|
|
device.last_seen = datetime.utcnow()
|
|
if device.nume_masa != device_info['nume_masa']:
|
|
device.nume_masa = device_info['nume_masa']
|
|
|
|
return device
|
|
|
|
def _is_log_file(self, filename, mime_type):
|
|
"""Check if file is a log file that should be processed"""
|
|
log_extensions = {'log', 'txt'}
|
|
log_keywords = ['log', 'error', 'debug', 'trace', 'audit']
|
|
|
|
# Check extension
|
|
if '.' in filename:
|
|
ext = filename.rsplit('.', 1)[1].lower()
|
|
if ext in log_extensions:
|
|
return True
|
|
|
|
# Check filename for log keywords
|
|
filename_lower = filename.lower()
|
|
for keyword in log_keywords:
|
|
if keyword in filename_lower:
|
|
return True
|
|
|
|
# Check MIME type
|
|
if mime_type and 'text' in mime_type:
|
|
return True
|
|
|
|
return False
|
|
|
|
def _process_log_file(self, file_upload, content):
|
|
"""Process log file content to extract log entries"""
|
|
try:
|
|
# Mark as log file
|
|
file_upload.is_log_file = True
|
|
|
|
# Simple log processing - split by lines
|
|
lines = content.decode('utf-8', errors='ignore').split('\n')
|
|
entries_extracted = 0
|
|
|
|
from app.services.log_service import LogCompressionService
|
|
log_service = LogCompressionService()
|
|
|
|
device_info = {
|
|
'hostname': file_upload.device.hostname,
|
|
'device_ip': file_upload.device.device_ip,
|
|
'nume_masa': file_upload.device.nume_masa
|
|
}
|
|
|
|
for line_num, line in enumerate(lines, 1):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
# Try to extract timestamp and message
|
|
# This is a simple implementation - enhance as needed
|
|
message = f"[File: {file_upload.original_filename}:{line_num}] {line}"
|
|
|
|
# Process through log compression service
|
|
result = log_service.process_log_message(
|
|
device_info=device_info,
|
|
message=message,
|
|
severity='info'
|
|
)
|
|
|
|
if result['success']:
|
|
entries_extracted += 1
|
|
|
|
# Update file record
|
|
file_upload.log_entries_extracted = entries_extracted
|
|
file_upload.processed = True
|
|
file_upload.processing_status = 'completed'
|
|
|
|
logging.info(f"Processed log file {file_upload.filename}: {entries_extracted} entries extracted")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error processing log file content: {e}")
|
|
file_upload.processing_status = 'error'
|
|
file_upload.processing_error = str(e)
|
|
|
|
def get_upload_stats(self):
|
|
"""Get file upload statistics"""
|
|
try:
|
|
with self.db.get_session() as session:
|
|
total_files = session.query(FileUpload).count()
|
|
log_files = session.query(FileUpload).filter_by(is_log_file=True).count()
|
|
|
|
# Calculate total size
|
|
total_size = session.query(
|
|
session.func.sum(FileUpload.file_size)
|
|
).scalar() or 0
|
|
|
|
# Count by processing status
|
|
processed = session.query(FileUpload).filter_by(processed=True).count()
|
|
pending = session.query(FileUpload).filter(
|
|
FileUpload.processing_status == 'pending'
|
|
).count()
|
|
|
|
return {
|
|
'total_files': total_files,
|
|
'log_files': log_files,
|
|
'total_size': total_size,
|
|
'processed': processed,
|
|
'pending': pending
|
|
}
|
|
except Exception as e:
|
|
logging.error(f"Error getting upload stats: {e}")
|
|
return {'error': str(e)} |