""" WMT management web routes – global settings, device registry, update requests. """ import csv import io import json import re import zipfile from pathlib import Path from flask import Blueprint, render_template, request, redirect, url_for, flash, Response, jsonify from datetime import datetime, timezone from app.models import WMTGlobalConfig, Device, WMTUpdateRequest from config.database_config import get_db import logging logger = logging.getLogger(__name__) wmt_web_bp = Blueprint('wmt_web', __name__, url_prefix='/wmt') # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _get_or_create_global_config(session): cfg = session.query(WMTGlobalConfig).first() if cfg is None: cfg = WMTGlobalConfig() session.add(cfg) session.flush() return cfg # --------------------------------------------------------------------------- # Dashboard # --------------------------------------------------------------------------- @wmt_web_bp.route('/') def index(): """WMT management dashboard.""" try: with get_db().get_session() as session: global_cfg = _get_or_create_global_config(session) devices = session.query(Device).filter(Device.mac_address.isnot(None)).order_by(Device.nume_masa).all() pending_count = session.query(WMTUpdateRequest).filter_by(status='pending').count() recent_requests = ( session.query(WMTUpdateRequest) .order_by(WMTUpdateRequest.submitted_at.desc()) .limit(5) .all() ) return render_template( 'wmt/index.html', global_cfg=global_cfg, devices=devices, pending_count=pending_count, recent_requests=recent_requests, breadcrumbs=[{'url': url_for('wmt_web.index'), 'title': 'WMT Management'}], ) except Exception as e: logger.error(f'WMT dashboard error: {e}') flash(f'Error loading dashboard: {e}', 'error') return render_template('wmt/index.html', global_cfg=None, devices=[], pending_count=0, recent_requests=[]) # --------------------------------------------------------------------------- # Global settings # --------------------------------------------------------------------------- @wmt_web_bp.route('/settings', methods=['GET', 'POST']) def settings(): """View and edit global WMT configuration.""" try: with get_db().get_session() as session: cfg = _get_or_create_global_config(session) if request.method == 'POST': cfg.chrome_url = request.form.get('chrome_url', '').strip() cfg.chrome_local_url = request.form.get('chrome_local_url', '').strip() or None cfg.chrome_insecure_origin = request.form.get('chrome_insecure_origin', '').strip() cfg.card_api_base_url = request.form.get('card_api_base_url', '').strip() cfg.server_log_url = request.form.get('server_log_url', '').strip() cfg.internet_check_host = request.form.get('internet_check_host', '').strip() cfg.update_host = request.form.get('update_host', '').strip() cfg.update_user = request.form.get('update_user', '').strip() cfg.notes = request.form.get('notes', '').strip() or None cfg.updated_at = datetime.utcnow() cfg.updated_by = 'admin' flash('Global settings saved.', 'success') return redirect(url_for('wmt_web.settings')) return render_template( 'wmt/settings.html', cfg=cfg, breadcrumbs=[ {'url': url_for('wmt_web.index'), 'title': 'WMT Management'}, {'url': url_for('wmt_web.settings'), 'title': 'Global Settings'}, ], ) except Exception as e: logger.error(f'WMT settings error: {e}') flash(f'Error: {e}', 'error') return redirect(url_for('wmt_web.index')) # --------------------------------------------------------------------------- # Update requests # --------------------------------------------------------------------------- @wmt_web_bp.route('/requests') def update_requests(): """List all device update requests.""" status_filter = request.args.get('status', 'pending') try: with get_db().get_session() as session: query = session.query(WMTUpdateRequest) if status_filter != 'all': query = query.filter_by(status=status_filter) req_list = query.order_by(WMTUpdateRequest.submitted_at.desc()).all() pending_count = session.query(WMTUpdateRequest).filter_by(status='pending').count() return render_template( 'wmt/requests.html', requests=req_list, status_filter=status_filter, pending_count=pending_count, breadcrumbs=[ {'url': url_for('wmt_web.index'), 'title': 'WMT Management'}, {'url': url_for('wmt_web.update_requests'), 'title': 'Update Requests'}, ], ) except Exception as e: logger.error(f'WMT requests list error: {e}') flash(f'Error: {e}', 'error') return redirect(url_for('wmt_web.index')) @wmt_web_bp.route('/requests//accept', methods=['POST']) def accept_request(req_id): """Accept an update request: apply proposed values to WMTDevice.""" try: with get_db().get_session() as session: req = session.query(WMTUpdateRequest).filter_by(id=req_id).first() if not req: flash('Request not found.', 'error') return redirect(url_for('wmt_web.update_requests')) # Find or create the Device device = session.query(Device).filter_by(mac_address=req.mac_address).first() if device is None: device = Device( mac_address=req.mac_address, hostname=req.proposed_hostname or '', device_ip=req.proposed_device_ip or '', nume_masa=req.proposed_device_name or '', ) session.add(device) session.flush() req.device_id = device.id # Apply proposed values if req.proposed_device_name is not None: device.nume_masa = req.proposed_device_name if req.proposed_hostname is not None: device.hostname = req.proposed_hostname if req.proposed_device_ip is not None: device.device_ip = req.proposed_device_ip device.config_updated_at = datetime.utcnow() device.info_reviewed_at = datetime.utcnow() # admin reviewed → push timestamp to devices # Mark request as accepted req.status = 'accepted' req.admin_reviewed_at = datetime.utcnow() req.admin_notes = request.form.get('admin_notes', '').strip() or None flash('Request accepted and device record updated.', 'success') except Exception as e: logger.error(f'WMT accept request error: {e}') flash(f'Error accepting request: {e}', 'error') return redirect(url_for('wmt_web.update_requests')) @wmt_web_bp.route('/requests//reject', methods=['POST']) def reject_request(req_id): """Reject an update request (updates reviewed_at so WMT client won't re-submit).""" try: with get_db().get_session() as session: req = session.query(WMTUpdateRequest).filter_by(id=req_id).first() if not req: flash('Request not found.', 'error') return redirect(url_for('wmt_web.update_requests')) req.status = 'rejected' req.admin_reviewed_at = datetime.utcnow() req.admin_notes = request.form.get('admin_notes', '').strip() or None # Update device info_reviewed_at even though data didn't change – # this signals to the WMT client that the server has reviewed the state # so it won't keep re-submitting the same request. if req.device_id: device = session.query(Device).filter_by(id=req.device_id).first() if device: device.info_reviewed_at = datetime.utcnow() flash('Request rejected.', 'warning') except Exception as e: logger.error(f'WMT reject request error: {e}') flash(f'Error rejecting request: {e}', 'error') return redirect(url_for('wmt_web.update_requests')) # --------------------------------------------------------------------------- # Device registry # --------------------------------------------------------------------------- @wmt_web_bp.route('/devices') def devices(): """List all WMT-registered devices.""" try: with get_db().get_session() as session: device_list = ( session.query(Device) .filter(Device.mac_address.isnot(None)) .order_by(Device.nume_masa) .all() ) return render_template( 'wmt/devices.html', devices=device_list, breadcrumbs=[ {'url': url_for('wmt_web.index'), 'title': 'WMT Management'}, {'url': url_for('wmt_web.devices'), 'title': 'Devices'}, ], ) except Exception as e: logger.error(f'WMT devices list error: {e}') flash(f'Error: {e}', 'error') return redirect(url_for('wmt_web.index')) @wmt_web_bp.route('/devices/new', methods=['GET', 'POST']) def device_new(): """Register a new WMT device manually.""" if request.method == 'POST': mac = (request.form.get('mac_address') or '').strip().lower() if not mac: flash('MAC address is required.', 'error') return render_template('wmt/device_form.html', device=None) try: with get_db().get_session() as session: existing = session.query(Device).filter_by(mac_address=mac).first() if existing: flash(f'Device with MAC {mac} already exists.', 'error') return render_template('wmt/device_form.html', device=None) device = Device( mac_address=mac, nume_masa=request.form.get('device_name', '').strip(), hostname=request.form.get('hostname', '').strip(), device_ip=request.form.get('device_ip', '').strip() or '127.0.0.1', location=request.form.get('location', '').strip() or None, card_presence=request.form.get('card_presence', 'enable'), description=request.form.get('notes', '').strip() or None, info_reviewed_at=datetime.utcnow(), ) session.add(device) flash('Device registered.', 'success') return redirect(url_for('wmt_web.devices')) except Exception as e: logger.error(f'WMT device_new error: {e}') flash(f'Error: {e}', 'error') return render_template('wmt/device_form.html', device=None) return render_template( 'wmt/device_form.html', device=None, breadcrumbs=[ {'url': url_for('wmt_web.index'), 'title': 'WMT Management'}, {'url': url_for('wmt_web.devices'), 'title': 'Devices'}, {'url': url_for('wmt_web.device_new'), 'title': 'New Device'}, ], ) @wmt_web_bp.route('/devices//edit', methods=['GET', 'POST']) def device_edit(device_id): """Edit an existing WMT device.""" try: with get_db().get_session() as session: device = session.query(Device).filter_by(id=device_id).first() if not device: flash('Device not found.', 'error') return redirect(url_for('wmt_web.devices')) if request.method == 'POST': device.nume_masa = request.form.get('device_name', '').strip() device.hostname = request.form.get('hostname', '').strip() device.device_ip = request.form.get('device_ip', '').strip() or device.device_ip device.location = request.form.get('location', '').strip() or None device.card_presence = request.form.get('card_presence', 'enable') device.description = request.form.get('notes', '').strip() or None custom_url = request.form.get('custom_chrome_url', '').strip() device.custom_chrome_url = custom_url if custom_url else None device.config_updated_at = datetime.utcnow() device.info_reviewed_at = datetime.utcnow() flash('Device updated.', 'success') return redirect(url_for('wmt_web.devices')) return render_template( 'wmt/device_form.html', device=device, breadcrumbs=[ {'url': url_for('wmt_web.index'), 'title': 'WMT Management'}, {'url': url_for('wmt_web.devices'), 'title': 'Devices'}, {'url': url_for('wmt_web.device_edit', device_id=device_id), 'title': 'Edit'}, ], ) except Exception as e: logger.error(f'WMT device_edit error: {e}') flash(f'Error: {e}', 'error') return redirect(url_for('wmt_web.devices')) @wmt_web_bp.route('/devices/export.csv') def devices_export_csv(): """Export all WMT devices as a CSV file.""" try: with get_db().get_session() as session: device_list = ( session.query(Device) .filter(Device.mac_address.isnot(None)) .order_by(Device.nume_masa) .all() ) output = io.StringIO() writer = csv.writer(output) writer.writerow(['hostname', 'mac_address', 'work_place', 'rfid_card_status']) for d in device_list: writer.writerow([ d.hostname or '', d.mac_address or '', d.nume_masa or '', d.card_presence or 'enable', ]) csv_data = output.getvalue() return Response( csv_data, mimetype='text/csv', headers={'Content-Disposition': 'attachment; filename=wmt_devices.csv'}, ) except Exception as e: logger.error(f'WMT devices_export_csv error: {e}') flash(f'Export error: {e}', 'error') return redirect(url_for('wmt_web.devices')) @wmt_web_bp.route('/devices//delete', methods=['POST']) def device_delete(device_id): """Delete a WMT device.""" try: with get_db().get_session() as session: device = session.query(Device).filter_by(id=device_id).first() if device: session.delete(device) flash('Device deleted.', 'success') else: flash('Device not found.', 'error') except Exception as e: logger.error(f'WMT device_delete error: {e}') flash(f'Error: {e}', 'error') return redirect(url_for('wmt_web.devices')) # --------------------------------------------------------------------------- # WMT Client Release Management # --------------------------------------------------------------------------- WMT_RELEASES_DIR = Path('data/wmt_releases') def _read_release_meta(): meta_path = WMT_RELEASES_DIR / 'latest.json' if meta_path.exists(): try: return json.loads(meta_path.read_text()) except Exception: pass return None @wmt_web_bp.route('/releases') def releases(): """WMT client release management page.""" meta = _read_release_meta() zip_size = None if meta: zip_path = WMT_RELEASES_DIR / meta.get('filename', '') if zip_path.exists(): zip_size = zip_path.stat().st_size return render_template('wmt/releases.html', meta=meta, zip_size=zip_size) @wmt_web_bp.route('/releases/upload', methods=['POST']) def releases_upload(): """Upload a new WMT client release zip and set it as latest.""" f = request.files.get('release_zip') version_str = request.form.get('version', '').strip() notes = request.form.get('notes', '').strip() if not f or not f.filename.endswith('.zip'): flash('Please upload a .zip file.', 'error') return redirect(url_for('wmt_web.releases')) if not version_str: flash('Version is required.', 'error') return redirect(url_for('wmt_web.releases')) # Validate version format (digits and dots) if not re.match(r'^\d+(\.\d+)*$', version_str): flash('Version must be in numeric format (e.g. 3.0 or 3.1.2).', 'error') return redirect(url_for('wmt_web.releases')) try: WMT_RELEASES_DIR.mkdir(parents=True, exist_ok=True) filename = f'wmt_v{version_str}.zip' zip_path = WMT_RELEASES_DIR / filename # Validate the zip contains app.py data = f.read() try: with zipfile.ZipFile(io.BytesIO(data)) as zf: names = zf.namelist() if 'app.py' not in names: flash('Invalid release: zip must contain app.py at the root.', 'error') return redirect(url_for('wmt_web.releases')) except zipfile.BadZipFile: flash('Uploaded file is not a valid zip archive.', 'error') return redirect(url_for('wmt_web.releases')) zip_path.write_bytes(data) meta = { 'version': version_str, 'notes': notes, 'uploaded_at': datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S'), 'filename': filename, } (WMT_RELEASES_DIR / 'latest.json').write_text(json.dumps(meta, indent=2)) logger.info(f'New WMT release uploaded: v{version_str} ({filename})') flash(f'Release v{version_str} uploaded and set as latest.', 'success') except Exception as e: logger.error(f'WMT release upload error: {e}') flash(f'Upload error: {e}', 'error') return redirect(url_for('wmt_web.releases')) @wmt_web_bp.route('/releases/delete', methods=['POST']) def releases_delete(): """Remove the current release zip and metadata.""" try: meta = _read_release_meta() if meta: zip_path = WMT_RELEASES_DIR / meta.get('filename', '') if zip_path.exists(): zip_path.unlink() meta_path = WMT_RELEASES_DIR / 'latest.json' if meta_path.exists(): meta_path.unlink() flash('Release deleted.', 'success') except Exception as e: logger.error(f'WMT release delete error: {e}') flash(f'Error: {e}', 'error') return redirect(url_for('wmt_web.releases')) # Patterns excluded when building a release from a local folder _BUILD_EXCLUDE_DIRS = {'.git', '__pycache__', 'data', 'venv', '.venv', 'node_modules'} _BUILD_EXCLUDE_EXTS = {'.pyc', '.pyo', '.bak', '.log', '.swp'} def _should_exclude(rel_parts): """Return True if this path should be left out of the release zip.""" # Any hidden segment (starts with '.') for part in rel_parts: if part.startswith('.'): return True # Top-level or nested dir exclusion if rel_parts[0] in _BUILD_EXCLUDE_DIRS: return True # Extension exclusion if Path(rel_parts[-1]).suffix.lower() in _BUILD_EXCLUDE_EXTS: return True return False @wmt_web_bp.route('/releases/build', methods=['POST']) def releases_build(): """ Build a release zip directly from a folder on this server, excluding hidden files, __pycache__, data/, venv/, etc. """ folder_path = request.form.get('folder_path', '/home/pi/Desktop/WMT').strip() version_str = request.form.get('version', '').strip() notes = request.form.get('notes', '').strip() if not version_str: flash('Version is required.', 'error') return redirect(url_for('wmt_web.releases')) if not re.match(r'^\d+(\.\d+)*$', version_str): flash('Version must be in numeric format (e.g. 3.0).', 'error') return redirect(url_for('wmt_web.releases')) src = Path(folder_path) if not src.is_dir(): flash(f'Folder not found: {folder_path}', 'error') return redirect(url_for('wmt_web.releases')) try: buf = io.BytesIO() file_count = 0 has_app_py = False with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf: for abs_path in sorted(src.rglob('*')): if not abs_path.is_file(): continue rel = abs_path.relative_to(src) parts = rel.parts if _should_exclude(parts): continue arcname = str(rel) zf.write(abs_path, arcname) file_count += 1 if arcname == 'app.py': has_app_py = True if not has_app_py: flash(f'No app.py found in {folder_path} — cannot create release.', 'error') return redirect(url_for('wmt_web.releases')) WMT_RELEASES_DIR.mkdir(parents=True, exist_ok=True) filename = f'wmt_v{version_str}.zip' zip_path = WMT_RELEASES_DIR / filename zip_path.write_bytes(buf.getvalue()) meta = { 'version': version_str, 'notes': notes, 'uploaded_at': datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S'), 'filename': filename, } (WMT_RELEASES_DIR / 'latest.json').write_text(json.dumps(meta, indent=2)) size_kb = len(buf.getvalue()) // 1024 logger.info(f'WMT release built from {folder_path}: v{version_str}, {file_count} files, {size_kb} KB') flash(f'Release v{version_str} built from {folder_path} ({file_count} files, {size_kb} KB).', 'success') except Exception as e: logger.error(f'WMT release build error: {e}') flash(f'Build error: {e}', 'error') return redirect(url_for('wmt_web.releases'))