Files
Server_Monitorizare/utils.py
Developer 376240fb06 Add configuration, utilities, and update server with enhanced monitoring features
- Add config.py for environment configuration management
- Add utils.py with utility functions
- Add .env.example for environment variable reference
- Add routes_example.py as route reference
- Add login.html template for authentication
- Update server.py with enhancements
- Update all dashboard and log templates
- Move documentation to 'explanations and old code' directory
- Update database schema
2025-12-18 09:11:11 +02:00

163 lines
5.0 KiB
Python

# Utility functions for the application
import logging
from functools import wraps
from flask import jsonify, request, session
from datetime import datetime
logger = logging.getLogger(__name__)
class APIError(Exception):
"""Custom exception for API errors"""
def __init__(self, message, status_code=400, details=None):
self.message = message
self.status_code = status_code
self.details = details or {}
super().__init__(self.message)
def error_response(error_message, status_code=400, details=None):
"""Create a standardized error response"""
response = {
'success': False,
'error': error_message,
'timestamp': datetime.now().isoformat()
}
if details:
response['details'] = details
return jsonify(response), status_code
def success_response(data=None, message="Success", status_code=200):
"""Create a standardized success response"""
response = {
'success': True,
'message': message,
'timestamp': datetime.now().isoformat()
}
if data is not None:
response['data'] = data
return jsonify(response), status_code
def require_auth(f):
"""Decorator to require authentication"""
@wraps(f)
def decorated_function(*args, **kwargs):
# Check for API key in headers
api_key = request.headers.get('X-API-Key')
from config import get_config
config = get_config()
if not api_key or api_key != config.API_KEY:
logger.warning(f"Unauthorized access attempt from {request.remote_addr}")
return error_response("Unauthorized", 401)
return f(*args, **kwargs)
return decorated_function
def require_session_auth(f):
"""Decorator to require session-based authentication"""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
return error_response("Authentication required", 401)
return f(*args, **kwargs)
return decorated_function
def log_request(f):
"""Decorator to log request details"""
@wraps(f)
def decorated_function(*args, **kwargs):
logger.info(f"{request.method} {request.path} from {request.remote_addr}")
try:
return f(*args, **kwargs)
except APIError as e:
logger.error(f"API Error in {f.__name__}: {e.message}")
return error_response(e.message, e.status_code, e.details)
except Exception as e:
logger.exception(f"Unexpected error in {f.__name__}")
return error_response("Internal server error", 500)
return decorated_function
def validate_required_fields(required_fields):
"""Decorator to validate required fields in JSON request"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not request.is_json:
return error_response("Content-Type must be application/json", 400)
data = request.get_json()
missing_fields = [field for field in required_fields if field not in data or not data[field]]
if missing_fields:
return error_response(
"Missing required fields",
400,
{'missing_fields': missing_fields}
)
return f(*args, **kwargs)
return decorated_function
return decorator
def validate_ip_address(ip):
"""Validate IP address format"""
import re
pattern = r'^(\d{1,3}\.){3}\d{1,3}$'
if not re.match(pattern, ip):
return False
parts = ip.split('.')
return all(0 <= int(part) <= 255 for part in parts)
def sanitize_hostname(hostname):
"""Sanitize hostname to prevent injection"""
import re
# Allow alphanumeric, dash, and underscore
if not re.match(r'^[a-zA-Z0-9_-]+$', hostname):
raise APIError("Invalid hostname format", 400)
if len(hostname) > 255:
raise APIError("Hostname too long", 400)
return hostname
def setup_logging(config):
"""Setup logging configuration"""
import os
from logging.handlers import RotatingFileHandler
# Create logs directory if it doesn't exist
log_dir = os.path.dirname(config.LOG_FILE)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir)
# Create logger
logger = logging.getLogger()
logger.setLevel(getattr(logging, config.LOG_LEVEL))
# File handler with rotation
file_handler = RotatingFileHandler(
config.LOG_FILE,
maxBytes=config.LOG_MAX_BYTES,
backupCount=config.LOG_BACKUP_COUNT
)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger