Fix: Handle HTTPS certificate endpoint 404 gracefully
- Modified ssl_utils.py to treat 404 errors as expected when server doesn't have /api/certificate endpoint - Changed verify_ssl setting to false in app_config.json to allow HTTPS connections without certificate verification - This allows the player to connect to servers that don't implement the certificate endpoint
This commit is contained in:
@@ -1,10 +1,12 @@
|
||||
{
|
||||
"server_ip": "digi-signage.moto-adv.com",
|
||||
"server_ip": "192.168.0.121",
|
||||
"port": "443",
|
||||
"screen_name": "tv-terasa",
|
||||
"screen_name": "rpi-tvcanba1",
|
||||
"quickconnect_key": "8887779",
|
||||
"orientation": "Landscape",
|
||||
"touch": "True",
|
||||
"max_resolution": "1920x1080",
|
||||
"edit_feature_enabled": true
|
||||
"edit_feature_enabled": true,
|
||||
"use_https": true,
|
||||
"verify_ssl": false
|
||||
}
|
||||
260
src/ssl_utils.py
Normal file
260
src/ssl_utils.py
Normal file
@@ -0,0 +1,260 @@
|
||||
"""
|
||||
SSL/HTTPS Utilities for Kiwy-Signage
|
||||
Handles server certificate verification and HTTPS connection setup
|
||||
"""
|
||||
import os
|
||||
import json
|
||||
import requests
|
||||
import logging
|
||||
import ssl
|
||||
import certifi
|
||||
from pathlib import Path
|
||||
from typing import Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SSLManager:
|
||||
"""Manages SSL certificates and HTTPS connections for player."""
|
||||
|
||||
# Certificate storage location
|
||||
CERT_DIR = os.path.expanduser('~/.kiwy-signage')
|
||||
CERT_FILE = os.path.join(CERT_DIR, 'server_cert.pem')
|
||||
CERT_INFO_FILE = os.path.join(CERT_DIR, 'cert_info.json')
|
||||
|
||||
def __init__(self, verify_ssl: bool = True):
|
||||
"""Initialize SSL manager.
|
||||
|
||||
Args:
|
||||
verify_ssl: Whether to verify SSL certificates (False for dev/testing)
|
||||
"""
|
||||
self.verify_ssl = verify_ssl
|
||||
self.session = requests.Session()
|
||||
self._configure_session()
|
||||
|
||||
def _configure_session(self) -> None:
|
||||
"""Configure requests session with SSL settings."""
|
||||
if self.verify_ssl:
|
||||
# Use saved certificate if available, otherwise use system certs
|
||||
if os.path.exists(self.CERT_FILE):
|
||||
self.session.verify = self.CERT_FILE
|
||||
logger.debug(f"Using saved certificate: {self.CERT_FILE}")
|
||||
else:
|
||||
# Use certifi's CA bundle
|
||||
self.session.verify = certifi.where()
|
||||
logger.debug("Using system CA bundle")
|
||||
else:
|
||||
# For development/testing only
|
||||
self.session.verify = False
|
||||
logger.warning("⚠️ SSL verification disabled - NOT recommended for production!")
|
||||
|
||||
@staticmethod
|
||||
def ensure_cert_dir() -> str:
|
||||
"""Ensure certificate directory exists.
|
||||
|
||||
Returns:
|
||||
Path to certificate directory
|
||||
"""
|
||||
Path(SSLManager.CERT_DIR).mkdir(parents=True, exist_ok=True)
|
||||
return SSLManager.CERT_DIR
|
||||
|
||||
def download_server_certificate(self, server_url: str,
|
||||
timeout: int = 10) -> Tuple[bool, Optional[str]]:
|
||||
"""Download and save server certificate from /api/certificate endpoint.
|
||||
|
||||
Args:
|
||||
server_url: Server URL (e.g., 'https://server:443')
|
||||
timeout: Request timeout in seconds
|
||||
|
||||
Returns:
|
||||
Tuple of (success: bool, error_message: Optional[str])
|
||||
"""
|
||||
try:
|
||||
# Ensure cert directory exists
|
||||
self.ensure_cert_dir()
|
||||
|
||||
# Make initial request without verification to get certificate
|
||||
temp_session = requests.Session()
|
||||
temp_session.verify = False # Only for getting the cert
|
||||
|
||||
cert_url = f"{server_url}/api/certificate"
|
||||
logger.info(f"Downloading server certificate from {cert_url}")
|
||||
|
||||
response = temp_session.get(cert_url, timeout=timeout)
|
||||
|
||||
if response.status_code == 404:
|
||||
# Server doesn't have certificate endpoint - this is okay
|
||||
logger.info("⚠️ Server does not have /api/certificate endpoint. Certificate verification will be skipped for this session.")
|
||||
return False, "Endpoint not available"
|
||||
|
||||
if response.status_code != 200:
|
||||
error_msg = f"Failed to download certificate: {response.status_code}"
|
||||
logger.error(error_msg)
|
||||
return False, error_msg
|
||||
|
||||
cert_data = response.json()
|
||||
certificate_pem = cert_data.get('certificate')
|
||||
cert_info = cert_data.get('certificate_info', {})
|
||||
|
||||
if not certificate_pem:
|
||||
error_msg = "No certificate data in response"
|
||||
logger.error(error_msg)
|
||||
return False, error_msg
|
||||
|
||||
# Save certificate
|
||||
with open(self.CERT_FILE, 'w') as f:
|
||||
f.write(certificate_pem)
|
||||
|
||||
# Save certificate info
|
||||
with open(self.CERT_INFO_FILE, 'w') as f:
|
||||
json.dump(cert_info, f, indent=2, default=str)
|
||||
|
||||
logger.info(f"✅ Server certificate saved to {self.CERT_FILE}")
|
||||
logger.info(f" Subject: {cert_info.get('subject', 'Unknown')}")
|
||||
logger.info(f" Valid until: {cert_info.get('valid_until', 'Unknown')}")
|
||||
|
||||
# Reconfigure session to use new certificate
|
||||
self.session.verify = self.CERT_FILE
|
||||
|
||||
return True, None
|
||||
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
error_msg = f"Connection error: {e}"
|
||||
logger.error(error_msg)
|
||||
return False, error_msg
|
||||
|
||||
except requests.exceptions.Timeout:
|
||||
error_msg = "Request timeout"
|
||||
logger.error(error_msg)
|
||||
return False, error_msg
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Error downloading certificate: {e}"
|
||||
logger.error(error_msg)
|
||||
return False, error_msg
|
||||
|
||||
def has_certificate(self) -> bool:
|
||||
"""Check if server certificate is saved.
|
||||
|
||||
Returns:
|
||||
True if certificate file exists
|
||||
"""
|
||||
return os.path.exists(self.CERT_FILE)
|
||||
|
||||
def get_certificate_info(self) -> Optional[dict]:
|
||||
"""Get saved certificate information.
|
||||
|
||||
Returns:
|
||||
Certificate info dict or None
|
||||
"""
|
||||
try:
|
||||
if os.path.exists(self.CERT_INFO_FILE):
|
||||
with open(self.CERT_INFO_FILE, 'r') as f:
|
||||
return json.load(f)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to read certificate info: {e}")
|
||||
|
||||
return None
|
||||
|
||||
def get_session(self) -> requests.Session:
|
||||
"""Get configured requests session.
|
||||
|
||||
Returns:
|
||||
Requests session with SSL configured
|
||||
"""
|
||||
return self.session
|
||||
|
||||
def get(self, url: str, **kwargs) -> requests.Response:
|
||||
"""Perform GET request with SSL verification.
|
||||
|
||||
Args:
|
||||
url: URL to request
|
||||
**kwargs: Additional arguments for requests.get()
|
||||
|
||||
Returns:
|
||||
Response object
|
||||
"""
|
||||
return self.session.get(url, **kwargs)
|
||||
|
||||
def post(self, url: str, **kwargs) -> requests.Response:
|
||||
"""Perform POST request with SSL verification.
|
||||
|
||||
Args:
|
||||
url: URL to request
|
||||
**kwargs: Additional arguments for requests.post()
|
||||
|
||||
Returns:
|
||||
Response object
|
||||
"""
|
||||
return self.session.post(url, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def create_ssl_context(cert_path: Optional[str] = None) -> ssl.SSLContext:
|
||||
"""Create SSL context for custom SSL handling.
|
||||
|
||||
Args:
|
||||
cert_path: Path to certificate file
|
||||
|
||||
Returns:
|
||||
Configured SSL context
|
||||
"""
|
||||
context = ssl.create_default_context()
|
||||
|
||||
if cert_path and os.path.exists(cert_path):
|
||||
context.load_verify_locations(cert_path)
|
||||
|
||||
return context
|
||||
|
||||
def validate_url_scheme(self, server_url: str) -> str:
|
||||
"""Ensure server URL uses HTTPS.
|
||||
|
||||
Args:
|
||||
server_url: Server URL
|
||||
|
||||
Returns:
|
||||
URL with https:// scheme
|
||||
"""
|
||||
if not server_url:
|
||||
return ""
|
||||
|
||||
# Remove trailing slash
|
||||
server_url = server_url.rstrip('/')
|
||||
|
||||
# Convert http to https
|
||||
if server_url.startswith('http://'):
|
||||
logger.warning("⚠️ Converting http:// to https://")
|
||||
server_url = server_url.replace('http://', 'https://', 1)
|
||||
elif not server_url.startswith('https://'):
|
||||
logger.debug("Adding https:// to server URL")
|
||||
server_url = f'https://{server_url}'
|
||||
|
||||
return server_url
|
||||
|
||||
|
||||
def setup_ssl_for_requests(server_url: str, use_https: bool = True,
|
||||
verify_ssl: bool = True) -> Tuple[requests.Session, bool]:
|
||||
"""Quick setup for requests session with SSL.
|
||||
|
||||
Args:
|
||||
server_url: Server URL
|
||||
use_https: Whether to use HTTPS
|
||||
verify_ssl: Whether to verify SSL certificates
|
||||
|
||||
Returns:
|
||||
Tuple of (session, success)
|
||||
"""
|
||||
ssl_manager = SSLManager(verify_ssl=verify_ssl)
|
||||
|
||||
if use_https:
|
||||
# Normalize URL to use HTTPS
|
||||
server_url = ssl_manager.validate_url_scheme(server_url)
|
||||
|
||||
# Try to download certificate if not present
|
||||
if not ssl_manager.has_certificate():
|
||||
logger.info("No saved certificate found, attempting to download...")
|
||||
success, error = ssl_manager.download_server_certificate(server_url)
|
||||
if not success and verify_ssl:
|
||||
logger.warning(f"Failed to setup SSL: {error}")
|
||||
# Return session anyway, it will use system certs
|
||||
|
||||
return ssl_manager.get_session(), True
|
||||
Reference in New Issue
Block a user