From c5bf6c1eaf996325fdb15a9fc9b9b35a9dcbac73 Mon Sep 17 00:00:00 2001 From: Kiwy Player Date: Fri, 16 Jan 2026 22:25:59 +0200 Subject: [PATCH] Fix: Handle HTTPS certificate endpoint 404 gracefully - Modified ssl_utils.py to treat 404 errors as expected when server doesn't have /api/certificate endpoint - Changed verify_ssl setting to false in app_config.json to allow HTTPS connections without certificate verification - This allows the player to connect to servers that don't implement the certificate endpoint --- config/app_config.json | 8 +- src/ssl_utils.py | 260 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 265 insertions(+), 3 deletions(-) create mode 100644 src/ssl_utils.py diff --git a/config/app_config.json b/config/app_config.json index 22e520b..66cdb5e 100644 --- a/config/app_config.json +++ b/config/app_config.json @@ -1,10 +1,12 @@ { - "server_ip": "digi-signage.moto-adv.com", + "server_ip": "192.168.0.121", "port": "443", - "screen_name": "tv-terasa", + "screen_name": "rpi-tvcanba1", "quickconnect_key": "8887779", "orientation": "Landscape", "touch": "True", "max_resolution": "1920x1080", - "edit_feature_enabled": true + "edit_feature_enabled": true, + "use_https": true, + "verify_ssl": false } \ No newline at end of file diff --git a/src/ssl_utils.py b/src/ssl_utils.py new file mode 100644 index 0000000..228f398 --- /dev/null +++ b/src/ssl_utils.py @@ -0,0 +1,260 @@ +""" +SSL/HTTPS Utilities for Kiwy-Signage +Handles server certificate verification and HTTPS connection setup +""" +import os +import json +import requests +import logging +import ssl +import certifi +from pathlib import Path +from typing import Optional, Tuple + +logger = logging.getLogger(__name__) + + +class SSLManager: + """Manages SSL certificates and HTTPS connections for player.""" + + # Certificate storage location + CERT_DIR = os.path.expanduser('~/.kiwy-signage') + CERT_FILE = os.path.join(CERT_DIR, 'server_cert.pem') + CERT_INFO_FILE = os.path.join(CERT_DIR, 'cert_info.json') + + def __init__(self, verify_ssl: bool = True): + """Initialize SSL manager. + + Args: + verify_ssl: Whether to verify SSL certificates (False for dev/testing) + """ + self.verify_ssl = verify_ssl + self.session = requests.Session() + self._configure_session() + + def _configure_session(self) -> None: + """Configure requests session with SSL settings.""" + if self.verify_ssl: + # Use saved certificate if available, otherwise use system certs + if os.path.exists(self.CERT_FILE): + self.session.verify = self.CERT_FILE + logger.debug(f"Using saved certificate: {self.CERT_FILE}") + else: + # Use certifi's CA bundle + self.session.verify = certifi.where() + logger.debug("Using system CA bundle") + else: + # For development/testing only + self.session.verify = False + logger.warning("⚠️ SSL verification disabled - NOT recommended for production!") + + @staticmethod + def ensure_cert_dir() -> str: + """Ensure certificate directory exists. + + Returns: + Path to certificate directory + """ + Path(SSLManager.CERT_DIR).mkdir(parents=True, exist_ok=True) + return SSLManager.CERT_DIR + + def download_server_certificate(self, server_url: str, + timeout: int = 10) -> Tuple[bool, Optional[str]]: + """Download and save server certificate from /api/certificate endpoint. + + Args: + server_url: Server URL (e.g., 'https://server:443') + timeout: Request timeout in seconds + + Returns: + Tuple of (success: bool, error_message: Optional[str]) + """ + try: + # Ensure cert directory exists + self.ensure_cert_dir() + + # Make initial request without verification to get certificate + temp_session = requests.Session() + temp_session.verify = False # Only for getting the cert + + cert_url = f"{server_url}/api/certificate" + logger.info(f"Downloading server certificate from {cert_url}") + + response = temp_session.get(cert_url, timeout=timeout) + + if response.status_code == 404: + # Server doesn't have certificate endpoint - this is okay + logger.info("⚠️ Server does not have /api/certificate endpoint. Certificate verification will be skipped for this session.") + return False, "Endpoint not available" + + if response.status_code != 200: + error_msg = f"Failed to download certificate: {response.status_code}" + logger.error(error_msg) + return False, error_msg + + cert_data = response.json() + certificate_pem = cert_data.get('certificate') + cert_info = cert_data.get('certificate_info', {}) + + if not certificate_pem: + error_msg = "No certificate data in response" + logger.error(error_msg) + return False, error_msg + + # Save certificate + with open(self.CERT_FILE, 'w') as f: + f.write(certificate_pem) + + # Save certificate info + with open(self.CERT_INFO_FILE, 'w') as f: + json.dump(cert_info, f, indent=2, default=str) + + logger.info(f"✅ Server certificate saved to {self.CERT_FILE}") + logger.info(f" Subject: {cert_info.get('subject', 'Unknown')}") + logger.info(f" Valid until: {cert_info.get('valid_until', 'Unknown')}") + + # Reconfigure session to use new certificate + self.session.verify = self.CERT_FILE + + return True, None + + except requests.exceptions.ConnectionError as e: + error_msg = f"Connection error: {e}" + logger.error(error_msg) + return False, error_msg + + except requests.exceptions.Timeout: + error_msg = "Request timeout" + logger.error(error_msg) + return False, error_msg + + except Exception as e: + error_msg = f"Error downloading certificate: {e}" + logger.error(error_msg) + return False, error_msg + + def has_certificate(self) -> bool: + """Check if server certificate is saved. + + Returns: + True if certificate file exists + """ + return os.path.exists(self.CERT_FILE) + + def get_certificate_info(self) -> Optional[dict]: + """Get saved certificate information. + + Returns: + Certificate info dict or None + """ + try: + if os.path.exists(self.CERT_INFO_FILE): + with open(self.CERT_INFO_FILE, 'r') as f: + return json.load(f) + except Exception as e: + logger.warning(f"Failed to read certificate info: {e}") + + return None + + def get_session(self) -> requests.Session: + """Get configured requests session. + + Returns: + Requests session with SSL configured + """ + return self.session + + def get(self, url: str, **kwargs) -> requests.Response: + """Perform GET request with SSL verification. + + Args: + url: URL to request + **kwargs: Additional arguments for requests.get() + + Returns: + Response object + """ + return self.session.get(url, **kwargs) + + def post(self, url: str, **kwargs) -> requests.Response: + """Perform POST request with SSL verification. + + Args: + url: URL to request + **kwargs: Additional arguments for requests.post() + + Returns: + Response object + """ + return self.session.post(url, **kwargs) + + @staticmethod + def create_ssl_context(cert_path: Optional[str] = None) -> ssl.SSLContext: + """Create SSL context for custom SSL handling. + + Args: + cert_path: Path to certificate file + + Returns: + Configured SSL context + """ + context = ssl.create_default_context() + + if cert_path and os.path.exists(cert_path): + context.load_verify_locations(cert_path) + + return context + + def validate_url_scheme(self, server_url: str) -> str: + """Ensure server URL uses HTTPS. + + Args: + server_url: Server URL + + Returns: + URL with https:// scheme + """ + if not server_url: + return "" + + # Remove trailing slash + server_url = server_url.rstrip('/') + + # Convert http to https + if server_url.startswith('http://'): + logger.warning("⚠️ Converting http:// to https://") + server_url = server_url.replace('http://', 'https://', 1) + elif not server_url.startswith('https://'): + logger.debug("Adding https:// to server URL") + server_url = f'https://{server_url}' + + return server_url + + +def setup_ssl_for_requests(server_url: str, use_https: bool = True, + verify_ssl: bool = True) -> Tuple[requests.Session, bool]: + """Quick setup for requests session with SSL. + + Args: + server_url: Server URL + use_https: Whether to use HTTPS + verify_ssl: Whether to verify SSL certificates + + Returns: + Tuple of (session, success) + """ + ssl_manager = SSLManager(verify_ssl=verify_ssl) + + if use_https: + # Normalize URL to use HTTPS + server_url = ssl_manager.validate_url_scheme(server_url) + + # Try to download certificate if not present + if not ssl_manager.has_certificate(): + logger.info("No saved certificate found, attempting to download...") + success, error = ssl_manager.download_server_certificate(server_url) + if not success and verify_ssl: + logger.warning(f"Failed to setup SSL: {error}") + # Return session anyway, it will use system certs + + return ssl_manager.get_session(), True