514 lines
19 KiB
Python
514 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Windows Print Service - Complete Self-Contained Version
|
|
Includes all dependencies and libraries for Windows systems
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
import logging
|
|
import subprocess
|
|
import tempfile
|
|
import shutil
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import threading
|
|
import webbrowser
|
|
from urllib.parse import urlparse, unquote
|
|
import urllib.request
|
|
import zipfile
|
|
|
|
# Built-in HTTP server modules
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
from socketserver import ThreadingMixIn
|
|
|
|
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
|
|
"""Handle requests in a separate thread."""
|
|
daemon_threads = True
|
|
allow_reuse_address = True
|
|
|
|
class PrintServiceHandler(BaseHTTPRequestHandler):
|
|
"""HTTP request handler for the print service."""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.temp_dir = tempfile.mkdtemp(prefix="print_service_")
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def log_message(self, format, *args):
|
|
"""Override default logging to use our logger."""
|
|
logging.info(f"{self.client_address[0]} - {format % args}")
|
|
|
|
def do_GET(self):
|
|
"""Handle GET requests."""
|
|
try:
|
|
if self.path == '/health':
|
|
self.send_health_response()
|
|
elif self.path == '/printers':
|
|
self.send_printers_response()
|
|
elif self.path == '/status':
|
|
self.send_status_response()
|
|
else:
|
|
self.send_error(404, "Endpoint not found")
|
|
except Exception as e:
|
|
logging.error(f"GET request error: {e}")
|
|
self.send_error(500, str(e))
|
|
|
|
def do_POST(self):
|
|
"""Handle POST requests."""
|
|
try:
|
|
if self.path == '/print_pdf':
|
|
self.handle_print_pdf()
|
|
else:
|
|
self.send_error(404, "Endpoint not found")
|
|
except Exception as e:
|
|
logging.error(f"POST request error: {e}")
|
|
self.send_error(500, str(e))
|
|
|
|
def send_health_response(self):
|
|
"""Send health check response."""
|
|
response = {
|
|
"status": "healthy",
|
|
"service": "Windows Print Service",
|
|
"version": "1.0.0",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"platform": sys.platform,
|
|
"python_version": sys.version,
|
|
"temp_dir": self.temp_dir
|
|
}
|
|
self.send_json_response(response)
|
|
|
|
def send_printers_response(self):
|
|
"""Send available printers list."""
|
|
printers = self.get_available_printers()
|
|
response = {
|
|
"success": True,
|
|
"printers": printers,
|
|
"count": len(printers),
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
self.send_json_response(response)
|
|
|
|
def send_status_response(self):
|
|
"""Send service status."""
|
|
response = {
|
|
"service_name": "Windows Print Service",
|
|
"status": "running",
|
|
"uptime": time.time() - start_time,
|
|
"requests_handled": getattr(self.server, 'request_count', 0),
|
|
"last_activity": datetime.now().isoformat()
|
|
}
|
|
self.send_json_response(response)
|
|
|
|
def handle_print_pdf(self):
|
|
"""Handle PDF printing request."""
|
|
try:
|
|
# Get content length
|
|
content_length = int(self.headers['Content-Length'])
|
|
post_data = self.rfile.read(content_length).decode('utf-8')
|
|
|
|
# Parse JSON data
|
|
try:
|
|
data = json.loads(post_data)
|
|
except json.JSONDecodeError:
|
|
self.send_error(400, "Invalid JSON data")
|
|
return
|
|
|
|
pdf_url = data.get('pdf_url')
|
|
printer_name = data.get('printer_name', 'default')
|
|
|
|
if not pdf_url:
|
|
self.send_error(400, "Missing pdf_url parameter")
|
|
return
|
|
|
|
logging.info(f"Print request - URL: {pdf_url}, Printer: {printer_name}")
|
|
|
|
# Download and print PDF
|
|
result = self.download_and_print_pdf(pdf_url, printer_name)
|
|
|
|
if result['success']:
|
|
response = {
|
|
"success": True,
|
|
"message": "PDF sent to printer successfully",
|
|
"printer": printer_name,
|
|
"method": result.get('method', 'unknown'),
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
else:
|
|
response = {
|
|
"success": False,
|
|
"error": result.get('error', 'Unknown error'),
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
self.send_json_response(response)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Print PDF error: {e}")
|
|
response = {
|
|
"success": False,
|
|
"error": str(e),
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
self.send_json_response(response)
|
|
|
|
def download_and_print_pdf(self, pdf_url, printer_name):
|
|
"""Download PDF and send to printer."""
|
|
try:
|
|
# Create unique filename
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
pdf_filename = f"print_job_{timestamp}.pdf"
|
|
pdf_path = os.path.join(self.temp_dir, pdf_filename)
|
|
|
|
# Download PDF
|
|
logging.info(f"Downloading PDF from: {pdf_url}")
|
|
urllib.request.urlretrieve(pdf_url, pdf_path)
|
|
|
|
if not os.path.exists(pdf_path):
|
|
return {"success": False, "error": "Failed to download PDF"}
|
|
|
|
logging.info(f"PDF downloaded to: {pdf_path}")
|
|
|
|
# Try different printing methods
|
|
print_methods = [
|
|
self.print_with_adobe_reader,
|
|
self.print_with_sumatra_pdf,
|
|
self.print_with_powershell,
|
|
self.print_with_edge,
|
|
self.print_with_system_default
|
|
]
|
|
|
|
for method in print_methods:
|
|
try:
|
|
result = method(pdf_path, printer_name)
|
|
if result['success']:
|
|
# Clean up downloaded file after successful print
|
|
try:
|
|
os.remove(pdf_path)
|
|
except:
|
|
pass
|
|
return result
|
|
except Exception as e:
|
|
logging.warning(f"Print method {method.__name__} failed: {e}")
|
|
continue
|
|
|
|
return {"success": False, "error": "All printing methods failed"}
|
|
|
|
except Exception as e:
|
|
logging.error(f"Download and print error: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
def print_with_adobe_reader(self, pdf_path, printer_name):
|
|
"""Print using Adobe Reader command line."""
|
|
try:
|
|
# Common Adobe Reader paths
|
|
adobe_paths = [
|
|
r"C:\Program Files (x86)\Adobe\Acrobat Reader DC\Reader\AcroRd32.exe",
|
|
r"C:\Program Files\Adobe\Acrobat Reader DC\Reader\AcroRd32.exe",
|
|
r"C:\Program Files (x86)\Adobe\Reader 11.0\Reader\AcroRd32.exe",
|
|
r"C:\Program Files\Adobe\Reader 11.0\Reader\AcroRd32.exe"
|
|
]
|
|
|
|
adobe_exe = None
|
|
for path in adobe_paths:
|
|
if os.path.exists(path):
|
|
adobe_exe = path
|
|
break
|
|
|
|
if not adobe_exe:
|
|
return {"success": False, "error": "Adobe Reader not found"}
|
|
|
|
# Build command
|
|
if printer_name == 'default':
|
|
cmd = [adobe_exe, "/t", pdf_path]
|
|
else:
|
|
cmd = [adobe_exe, "/t", pdf_path, printer_name]
|
|
|
|
logging.info(f"Adobe Reader command: {' '.join(cmd)}")
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
|
|
if result.returncode == 0:
|
|
return {"success": True, "method": "Adobe Reader"}
|
|
else:
|
|
return {"success": False, "error": f"Adobe Reader failed: {result.stderr}"}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return {"success": False, "error": "Adobe Reader timeout"}
|
|
except Exception as e:
|
|
return {"success": False, "error": f"Adobe Reader error: {e}"}
|
|
|
|
def print_with_sumatra_pdf(self, pdf_path, printer_name):
|
|
"""Print using SumatraPDF."""
|
|
try:
|
|
# Common SumatraPDF paths
|
|
sumatra_paths = [
|
|
r"C:\Program Files\SumatraPDF\SumatraPDF.exe",
|
|
r"C:\Program Files (x86)\SumatraPDF\SumatraPDF.exe",
|
|
os.path.join(os.environ.get('LOCALAPPDATA', ''), 'SumatraPDF', 'SumatraPDF.exe')
|
|
]
|
|
|
|
sumatra_exe = None
|
|
for path in sumatra_paths:
|
|
if os.path.exists(path):
|
|
sumatra_exe = path
|
|
break
|
|
|
|
if not sumatra_exe:
|
|
return {"success": False, "error": "SumatraPDF not found"}
|
|
|
|
# Build command
|
|
if printer_name == 'default':
|
|
cmd = [sumatra_exe, "-print-dialog", pdf_path]
|
|
else:
|
|
cmd = [sumatra_exe, "-print-to", printer_name, pdf_path]
|
|
|
|
logging.info(f"SumatraPDF command: {' '.join(cmd)}")
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
|
|
return {"success": True, "method": "SumatraPDF"}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return {"success": False, "error": "SumatraPDF timeout"}
|
|
except Exception as e:
|
|
return {"success": False, "error": f"SumatraPDF error: {e}"}
|
|
|
|
def print_with_powershell(self, pdf_path, printer_name):
|
|
"""Print using PowerShell."""
|
|
try:
|
|
if printer_name == 'default':
|
|
powershell_cmd = f'''
|
|
$pdf = "{pdf_path}"
|
|
Start-Process -FilePath $pdf -Verb Print -WindowStyle Hidden
|
|
'''
|
|
else:
|
|
# Use specific printer with PowerShell
|
|
powershell_cmd = f'''
|
|
$printer = "{printer_name}"
|
|
$pdf = "{pdf_path}"
|
|
$shell = New-Object -ComObject Shell.Application
|
|
$file = $shell.NameSpace((Get-Item $pdf).DirectoryName).ParseName((Get-Item $pdf).Name)
|
|
$file.InvokeVerb("print")
|
|
'''
|
|
|
|
cmd = ["powershell", "-Command", powershell_cmd]
|
|
logging.info("PowerShell print command executed")
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
return {"success": True, "method": "PowerShell"}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return {"success": False, "error": "PowerShell timeout"}
|
|
except Exception as e:
|
|
return {"success": False, "error": f"PowerShell error: {e}"}
|
|
|
|
def print_with_edge(self, pdf_path, printer_name):
|
|
"""Print using Microsoft Edge."""
|
|
try:
|
|
# Convert to file URL
|
|
file_url = f"file:///{pdf_path.replace(os.sep, '/')}"
|
|
|
|
cmd = ["msedge", "--print-to-pdf", "--run-all-compositor-stages-before-draw", file_url]
|
|
logging.info("Edge print command executed")
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
return {"success": True, "method": "Microsoft Edge"}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return {"success": False, "error": "Edge timeout"}
|
|
except Exception as e:
|
|
return {"success": False, "error": f"Edge error: {e}"}
|
|
|
|
def print_with_system_default(self, pdf_path, printer_name):
|
|
"""Print using system default application."""
|
|
try:
|
|
# Use Windows shell to open and print
|
|
cmd = ["cmd", "/c", "start", "/wait", pdf_path]
|
|
logging.info("System default print executed")
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
return {"success": True, "method": "System Default"}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return {"success": False, "error": "System default timeout"}
|
|
except Exception as e:
|
|
return {"success": False, "error": f"System default error: {e}"}
|
|
|
|
def get_available_printers(self):
|
|
"""Get list of available printers using Windows commands."""
|
|
try:
|
|
printers = []
|
|
|
|
# Method 1: Use PowerShell to get printers
|
|
try:
|
|
cmd = ["powershell", "-Command", "Get-Printer | Select-Object Name, Type, PrinterStatus | ConvertTo-Json"]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
|
|
|
if result.returncode == 0:
|
|
printer_data = json.loads(result.stdout)
|
|
if isinstance(printer_data, list):
|
|
for printer in printer_data:
|
|
printers.append({
|
|
"name": printer.get("Name", "Unknown"),
|
|
"type": printer.get("Type", "Unknown"),
|
|
"status": printer.get("PrinterStatus", "Unknown"),
|
|
"is_default": False
|
|
})
|
|
else:
|
|
printers.append({
|
|
"name": printer_data.get("Name", "Unknown"),
|
|
"type": printer_data.get("Type", "Unknown"),
|
|
"status": printer_data.get("PrinterStatus", "Unknown"),
|
|
"is_default": False
|
|
})
|
|
except:
|
|
pass
|
|
|
|
# Method 2: Use wmic command as fallback
|
|
if not printers:
|
|
try:
|
|
cmd = ["wmic", "printer", "get", "name,default", "/format:csv"]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
|
|
|
if result.returncode == 0:
|
|
lines = result.stdout.strip().split('\n')[1:] # Skip header
|
|
for line in lines:
|
|
parts = line.split(',')
|
|
if len(parts) >= 3:
|
|
printer_name = parts[2].strip()
|
|
is_default = parts[1].strip().lower() == 'true'
|
|
if printer_name:
|
|
printers.append({
|
|
"name": printer_name,
|
|
"type": "Windows Printer",
|
|
"status": "Available",
|
|
"is_default": is_default
|
|
})
|
|
except:
|
|
pass
|
|
|
|
# Add default fallback printers
|
|
if not printers:
|
|
printers = [
|
|
{"name": "Microsoft Print to PDF", "type": "Virtual", "status": "Available", "is_default": False},
|
|
{"name": "Default Printer", "type": "System", "status": "Available", "is_default": True}
|
|
]
|
|
|
|
return printers
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error getting printers: {e}")
|
|
return [{"name": "Default Printer", "type": "System", "status": "Unknown", "is_default": True}]
|
|
|
|
def send_json_response(self, data, status_code=200):
|
|
"""Send JSON response."""
|
|
self.send_response(status_code)
|
|
self.send_header('Content-type', 'application/json')
|
|
self.send_header('Access-Control-Allow-Origin', '*')
|
|
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
|
|
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
|
|
self.end_headers()
|
|
|
|
json_response = json.dumps(data, indent=2)
|
|
self.wfile.write(json_response.encode('utf-8'))
|
|
|
|
def setup_logging():
|
|
"""Setup logging configuration."""
|
|
log_dir = os.path.join(os.path.expanduser("~"), "PrintService", "logs")
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
|
|
log_file = os.path.join(log_dir, f"print_service_{datetime.now().strftime('%Y%m%d')}.log")
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(log_file),
|
|
logging.StreamHandler(sys.stdout)
|
|
]
|
|
)
|
|
|
|
def create_windows_service():
|
|
"""Create Windows service registration script."""
|
|
service_script = '''
|
|
@echo off
|
|
echo Installing Windows Print Service...
|
|
|
|
REM Check for administrator privileges
|
|
net session >nul 2>&1
|
|
if %errorLevel% == 0 (
|
|
echo Administrator privileges confirmed.
|
|
) else (
|
|
echo This script requires administrator privileges.
|
|
echo Please right-click and "Run as administrator"
|
|
pause
|
|
exit /b 1
|
|
)
|
|
|
|
REM Get current directory
|
|
set CURRENT_DIR=%~dp0
|
|
|
|
REM Install service using sc command
|
|
sc create "WindowsPrintService" binPath= "%CURRENT_DIR%print_service_complete.exe" DisplayName= "Windows Print Service" start= auto
|
|
|
|
REM Configure service recovery options
|
|
sc failure "WindowsPrintService" reset= 86400 actions= restart/5000/restart/5000/restart/5000
|
|
|
|
REM Start the service
|
|
sc start "WindowsPrintService"
|
|
|
|
echo Windows Print Service installed and started successfully!
|
|
echo Service will auto-start with Windows.
|
|
echo.
|
|
echo Test the service: http://localhost:8765/health
|
|
pause
|
|
'''
|
|
|
|
with open('install_service_complete.bat', 'w') as f:
|
|
f.write(service_script)
|
|
|
|
def main():
|
|
"""Main service function."""
|
|
global start_time
|
|
start_time = time.time()
|
|
|
|
# Setup logging
|
|
setup_logging()
|
|
logging.info("Starting Windows Print Service (Complete Version)")
|
|
logging.info(f"Python version: {sys.version}")
|
|
logging.info(f"Platform: {sys.platform}")
|
|
|
|
# Create service installer
|
|
create_windows_service()
|
|
|
|
try:
|
|
# Start HTTP server
|
|
server_address = ('localhost', 8765)
|
|
httpd = ThreadingHTTPServer(server_address, PrintServiceHandler)
|
|
|
|
logging.info(f"Print service started on http://{server_address[0]}:{server_address[1]}")
|
|
logging.info("Available endpoints:")
|
|
logging.info(" GET /health - Health check")
|
|
logging.info(" GET /printers - List available printers")
|
|
logging.info(" GET /status - Service status")
|
|
logging.info(" POST /print_pdf - Print PDF file")
|
|
|
|
# Keep track of requests
|
|
httpd.request_count = 0
|
|
|
|
# Start server
|
|
httpd.serve_forever()
|
|
|
|
except KeyboardInterrupt:
|
|
logging.info("Service stopped by user")
|
|
except Exception as e:
|
|
logging.error(f"Service error: {e}")
|
|
finally:
|
|
try:
|
|
httpd.server_close()
|
|
except:
|
|
pass
|
|
logging.info("Windows Print Service shutdown complete")
|
|
|
|
if __name__ == "__main__":
|
|
main() |