- Add config.py for environment configuration management - Add utils.py with utility functions - Add .env.example for environment variable reference - Add routes_example.py as route reference - Add login.html template for authentication - Update server.py with enhancements - Update all dashboard and log templates - Move documentation to 'explanations and old code' directory - Update database schema
163 lines
5.0 KiB
Python
163 lines
5.0 KiB
Python
# Utility functions for the application
|
|
import logging
|
|
from functools import wraps
|
|
from flask import jsonify, request, session
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class APIError(Exception):
|
|
"""Custom exception for API errors"""
|
|
def __init__(self, message, status_code=400, details=None):
|
|
self.message = message
|
|
self.status_code = status_code
|
|
self.details = details or {}
|
|
super().__init__(self.message)
|
|
|
|
|
|
def error_response(error_message, status_code=400, details=None):
|
|
"""Create a standardized error response"""
|
|
response = {
|
|
'success': False,
|
|
'error': error_message,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
if details:
|
|
response['details'] = details
|
|
return jsonify(response), status_code
|
|
|
|
|
|
def success_response(data=None, message="Success", status_code=200):
|
|
"""Create a standardized success response"""
|
|
response = {
|
|
'success': True,
|
|
'message': message,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
if data is not None:
|
|
response['data'] = data
|
|
return jsonify(response), status_code
|
|
|
|
|
|
def require_auth(f):
|
|
"""Decorator to require authentication"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
# Check for API key in headers
|
|
api_key = request.headers.get('X-API-Key')
|
|
from config import get_config
|
|
config = get_config()
|
|
|
|
if not api_key or api_key != config.API_KEY:
|
|
logger.warning(f"Unauthorized access attempt from {request.remote_addr}")
|
|
return error_response("Unauthorized", 401)
|
|
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
|
|
def require_session_auth(f):
|
|
"""Decorator to require session-based authentication"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if 'user_id' not in session:
|
|
return error_response("Authentication required", 401)
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
|
|
def log_request(f):
|
|
"""Decorator to log request details"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
logger.info(f"{request.method} {request.path} from {request.remote_addr}")
|
|
try:
|
|
return f(*args, **kwargs)
|
|
except APIError as e:
|
|
logger.error(f"API Error in {f.__name__}: {e.message}")
|
|
return error_response(e.message, e.status_code, e.details)
|
|
except Exception as e:
|
|
logger.exception(f"Unexpected error in {f.__name__}")
|
|
return error_response("Internal server error", 500)
|
|
return decorated_function
|
|
|
|
|
|
def validate_required_fields(required_fields):
|
|
"""Decorator to validate required fields in JSON request"""
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if not request.is_json:
|
|
return error_response("Content-Type must be application/json", 400)
|
|
|
|
data = request.get_json()
|
|
missing_fields = [field for field in required_fields if field not in data or not data[field]]
|
|
|
|
if missing_fields:
|
|
return error_response(
|
|
"Missing required fields",
|
|
400,
|
|
{'missing_fields': missing_fields}
|
|
)
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
return decorator
|
|
|
|
|
|
def validate_ip_address(ip):
|
|
"""Validate IP address format"""
|
|
import re
|
|
pattern = r'^(\d{1,3}\.){3}\d{1,3}$'
|
|
if not re.match(pattern, ip):
|
|
return False
|
|
parts = ip.split('.')
|
|
return all(0 <= int(part) <= 255 for part in parts)
|
|
|
|
|
|
def sanitize_hostname(hostname):
|
|
"""Sanitize hostname to prevent injection"""
|
|
import re
|
|
# Allow alphanumeric, dash, and underscore
|
|
if not re.match(r'^[a-zA-Z0-9_-]+$', hostname):
|
|
raise APIError("Invalid hostname format", 400)
|
|
if len(hostname) > 255:
|
|
raise APIError("Hostname too long", 400)
|
|
return hostname
|
|
|
|
|
|
def setup_logging(config):
|
|
"""Setup logging configuration"""
|
|
import os
|
|
from logging.handlers import RotatingFileHandler
|
|
|
|
# Create logs directory if it doesn't exist
|
|
log_dir = os.path.dirname(config.LOG_FILE)
|
|
if log_dir and not os.path.exists(log_dir):
|
|
os.makedirs(log_dir)
|
|
|
|
# Create logger
|
|
logger = logging.getLogger()
|
|
logger.setLevel(getattr(logging, config.LOG_LEVEL))
|
|
|
|
# File handler with rotation
|
|
file_handler = RotatingFileHandler(
|
|
config.LOG_FILE,
|
|
maxBytes=config.LOG_MAX_BYTES,
|
|
backupCount=config.LOG_BACKUP_COUNT
|
|
)
|
|
file_handler.setFormatter(logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
))
|
|
|
|
# Console handler
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setFormatter(logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
))
|
|
|
|
logger.addHandler(file_handler)
|
|
logger.addHandler(console_handler)
|
|
|
|
return logger
|