Initial commit: Kivy Signage Player

- Complete Kivy-based signage player application
- Fetches playlists from DigiServer via REST API
- Supports images (JPG, PNG, GIF, BMP) and videos (MP4, AVI, MKV, MOV, WEBM)
- Modern UI with touch controls and settings popup
- Separated UI (KV file) from business logic (Python)
- Fullscreen media display with proper scaling
- Server feedback system for player status reporting
- Auto-hide controls with mouse/touch activation
- Virtual environment setup with requirements.txt
- Installation and run scripts included

Features:
 Cross-platform Kivy framework
 Server integration with DigiServer
 Media playback with duration control
 Player feedback and status reporting
 Settings management with persistent config
 Error handling and recovery
 Clean architecture with KV file separation
This commit is contained in:
Kivy Signage Player
2025-09-26 16:48:26 +03:00
commit 04d4ea5916
12 changed files with 1192 additions and 0 deletions

441
src/main.py Normal file
View File

@@ -0,0 +1,441 @@
"""
Kivy Signage Player - Main Application
Displays content from DigiServer playlists using Kivy framework
"""
import os
import json
import threading
import time
from kivy.app import App
from kivy.uix.widget import Widget
from kivy.uix.label import Label
from kivy.uix.image import AsyncImage
from kivy.uix.video import Video
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.button import Button
from kivy.uix.popup import Popup
from kivy.uix.textinput import TextInput
from kivy.clock import Clock
from kivy.core.window import Window
from kivy.logger import Logger
from kivy.animation import Animation
from kivy.lang import Builder
from get_playlists import (
update_playlist_if_needed,
send_playing_status_feedback,
send_playlist_restart_feedback,
send_player_error_feedback
)
# Load the KV file
Builder.load_file('signage_player.kv')
class SettingsPopup(Popup):
def __init__(self, player_instance, **kwargs):
super(SettingsPopup, self).__init__(**kwargs)
self.player = player_instance
# Populate current values
self.ids.server_input.text = self.player.config.get('server_ip', 'localhost')
self.ids.screen_input.text = self.player.config.get('screen_name', 'kivy-player')
self.ids.quickconnect_input.text = self.player.config.get('quickconnect_key', '1234567')
# Update status info
self.ids.playlist_info.text = f'Playlist Version: {self.player.playlist_version}'
self.ids.media_count_info.text = f'Media Count: {len(self.player.playlist)}'
self.ids.status_info.text = f'Status: {"Playing" if self.player.is_playing else "Paused" if self.player.is_paused else "Idle"}'
def save_and_close(self):
"""Save configuration and close popup"""
# Update config
self.player.config['server_ip'] = self.ids.server_input.text
self.player.config['screen_name'] = self.ids.screen_input.text
self.player.config['quickconnect_key'] = self.ids.quickconnect_input.text
# Save to file
self.player.save_config()
# Close popup
self.dismiss()
class SignagePlayer(Widget):
def __init__(self, **kwargs):
super(SignagePlayer, self).__init__(**kwargs)
# Initialize variables
self.playlist = []
self.current_index = 0
self.current_widget = None
self.is_playing = False
self.is_paused = False
self.config = {}
self.playlist_version = None
# Paths
self.base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
self.config_dir = os.path.join(self.base_dir, 'config')
self.media_dir = os.path.join(self.base_dir, 'media')
self.playlists_dir = os.path.join(self.base_dir, 'playlists')
self.config_file = os.path.join(self.config_dir, 'app_config.json')
# Create directories if they don't exist
for directory in [self.config_dir, self.media_dir, self.playlists_dir]:
os.makedirs(directory, exist_ok=True)
# Initialize player
Clock.schedule_once(self.initialize_player, 0.1)
# Hide controls timer
self.controls_timer = None
# Auto-hide controls
self.schedule_hide_controls()
def initialize_player(self, dt):
"""Initialize the player - load config and start playlist checking"""
Logger.info("SignagePlayer: Initializing player...")
# Load configuration
self.load_config()
# Start playlist update thread
threading.Thread(target=self.playlist_update_loop, daemon=True).start()
# Start media playback
Clock.schedule_interval(self.check_playlist_and_play, 30) # Check every 30 seconds
# Initial playlist check
self.check_playlist_and_play(None)
def load_config(self):
"""Load configuration from file"""
try:
if os.path.exists(self.config_file):
with open(self.config_file, 'r') as f:
self.config = json.load(f)
Logger.info(f"SignagePlayer: Configuration loaded from {self.config_file}")
else:
# Create default configuration
self.config = {
"server_ip": "localhost",
"port": "5000",
"screen_name": "kivy-player",
"quickconnect_key": "1234567"
}
self.save_config()
Logger.info("SignagePlayer: Created default configuration")
except Exception as e:
Logger.error(f"SignagePlayer: Error loading config: {e}")
self.show_error(f"Failed to load configuration: {e}")
def save_config(self):
"""Save configuration to file"""
try:
with open(self.config_file, 'w') as f:
json.dump(self.config, f, indent=2)
Logger.info("SignagePlayer: Configuration saved")
except Exception as e:
Logger.error(f"SignagePlayer: Error saving config: {e}")
def playlist_update_loop(self):
"""Background thread to check for playlist updates"""
while True:
try:
if self.config:
# Find latest playlist file
latest_playlist = self.get_latest_playlist_file()
# Check for updates
updated = update_playlist_if_needed(
latest_playlist,
self.config,
self.media_dir,
self.playlists_dir
)
if updated:
Logger.info("SignagePlayer: Playlist updated, reloading...")
Clock.schedule_once(self.load_playlist, 0)
time.sleep(60) # Check every minute
except Exception as e:
Logger.error(f"SignagePlayer: Error in playlist update loop: {e}")
time.sleep(30) # Wait 30 seconds on error
def get_latest_playlist_file(self):
"""Get the path to the latest playlist file"""
try:
if os.path.exists(self.playlists_dir):
playlist_files = [f for f in os.listdir(self.playlists_dir)
if f.startswith('server_playlist_v') and f.endswith('.json')]
if playlist_files:
# Sort by version number and get the latest
versions = [(int(f.split('_v')[-1].split('.json')[0]), f) for f in playlist_files]
versions.sort(reverse=True)
latest_file = versions[0][1]
return os.path.join(self.playlists_dir, latest_file)
return os.path.join(self.playlists_dir, 'server_playlist_v0.json')
except Exception as e:
Logger.error(f"SignagePlayer: Error getting latest playlist: {e}")
return os.path.join(self.playlists_dir, 'server_playlist_v0.json')
def load_playlist(self, dt=None):
"""Load playlist from file"""
try:
playlist_file = self.get_latest_playlist_file()
if os.path.exists(playlist_file):
with open(playlist_file, 'r') as f:
data = json.load(f)
self.playlist = data.get('playlist', [])
self.playlist_version = data.get('version', 0)
Logger.info(f"SignagePlayer: Loaded playlist v{self.playlist_version} with {len(self.playlist)} items")
if self.playlist:
self.ids.status_label.text = f"Playlist loaded: {len(self.playlist)} items"
if not self.is_playing:
Clock.schedule_once(self.start_playback, 1)
else:
self.ids.status_label.text = "No media in playlist"
else:
Logger.warning(f"SignagePlayer: Playlist file not found: {playlist_file}")
self.ids.status_label.text = "No playlist found"
except Exception as e:
Logger.error(f"SignagePlayer: Error loading playlist: {e}")
self.show_error(f"Failed to load playlist: {e}")
def check_playlist_and_play(self, dt):
"""Check for playlist updates and ensure playback is running"""
if not self.playlist:
self.load_playlist()
if self.playlist and not self.is_playing and not self.is_paused:
self.start_playback()
def start_playback(self, dt=None):
"""Start media playback"""
if not self.playlist:
Logger.warning("SignagePlayer: No playlist to play")
return
Logger.info("SignagePlayer: Starting playback")
self.is_playing = True
self.current_index = 0
self.play_current_media()
def play_current_media(self):
"""Play the current media item"""
if not self.playlist or self.current_index >= len(self.playlist):
# End of playlist, restart
self.restart_playlist()
return
try:
media_item = self.playlist[self.current_index]
file_name = media_item.get('file_name', '')
duration = media_item.get('duration', 10)
# Construct full path to media file
media_path = os.path.join(self.media_dir, file_name)
if not os.path.exists(media_path):
Logger.error(f"SignagePlayer: Media file not found: {media_path}")
self.next_media()
return
Logger.info(f"SignagePlayer: Playing {file_name} for {duration}s")
# Remove status label if showing
self.ids.status_label.opacity = 0
# Remove previous media widget
if self.current_widget:
self.ids.content_area.remove_widget(self.current_widget)
self.current_widget = None
# Determine media type and create appropriate widget
file_extension = os.path.splitext(file_name)[1].lower()
if file_extension in ['.mp4', '.avi', '.mkv', '.mov', '.webm']:
# Video file
self.play_video(media_path, duration)
elif file_extension in ['.jpg', '.jpeg', '.png', '.bmp', '.gif']:
# Image file
self.play_image(media_path, duration)
else:
Logger.warning(f"SignagePlayer: Unsupported media type: {file_extension}")
self.next_media()
return
# Send feedback to server
if self.config:
send_playing_status_feedback(self.config, self.playlist_version, file_name)
except Exception as e:
Logger.error(f"SignagePlayer: Error playing media: {e}")
self.show_error(f"Error playing media: {e}")
self.next_media()
def play_video(self, video_path, duration):
"""Play a video file"""
try:
self.current_widget = Video(
source=video_path,
state='play',
options={'allow_stretch': True},
size_hint=(1, 1),
pos_hint={'x': 0, 'y': 0}
)
self.ids.content_area.add_widget(self.current_widget)
# Schedule next media after duration
Clock.schedule_once(self.next_media, duration)
except Exception as e:
Logger.error(f"SignagePlayer: Error playing video {video_path}: {e}")
self.next_media()
def play_image(self, image_path, duration):
"""Play an image file"""
try:
self.current_widget = AsyncImage(
source=image_path,
allow_stretch=True,
keep_ratio=False,
size_hint=(1, 1),
pos_hint={'x': 0, 'y': 0}
)
self.ids.content_area.add_widget(self.current_widget)
# Schedule next media after duration
Clock.schedule_once(self.next_media, duration)
except Exception as e:
Logger.error(f"SignagePlayer: Error playing image {image_path}: {e}")
self.next_media()
def next_media(self, dt=None):
"""Move to next media item"""
if self.is_paused:
return
self.current_index += 1
# Unschedule any pending media transitions
Clock.unschedule(self.next_media)
# Play next media or restart playlist
self.play_current_media()
def previous_media(self, instance=None):
"""Move to previous media item"""
if self.playlist:
self.current_index = max(0, self.current_index - 1)
Clock.unschedule(self.next_media)
self.play_current_media()
def toggle_pause(self, instance=None):
"""Toggle pause/play"""
self.is_paused = not self.is_paused
if self.is_paused:
self.ids.play_pause_btn.text = ''
Clock.unschedule(self.next_media)
else:
self.ids.play_pause_btn.text = ''
# Resume by playing current media
self.play_current_media()
def restart_playlist(self):
"""Restart playlist from beginning"""
Logger.info("SignagePlayer: Restarting playlist")
# Send restart feedback
if self.config:
send_playlist_restart_feedback(self.config, self.playlist_version)
self.current_index = 0
self.play_current_media()
def show_error(self, message):
"""Show error message"""
Logger.error(f"SignagePlayer: {message}")
# Send error feedback to server
if self.config:
send_player_error_feedback(self.config, message, self.playlist_version)
# Show error on screen
self.ids.status_label.text = f"Error: {message}"
self.ids.status_label.opacity = 1
def on_touch_down(self, touch):
"""Handle touch - show controls"""
self.show_controls()
return super(SignagePlayer, self).on_touch_down(touch)
def on_touch_move(self, touch):
"""Handle touch move - show controls"""
self.show_controls()
return super(SignagePlayer, self).on_touch_move(touch)
def show_controls(self):
"""Show control buttons"""
if self.controls_timer:
self.controls_timer.cancel()
# Fade in controls
Animation(opacity=1, duration=0.3).start(self.ids.controls_layout)
# Schedule hide after 3 seconds
self.schedule_hide_controls()
def schedule_hide_controls(self):
"""Schedule hiding of controls"""
if self.controls_timer:
self.controls_timer.cancel()
self.controls_timer = Clock.schedule_once(self.hide_controls, 3)
def hide_controls(self, dt=None):
"""Hide control buttons"""
Animation(opacity=0, duration=0.5).start(self.ids.controls_layout)
def show_settings(self, instance=None):
"""Show settings popup"""
popup = SettingsPopup(player_instance=self)
popup.open()
def exit_app(self, instance=None):
"""Exit the application"""
Logger.info("SignagePlayer: Exiting application")
App.get_running_app().stop()
class SignagePlayerApp(App):
def build(self):
# Set window to fullscreen
Window.fullscreen = 'auto'
Window.borderless = True
return SignagePlayer()
def on_start(self):
Logger.info("SignagePlayerApp: Application started")
def on_stop(self):
Logger.info("SignagePlayerApp: Application stopped")
if __name__ == '__main__':
SignagePlayerApp().run()