Files
Server_Monitorizare_v2/app/web/wmt.py
T
ske087 f1449285ba feat: WMT client versioning, release management and force-update playbook
- api/wmt.py: add GET /api/wmt/client/version and GET /api/wmt/client/download endpoints; rewrite submit_update_request with dedup logic
- web/wmt.py: add releases, releases_upload, releases_delete, releases_build routes; build-from-folder excludes hidden/data/venv/pyc files
- web/main.py: admin per-device delete route; clear-device-logs route; pass devices list to admin template
- templates/wmt/releases.html: new release management page (current release info, upload form, build-from-folder card)
- templates/admin.html: replace nuclear clear-devices with clear-logs + per-device delete table
- templates/base.html: add Client Releases nav link in WMT sidebar section
- templates/ansible/execute.html: add Update WMT Code playbook card
- ansible/playbooks/update_wmt_code.yml: rsync WMT_project to clients excluding data/; backs up app.py; restarts wmt service
- ansible_service.py: register update_wmt_code description
- .gitignore: whitelist update_wmt_code.yml
2026-05-13 16:36:17 +03:00

563 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
WMT management web routes global settings, device registry, update requests.
"""
import csv
import io
import json
import re
import zipfile
from pathlib import Path
from flask import Blueprint, render_template, request, redirect, url_for, flash, Response, jsonify
from datetime import datetime, timezone
from app.models import WMTGlobalConfig, Device, WMTUpdateRequest
from config.database_config import get_db
import logging
logger = logging.getLogger(__name__)
wmt_web_bp = Blueprint('wmt_web', __name__, url_prefix='/wmt')
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _get_or_create_global_config(session):
cfg = session.query(WMTGlobalConfig).first()
if cfg is None:
cfg = WMTGlobalConfig()
session.add(cfg)
session.flush()
return cfg
# ---------------------------------------------------------------------------
# Dashboard
# ---------------------------------------------------------------------------
@wmt_web_bp.route('/')
def index():
"""WMT management dashboard."""
try:
with get_db().get_session() as session:
global_cfg = _get_or_create_global_config(session)
devices = session.query(Device).filter(Device.mac_address.isnot(None)).order_by(Device.nume_masa).all()
pending_count = session.query(WMTUpdateRequest).filter_by(status='pending').count()
recent_requests = (
session.query(WMTUpdateRequest)
.order_by(WMTUpdateRequest.submitted_at.desc())
.limit(5)
.all()
)
return render_template(
'wmt/index.html',
global_cfg=global_cfg,
devices=devices,
pending_count=pending_count,
recent_requests=recent_requests,
breadcrumbs=[{'url': url_for('wmt_web.index'), 'title': 'WMT Management'}],
)
except Exception as e:
logger.error(f'WMT dashboard error: {e}')
flash(f'Error loading dashboard: {e}', 'error')
return render_template('wmt/index.html', global_cfg=None, devices=[],
pending_count=0, recent_requests=[])
# ---------------------------------------------------------------------------
# Global settings
# ---------------------------------------------------------------------------
@wmt_web_bp.route('/settings', methods=['GET', 'POST'])
def settings():
"""View and edit global WMT configuration."""
try:
with get_db().get_session() as session:
cfg = _get_or_create_global_config(session)
if request.method == 'POST':
cfg.chrome_url = request.form.get('chrome_url', '').strip()
cfg.chrome_local_url = request.form.get('chrome_local_url', '').strip() or None
cfg.chrome_insecure_origin = request.form.get('chrome_insecure_origin', '').strip()
cfg.card_api_base_url = request.form.get('card_api_base_url', '').strip()
cfg.server_log_url = request.form.get('server_log_url', '').strip()
cfg.internet_check_host = request.form.get('internet_check_host', '').strip()
cfg.update_host = request.form.get('update_host', '').strip()
cfg.update_user = request.form.get('update_user', '').strip()
cfg.notes = request.form.get('notes', '').strip() or None
cfg.updated_at = datetime.utcnow()
cfg.updated_by = 'admin'
flash('Global settings saved.', 'success')
return redirect(url_for('wmt_web.settings'))
return render_template(
'wmt/settings.html',
cfg=cfg,
breadcrumbs=[
{'url': url_for('wmt_web.index'), 'title': 'WMT Management'},
{'url': url_for('wmt_web.settings'), 'title': 'Global Settings'},
],
)
except Exception as e:
logger.error(f'WMT settings error: {e}')
flash(f'Error: {e}', 'error')
return redirect(url_for('wmt_web.index'))
# ---------------------------------------------------------------------------
# Update requests
# ---------------------------------------------------------------------------
@wmt_web_bp.route('/requests')
def update_requests():
"""List all device update requests."""
status_filter = request.args.get('status', 'pending')
try:
with get_db().get_session() as session:
query = session.query(WMTUpdateRequest)
if status_filter != 'all':
query = query.filter_by(status=status_filter)
req_list = query.order_by(WMTUpdateRequest.submitted_at.desc()).all()
pending_count = session.query(WMTUpdateRequest).filter_by(status='pending').count()
return render_template(
'wmt/requests.html',
requests=req_list,
status_filter=status_filter,
pending_count=pending_count,
breadcrumbs=[
{'url': url_for('wmt_web.index'), 'title': 'WMT Management'},
{'url': url_for('wmt_web.update_requests'), 'title': 'Update Requests'},
],
)
except Exception as e:
logger.error(f'WMT requests list error: {e}')
flash(f'Error: {e}', 'error')
return redirect(url_for('wmt_web.index'))
@wmt_web_bp.route('/requests/<int:req_id>/accept', methods=['POST'])
def accept_request(req_id):
"""Accept an update request: apply proposed values to WMTDevice."""
try:
with get_db().get_session() as session:
req = session.query(WMTUpdateRequest).filter_by(id=req_id).first()
if not req:
flash('Request not found.', 'error')
return redirect(url_for('wmt_web.update_requests'))
# Find or create the Device
device = session.query(Device).filter_by(mac_address=req.mac_address).first()
if device is None:
device = Device(
mac_address=req.mac_address,
hostname=req.proposed_hostname or '',
device_ip=req.proposed_device_ip or '',
nume_masa=req.proposed_device_name or '',
)
session.add(device)
session.flush()
req.device_id = device.id
# Apply proposed values
if req.proposed_device_name is not None:
device.nume_masa = req.proposed_device_name
if req.proposed_hostname is not None:
device.hostname = req.proposed_hostname
if req.proposed_device_ip is not None:
device.device_ip = req.proposed_device_ip
device.config_updated_at = datetime.utcnow()
device.info_reviewed_at = datetime.utcnow() # admin reviewed → push timestamp to devices
# Mark request as accepted
req.status = 'accepted'
req.admin_reviewed_at = datetime.utcnow()
req.admin_notes = request.form.get('admin_notes', '').strip() or None
flash('Request accepted and device record updated.', 'success')
except Exception as e:
logger.error(f'WMT accept request error: {e}')
flash(f'Error accepting request: {e}', 'error')
return redirect(url_for('wmt_web.update_requests'))
@wmt_web_bp.route('/requests/<int:req_id>/reject', methods=['POST'])
def reject_request(req_id):
"""Reject an update request (updates reviewed_at so WMT client won't re-submit)."""
try:
with get_db().get_session() as session:
req = session.query(WMTUpdateRequest).filter_by(id=req_id).first()
if not req:
flash('Request not found.', 'error')
return redirect(url_for('wmt_web.update_requests'))
req.status = 'rejected'
req.admin_reviewed_at = datetime.utcnow()
req.admin_notes = request.form.get('admin_notes', '').strip() or None
# Update device info_reviewed_at even though data didn't change
# this signals to the WMT client that the server has reviewed the state
# so it won't keep re-submitting the same request.
if req.device_id:
device = session.query(Device).filter_by(id=req.device_id).first()
if device:
device.info_reviewed_at = datetime.utcnow()
flash('Request rejected.', 'warning')
except Exception as e:
logger.error(f'WMT reject request error: {e}')
flash(f'Error rejecting request: {e}', 'error')
return redirect(url_for('wmt_web.update_requests'))
# ---------------------------------------------------------------------------
# Device registry
# ---------------------------------------------------------------------------
@wmt_web_bp.route('/devices')
def devices():
"""List all WMT-registered devices."""
try:
with get_db().get_session() as session:
device_list = (
session.query(Device)
.filter(Device.mac_address.isnot(None))
.order_by(Device.nume_masa)
.all()
)
return render_template(
'wmt/devices.html',
devices=device_list,
breadcrumbs=[
{'url': url_for('wmt_web.index'), 'title': 'WMT Management'},
{'url': url_for('wmt_web.devices'), 'title': 'Devices'},
],
)
except Exception as e:
logger.error(f'WMT devices list error: {e}')
flash(f'Error: {e}', 'error')
return redirect(url_for('wmt_web.index'))
@wmt_web_bp.route('/devices/new', methods=['GET', 'POST'])
def device_new():
"""Register a new WMT device manually."""
if request.method == 'POST':
mac = (request.form.get('mac_address') or '').strip().lower()
if not mac:
flash('MAC address is required.', 'error')
return render_template('wmt/device_form.html', device=None)
try:
with get_db().get_session() as session:
existing = session.query(Device).filter_by(mac_address=mac).first()
if existing:
flash(f'Device with MAC {mac} already exists.', 'error')
return render_template('wmt/device_form.html', device=None)
device = Device(
mac_address=mac,
nume_masa=request.form.get('device_name', '').strip(),
hostname=request.form.get('hostname', '').strip(),
device_ip=request.form.get('device_ip', '').strip() or '127.0.0.1',
location=request.form.get('location', '').strip() or None,
card_presence=request.form.get('card_presence', 'enable'),
description=request.form.get('notes', '').strip() or None,
info_reviewed_at=datetime.utcnow(),
)
session.add(device)
flash('Device registered.', 'success')
return redirect(url_for('wmt_web.devices'))
except Exception as e:
logger.error(f'WMT device_new error: {e}')
flash(f'Error: {e}', 'error')
return render_template('wmt/device_form.html', device=None)
return render_template(
'wmt/device_form.html',
device=None,
breadcrumbs=[
{'url': url_for('wmt_web.index'), 'title': 'WMT Management'},
{'url': url_for('wmt_web.devices'), 'title': 'Devices'},
{'url': url_for('wmt_web.device_new'), 'title': 'New Device'},
],
)
@wmt_web_bp.route('/devices/<int:device_id>/edit', methods=['GET', 'POST'])
def device_edit(device_id):
"""Edit an existing WMT device."""
try:
with get_db().get_session() as session:
device = session.query(Device).filter_by(id=device_id).first()
if not device:
flash('Device not found.', 'error')
return redirect(url_for('wmt_web.devices'))
if request.method == 'POST':
device.nume_masa = request.form.get('device_name', '').strip()
device.hostname = request.form.get('hostname', '').strip()
device.device_ip = request.form.get('device_ip', '').strip() or device.device_ip
device.location = request.form.get('location', '').strip() or None
device.card_presence = request.form.get('card_presence', 'enable')
device.description = request.form.get('notes', '').strip() or None
device.config_updated_at = datetime.utcnow()
device.info_reviewed_at = datetime.utcnow()
flash('Device updated.', 'success')
return redirect(url_for('wmt_web.devices'))
return render_template(
'wmt/device_form.html',
device=device,
breadcrumbs=[
{'url': url_for('wmt_web.index'), 'title': 'WMT Management'},
{'url': url_for('wmt_web.devices'), 'title': 'Devices'},
{'url': url_for('wmt_web.device_edit', device_id=device_id), 'title': 'Edit'},
],
)
except Exception as e:
logger.error(f'WMT device_edit error: {e}')
flash(f'Error: {e}', 'error')
return redirect(url_for('wmt_web.devices'))
@wmt_web_bp.route('/devices/export.csv')
def devices_export_csv():
"""Export all WMT devices as a CSV file."""
try:
with get_db().get_session() as session:
device_list = (
session.query(Device)
.filter(Device.mac_address.isnot(None))
.order_by(Device.nume_masa)
.all()
)
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(['hostname', 'mac_address', 'work_place', 'rfid_card_status'])
for d in device_list:
writer.writerow([
d.hostname or '',
d.mac_address or '',
d.nume_masa or '',
d.card_presence or 'enable',
])
csv_data = output.getvalue()
return Response(
csv_data,
mimetype='text/csv',
headers={'Content-Disposition': 'attachment; filename=wmt_devices.csv'},
)
except Exception as e:
logger.error(f'WMT devices_export_csv error: {e}')
flash(f'Export error: {e}', 'error')
return redirect(url_for('wmt_web.devices'))
@wmt_web_bp.route('/devices/<int:device_id>/delete', methods=['POST'])
def device_delete(device_id):
"""Delete a WMT device."""
try:
with get_db().get_session() as session:
device = session.query(Device).filter_by(id=device_id).first()
if device:
session.delete(device)
flash('Device deleted.', 'success')
else:
flash('Device not found.', 'error')
except Exception as e:
logger.error(f'WMT device_delete error: {e}')
flash(f'Error: {e}', 'error')
return redirect(url_for('wmt_web.devices'))
# ---------------------------------------------------------------------------
# WMT Client Release Management
# ---------------------------------------------------------------------------
WMT_RELEASES_DIR = Path('data/wmt_releases')
def _read_release_meta():
meta_path = WMT_RELEASES_DIR / 'latest.json'
if meta_path.exists():
try:
return json.loads(meta_path.read_text())
except Exception:
pass
return None
@wmt_web_bp.route('/releases')
def releases():
"""WMT client release management page."""
meta = _read_release_meta()
zip_size = None
if meta:
zip_path = WMT_RELEASES_DIR / meta.get('filename', '')
if zip_path.exists():
zip_size = zip_path.stat().st_size
return render_template('wmt/releases.html', meta=meta, zip_size=zip_size)
@wmt_web_bp.route('/releases/upload', methods=['POST'])
def releases_upload():
"""Upload a new WMT client release zip and set it as latest."""
f = request.files.get('release_zip')
version_str = request.form.get('version', '').strip()
notes = request.form.get('notes', '').strip()
if not f or not f.filename.endswith('.zip'):
flash('Please upload a .zip file.', 'error')
return redirect(url_for('wmt_web.releases'))
if not version_str:
flash('Version is required.', 'error')
return redirect(url_for('wmt_web.releases'))
# Validate version format (digits and dots)
if not re.match(r'^\d+(\.\d+)*$', version_str):
flash('Version must be in numeric format (e.g. 3.0 or 3.1.2).', 'error')
return redirect(url_for('wmt_web.releases'))
try:
WMT_RELEASES_DIR.mkdir(parents=True, exist_ok=True)
filename = f'wmt_v{version_str}.zip'
zip_path = WMT_RELEASES_DIR / filename
# Validate the zip contains app.py
data = f.read()
try:
with zipfile.ZipFile(io.BytesIO(data)) as zf:
names = zf.namelist()
if 'app.py' not in names:
flash('Invalid release: zip must contain app.py at the root.', 'error')
return redirect(url_for('wmt_web.releases'))
except zipfile.BadZipFile:
flash('Uploaded file is not a valid zip archive.', 'error')
return redirect(url_for('wmt_web.releases'))
zip_path.write_bytes(data)
meta = {
'version': version_str,
'notes': notes,
'uploaded_at': datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S'),
'filename': filename,
}
(WMT_RELEASES_DIR / 'latest.json').write_text(json.dumps(meta, indent=2))
logger.info(f'New WMT release uploaded: v{version_str} ({filename})')
flash(f'Release v{version_str} uploaded and set as latest.', 'success')
except Exception as e:
logger.error(f'WMT release upload error: {e}')
flash(f'Upload error: {e}', 'error')
return redirect(url_for('wmt_web.releases'))
@wmt_web_bp.route('/releases/delete', methods=['POST'])
def releases_delete():
"""Remove the current release zip and metadata."""
try:
meta = _read_release_meta()
if meta:
zip_path = WMT_RELEASES_DIR / meta.get('filename', '')
if zip_path.exists():
zip_path.unlink()
meta_path = WMT_RELEASES_DIR / 'latest.json'
if meta_path.exists():
meta_path.unlink()
flash('Release deleted.', 'success')
except Exception as e:
logger.error(f'WMT release delete error: {e}')
flash(f'Error: {e}', 'error')
return redirect(url_for('wmt_web.releases'))
# Patterns excluded when building a release from a local folder
_BUILD_EXCLUDE_DIRS = {'.git', '__pycache__', 'data', 'venv', '.venv', 'node_modules'}
_BUILD_EXCLUDE_EXTS = {'.pyc', '.pyo', '.bak', '.log', '.swp'}
def _should_exclude(rel_parts):
"""Return True if this path should be left out of the release zip."""
# Any hidden segment (starts with '.')
for part in rel_parts:
if part.startswith('.'):
return True
# Top-level or nested dir exclusion
if rel_parts[0] in _BUILD_EXCLUDE_DIRS:
return True
# Extension exclusion
if Path(rel_parts[-1]).suffix.lower() in _BUILD_EXCLUDE_EXTS:
return True
return False
@wmt_web_bp.route('/releases/build', methods=['POST'])
def releases_build():
"""
Build a release zip directly from a folder on this server,
excluding hidden files, __pycache__, data/, venv/, etc.
"""
folder_path = request.form.get('folder_path', '/home/pi/Desktop/WMT').strip()
version_str = request.form.get('version', '').strip()
notes = request.form.get('notes', '').strip()
if not version_str:
flash('Version is required.', 'error')
return redirect(url_for('wmt_web.releases'))
if not re.match(r'^\d+(\.\d+)*$', version_str):
flash('Version must be in numeric format (e.g. 3.0).', 'error')
return redirect(url_for('wmt_web.releases'))
src = Path(folder_path)
if not src.is_dir():
flash(f'Folder not found: {folder_path}', 'error')
return redirect(url_for('wmt_web.releases'))
try:
buf = io.BytesIO()
file_count = 0
has_app_py = False
with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf:
for abs_path in sorted(src.rglob('*')):
if not abs_path.is_file():
continue
rel = abs_path.relative_to(src)
parts = rel.parts
if _should_exclude(parts):
continue
arcname = str(rel)
zf.write(abs_path, arcname)
file_count += 1
if arcname == 'app.py':
has_app_py = True
if not has_app_py:
flash(f'No app.py found in {folder_path} — cannot create release.', 'error')
return redirect(url_for('wmt_web.releases'))
WMT_RELEASES_DIR.mkdir(parents=True, exist_ok=True)
filename = f'wmt_v{version_str}.zip'
zip_path = WMT_RELEASES_DIR / filename
zip_path.write_bytes(buf.getvalue())
meta = {
'version': version_str,
'notes': notes,
'uploaded_at': datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S'),
'filename': filename,
}
(WMT_RELEASES_DIR / 'latest.json').write_text(json.dumps(meta, indent=2))
size_kb = len(buf.getvalue()) // 1024
logger.info(f'WMT release built from {folder_path}: v{version_str}, {file_count} files, {size_kb} KB')
flash(f'Release v{version_str} built from {folder_path} ({file_count} files, {size_kb} KB).', 'success')
except Exception as e:
logger.error(f'WMT release build error: {e}')
flash(f'Build error: {e}', 'error')
return redirect(url_for('wmt_web.releases'))