666 lines
25 KiB
Python
666 lines
25 KiB
Python
"""
|
|
Windows Print Service for Quality Label Printing
|
|
Receives PDFs from Chrome extension and prints them page by page
|
|
|
|
Windows-compatible version with comprehensive dependency support
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import logging
|
|
import tempfile
|
|
import subprocess
|
|
import platform
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
from urllib.parse import urlparse, parse_qs
|
|
import urllib.request
|
|
from datetime import datetime
|
|
import threading
|
|
import time
|
|
import shutil
|
|
import winreg
|
|
from pathlib import Path
|
|
|
|
# Windows-specific imports with fallbacks
|
|
try:
|
|
import win32print
|
|
import win32api
|
|
import win32con
|
|
import win32ui
|
|
import win32gui
|
|
WINDOWS_PRINTING_AVAILABLE = True
|
|
except ImportError:
|
|
print("Warning: pywin32 not available. Some Windows-specific features may not work.")
|
|
WINDOWS_PRINTING_AVAILABLE = False
|
|
|
|
# PDF processing imports with fallbacks
|
|
try:
|
|
from PyPDF2 import PdfReader, PdfWriter
|
|
PYPDF2_AVAILABLE = True
|
|
except ImportError:
|
|
try:
|
|
from pypdf import PdfReader, PdfWriter
|
|
PYPDF2_AVAILABLE = True
|
|
except ImportError:
|
|
print("Warning: PDF processing library not available. Install PyPDF2 or pypdf.")
|
|
PYPDF2_AVAILABLE = False
|
|
|
|
# Advanced PDF processing (optional)
|
|
try:
|
|
import fitz # PyMuPDF
|
|
PYMUPDF_AVAILABLE = True
|
|
except ImportError:
|
|
PYMUPDF_AVAILABLE = False
|
|
|
|
# Image processing for PDF conversion
|
|
try:
|
|
from PIL import Image
|
|
from pdf2image import convert_from_path
|
|
IMAGE_PROCESSING_AVAILABLE = True
|
|
except ImportError:
|
|
IMAGE_PROCESSING_AVAILABLE = False
|
|
|
|
# HTTP requests
|
|
try:
|
|
import requests
|
|
REQUESTS_AVAILABLE = True
|
|
except ImportError:
|
|
REQUESTS_AVAILABLE = False
|
|
|
|
# Configure logging
|
|
log_dir = os.path.join(os.path.expanduser("~"), "QualityLabelPrinting", "logs")
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
log_file = os.path.join(log_dir, "print_service.log")
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(log_file),
|
|
logging.StreamHandler(sys.stdout)
|
|
]
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class PrintServiceHandler(BaseHTTPRequestHandler):
|
|
def log_message(self, format, *args):
|
|
"""Override to use our logger instead of stderr"""
|
|
logger.info(f"{self.address_string()} - {format % args}")
|
|
|
|
def do_GET(self):
|
|
"""Handle GET requests"""
|
|
try:
|
|
parsed_path = urlparse(self.path)
|
|
path = parsed_path.path
|
|
|
|
if path == '/health':
|
|
self.handle_health_check()
|
|
elif path == '/printers':
|
|
self.handle_get_printers()
|
|
elif path == '/status':
|
|
self.handle_service_status()
|
|
else:
|
|
self.send_error(404, "Endpoint not found")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error handling GET request: {e}")
|
|
self.send_error(500, str(e))
|
|
|
|
def do_POST(self):
|
|
"""Handle POST requests"""
|
|
try:
|
|
parsed_path = urlparse(self.path)
|
|
path = parsed_path.path
|
|
|
|
if path == '/print_pdf':
|
|
self.handle_print_pdf()
|
|
elif path == '/print_url':
|
|
self.handle_print_url()
|
|
else:
|
|
self.send_error(404, "Endpoint not found")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error handling POST request: {e}")
|
|
self.send_error(500, str(e))
|
|
|
|
def handle_health_check(self):
|
|
"""Health check endpoint"""
|
|
response = {
|
|
"status": "healthy",
|
|
"service": "Quality Label Print Service",
|
|
"version": "2.0.0",
|
|
"platform": "Windows",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"capabilities": ["pdf_printing", "page_by_page", "printer_selection"]
|
|
}
|
|
|
|
self.send_json_response(200, response)
|
|
|
|
def handle_get_printers(self):
|
|
"""Get available printers"""
|
|
try:
|
|
printers = self.get_available_printers()
|
|
response = {
|
|
"success": True,
|
|
"printers": printers,
|
|
"count": len(printers)
|
|
}
|
|
self.send_json_response(200, response)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting printers: {e}")
|
|
response = {
|
|
"success": False,
|
|
"error": str(e),
|
|
"printers": []
|
|
}
|
|
self.send_json_response(500, response)
|
|
|
|
def handle_service_status(self):
|
|
"""Service status information"""
|
|
response = {
|
|
"service_name": "Quality Label Print Service",
|
|
"running": True,
|
|
"uptime": "Available",
|
|
"last_print_job": getattr(self.server, 'last_print_time', 'Never'),
|
|
"total_jobs": getattr(self.server, 'total_jobs', 0)
|
|
}
|
|
self.send_json_response(200, response)
|
|
|
|
def handle_print_pdf(self):
|
|
"""Handle PDF printing requests"""
|
|
try:
|
|
content_length = int(self.headers['Content-Length'])
|
|
post_data = self.rfile.read(content_length)
|
|
data = json.loads(post_data.decode('utf-8'))
|
|
|
|
logger.info(f"Received print request: {data}")
|
|
|
|
# Extract request data
|
|
pdf_url = data.get('pdf_url')
|
|
printer_name = data.get('printer_name', 'default')
|
|
order_id = data.get('order_id')
|
|
prod_order = data.get('prod_order')
|
|
quantity = data.get('quantity')
|
|
|
|
if not pdf_url:
|
|
raise ValueError("PDF URL is required")
|
|
|
|
# Download PDF
|
|
logger.info(f"Downloading PDF from: {pdf_url}")
|
|
pdf_path = self.download_pdf(pdf_url)
|
|
|
|
# Print PDF page by page
|
|
logger.info(f"Printing PDF to: {printer_name}")
|
|
print_result = self.print_pdf_pages(pdf_path, printer_name)
|
|
|
|
# Update service stats
|
|
self.server.last_print_time = datetime.now().isoformat()
|
|
self.server.total_jobs = getattr(self.server, 'total_jobs', 0) + 1
|
|
|
|
# Clean up temporary file
|
|
try:
|
|
os.unlink(pdf_path)
|
|
except:
|
|
pass
|
|
|
|
response = {
|
|
"success": True,
|
|
"message": f"PDF printed successfully to {printer_name}",
|
|
"order_id": order_id,
|
|
"prod_order": prod_order,
|
|
"quantity": quantity,
|
|
"pages_printed": print_result.get('pages', 0),
|
|
"printer_used": print_result.get('printer', printer_name),
|
|
"method": "windows_service_page_by_page"
|
|
}
|
|
|
|
logger.info(f"Print job completed: {response}")
|
|
self.send_json_response(200, response)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error printing PDF: {e}")
|
|
response = {
|
|
"success": False,
|
|
"error": str(e),
|
|
"method": "windows_service_error"
|
|
}
|
|
self.send_json_response(500, response)
|
|
|
|
def handle_print_url(self):
|
|
"""Alternative endpoint that accepts PDF URL and prints it"""
|
|
try:
|
|
content_length = int(self.headers['Content-Length'])
|
|
post_data = self.rfile.read(content_length)
|
|
data = json.loads(post_data.decode('utf-8'))
|
|
|
|
# Same logic as handle_print_pdf
|
|
self.handle_print_pdf()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in print_url endpoint: {e}")
|
|
self.send_error(500, str(e))
|
|
|
|
def download_pdf(self, pdf_url):
|
|
"""Download PDF from URL to temporary file"""
|
|
try:
|
|
# Create temporary file
|
|
temp_dir = tempfile.gettempdir()
|
|
temp_file = tempfile.NamedTemporaryFile(
|
|
suffix='.pdf',
|
|
prefix='quality_label_',
|
|
delete=False,
|
|
dir=temp_dir
|
|
)
|
|
temp_path = temp_file.name
|
|
temp_file.close()
|
|
|
|
logger.info(f"Downloading PDF to: {temp_path}")
|
|
|
|
# Download the PDF
|
|
urllib.request.urlretrieve(pdf_url, temp_path)
|
|
|
|
logger.info(f"PDF downloaded successfully: {os.path.getsize(temp_path)} bytes")
|
|
return temp_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error downloading PDF: {e}")
|
|
raise
|
|
|
|
def print_pdf_pages(self, pdf_path, printer_name):
|
|
"""Print PDF page by page using Windows printing"""
|
|
try:
|
|
logger.info(f"Starting page-by-page printing of: {pdf_path}")
|
|
|
|
# Method 1: Use Adobe Reader command line (if available)
|
|
adobe_result = self.try_adobe_print(pdf_path, printer_name)
|
|
if adobe_result['success']:
|
|
return adobe_result
|
|
|
|
# Method 2: Use SumatraPDF (lightweight, good for automation)
|
|
sumatra_result = self.try_sumatra_print(pdf_path, printer_name)
|
|
if sumatra_result['success']:
|
|
return sumatra_result
|
|
|
|
# Method 3: Use Windows PowerShell
|
|
powershell_result = self.try_powershell_print(pdf_path, printer_name)
|
|
if powershell_result['success']:
|
|
return powershell_result
|
|
|
|
# Method 4: Use Python PDF library + Windows printing
|
|
python_result = self.try_python_print(pdf_path, printer_name)
|
|
return python_result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error printing PDF pages: {e}")
|
|
return {"success": False, "error": str(e), "pages": 0}
|
|
|
|
def try_adobe_print(self, pdf_path, printer_name):
|
|
"""Try printing with Adobe Reader"""
|
|
try:
|
|
# Common Adobe Reader paths
|
|
adobe_paths = [
|
|
r"C:\Program Files\Adobe\Acrobat DC\Acrobat\Acrobat.exe",
|
|
r"C:\Program Files (x86)\Adobe\Acrobat Reader DC\Reader\AcroRd32.exe",
|
|
r"C:\Program Files\Adobe\Acrobat Reader DC\Reader\AcroRd32.exe"
|
|
]
|
|
|
|
adobe_path = None
|
|
for path in adobe_paths:
|
|
if os.path.exists(path):
|
|
adobe_path = path
|
|
break
|
|
|
|
if not adobe_path:
|
|
return {"success": False, "reason": "Adobe Reader not found"}
|
|
|
|
# Adobe command line printing
|
|
if printer_name == 'default':
|
|
cmd = [adobe_path, "/t", pdf_path]
|
|
else:
|
|
cmd = [adobe_path, "/t", pdf_path, printer_name]
|
|
|
|
logger.info(f"Running Adobe print command: {cmd}")
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
|
|
if result.returncode == 0:
|
|
logger.info("Adobe Reader print successful")
|
|
return {
|
|
"success": True,
|
|
"method": "adobe_reader",
|
|
"printer": printer_name,
|
|
"pages": "unknown" # Adobe doesn't return page count
|
|
}
|
|
else:
|
|
logger.warning(f"Adobe print failed: {result.stderr}")
|
|
return {"success": False, "reason": f"Adobe error: {result.stderr}"}
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Adobe print method failed: {e}")
|
|
return {"success": False, "reason": str(e)}
|
|
|
|
def try_sumatra_print(self, pdf_path, printer_name):
|
|
"""Try printing with SumatraPDF"""
|
|
try:
|
|
# SumatraPDF is lightweight and good for automation
|
|
sumatra_paths = [
|
|
r"C:\Program Files\SumatraPDF\SumatraPDF.exe",
|
|
r"C:\Program Files (x86)\SumatraPDF\SumatraPDF.exe"
|
|
]
|
|
|
|
sumatra_path = None
|
|
for path in sumatra_paths:
|
|
if os.path.exists(path):
|
|
sumatra_path = path
|
|
break
|
|
|
|
if not sumatra_path:
|
|
return {"success": False, "reason": "SumatraPDF not found"}
|
|
|
|
# SumatraPDF command line printing
|
|
if printer_name == 'default':
|
|
cmd = [sumatra_path, "-print-to-default", pdf_path]
|
|
else:
|
|
cmd = [sumatra_path, "-print-to", printer_name, pdf_path]
|
|
|
|
logger.info(f"Running SumatraPDF command: {cmd}")
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
|
|
if result.returncode == 0:
|
|
logger.info("SumatraPDF print successful")
|
|
return {
|
|
"success": True,
|
|
"method": "sumatra_pdf",
|
|
"printer": printer_name,
|
|
"pages": "unknown"
|
|
}
|
|
else:
|
|
logger.warning(f"SumatraPDF print failed: {result.stderr}")
|
|
return {"success": False, "reason": f"SumatraPDF error: {result.stderr}"}
|
|
|
|
except Exception as e:
|
|
logger.warning(f"SumatraPDF print method failed: {e}")
|
|
return {"success": False, "reason": str(e)}
|
|
|
|
def try_powershell_print(self, pdf_path, printer_name):
|
|
"""Try printing with PowerShell"""
|
|
try:
|
|
# PowerShell script to print PDF
|
|
if printer_name == 'default':
|
|
ps_script = f'''
|
|
Add-Type -AssemblyName System.Drawing
|
|
$pdf = New-Object System.Drawing.Printing.PrintDocument
|
|
$pdf.DocumentName = "{os.path.basename(pdf_path)}"
|
|
Start-Process -FilePath "{pdf_path}" -Verb Print -Wait
|
|
'''
|
|
else:
|
|
ps_script = f'''
|
|
Add-Type -AssemblyName System.Drawing
|
|
$pdf = New-Object System.Drawing.Printing.PrintDocument
|
|
$pdf.PrinterSettings.PrinterName = "{printer_name}"
|
|
$pdf.DocumentName = "{os.path.basename(pdf_path)}"
|
|
Start-Process -FilePath "{pdf_path}" -Verb Print -Wait
|
|
'''
|
|
|
|
# Execute PowerShell
|
|
cmd = ["powershell", "-Command", ps_script]
|
|
logger.info("Running PowerShell print command")
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=45)
|
|
|
|
if result.returncode == 0:
|
|
logger.info("PowerShell print successful")
|
|
return {
|
|
"success": True,
|
|
"method": "powershell",
|
|
"printer": printer_name,
|
|
"pages": "unknown"
|
|
}
|
|
else:
|
|
logger.warning(f"PowerShell print failed: {result.stderr}")
|
|
return {"success": False, "reason": f"PowerShell error: {result.stderr}"}
|
|
|
|
except Exception as e:
|
|
logger.warning(f"PowerShell print method failed: {e}")
|
|
return {"success": False, "reason": str(e)}
|
|
|
|
def try_python_print(self, pdf_path, printer_name):
|
|
"""Try printing with Python libraries"""
|
|
try:
|
|
# This would require additional libraries like win32print
|
|
# For now, return a fallback method
|
|
logger.info("Python print method - using default system print")
|
|
|
|
# Use Windows default print handler
|
|
if printer_name == 'default':
|
|
os.startfile(pdf_path, "print")
|
|
else:
|
|
# More complex implementation would be needed for specific printer
|
|
os.startfile(pdf_path, "print")
|
|
|
|
return {
|
|
"success": True,
|
|
"method": "python_system_print",
|
|
"printer": printer_name,
|
|
"pages": "unknown"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Python print method failed: {e}")
|
|
return {"success": False, "reason": str(e)}
|
|
|
|
def get_available_printers(self):
|
|
"""Get list of available printers on Windows with comprehensive detection"""
|
|
printers = []
|
|
|
|
# Method 1: Windows API (most reliable if pywin32 is available)
|
|
if WINDOWS_PRINTING_AVAILABLE:
|
|
try:
|
|
printer_info = win32print.EnumPrinters(win32print.PRINTER_ENUM_LOCAL | win32print.PRINTER_ENUM_CONNECTIONS)
|
|
for printer in printer_info:
|
|
printer_name = printer[2] # Printer name is at index 2
|
|
|
|
# Check if this is the default printer
|
|
try:
|
|
default_printer = win32print.GetDefaultPrinter()
|
|
is_default = (printer_name == default_printer)
|
|
except:
|
|
is_default = False
|
|
|
|
printers.append({
|
|
"name": printer_name,
|
|
"display_name": printer_name,
|
|
"is_default": is_default,
|
|
"status": "available",
|
|
"type": "Windows API"
|
|
})
|
|
|
|
logger.info(f"Found {len(printers)} printers via Windows API")
|
|
return printers
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Windows API printer enumeration failed: {e}")
|
|
|
|
# Method 2: PowerShell (good fallback)
|
|
if not printers:
|
|
try:
|
|
cmd = ["powershell", "-Command", "Get-Printer | Select-Object Name,Default | ConvertTo-Json"]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15, shell=True)
|
|
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
try:
|
|
printers_data = json.loads(result.stdout)
|
|
if not isinstance(printers_data, list):
|
|
printers_data = [printers_data] # Single printer case
|
|
|
|
for printer in printers_data:
|
|
printers.append({
|
|
"name": printer.get("Name", "Unknown"),
|
|
"display_name": printer.get("Name", "Unknown"),
|
|
"is_default": printer.get("Default", False),
|
|
"status": "available",
|
|
"type": "PowerShell"
|
|
})
|
|
|
|
logger.info(f"Found {len(printers)} printers via PowerShell")
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"Failed to parse PowerShell printer JSON: {e}")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"PowerShell printer enumeration failed: {e}")
|
|
|
|
# Method 3: WMIC command (older Windows compatibility)
|
|
if not printers:
|
|
try:
|
|
result = subprocess.run(['wmic', 'printer', 'get', 'name', '/format:csv'],
|
|
capture_output=True, text=True, shell=True, timeout=10)
|
|
if result.returncode == 0:
|
|
lines = result.stdout.strip().split('\n')[1:] # Skip header
|
|
for line in lines:
|
|
if line.strip() and ',' in line:
|
|
parts = line.split(',')
|
|
if len(parts) >= 2 and parts[1].strip():
|
|
printer_name = parts[1].strip()
|
|
printers.append({
|
|
"name": printer_name,
|
|
"display_name": printer_name,
|
|
"is_default": False,
|
|
"status": "available",
|
|
"type": "WMIC"
|
|
})
|
|
|
|
logger.info(f"Found {len(printers)} printers via WMIC")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"WMIC printer enumeration failed: {e}")
|
|
|
|
# Method 4: Registry search (comprehensive fallback)
|
|
if not printers:
|
|
try:
|
|
import winreg
|
|
reg_path = r"SYSTEM\CurrentControlSet\Control\Print\Printers"
|
|
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, reg_path) as key:
|
|
i = 0
|
|
while True:
|
|
try:
|
|
printer_name = winreg.EnumKey(key, i)
|
|
printers.append({
|
|
"name": printer_name,
|
|
"display_name": printer_name,
|
|
"is_default": False,
|
|
"status": "available",
|
|
"type": "Registry"
|
|
})
|
|
i += 1
|
|
except OSError:
|
|
break
|
|
|
|
logger.info(f"Found {len(printers)} printers via Registry")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Registry printer enumeration failed: {e}")
|
|
|
|
# Fallback: Add common Windows printers
|
|
if not printers:
|
|
common_printers = [
|
|
("Microsoft Print to PDF", True),
|
|
("Microsoft XPS Document Writer", False),
|
|
("OneNote for Windows 10", False),
|
|
("Fax", False)
|
|
]
|
|
|
|
for printer_name, is_default in common_printers:
|
|
printers.append({
|
|
"name": printer_name,
|
|
"display_name": f"{printer_name} (Built-in)",
|
|
"is_default": is_default,
|
|
"status": "available",
|
|
"type": "Built-in"
|
|
})
|
|
|
|
logger.info(f"Using {len(printers)} built-in printer options")
|
|
|
|
# Ensure we have at least one default option
|
|
if not printers:
|
|
printers.append({
|
|
"name": "default",
|
|
"display_name": "System Default Printer",
|
|
"is_default": True,
|
|
"status": "available",
|
|
"type": "Fallback"
|
|
})
|
|
|
|
logger.info(f"Total available printers: {len(printers)}")
|
|
return printers
|
|
|
|
def send_json_response(self, status_code, data):
|
|
"""Send JSON response"""
|
|
self.send_response(status_code)
|
|
self.send_header('Content-type', 'application/json')
|
|
self.send_header('Access-Control-Allow-Origin', '*')
|
|
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
|
|
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
|
|
self.end_headers()
|
|
|
|
json_data = json.dumps(data, indent=2)
|
|
self.wfile.write(json_data.encode('utf-8'))
|
|
|
|
def do_OPTIONS(self):
|
|
"""Handle CORS preflight requests"""
|
|
self.send_response(200)
|
|
self.send_header('Access-Control-Allow-Origin', '*')
|
|
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
|
|
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
|
|
self.end_headers()
|
|
|
|
class PrintService:
|
|
def __init__(self, port=8765):
|
|
self.port = port
|
|
self.server = None
|
|
self.running = False
|
|
|
|
def start(self):
|
|
"""Start the print service"""
|
|
try:
|
|
logger.info(f"Starting Quality Label Print Service on port {self.port}")
|
|
|
|
self.server = HTTPServer(('localhost', self.port), PrintServiceHandler)
|
|
self.server.total_jobs = 0
|
|
self.server.last_print_time = None
|
|
|
|
logger.info(f"Print service running at http://localhost:{self.port}")
|
|
logger.info("Available endpoints:")
|
|
logger.info(" GET /health - Health check")
|
|
logger.info(" GET /printers - List available printers")
|
|
logger.info(" GET /status - Service status")
|
|
logger.info(" POST /print_pdf - Print PDF from URL")
|
|
|
|
self.running = True
|
|
self.server.serve_forever()
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Service stopped by user")
|
|
except Exception as e:
|
|
logger.error(f"Service error: {e}")
|
|
finally:
|
|
self.stop()
|
|
|
|
def stop(self):
|
|
"""Stop the print service"""
|
|
if self.server:
|
|
logger.info("Shutting down print service...")
|
|
self.server.shutdown()
|
|
self.server.server_close()
|
|
self.running = False
|
|
|
|
if __name__ == "__main__":
|
|
service = PrintService(port=8765)
|
|
try:
|
|
service.start()
|
|
except KeyboardInterrupt:
|
|
print("\nService stopped by user")
|
|
except Exception as e:
|
|
print(f"Service failed to start: {e}")
|
|
logger.error(f"Service failed to start: {e}") |