Files
Kiwy-Signage/src/main.py
2025-11-22 10:04:30 +02:00

986 lines
40 KiB
Python

"""
Kivy Signage Player - Main Application
Displays content from DigiServer playlists using Kivy framework
"""
import os
import json
import platform
import threading
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
# Set environment variables for better video performance
os.environ['KIVY_VIDEO'] = 'ffpyplayer' # Use ffpyplayer as video provider
os.environ['FFPYPLAYER_CODECS'] = 'h264,h265,vp9,vp8' # Support common codecs
os.environ['SDL_VIDEO_ALLOW_SCREENSAVER'] = '0' # Prevent screen saver
from kivy.app import App
from kivy.config import Config
from kivy.uix.widget import Widget
from kivy.uix.label import Label
from kivy.uix.image import AsyncImage, Image
from kivy.uix.video import Video
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.button import Button
from kivy.uix.popup import Popup
from kivy.uix.textinput import TextInput
from kivy.clock import Clock
from kivy.core.window import Window
from kivy.properties import BooleanProperty
from kivy.logger import Logger
from kivy.animation import Animation
from kivy.lang import Builder
from kivy.graphics import Rectangle
from kivy.graphics.texture import Texture
from get_playlists import (
update_playlist_if_needed,
send_playing_status_feedback,
send_playlist_restart_feedback,
send_player_error_feedback
)
# Load the KV file
Builder.load_file('signage_player.kv')
# Removed VLCVideoWidget - using Kivy's built-in Video widget instead
class ExitPasswordPopup(Popup):
def __init__(self, player_instance, was_paused=False, **kwargs):
super(ExitPasswordPopup, self).__init__(**kwargs)
self.player = player_instance
self.was_paused = was_paused
# Cancel all scheduled cursor/control hide events
try:
if self.player.controls_timer:
self.player.controls_timer.cancel()
self.player.controls_timer = None
Clock.unschedule(self.player.hide_controls)
Clock.unschedule(self.player.schedule_cursor_hide)
except Exception as e:
Logger.debug(f"ExitPasswordPopup: Error canceling timers: {e}")
# Show cursor when password popup opens
try:
Window.show_cursor = True
except:
pass
# Bind to dismiss event to manage cursor visibility and resume playback
self.bind(on_dismiss=self.on_popup_dismiss)
def on_popup_dismiss(self, *args):
"""Handle popup dismissal - resume playback and restart cursor hide timer"""
# Resume playback if it wasn't paused before
if not self.was_paused:
self.player.is_paused = False
# Resume video if it was playing
if self.player.current_widget and isinstance(self.player.current_widget, Video):
self.player.current_widget.state = 'play'
# Restart the control hide timer
self.player.schedule_hide_controls()
def check_password(self):
"""Check if entered password matches quickconnect key"""
entered_password = self.ids.password_input.text
correct_password = self.player.config.get('quickconnect_key', '1234567')
if entered_password == correct_password:
# Password correct, exit app
Logger.info("ExitPasswordPopup: Correct password, exiting app")
self.dismiss()
App.get_running_app().stop()
else:
# Password incorrect, show error and close popup
Logger.warning("ExitPasswordPopup: Incorrect password")
self.ids.error_label.text = 'Incorrect password!'
Clock.schedule_once(lambda dt: self.dismiss(), 1)
class SettingsPopup(Popup):
def __init__(self, player_instance, was_paused=False, **kwargs):
super(SettingsPopup, self).__init__(**kwargs)
self.player = player_instance
self.was_paused = was_paused
# Cancel all scheduled cursor/control hide events
try:
if self.player.controls_timer:
self.player.controls_timer.cancel()
self.player.controls_timer = None
Clock.unschedule(self.player.hide_controls)
Clock.unschedule(self.player.schedule_cursor_hide)
except Exception as e:
Logger.debug(f"SettingsPopup: Error canceling timers: {e}")
# Show cursor when settings open
try:
Window.show_cursor = True
except:
pass
# Populate current values
self.ids.server_input.text = self.player.config.get('server_ip', 'localhost')
self.ids.screen_input.text = self.player.config.get('screen_name', 'kivy-player')
self.ids.quickconnect_input.text = self.player.config.get('quickconnect_key', '1234567')
self.ids.orientation_input.text = self.player.config.get('orientation', 'Landscape')
self.ids.touch_input.text = self.player.config.get('touch', 'True')
self.ids.resolution_input.text = self.player.config.get('max_resolution', 'auto')
# Update status info
self.ids.playlist_info.text = f'Playlist Version: {self.player.playlist_version}'
self.ids.media_count_info.text = f'Media Count: {len(self.player.playlist)}'
self.ids.status_info.text = f'Status: {"Playing" if self.player.is_playing else "Paused" if self.player.is_paused else "Idle"}'
# Bind to dismiss event to manage cursor visibility and resume playback
self.bind(on_dismiss=self.on_popup_dismiss)
def on_popup_dismiss(self, *args):
"""Handle popup dismissal - resume playback and restart cursor hide timer"""
# Resume playback if it wasn't paused before
if not self.was_paused:
self.player.is_paused = False
# Resume video if it was playing
if self.player.current_widget and isinstance(self.player.current_widget, Video):
self.player.current_widget.state = 'play'
# Restart the control hide timer
self.player.schedule_hide_controls()
def test_connection(self):
"""Test connection to server with current credentials"""
# Update status label to show testing
self.ids.connection_status.text = 'Testing connection...'
self.ids.connection_status.color = (1, 0.7, 0, 1) # Orange
# Run test in background thread to avoid blocking UI
def run_test():
try:
from player_auth import PlayerAuth
import re
# Get current values from inputs
server_ip = self.ids.server_input.text.strip()
screen_name = self.ids.screen_input.text.strip()
quickconnect = self.ids.quickconnect_input.text.strip()
port = self.player.config.get('port', '443')
if not all([server_ip, screen_name, quickconnect]):
Clock.schedule_once(lambda dt: self.update_connection_status(
'Error: Fill all fields', False
))
return
# Build server URL
if server_ip.startswith('http://') or server_ip.startswith('https://'):
server_url = server_ip
if not ':' in server_ip.replace('https://', '').replace('http://', ''):
if port and port != '443' and port != '80':
server_url = f"{server_ip}:{port}"
else:
protocol = "https" if port == "443" else "http"
server_url = f"{protocol}://{server_ip}:{port}"
Logger.info(f"SettingsPopup: Testing connection to {server_url}")
# Create temporary auth instance (don't save)
auth = PlayerAuth('/tmp/temp_auth_test.json')
# Try to authenticate
success, error = auth.authenticate(
server_url=server_url,
hostname=screen_name,
quickconnect_code=quickconnect
)
# Clean up temp file
try:
import os
if os.path.exists('/tmp/temp_auth_test.json'):
os.remove('/tmp/temp_auth_test.json')
except:
pass
# Update UI on main thread
if success:
player_name = auth.get_player_name()
Clock.schedule_once(lambda dt: self.update_connection_status(
f'✓ Connected: {player_name}', True
))
else:
Clock.schedule_once(lambda dt: self.update_connection_status(
f'✗ Failed: {error}', False
))
except Exception as e:
Logger.error(f"SettingsPopup: Connection test error: {e}")
Clock.schedule_once(lambda dt: self.update_connection_status(
f'✗ Error: {str(e)}', False
))
# Run in thread
threading.Thread(target=run_test, daemon=True).start()
def update_connection_status(self, message, success):
"""Update connection status label"""
self.ids.connection_status.text = message
if success:
self.ids.connection_status.color = (0, 1, 0, 1) # Green
else:
self.ids.connection_status.color = (1, 0, 0, 1) # Red
def save_and_close(self):
"""Save configuration and close popup"""
# Update config
self.player.config['server_ip'] = self.ids.server_input.text
self.player.config['screen_name'] = self.ids.screen_input.text
self.player.config['quickconnect_key'] = self.ids.quickconnect_input.text
self.player.config['orientation'] = self.ids.orientation_input.text
self.player.config['touch'] = self.ids.touch_input.text
self.player.config['max_resolution'] = self.ids.resolution_input.text
# Save to file
self.player.save_config()
# Notify user that resolution change requires restart
if self.ids.resolution_input.text != self.player.config.get('max_resolution', 'auto'):
Logger.info("SettingsPopup: Resolution changed - restart required")
# Close popup
self.dismiss()
class SignagePlayer(Widget):
from kivy.properties import StringProperty
resources_path = StringProperty()
from kivy.properties import NumericProperty
screen_width = NumericProperty(0)
screen_height = NumericProperty(0)
def set_screen_size(self):
"""Get the current screen size and set as properties."""
self.screen_width, self.screen_height = Window.size
Logger.info(f"Screen size detected: {self.screen_width}x{self.screen_height}")
is_paused = BooleanProperty(False)
def __init__(self, **kwargs):
super(SignagePlayer, self).__init__(**kwargs)
# Initialize variables
self.playlist = []
self.current_index = 0
self.current_widget = None
self.is_playing = False
self.is_paused = False
self.config = {}
self.playlist_version = None
self.consecutive_errors = 0 # Track consecutive playback errors
self.max_consecutive_errors = 10 # Maximum errors before stopping
self.intro_played = False # Track if intro has been played
# Paths
self.base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
self.config_dir = os.path.join(self.base_dir, 'config')
self.media_dir = os.path.join(self.base_dir, 'media')
self.playlists_dir = os.path.join(self.base_dir, 'playlists')
self.config_file = os.path.join(self.config_dir, 'app_config.json')
self.resources_path = os.path.join(self.config_dir, 'resources')
# Create directories if they don't exist
for directory in [self.config_dir, self.media_dir, self.playlists_dir]:
os.makedirs(directory, exist_ok=True)
# Get and set screen size
self.set_screen_size()
# Bind to window size for fullscreen
Window.bind(size=self._update_size)
self._update_size(Window, Window.size)
# Initialize player
Clock.schedule_once(self.initialize_player, 0.1)
# Hide controls timer
self.controls_timer = None
# Auto-hide controls
self.schedule_hide_controls()
def _update_size(self, instance, value):
self.size = value
if hasattr(self, 'ids') and 'content_area' in self.ids:
self.ids.content_area.size = value
def initialize_player(self, dt):
"""Initialize the player - load config and start playlist checking"""
Logger.info("SignagePlayer: Initializing player...")
# Load configuration
self.load_config()
# Play intro video first
self.play_intro_video()
# Start async server tasks (non-blocking)
asyncio.ensure_future(self.async_playlist_update_loop())
Logger.info("SignagePlayer: Async server tasks started")
# Start media playback
Clock.schedule_interval(self.check_playlist_and_play, 30) # Check every 30 seconds
def load_config(self):
"""Load configuration from file"""
try:
if os.path.exists(self.config_file):
with open(self.config_file, 'r') as f:
self.config = json.load(f)
Logger.info(f"SignagePlayer: Configuration loaded from {self.config_file}")
else:
# Create default configuration
self.config = {
"server_ip": "localhost",
"port": "5000",
"screen_name": "kivy-player",
"quickconnect_key": "1234567",
"max_resolution": "auto"
}
self.save_config()
Logger.info("SignagePlayer: Created default configuration")
except Exception as e:
Logger.error(f"SignagePlayer: Error loading config: {e}")
self.show_error(f"Failed to load configuration: {e}")
def save_config(self):
"""Save configuration to file"""
try:
with open(self.config_file, 'w') as f:
json.dump(self.config, f, indent=2)
Logger.info("SignagePlayer: Configuration saved")
except Exception as e:
Logger.error(f"SignagePlayer: Error saving config: {e}")
async def async_playlist_update_loop(self):
"""Async coroutine to check for playlist updates without blocking UI"""
Logger.info("SignagePlayer: Starting async playlist update loop")
while True:
try:
if self.config:
# Run blocking network I/O in thread pool
loop = asyncio.get_event_loop()
# Find latest playlist file
latest_playlist = await loop.run_in_executor(
None,
self.get_latest_playlist_file
)
# Check for updates (network I/O in thread pool)
updated = await loop.run_in_executor(
None,
update_playlist_if_needed,
latest_playlist,
self.config,
self.media_dir,
self.playlists_dir
)
if updated:
Logger.info("SignagePlayer: Playlist updated, reloading...")
# Schedule UI update on main thread
Clock.schedule_once(self.load_playlist, 0)
# Wait 60 seconds before next check
await asyncio.sleep(60)
except Exception as e:
Logger.error(f"SignagePlayer: Error in async playlist update loop: {e}")
await asyncio.sleep(30) # Wait 30 seconds on error
async def async_send_feedback(self, feedback_func, *args):
"""Send feedback to server asynchronously without blocking UI"""
try:
loop = asyncio.get_event_loop()
# Run feedback in thread pool to avoid blocking
await loop.run_in_executor(None, feedback_func, *args)
except Exception as e:
Logger.debug(f"SignagePlayer: Error sending async feedback: {e}")
def get_latest_playlist_file(self):
"""Get the path to the latest playlist file"""
try:
if os.path.exists(self.playlists_dir):
playlist_files = [f for f in os.listdir(self.playlists_dir)
if f.startswith('server_playlist_v') and f.endswith('.json')]
if playlist_files:
# Sort by version number and get the latest
versions = [(int(f.split('_v')[-1].split('.json')[0]), f) for f in playlist_files]
versions.sort(reverse=True)
latest_file = versions[0][1]
return os.path.join(self.playlists_dir, latest_file)
return os.path.join(self.playlists_dir, 'server_playlist_v0.json')
except Exception as e:
Logger.error(f"SignagePlayer: Error getting latest playlist: {e}")
return os.path.join(self.playlists_dir, 'server_playlist_v0.json')
def load_playlist(self, dt=None):
"""Load playlist from file"""
try:
playlist_file = self.get_latest_playlist_file()
if os.path.exists(playlist_file):
with open(playlist_file, 'r') as f:
data = json.load(f)
self.playlist = data.get('playlist', [])
self.playlist_version = data.get('version', 0)
Logger.info(f"SignagePlayer: Loaded playlist v{self.playlist_version} with {len(self.playlist)} items")
if self.playlist:
self.ids.status_label.text = f"Playlist loaded: {len(self.playlist)} items"
if not self.is_playing:
Clock.schedule_once(self.start_playback, 1)
else:
self.ids.status_label.text = "No media in playlist"
else:
Logger.warning(f"SignagePlayer: Playlist file not found: {playlist_file}")
self.ids.status_label.text = "No playlist found"
except Exception as e:
Logger.error(f"SignagePlayer: Error loading playlist: {e}")
self.show_error(f"Failed to load playlist: {e}")
def play_intro_video(self):
"""Play intro video on startup"""
intro_path = os.path.join(self.resources_path, 'intro1.mp4')
if not os.path.exists(intro_path):
Logger.warning(f"SignagePlayer: Intro video not found at {intro_path}")
# Skip intro and load playlist
self.intro_played = True
self.check_playlist_and_play(None)
return
try:
Logger.info("SignagePlayer: Playing intro video...")
self.ids.status_label.opacity = 0 # Hide status label
# Create video widget for intro
intro_video = Video(
source=intro_path,
state='play',
options={'eos': 'stop'},
allow_stretch=True,
keep_ratio=True,
size=self.size,
pos=(0, 0)
)
# Bind to video end event
def on_intro_end(instance, value):
if value == 'stop':
Logger.info("SignagePlayer: Intro video finished")
# Remove intro video
if intro_video in self.ids.content_area.children:
self.ids.content_area.remove_widget(intro_video)
# Mark intro as played
self.intro_played = True
# Start normal playlist
self.check_playlist_and_play(None)
intro_video.bind(state=on_intro_end)
# Add intro video to content area
self.ids.content_area.add_widget(intro_video)
except Exception as e:
Logger.error(f"SignagePlayer: Error playing intro video: {e}")
# Skip intro and load playlist
self.intro_played = True
self.check_playlist_and_play(None)
def check_playlist_and_play(self, dt):
"""Check for playlist updates and ensure playback is running"""
# Don't start playlist until intro is done
if not self.intro_played:
return
if not self.playlist:
self.load_playlist()
if self.playlist and not self.is_playing and not self.is_paused:
self.start_playback()
def start_playback(self, dt=None):
"""Start media playback"""
if not self.playlist:
Logger.warning("SignagePlayer: No playlist to play")
return
Logger.info("SignagePlayer: Starting playback")
self.is_playing = True
self.current_index = 0
self.play_current_media()
def play_current_media(self):
"""Play the current media item"""
if not self.playlist or self.current_index >= len(self.playlist):
# End of playlist, restart
self.restart_playlist()
return
try:
media_item = self.playlist[self.current_index]
file_name = media_item.get('file_name', '')
duration = media_item.get('duration', 10)
Logger.info(f"SignagePlayer: ===== Playing item {self.current_index + 1}/{len(self.playlist)} =====")
Logger.info(f"SignagePlayer: File: {file_name}")
Logger.info(f"SignagePlayer: Duration: {duration}s")
# Construct full path to media file
media_path = os.path.join(self.media_dir, file_name)
Logger.info(f"SignagePlayer: Full path: {media_path}")
if not os.path.exists(media_path):
Logger.error(f"SignagePlayer: ❌ Media file not found: {media_path}")
Logger.error(f"SignagePlayer: Skipping to next media...")
self.consecutive_errors += 1
self.next_media()
return
Logger.info(f"SignagePlayer: ✓ File exists (size: {os.path.getsize(media_path):,} bytes)")
# Remove status label if showing
self.ids.status_label.opacity = 0
# Remove previous media widget
if self.current_widget:
# Properly stop video if it's playing to prevent resource leaks
if isinstance(self.current_widget, Video):
try:
Logger.info(f"SignagePlayer: Stopping previous video widget...")
self.current_widget.state = 'stop'
self.current_widget.unload()
except Exception as e:
Logger.warning(f"SignagePlayer: Error stopping video: {e}")
self.ids.content_area.remove_widget(self.current_widget)
self.current_widget = None
Logger.info(f"SignagePlayer: Previous widget removed")
# Determine media type and create appropriate widget
file_extension = os.path.splitext(file_name)[1].lower()
Logger.info(f"SignagePlayer: Extension: {file_extension}")
if file_extension in ['.mp4', '.avi', '.mkv', '.mov', '.webm']:
# Video file
Logger.info(f"SignagePlayer: Media type: VIDEO")
self.play_video(media_path, duration)
elif file_extension in ['.jpg', '.jpeg', '.png', '.bmp', '.gif']:
# Image file
Logger.info(f"SignagePlayer: Media type: IMAGE")
self.play_image(media_path, duration)
else:
Logger.warning(f"SignagePlayer: ❌ Unsupported media type: {file_extension}")
Logger.warning(f"SignagePlayer: Supported: .mp4/.avi/.mkv/.mov/.webm/.jpg/.jpeg/.png/.bmp/.gif")
Logger.warning(f"SignagePlayer: Skipping to next media...")
self.consecutive_errors += 1
self.next_media()
return
# Send feedback to server asynchronously (non-blocking)
if self.config:
asyncio.ensure_future(
self.async_send_feedback(
send_playing_status_feedback,
self.config,
self.playlist_version,
file_name
)
)
# Reset error counter on successful playback
self.consecutive_errors = 0
Logger.info(f"SignagePlayer: ✓ Media started successfully (consecutive_errors reset to 0)")
except Exception as e:
Logger.error(f"SignagePlayer: Error playing media: {e}")
self.consecutive_errors += 1
# Check if we've exceeded max errors
if self.consecutive_errors >= self.max_consecutive_errors:
error_msg = f"Too many consecutive errors ({self.consecutive_errors}), stopping playback"
Logger.error(f"SignagePlayer: {error_msg}")
self.show_error(error_msg)
self.is_playing = False
return
self.show_error(f"Error playing media: {e}")
self.next_media()
def play_video(self, video_path, duration):
"""Play a video file using Kivy's Video widget"""
try:
# Verify file exists
if not os.path.exists(video_path):
Logger.error(f"SignagePlayer: ❌ Video file not found: {video_path}")
self.consecutive_errors += 1
self.next_media()
return
Logger.info(f"SignagePlayer: Loading video {os.path.basename(video_path)} for {duration}s")
Logger.info(f"SignagePlayer: Video provider: {os.environ.get('KIVY_VIDEO', 'default')}")
# Create Video widget with optimized settings
Logger.info(f"SignagePlayer: Creating Video widget...")
self.current_widget = Video(
source=video_path,
state='play', # Start playing immediately
options={
'eos': 'stop', # Stop at end of stream
},
allow_stretch=True,
keep_ratio=True, # Maintain aspect ratio
size_hint=(1, 1),
pos_hint={'center_x': 0.5, 'center_y': 0.5}
)
# Bind to loaded and error events
self.current_widget.bind(loaded=self._on_video_loaded)
self.current_widget.bind(on_eos=self._on_video_eos)
# Add to content area
Logger.info(f"SignagePlayer: Adding video widget to content area...")
self.ids.content_area.add_widget(self.current_widget)
# Schedule next media after duration (unschedule first to prevent overlaps)
Logger.info(f"SignagePlayer: Scheduled next media in {duration}s")
Clock.unschedule(self.next_media)
Clock.schedule_once(self.next_media, duration)
except Exception as e:
Logger.error(f"SignagePlayer: Error playing video {video_path}: {e}")
self.consecutive_errors += 1
if self.consecutive_errors < self.max_consecutive_errors:
self.next_media()
def _on_video_eos(self, instance):
"""Callback when video reaches end of stream"""
Logger.info("SignagePlayer: Video finished playing (EOS)")
def _on_video_loaded(self, instance, value):
"""Callback when video is loaded - log video information"""
if value:
try:
Logger.info(f"SignagePlayer: Video loaded successfully")
Logger.info(f"SignagePlayer: Video texture: {instance.texture.size if instance.texture else 'No texture'}")
Logger.info(f"SignagePlayer: Video duration: {instance.duration}s")
Logger.info(f"SignagePlayer: Video state: {instance.state}")
except Exception as e:
Logger.warning(f"SignagePlayer: Could not log video info: {e}")
def play_image(self, image_path, duration):
"""Play an image file"""
try:
Logger.info(f"SignagePlayer: Creating AsyncImage widget...")
self.current_widget = AsyncImage(
source=image_path,
allow_stretch=True,
keep_ratio=True, # Maintain aspect ratio
size_hint=(1, 1),
pos_hint={'center_x': 0.5, 'center_y': 0.5}
)
Logger.info(f"SignagePlayer: Adding image widget to content area...")
self.ids.content_area.add_widget(self.current_widget)
# Schedule next media after duration (unschedule first to prevent overlaps)
Logger.info(f"SignagePlayer: Scheduled next media in {duration}s")
Clock.unschedule(self.next_media)
Clock.schedule_once(self.next_media, duration)
Logger.info(f"SignagePlayer: ✓ Image displayed successfully")
except Exception as e:
Logger.error(f"SignagePlayer: Error playing image {image_path}: {e}")
self.consecutive_errors += 1
if self.consecutive_errors < self.max_consecutive_errors:
self.next_media()
def next_media(self, dt=None):
"""Move to next media item"""
if self.is_paused:
Logger.debug(f"SignagePlayer: Skipping next_media - player is paused")
return
Logger.info(f"SignagePlayer: Transitioning to next media (was index {self.current_index})")
self.current_index += 1
# Unschedule any pending media transitions
Clock.unschedule(self.next_media)
# Play next media or restart playlist
self.play_current_media()
def previous_media(self, instance=None):
"""Move to previous media item"""
if self.playlist:
self.current_index = max(0, self.current_index - 1)
Clock.unschedule(self.next_media)
self.play_current_media()
def toggle_pause(self, instance=None):
"""Toggle pause/play"""
self.is_paused = not self.is_paused
if self.is_paused:
self.ids.play_pause_btn.text = ''
Clock.unschedule(self.next_media)
else:
self.ids.play_pause_btn.text = ''
# Resume by playing current media
self.play_current_media()
def restart_playlist(self):
"""Restart playlist from beginning"""
Logger.info("SignagePlayer: Restarting playlist")
# Send restart feedback asynchronously (non-blocking)
if self.config:
asyncio.ensure_future(
self.async_send_feedback(
send_playlist_restart_feedback,
self.config,
self.playlist_version
)
)
self.current_index = 0
self.play_current_media()
def show_error(self, message):
"""Show error message"""
Logger.error(f"SignagePlayer: {message}")
# Send error feedback to server asynchronously (non-blocking)
if self.config:
asyncio.ensure_future(
self.async_send_feedback(
send_player_error_feedback,
self.config,
message,
self.playlist_version
)
)
# Show error on screen
self.ids.status_label.text = f"Error: {message}"
self.ids.status_label.opacity = 1
def on_touch_down(self, touch):
"""Handle touch - show controls"""
self.show_controls()
return super(SignagePlayer, self).on_touch_down(touch)
def on_touch_move(self, touch):
"""Handle touch move - show controls"""
self.show_controls()
return super(SignagePlayer, self).on_touch_move(touch)
def show_controls(self):
"""Show control buttons and cursor"""
if self.controls_timer:
self.controls_timer.cancel()
# Show cursor
try:
Window.show_cursor = True
except:
pass
# Fade in controls
Animation(opacity=1, duration=0.3).start(self.ids.controls_layout)
# Schedule hide after 3 seconds
self.schedule_hide_controls()
def schedule_hide_controls(self):
"""Schedule hiding of controls"""
if self.controls_timer:
self.controls_timer.cancel()
self.controls_timer = Clock.schedule_once(self.hide_controls, 3)
def hide_controls(self, dt=None):
"""Hide control buttons and cursor"""
Animation(opacity=0, duration=0.5).start(self.ids.controls_layout)
# Hide cursor after controls are hidden
try:
Window.show_cursor = False
except:
pass
def schedule_cursor_hide(self):
"""Schedule cursor to hide after 3 seconds"""
try:
Clock.schedule_once(lambda dt: Window.__setattr__('show_cursor', False), 3)
except:
pass
def show_settings(self, instance=None):
"""Show settings popup"""
# Pause playback when settings opens
was_paused = self.is_paused
if not was_paused:
self.is_paused = True
Clock.unschedule(self.next_media)
# Stop video if playing
if self.current_widget and isinstance(self.current_widget, Video):
self.current_widget.state = 'pause'
popup = SettingsPopup(player_instance=self, was_paused=was_paused)
popup.open()
def show_exit_popup(self, instance=None):
"""Show exit password popup"""
# Pause playback when exit popup opens
was_paused = self.is_paused
if not was_paused:
self.is_paused = True
Clock.unschedule(self.next_media)
# Stop video if playing
if self.current_widget and isinstance(self.current_widget, Video):
self.current_widget.state = 'pause'
popup = ExitPasswordPopup(player_instance=self, was_paused=was_paused)
popup.open()
def exit_app(self, instance=None):
"""Exit the application"""
Logger.info("SignagePlayer: Exiting application")
App.get_running_app().stop()
class SignagePlayerApp(App):
def build_config(self, config):
"""Configure Kivy settings for optimal video playback"""
# Set graphics settings for smooth 30fps video
config.setdefaults('graphics', {
'multisamples': '2', # Anti-aliasing
'maxfps': '30', # Limit to 30fps (optimized for RPi and video content)
'vsync': '1', # Enable vertical sync
'resizable': '0', # Disable window resizing
'borderless': '1', # Borderless window
'fullscreen': 'auto' # Auto fullscreen
})
# Disable unnecessary modules to save resources
config.setdefaults('modules', {
'inspector': '', # Disable inspector
'monitor': '', # Disable monitor
'keybinding': '', # Disable keybinding
'showborder': '' # Disable border highlighting
})
# Input settings
config.setdefaults('input', {
'mouse': 'mouse', # Enable mouse input
})
Logger.info("SignagePlayerApp: Kivy config optimized for 30fps video playback")
def build(self):
# Load config to get resolution setting
config_file = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'config', 'app_config.json')
max_resolution = 'auto'
try:
if os.path.exists(config_file):
with open(config_file, 'r') as f:
config = json.load(f)
max_resolution = config.get('max_resolution', 'auto')
except Exception as e:
Logger.warning(f"SignagePlayerApp: Could not load resolution setting: {e}")
# Apply resolution constraint
if max_resolution != 'auto' and 'x' in max_resolution:
try:
max_width, max_height = map(int, max_resolution.split('x'))
current_width, current_height = Window.size
# Check if current resolution exceeds maximum
if current_width > max_width or current_height > max_height:
# Calculate scaling to fit within max resolution
scale = min(max_width / current_width, max_height / current_height)
new_width = int(current_width * scale)
new_height = int(current_height * scale)
Window.size = (new_width, new_height)
Logger.info(f"SignagePlayerApp: Resolution constrained from {current_width}x{current_height} to {new_width}x{new_height}")
else:
Logger.info(f"SignagePlayerApp: Current resolution {current_width}x{current_height} within max {max_resolution}")
except Exception as e:
Logger.error(f"SignagePlayerApp: Error parsing resolution setting '{max_resolution}': {e}")
else:
Logger.info(f"SignagePlayerApp: Using auto resolution (no constraint)")
# Force fullscreen and borderless
Window.fullscreen = True
Window.borderless = True
Logger.info(f"SignagePlayerApp: Screen size: {Window.size}")
Logger.info(f"SignagePlayerApp: Available screen size: {Window.system_size if hasattr(Window, 'system_size') else 'N/A'}")
# Hide cursor after 3 seconds of inactivity
Clock.schedule_once(self.hide_cursor, 3)
return SignagePlayer()
def hide_cursor(self, dt):
"""Hide the mouse cursor"""
try:
Window.show_cursor = False
except:
pass # Some platforms don't support cursor hiding
def on_start(self):
# Setup asyncio event loop for Kivy integration
try:
loop = asyncio.get_event_loop()
Logger.info("SignagePlayerApp: Asyncio event loop integrated with Kivy")
except RuntimeError:
# Create new event loop if none exists
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
Logger.info("SignagePlayerApp: New asyncio event loop created")
# Schedule periodic async task processing
Clock.schedule_interval(self._process_async_tasks, 0.1) # Process every 100ms
# Log final window info
Logger.info(f"SignagePlayerApp: Final window size: {Window.size}")
Logger.info(f"SignagePlayerApp: Fullscreen: {Window.fullscreen}")
Logger.info("SignagePlayerApp: Application started")
Logger.info("SignagePlayerApp: Server communications running asynchronously")
def _process_async_tasks(self, dt):
"""Process pending asyncio tasks without blocking Kivy"""
try:
loop = asyncio.get_event_loop()
# Process pending callbacks without blocking
loop.call_soon(loop.stop)
loop.run_forever()
except Exception as e:
pass # Silently handle - loop may not have tasks
def on_stop(self):
Logger.info("SignagePlayerApp: Application stopped")
# Cancel all async tasks
try:
pending = asyncio.all_tasks()
for task in pending:
task.cancel()
Logger.info(f"SignagePlayerApp: Cancelled {len(pending)} async tasks")
except Exception as e:
Logger.debug(f"SignagePlayerApp: Error cancelling tasks: {e}")
if __name__ == '__main__':
SignagePlayerApp().run()