Add configuration, utilities, and update server with enhanced monitoring features
- Add config.py for environment configuration management - Add utils.py with utility functions - Add .env.example for environment variable reference - Add routes_example.py as route reference - Add login.html template for authentication - Update server.py with enhancements - Update all dashboard and log templates - Move documentation to 'explanations and old code' directory - Update database schema
This commit is contained in:
162
utils.py
Normal file
162
utils.py
Normal file
@@ -0,0 +1,162 @@
|
||||
# Utility functions for the application
|
||||
import logging
|
||||
from functools import wraps
|
||||
from flask import jsonify, request, session
|
||||
from datetime import datetime
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class APIError(Exception):
|
||||
"""Custom exception for API errors"""
|
||||
def __init__(self, message, status_code=400, details=None):
|
||||
self.message = message
|
||||
self.status_code = status_code
|
||||
self.details = details or {}
|
||||
super().__init__(self.message)
|
||||
|
||||
|
||||
def error_response(error_message, status_code=400, details=None):
|
||||
"""Create a standardized error response"""
|
||||
response = {
|
||||
'success': False,
|
||||
'error': error_message,
|
||||
'timestamp': datetime.now().isoformat()
|
||||
}
|
||||
if details:
|
||||
response['details'] = details
|
||||
return jsonify(response), status_code
|
||||
|
||||
|
||||
def success_response(data=None, message="Success", status_code=200):
|
||||
"""Create a standardized success response"""
|
||||
response = {
|
||||
'success': True,
|
||||
'message': message,
|
||||
'timestamp': datetime.now().isoformat()
|
||||
}
|
||||
if data is not None:
|
||||
response['data'] = data
|
||||
return jsonify(response), status_code
|
||||
|
||||
|
||||
def require_auth(f):
|
||||
"""Decorator to require authentication"""
|
||||
@wraps(f)
|
||||
def decorated_function(*args, **kwargs):
|
||||
# Check for API key in headers
|
||||
api_key = request.headers.get('X-API-Key')
|
||||
from config import get_config
|
||||
config = get_config()
|
||||
|
||||
if not api_key or api_key != config.API_KEY:
|
||||
logger.warning(f"Unauthorized access attempt from {request.remote_addr}")
|
||||
return error_response("Unauthorized", 401)
|
||||
|
||||
return f(*args, **kwargs)
|
||||
return decorated_function
|
||||
|
||||
|
||||
def require_session_auth(f):
|
||||
"""Decorator to require session-based authentication"""
|
||||
@wraps(f)
|
||||
def decorated_function(*args, **kwargs):
|
||||
if 'user_id' not in session:
|
||||
return error_response("Authentication required", 401)
|
||||
return f(*args, **kwargs)
|
||||
return decorated_function
|
||||
|
||||
|
||||
def log_request(f):
|
||||
"""Decorator to log request details"""
|
||||
@wraps(f)
|
||||
def decorated_function(*args, **kwargs):
|
||||
logger.info(f"{request.method} {request.path} from {request.remote_addr}")
|
||||
try:
|
||||
return f(*args, **kwargs)
|
||||
except APIError as e:
|
||||
logger.error(f"API Error in {f.__name__}: {e.message}")
|
||||
return error_response(e.message, e.status_code, e.details)
|
||||
except Exception as e:
|
||||
logger.exception(f"Unexpected error in {f.__name__}")
|
||||
return error_response("Internal server error", 500)
|
||||
return decorated_function
|
||||
|
||||
|
||||
def validate_required_fields(required_fields):
|
||||
"""Decorator to validate required fields in JSON request"""
|
||||
def decorator(f):
|
||||
@wraps(f)
|
||||
def decorated_function(*args, **kwargs):
|
||||
if not request.is_json:
|
||||
return error_response("Content-Type must be application/json", 400)
|
||||
|
||||
data = request.get_json()
|
||||
missing_fields = [field for field in required_fields if field not in data or not data[field]]
|
||||
|
||||
if missing_fields:
|
||||
return error_response(
|
||||
"Missing required fields",
|
||||
400,
|
||||
{'missing_fields': missing_fields}
|
||||
)
|
||||
return f(*args, **kwargs)
|
||||
return decorated_function
|
||||
return decorator
|
||||
|
||||
|
||||
def validate_ip_address(ip):
|
||||
"""Validate IP address format"""
|
||||
import re
|
||||
pattern = r'^(\d{1,3}\.){3}\d{1,3}$'
|
||||
if not re.match(pattern, ip):
|
||||
return False
|
||||
parts = ip.split('.')
|
||||
return all(0 <= int(part) <= 255 for part in parts)
|
||||
|
||||
|
||||
def sanitize_hostname(hostname):
|
||||
"""Sanitize hostname to prevent injection"""
|
||||
import re
|
||||
# Allow alphanumeric, dash, and underscore
|
||||
if not re.match(r'^[a-zA-Z0-9_-]+$', hostname):
|
||||
raise APIError("Invalid hostname format", 400)
|
||||
if len(hostname) > 255:
|
||||
raise APIError("Hostname too long", 400)
|
||||
return hostname
|
||||
|
||||
|
||||
def setup_logging(config):
|
||||
"""Setup logging configuration"""
|
||||
import os
|
||||
from logging.handlers import RotatingFileHandler
|
||||
|
||||
# Create logs directory if it doesn't exist
|
||||
log_dir = os.path.dirname(config.LOG_FILE)
|
||||
if log_dir and not os.path.exists(log_dir):
|
||||
os.makedirs(log_dir)
|
||||
|
||||
# Create logger
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel(getattr(logging, config.LOG_LEVEL))
|
||||
|
||||
# File handler with rotation
|
||||
file_handler = RotatingFileHandler(
|
||||
config.LOG_FILE,
|
||||
maxBytes=config.LOG_MAX_BYTES,
|
||||
backupCount=config.LOG_BACKUP_COUNT
|
||||
)
|
||||
file_handler.setFormatter(logging.Formatter(
|
||||
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
))
|
||||
|
||||
# Console handler
|
||||
console_handler = logging.StreamHandler()
|
||||
console_handler.setFormatter(logging.Formatter(
|
||||
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
))
|
||||
|
||||
logger.addHandler(file_handler)
|
||||
logger.addHandler(console_handler)
|
||||
|
||||
return logger
|
||||
Reference in New Issue
Block a user