Initial commit: add compliance_checks table, per-check metadata on assets, and compliance audit trail

This commit is contained in:
2026-04-24 07:14:27 +03:00
commit e63b486ec2
58 changed files with 6468 additions and 0 deletions

13
app/routes/__init__.py Normal file
View File

@@ -0,0 +1,13 @@
from app.routes.auth import bp as auth_bp
from app.routes.dashboard import bp as dashboard_bp
from app.routes.users import bp as users_bp
from app.routes.assets import bp as assets_bp
from app.routes.assignments import bp as assignments_bp
from app.routes.paperwork import bp as paperwork_bp
from app.routes.audit import bp as audit_bp
from app.routes.settings import bp as settings_bp
__all__ = [
'auth_bp', 'dashboard_bp', 'users_bp', 'assets_bp',
'assignments_bp', 'paperwork_bp', 'audit_bp', 'settings_bp',
]

389
app/routes/assets.py Normal file
View File

@@ -0,0 +1,389 @@
import json
from datetime import date, datetime
from flask import (Blueprint, render_template, redirect, url_for,
flash, request, current_app, jsonify)
from flask_login import login_required, current_user
from app.extensions import db
from app.models.asset import Asset, ASSET_TYPES, ASSET_STATUSES
from app.models.audit_log import AuditLog
from app.models.compliance_check import ComplianceCheck
from app.services.dell_service import lookup_service_tag
bp = Blueprint('assets', __name__, url_prefix='/assets')
def _parse_date(value):
"""Convert a 'YYYY-MM-DD' string to a date object; return None if blank."""
if not value:
return None
if isinstance(value, date):
return value
try:
return date.fromisoformat(value.strip())
except (ValueError, AttributeError):
return None
def _log(action, record_id, description, old=None, new=None):
entry = AuditLog(
table_name='assets',
record_id=record_id,
action=action,
old_values=json.dumps(old) if old else None,
new_values=json.dumps(new) if new else None,
performed_by_id=current_user.id,
ip_address=request.remote_addr,
description=description,
)
db.session.add(entry)
# ------------------------------------------------------------------
# List
# ------------------------------------------------------------------
@bp.route('/')
@login_required
def index():
page = request.args.get('page', 1, type=int)
q = request.args.get('q', '').strip()
status_filter = request.args.get('status', '')
type_filter = request.args.get('asset_type', '')
query = Asset.query
if q:
like = f'%{q}%'
query = query.filter(
db.or_(
Asset.serial_number.like(like),
Asset.service_tag.like(like),
Asset.asset_tag.like(like),
Asset.brand.like(like),
Asset.model.like(like),
)
)
if status_filter:
query = query.filter_by(status=status_filter)
if type_filter:
query = query.filter_by(asset_type=type_filter)
pagination = query.order_by(Asset.created_at.desc()).paginate(
page=page, per_page=current_app.config['ITEMS_PER_PAGE'], error_out=False
)
return render_template(
'assets/index.html',
pagination=pagination, q=q,
status_filter=status_filter, type_filter=type_filter,
asset_types=ASSET_TYPES, asset_statuses=ASSET_STATUSES,
)
# ------------------------------------------------------------------
# Create
# ------------------------------------------------------------------
@bp.route('/new', methods=['GET', 'POST'])
@login_required
def create():
if request.method == 'POST':
sn = request.form.get('serial_number', '').strip()
if not sn:
flash('Serial Number is required.', 'danger')
return render_template('assets/form.html', asset=None,
asset_types=ASSET_TYPES, asset_statuses=ASSET_STATUSES)
if Asset.query.filter_by(serial_number=sn).first():
flash(f'An asset with serial number {sn} already exists.', 'danger')
return render_template('assets/form.html', asset=None,
asset_types=ASSET_TYPES, asset_statuses=ASSET_STATUSES)
service_tag = request.form.get('service_tag', '').strip() or None
if service_tag and Asset.query.filter_by(service_tag=service_tag).first():
flash(f'An asset with service tag {service_tag} already exists.', 'danger')
return render_template('assets/form.html', asset=None,
asset_types=ASSET_TYPES, asset_statuses=ASSET_STATUSES)
asset = Asset(
serial_number=sn,
service_tag=service_tag,
asset_tag=request.form.get('asset_tag', '').strip() or None,
asset_type=request.form.get('asset_type', 'Laptop'),
brand=request.form.get('brand', '').strip() or None,
model=request.form.get('model', '').strip() or None,
processor=request.form.get('processor', '').strip() or None,
ram_gb=request.form.get('ram_gb', type=int),
storage_gb=request.form.get('storage_gb', type=int),
operating_system=request.form.get('operating_system', '').strip() or None,
mac_address=request.form.get('mac_address', '').strip() or None,
purchase_date=_parse_date(request.form.get('purchase_date')),
warranty_expiry=_parse_date(request.form.get('warranty_expiry')),
purchase_price=request.form.get('purchase_price', type=float),
supplier=request.form.get('supplier', '').strip() or None,
po_number=request.form.get('po_number', '').strip() or None,
status=request.form.get('status', 'available'),
location=request.form.get('location', '').strip() or None,
notes=request.form.get('notes', '').strip() or None,
created_by_id=current_user.id,
)
db.session.add(asset)
db.session.flush()
_log('create', asset.id, f'Created asset SN={sn}',
new={'serial_number': sn, 'asset_type': asset.asset_type})
db.session.commit()
flash(f'Asset {sn} created.', 'success')
return redirect(url_for('assets.detail', asset_id=asset.id))
# Pre-fill values from Dell lookup (passed as query string params)
prefill = {
'service_tag': request.args.get('service_tag', ''),
'serial_number': request.args.get('serial_number', ''),
'brand': request.args.get('brand', ''),
'model': request.args.get('model', ''),
'asset_type': request.args.get('asset_type', ''),
'operating_system': request.args.get('operating_system', ''),
'warranty_expiry': request.args.get('warranty_expiry', ''),
'purchase_date': request.args.get('purchase_date', ''),
}
return render_template('assets/form.html', asset=None,
asset_types=ASSET_TYPES, asset_statuses=ASSET_STATUSES,
prefill=prefill)
# ------------------------------------------------------------------
# Dell service-tag lookup (AJAX)
# ------------------------------------------------------------------
@bp.route('/dell-lookup')
@login_required
def dell_lookup():
tag = request.args.get('tag', '').strip()
if not tag:
return jsonify({'error': 'Service tag is required.'}), 400
# Check for duplicates first
existing = Asset.query.filter_by(service_tag=tag.upper()).first()
if existing:
return jsonify({
'error': f'An asset with service tag {tag.upper()} already exists.',
'existing_id': existing.id,
}), 409
try:
data = lookup_service_tag(tag)
return jsonify(data)
except RuntimeError as exc:
return jsonify({'error': str(exc)}), 502
# ------------------------------------------------------------------
# Detail
# ------------------------------------------------------------------
@bp.route('/<int:asset_id>')
@login_required
def detail(asset_id):
asset = Asset.query.get_or_404(asset_id)
history = asset.assignments.order_by(db.text('assigned_date DESC')).all()
docs = asset.paperwork_docs.order_by(db.text('created_at DESC')).all()
compliance_log = (
AuditLog.query
.filter_by(table_name='assets', record_id=asset_id, action='compliance_update')
.order_by(AuditLog.performed_at.desc())
.all()
)
check_history = (
ComplianceCheck.query
.filter_by(asset_id=asset_id)
.order_by(ComplianceCheck.performed_at.desc())
.all()
)
return render_template('assets/detail.html', asset=asset, history=history,
docs=docs, compliance_log=compliance_log,
check_history=check_history)
# ------------------------------------------------------------------
# Edit
# ------------------------------------------------------------------
@bp.route('/<int:asset_id>/edit', methods=['GET', 'POST'])
@login_required
def edit(asset_id):
asset = Asset.query.get_or_404(asset_id)
if request.method == 'POST':
old = {'serial_number': asset.serial_number, 'status': asset.status}
new_sn = request.form.get('serial_number', '').strip()
if not new_sn:
flash('Serial Number is required.', 'danger')
return render_template('assets/form.html', asset=asset,
asset_types=ASSET_TYPES, asset_statuses=ASSET_STATUSES)
# Check uniqueness only if SN changed
if new_sn != asset.serial_number:
if Asset.query.filter(Asset.serial_number == new_sn, Asset.id != asset_id).first():
flash(f'Serial number {new_sn} is already used by another asset.', 'danger')
return render_template('assets/form.html', asset=asset,
asset_types=ASSET_TYPES, asset_statuses=ASSET_STATUSES)
asset.serial_number = new_sn
asset.service_tag = request.form.get('service_tag', '').strip() or None
asset.asset_tag = request.form.get('asset_tag', '').strip() or None
asset.asset_type = request.form.get('asset_type', asset.asset_type)
asset.brand = request.form.get('brand', '').strip() or None
asset.model = request.form.get('model', '').strip() or None
asset.processor = request.form.get('processor', '').strip() or None
asset.ram_gb = request.form.get('ram_gb', type=int)
asset.storage_gb = request.form.get('storage_gb', type=int)
asset.operating_system = request.form.get('operating_system', '').strip() or None
asset.mac_address = request.form.get('mac_address', '').strip() or None
asset.purchase_date = _parse_date(request.form.get('purchase_date'))
asset.warranty_expiry = _parse_date(request.form.get('warranty_expiry'))
asset.purchase_price = request.form.get('purchase_price', type=float)
asset.supplier = request.form.get('supplier', '').strip() or None
asset.po_number = request.form.get('po_number', '').strip() or None
asset.status = request.form.get('status', asset.status)
asset.location = request.form.get('location', '').strip() or None
asset.notes = request.form.get('notes', '').strip() or None
_log('update', asset.id, f'Updated asset SN={asset.serial_number}',
old=old, new={'serial_number': asset.serial_number, 'status': asset.status})
db.session.commit()
flash('Asset updated.', 'success')
return redirect(url_for('assets.detail', asset_id=asset_id))
return render_template('assets/form.html', asset=asset,
asset_types=ASSET_TYPES, asset_statuses=ASSET_STATUSES)
# ------------------------------------------------------------------
# Compliance update (Desktop / Laptop specific fields)
# ------------------------------------------------------------------
_COMPLIANCE_FIELDS = {
'inventory_number': 'Inventory Number',
'ad_device_name': 'AD Device Name',
'location_note': 'Location Note',
'encryption_checked': 'Encryption Checked',
'backup_checked': 'Backup Checked',
'hr_notified': 'HR Notified',
}
@bp.route('/<int:asset_id>/compliance', methods=['POST'])
@login_required
def update_compliance(asset_id):
asset = Asset.query.get_or_404(asset_id)
old = {
'inventory_number': asset.inventory_number,
'ad_device_name': asset.ad_device_name,
'location_note': asset.location_note,
'encryption_checked': asset.encryption_checked,
'backup_checked': asset.backup_checked,
'hr_notified': asset.hr_notified,
}
asset.inventory_number = request.form.get('inventory_number', '').strip() or None
asset.ad_device_name = request.form.get('ad_device_name', '').strip() or None
asset.location_note = request.form.get('location_note', '').strip() or None
new_encryption = bool(request.form.get('encryption_checked'))
new_backup = bool(request.form.get('backup_checked'))
new_hr = bool(request.form.get('hr_notified'))
notes = request.form.get('compliance_notes', '').strip() or None
now = datetime.utcnow()
# Record a ComplianceCheck event for each boolean that changed
_check_map = [
('encryption', 'encryption_checked', new_encryption,
'encryption_checked_by_id', 'encryption_checked_at'),
('backup', 'backup_checked', new_backup,
'backup_checked_by_id', 'backup_checked_at'),
('hr', 'hr_notified', new_hr,
'hr_notified_by_id', 'hr_notified_at'),
]
for check_type, field, new_val, by_field, at_field in _check_map:
if old[field] != new_val:
setattr(asset, field, new_val)
setattr(asset, by_field, current_user.id)
setattr(asset, at_field, now)
db.session.add(ComplianceCheck(
asset_id=asset_id,
check_type=check_type,
checked=new_val,
performed_by_id=current_user.id,
performed_at=now,
notes=notes,
))
new = {
'inventory_number': asset.inventory_number,
'ad_device_name': asset.ad_device_name,
'location_note': asset.location_note,
'encryption_checked': asset.encryption_checked,
'backup_checked': asset.backup_checked,
'hr_notified': asset.hr_notified,
}
# Build human-readable description of what changed
changed = [
f'{_COMPLIANCE_FIELDS[k]}: {repr(old[k])}{repr(new[k])}'
for k in _COMPLIANCE_FIELDS if old[k] != new[k]
]
if changed:
_log('compliance_update', asset.id,
f'Compliance updated: {"; ".join(changed)}',
old=old, new=new)
db.session.commit()
flash('Compliance fields updated.', 'success')
else:
flash('No changes detected.', 'info')
return redirect(url_for('assets.detail', asset_id=asset_id))
# ------------------------------------------------------------------
# Quick lookup by SN or service tag (AJAX)
# ------------------------------------------------------------------
@bp.route('/lookup')
@login_required
def lookup():
q = request.args.get('q', '').strip()
if not q:
return jsonify(None)
asset = Asset.query.filter(
db.or_(Asset.serial_number == q, Asset.service_tag == q)
).first()
if not asset:
return jsonify(None)
return jsonify({
'id': asset.id,
'serial_number': asset.serial_number,
'service_tag': asset.service_tag,
'brand': asset.brand,
'model': asset.model,
'asset_type': asset.asset_type,
'status': asset.status,
})
# ------------------------------------------------------------------
# Search (AJAX dropdown)
# ------------------------------------------------------------------
@bp.route('/search')
@login_required
def search():
q = request.args.get('q', '').strip()
if len(q) < 2:
return jsonify([])
like = f'%{q}%'
assets = Asset.query.filter(
db.or_(
Asset.serial_number.like(like),
Asset.service_tag.like(like),
Asset.asset_tag.like(like),
)
).limit(15).all()
return jsonify([{
'id': a.id,
'text': f'{a.brand or ""} {a.model or ""} — SN: {a.serial_number}'.strip(''),
'serial_number': a.serial_number,
'service_tag': a.service_tag,
'status': a.status,
} for a in assets])

145
app/routes/assignments.py Normal file
View File

@@ -0,0 +1,145 @@
import json
from datetime import date
from flask import (Blueprint, render_template, redirect, url_for,
flash, request, current_app)
from flask_login import login_required, current_user
from app.extensions import db
from app.models.assignment import Assignment
from app.models.asset import Asset
from app.models.user import User
from app.models.audit_log import AuditLog
bp = Blueprint('assignments', __name__, url_prefix='/assignments')
def _log(action, record_id, description, old=None, new=None):
entry = AuditLog(
table_name='assignments',
record_id=record_id,
action=action,
old_values=json.dumps(old) if old else None,
new_values=json.dumps(new) if new else None,
performed_by_id=current_user.id,
ip_address=request.remote_addr,
description=description,
)
db.session.add(entry)
@bp.route('/')
@login_required
def index():
page = request.args.get('page', 1, type=int)
active_only = request.args.get('active', '1') == '1'
q = request.args.get('q', '').strip()
query = Assignment.query
if active_only:
query = query.filter_by(is_active=True)
pagination = query.order_by(Assignment.assigned_date.desc()).paginate(
page=page, per_page=current_app.config['ITEMS_PER_PAGE'], error_out=False
)
return render_template('assignments/index.html',
pagination=pagination, active_only=active_only, q=q)
@bp.route('/new', methods=['GET', 'POST'])
@login_required
def create():
# Pre-fill from query params (used from asset / user detail pages)
preselect_asset_id = request.args.get('asset_id', type=int)
preselect_user_id = request.args.get('user_id', type=int)
if request.method == 'POST':
user_id = request.form.get('user_id', type=int)
asset_id = request.form.get('asset_id', type=int)
assigned_date_str = request.form.get('assigned_date', '')
notes = request.form.get('notes', '').strip() or None
user = User.query.get(user_id) if user_id else None
asset = Asset.query.get(asset_id) if asset_id else None
errors = []
if not user:
errors.append('User is required.')
if not asset:
errors.append('Asset is required.')
if user and user.is_masked:
errors.append('Cannot assign assets to a masked user.')
if asset and asset.status == 'assigned':
errors.append(f'Asset {asset.serial_number} is already assigned.')
if asset and asset.status in ('retired', 'lost'):
errors.append(f'Asset {asset.serial_number} is {asset.status} and cannot be assigned.')
if errors:
for e in errors:
flash(e, 'danger')
return render_template('assignments/form.html',
preselect_asset_id=asset_id,
preselect_user_id=user_id)
try:
assigned_date = date.fromisoformat(assigned_date_str) if assigned_date_str else date.today()
except ValueError:
assigned_date = date.today()
assignment = Assignment(
asset_id=asset.id,
user_id=user.id,
assigned_date=assigned_date,
assigned_by_id=current_user.id,
notes=notes,
is_active=True,
)
asset.status = 'assigned'
db.session.add(assignment)
db.session.flush()
_log('assign', assignment.id,
f'Assigned asset SN={asset.serial_number} to WID={user.windows_id}',
new={'asset_sn': asset.serial_number, 'user_wid': user.windows_id,
'date': str(assigned_date)})
db.session.commit()
flash(f'Asset {asset.serial_number} assigned to {user.display_name}.', 'success')
return redirect(url_for('assignments.index'))
return render_template('assignments/form.html',
preselect_asset_id=preselect_asset_id,
preselect_user_id=preselect_user_id)
@bp.route('/<int:assignment_id>/return', methods=['POST'])
@login_required
def return_asset(assignment_id):
assignment = Assignment.query.get_or_404(assignment_id)
if not assignment.is_active:
flash('This assignment is already closed.', 'info')
return redirect(url_for('assignments.index'))
returned_date_str = request.form.get('returned_date', '')
try:
returned_date = date.fromisoformat(returned_date_str) if returned_date_str else date.today()
except ValueError:
returned_date = date.today()
assignment.returned_date = returned_date
assignment.returned_by_id = current_user.id
assignment.is_active = False
assignment.notes = (assignment.notes or '') + ('\n' + request.form.get('return_notes', '').strip() if request.form.get('return_notes') else '')
# Only set asset back to available if no other active assignment (safety check)
other_active = Assignment.query.filter(
Assignment.asset_id == assignment.asset_id,
Assignment.is_active == True, # noqa: E712
Assignment.id != assignment.id,
).first()
if not other_active:
assignment.asset.status = 'available'
_log('return', assignment.id,
f'Returned asset SN={assignment.asset.serial_number} from WID={assignment.user.windows_id}',
new={'returned_date': str(returned_date)})
db.session.commit()
flash(f'Asset {assignment.asset.serial_number} returned.', 'success')
return redirect(url_for('assignments.index'))

30
app/routes/audit.py Normal file
View File

@@ -0,0 +1,30 @@
from flask import Blueprint, render_template, request, current_app
from flask_login import login_required
from app.models.audit_log import AuditLog
bp = Blueprint('audit', __name__, url_prefix='/audit')
@bp.route('/')
@login_required
def index():
page = request.args.get('page', 1, type=int)
table_filter = request.args.get('table', '')
action_filter = request.args.get('action', '')
query = AuditLog.query
if table_filter:
query = query.filter_by(table_name=table_filter)
if action_filter:
query = query.filter_by(action=action_filter)
pagination = query.order_by(AuditLog.performed_at.desc()).paginate(
page=page, per_page=current_app.config['ITEMS_PER_PAGE'], error_out=False
)
tables = ['users', 'assets', 'assignments', 'paperwork']
actions = ['create', 'update', 'delete', 'mask', 'assign', 'return', 'import']
return render_template('audit/index.html',
pagination=pagination,
table_filter=table_filter,
action_filter=action_filter,
tables=tables, actions=actions)

41
app/routes/auth.py Normal file
View File

@@ -0,0 +1,41 @@
from datetime import datetime
from flask import Blueprint, render_template, redirect, url_for, flash, request
from flask_login import login_user, logout_user, login_required, current_user
from app.extensions import db
from app.models.admin_user import AdminUser
bp = Blueprint('auth', __name__)
@bp.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('dashboard.index'))
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
if not username or not password:
flash('Please enter username and password.', 'danger')
return render_template('auth/login.html')
user = AdminUser.query.filter_by(username=username).first()
if user and user.is_active and user.check_password(password):
user.last_login = datetime.utcnow()
db.session.commit()
login_user(user, remember=False)
next_page = request.args.get('next')
return redirect(next_page or url_for('dashboard.index'))
flash('Invalid username or password.', 'danger')
return render_template('auth/login.html')
@bp.route('/logout')
@login_required
def logout():
logout_user()
flash('You have been logged out.', 'info')
return redirect(url_for('auth.login'))

35
app/routes/dashboard.py Normal file
View File

@@ -0,0 +1,35 @@
from flask import Blueprint, render_template
from flask_login import login_required
from app.models.user import User
from app.models.asset import Asset
from app.models.assignment import Assignment
from app.models.paperwork import Paperwork
bp = Blueprint('dashboard', __name__)
@bp.route('/')
@login_required
def index():
stats = {
'total_users': User.query.count(),
'active_users': User.query.filter_by(is_active=True, is_masked=False).count(),
'masked_users': User.query.filter_by(is_masked=True).count(),
'total_assets': Asset.query.count(),
'available_assets': Asset.query.filter_by(status='available').count(),
'assigned_assets': Asset.query.filter_by(status='assigned').count(),
'maintenance_assets': Asset.query.filter_by(status='maintenance').count(),
'active_assignments': Assignment.query.filter_by(is_active=True).count(),
'total_paperwork': Paperwork.query.count(),
}
# Recent assignments
recent_assignments = (
Assignment.query
.filter_by(is_active=True)
.order_by(Assignment.created_at.desc())
.limit(10)
.all()
)
return render_template('dashboard/index.html', stats=stats, recent_assignments=recent_assignments)

202
app/routes/doc_templates.py Normal file
View File

@@ -0,0 +1,202 @@
import json
import os
from flask import (Blueprint, render_template, redirect, url_for,
flash, request, current_app, send_from_directory, jsonify)
from flask_login import login_required, current_user
from werkzeug.utils import secure_filename
from app.extensions import db
from app.models.document_template import DocumentTemplate
from app.models.paperwork import DOC_TYPES
from app.models.audit_log import AuditLog
from app.services.template_service import extract_variables
bp = Blueprint('doc_templates', __name__, url_prefix='/doc-templates')
ALLOWED_EXT = {'docx'}
def _log(action, record_id, description, new=None):
entry = AuditLog(
table_name='document_templates',
record_id=record_id,
action=action,
new_values=json.dumps(new) if new else None,
performed_by_id=current_user.id,
ip_address=request.remote_addr,
description=description,
)
db.session.add(entry)
def _template_folder(app):
folder = os.path.join(app.root_path, '..', app.config.get('TEMPLATE_FOLDER', 'doc_templates'))
os.makedirs(folder, exist_ok=True)
return folder
def _allowed(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXT
# ------------------------------------------------------------------
# List
# ------------------------------------------------------------------
@bp.route('/')
@login_required
def index():
templates = DocumentTemplate.query.order_by(DocumentTemplate.name).all()
return render_template('doc_templates/index.html',
templates=templates, doc_types=DOC_TYPES)
# ------------------------------------------------------------------
# Upload
# ------------------------------------------------------------------
@bp.route('/upload', methods=['GET', 'POST'])
@login_required
def upload():
if request.method == 'POST':
name = request.form.get('name', '').strip()
description = request.form.get('description', '').strip() or None
category = request.form.get('category', '') or None
f = request.files.get('docx_file')
if not name:
flash('Template name is required.', 'danger')
return render_template('doc_templates/upload.html', doc_types=DOC_TYPES)
if not f or not f.filename:
flash('Please select a .docx file.', 'danger')
return render_template('doc_templates/upload.html', doc_types=DOC_TYPES)
if not _allowed(f.filename):
flash('Only .docx files are accepted.', 'danger')
return render_template('doc_templates/upload.html', doc_types=DOC_TYPES)
folder = _template_folder(current_app)
safe_name = secure_filename(f.filename)
# Prefix with timestamp to avoid collisions
from datetime import datetime as _dt
prefix = _dt.utcnow().strftime('%Y%m%d_%H%M%S_')
filename = prefix + safe_name
save_path = os.path.join(folder, filename)
f.save(save_path)
# Extract variables from the uploaded template
try:
variables = extract_variables(save_path)
except Exception as exc:
current_app.logger.warning('Variable extraction failed: %s', exc)
variables = []
tpl = DocumentTemplate(
name=name,
description=description,
category=category,
filename=filename,
created_by_id=current_user.id,
)
tpl.variables = variables
db.session.add(tpl)
db.session.flush()
_log('create', tpl.id, f'Uploaded template "{name}"', new={'name': name, 'filename': filename})
db.session.commit()
flash(f'Template "{name}" uploaded. Found {len(variables)} variable(s).', 'success')
return redirect(url_for('doc_templates.detail', tpl_id=tpl.id))
return render_template('doc_templates/upload.html', doc_types=DOC_TYPES)
# ------------------------------------------------------------------
# Detail / variable list
# ------------------------------------------------------------------
@bp.route('/<int:tpl_id>')
@login_required
def detail(tpl_id):
tpl = DocumentTemplate.query.get_or_404(tpl_id)
return render_template('doc_templates/detail.html', tpl=tpl, doc_types=DOC_TYPES)
# ------------------------------------------------------------------
# Download original template file
# ------------------------------------------------------------------
@bp.route('/<int:tpl_id>/download')
@login_required
def download(tpl_id):
tpl = DocumentTemplate.query.get_or_404(tpl_id)
folder = _template_folder(current_app)
return send_from_directory(folder, tpl.filename, as_attachment=True,
download_name=tpl.name + '.docx')
# ------------------------------------------------------------------
# Re-scan variables (after template is replaced / edited externally)
# ------------------------------------------------------------------
@bp.route('/<int:tpl_id>/rescan', methods=['POST'])
@login_required
def rescan(tpl_id):
tpl = DocumentTemplate.query.get_or_404(tpl_id)
folder = _template_folder(current_app)
path = os.path.join(folder, tpl.filename)
try:
variables = extract_variables(path)
tpl.variables = variables
db.session.commit()
flash(f'Rescanned: found {len(variables)} variable(s).', 'success')
except Exception as exc:
flash(f'Rescan failed: {exc}', 'danger')
return redirect(url_for('doc_templates.detail', tpl_id=tpl_id))
# ------------------------------------------------------------------
# Edit metadata
# ------------------------------------------------------------------
@bp.route('/<int:tpl_id>/edit', methods=['GET', 'POST'])
@login_required
def edit(tpl_id):
tpl = DocumentTemplate.query.get_or_404(tpl_id)
if request.method == 'POST':
tpl.name = request.form.get('name', tpl.name).strip()
tpl.description = request.form.get('description', '').strip() or None
tpl.category = request.form.get('category', '') or None
db.session.commit()
flash('Template updated.', 'success')
return redirect(url_for('doc_templates.detail', tpl_id=tpl_id))
return render_template('doc_templates/edit.html', tpl=tpl, doc_types=DOC_TYPES)
# ------------------------------------------------------------------
# Delete
# ------------------------------------------------------------------
@bp.route('/<int:tpl_id>/delete', methods=['POST'])
@login_required
def delete(tpl_id):
tpl = DocumentTemplate.query.get_or_404(tpl_id)
# Check if any documents were generated from this template
if tpl.paperwork_docs.count() > 0:
flash(f'Cannot delete — {tpl.paperwork_docs.count()} document(s) were generated from this template.', 'danger')
return redirect(url_for('doc_templates.detail', tpl_id=tpl_id))
folder = _template_folder(current_app)
file_path = os.path.join(folder, tpl.filename)
try:
if os.path.exists(file_path):
os.remove(file_path)
except OSError:
pass
_log('delete', tpl.id, f'Deleted template "{tpl.name}"')
db.session.delete(tpl)
db.session.commit()
flash(f'Template "{tpl.name}" deleted.', 'success')
return redirect(url_for('doc_templates.index'))
# ------------------------------------------------------------------
# AJAX: return variables for a template (used by paperwork form)
# ------------------------------------------------------------------
@bp.route('/<int:tpl_id>/variables.json')
@login_required
def variables_json(tpl_id):
tpl = DocumentTemplate.query.get_or_404(tpl_id)
return jsonify({'variables': tpl.variables, 'name': tpl.name})

245
app/routes/paperwork.py Normal file
View File

@@ -0,0 +1,245 @@
import json
import os
from datetime import datetime
from flask import (Blueprint, render_template, redirect, url_for,
flash, request, current_app, send_from_directory, abort, jsonify)
from flask_login import login_required, current_user
from app.extensions import db
from app.models.paperwork import Paperwork, DOC_TYPES
from app.models.document_template import DocumentTemplate
from app.models.user import User
from app.models.asset import Asset
from app.models.assignment import Assignment
from app.models.audit_log import AuditLog
from app.services.pdf_service import generate_paperwork_pdf
from app.services.template_service import (
build_context, render_template_to_docx, _template_path
)
bp = Blueprint('paperwork', __name__, url_prefix='/paperwork')
def _log(action, record_id, description, new=None):
entry = AuditLog(
table_name='paperwork',
record_id=record_id,
action=action,
new_values=json.dumps(new) if new else None,
performed_by_id=current_user.id,
ip_address=request.remote_addr,
description=description,
)
db.session.add(entry)
@bp.route('/')
@login_required
def index():
page = request.args.get('page', 1, type=int)
q = request.args.get('q', '').strip()
doc_type_filter = request.args.get('doc_type', '')
query = Paperwork.query
if doc_type_filter:
query = query.filter_by(document_type=doc_type_filter)
pagination = query.order_by(Paperwork.created_at.desc()).paginate(
page=page, per_page=current_app.config['ITEMS_PER_PAGE'], error_out=False
)
return render_template('paperwork/index.html',
pagination=pagination, q=q,
doc_type_filter=doc_type_filter,
doc_types=DOC_TYPES)
@bp.route('/new', methods=['GET', 'POST'])
@login_required
def create():
preselect_user_id = request.args.get('user_id', type=int)
preselect_asset_id = request.args.get('asset_id', type=int)
preselect_assignment_id = request.args.get('assignment_id', type=int)
all_templates = DocumentTemplate.query.order_by(DocumentTemplate.name).all()
if request.method == 'POST':
user_id = request.form.get('user_id', type=int)
asset_id = request.form.get('asset_id', type=int) or None
assignment_id = request.form.get('assignment_id', type=int) or None
doc_type = request.form.get('document_type', 'handover')
title = request.form.get('title', '').strip()
notes = request.form.get('notes', '').strip() or None
template_id = request.form.get('template_id', type=int) or None
user = User.query.get(user_id) if user_id else None
if not user:
flash('User is required.', 'danger')
return render_template('paperwork/form.html',
doc_types=DOC_TYPES, all_templates=all_templates,
preselect_user_id=preselect_user_id,
preselect_asset_id=preselect_asset_id)
asset = Asset.query.get(asset_id) if asset_id else None
assignment = Assignment.query.get(assignment_id) if assignment_id else None
if not title:
title = f'{dict(DOC_TYPES).get(doc_type, doc_type)}{user.display_name}'
doc = Paperwork(
document_type=doc_type,
title=title,
user_id=user.id,
asset_id=asset_id,
assignment_id=assignment_id,
template_id=template_id,
notes=notes,
created_by_id=current_user.id,
)
db.session.add(doc)
db.session.flush()
# Build and store merge variables
ctx = build_context(user, asset=asset, assignment=assignment,
paperwork=doc, app=current_app._get_current_object())
# Allow form overrides for any variable (textarea fields named var_*)
for k, v in request.form.items():
if k.startswith('var_'):
ctx[k[4:]] = v
ctx['admin_name'] = current_user.username
doc.merge_vars = json.dumps(ctx)
# Generate .docx from template if one is selected
if template_id:
tpl_obj = DocumentTemplate.query.get(template_id)
if tpl_obj:
tpl_file = _template_path(current_app._get_current_object(), tpl_obj.filename)
if os.path.exists(tpl_file):
out_name = f'doc_{doc.id}_{tpl_obj.id}.docx'
try:
render_template_to_docx(tpl_file, ctx, out_name)
doc.docx_filename = out_name
except Exception as exc:
current_app.logger.error('docx render failed: %s', exc)
flash(f'Word document generation failed: {exc}', 'warning')
# Always generate PDF
try:
pdf_filename = generate_paperwork_pdf(doc, current_app._get_current_object())
doc.pdf_filename = pdf_filename
except Exception as exc:
current_app.logger.error(f'PDF generation failed: {exc}')
flash(f'Document saved but PDF generation failed: {exc}', 'warning')
_log('create', doc.id, f'Created paperwork "{title}" type={doc_type}',
new={'type': doc_type, 'user_id': user_id, 'asset_id': asset_id,
'template_id': template_id})
db.session.commit()
flash(f'Document "{title}" created.', 'success')
return redirect(url_for('paperwork.detail', doc_id=doc.id))
return render_template('paperwork/form.html',
doc_types=DOC_TYPES,
all_templates=all_templates,
preselect_user_id=preselect_user_id,
preselect_asset_id=preselect_asset_id,
preselect_assignment_id=preselect_assignment_id)
@bp.route('/<int:doc_id>')
@login_required
def detail(doc_id):
doc = Paperwork.query.get_or_404(doc_id)
return render_template('paperwork/detail.html', doc=doc,
merge_vars=doc.get_merge_vars())
@bp.route('/<int:doc_id>/download')
@login_required
def download(doc_id):
doc = Paperwork.query.get_or_404(doc_id)
if not doc.pdf_filename:
flash('No PDF available for this document.', 'warning')
return redirect(url_for('paperwork.detail', doc_id=doc_id))
pdf_dir = os.path.join(current_app.root_path, '..', current_app.config['PDF_FOLDER'])
return send_from_directory(pdf_dir, doc.pdf_filename, as_attachment=True)
@bp.route('/<int:doc_id>/download-docx')
@login_required
def download_docx(doc_id):
doc = Paperwork.query.get_or_404(doc_id)
if not doc.docx_filename:
flash('No Word document available for this record.', 'warning')
return redirect(url_for('paperwork.detail', doc_id=doc_id))
docx_dir = os.path.join(current_app.root_path, '..', current_app.config.get('DOCX_FOLDER', 'docx_output'))
safe_name = (doc.title or f'document_{doc.id}') + '.docx'
return send_from_directory(docx_dir, doc.docx_filename, as_attachment=True,
download_name=safe_name)
@bp.route('/<int:doc_id>/regenerate', methods=['POST'])
@login_required
def regenerate(doc_id):
doc = Paperwork.query.get_or_404(doc_id)
app = current_app._get_current_object()
# Regenerate .docx if template-based
if doc.template_id and doc.template:
tpl_file = _template_path(app, doc.template.filename)
ctx = doc.get_merge_vars()
out_name = doc.docx_filename or f'doc_{doc.id}_{doc.template_id}.docx'
try:
render_template_to_docx(tpl_file, ctx, out_name)
doc.docx_filename = out_name
except Exception as exc:
flash(f'Word regeneration failed: {exc}', 'warning')
try:
pdf_filename = generate_paperwork_pdf(doc, app)
doc.pdf_filename = pdf_filename
db.session.commit()
flash('Document regenerated.', 'success')
except Exception as exc:
flash(f'PDF generation failed: {exc}', 'danger')
return redirect(url_for('paperwork.detail', doc_id=doc_id))
@bp.route('/<int:doc_id>/sign', methods=['POST'])
@login_required
def sign(doc_id):
"""Record a signature on a document."""
doc = Paperwork.query.get_or_404(doc_id)
signed_by_name = request.form.get('signed_by_name', '').strip()
signature_data = request.form.get('signature_data', '').strip() # base64 PNG from canvas
if not signed_by_name:
flash('Signer name is required.', 'danger')
return redirect(url_for('paperwork.detail', doc_id=doc_id))
doc.signed_at = datetime.utcnow()
doc.signed_by_name = signed_by_name
if signature_data:
doc.signature_data = signature_data
# Regenerate PDF to embed signature
try:
pdf_filename = generate_paperwork_pdf(doc, current_app._get_current_object())
doc.pdf_filename = pdf_filename
except Exception as exc:
current_app.logger.error('PDF re-gen after sign failed: %s', exc)
_log('sign', doc.id, f'Document signed by {signed_by_name}')
db.session.commit()
flash(f'Document signed by {signed_by_name}.', 'success')
return redirect(url_for('paperwork.detail', doc_id=doc_id))
@bp.route('/<int:doc_id>/unsign', methods=['POST'])
@login_required
def unsign(doc_id):
doc = Paperwork.query.get_or_404(doc_id)
doc.signed_at = None
doc.signed_by_name = None
doc.signature_data = None
db.session.commit()
flash('Signature removed.', 'info')
return redirect(url_for('paperwork.detail', doc_id=doc_id))

52
app/routes/settings.py Normal file
View File

@@ -0,0 +1,52 @@
from flask import Blueprint, render_template, redirect, url_for, flash, request, current_app
from flask_login import login_required, current_user
from app.extensions import db
from app.models.admin_user import AdminUser
bp = Blueprint('settings', __name__, url_prefix='/settings')
@bp.route('/')
@login_required
def index():
admins = AdminUser.query.order_by(AdminUser.username).all()
return render_template('settings/index.html', admins=admins, config=current_app.config)
@bp.route('/admin/new', methods=['POST'])
@login_required
def create_admin():
username = request.form.get('username', '').strip()
email = request.form.get('email', '').strip()
full_name = request.form.get('full_name', '').strip()
password = request.form.get('password', '')
role = request.form.get('role', 'admin')
if not username or not email or not password:
flash('Username, email and password are required.', 'danger')
return redirect(url_for('settings.index'))
if AdminUser.query.filter_by(username=username).first():
flash(f'Username "{username}" is already taken.', 'danger')
return redirect(url_for('settings.index'))
admin = AdminUser(username=username, email=email, full_name=full_name, role=role)
admin.set_password(password)
db.session.add(admin)
db.session.commit()
flash(f'Admin user "{username}" created.', 'success')
return redirect(url_for('settings.index'))
@bp.route('/admin/<int:admin_id>/toggle', methods=['POST'])
@login_required
def toggle_admin(admin_id):
admin = AdminUser.query.get_or_404(admin_id)
if admin.id == current_user.id:
flash('You cannot deactivate your own account.', 'danger')
return redirect(url_for('settings.index'))
admin.is_active = not admin.is_active
db.session.commit()
status = 'activated' if admin.is_active else 'deactivated'
flash(f'Admin "{admin.username}" {status}.', 'success')
return redirect(url_for('settings.index'))

332
app/routes/users.py Normal file
View File

@@ -0,0 +1,332 @@
import json
from datetime import datetime
from flask import (Blueprint, render_template, redirect, url_for,
flash, request, current_app, jsonify)
from flask_login import login_required, current_user
from app.extensions import db
from app.models.user import User
from app.models.audit_log import AuditLog
from app.services.csv_service import parse_users_csv
from app.services.ldap_service import LDAPService
from app.services.template_service import mask_variables, regenerate_for_paperwork
bp = Blueprint('users', __name__, url_prefix='/users')
def _log(action, record_id, description, old=None, new=None):
entry = AuditLog(
table_name='users',
record_id=record_id,
action=action,
old_values=json.dumps(old) if old else None,
new_values=json.dumps(new) if new else None,
performed_by_id=current_user.id,
ip_address=request.remote_addr,
description=description,
)
db.session.add(entry)
# ------------------------------------------------------------------
# List
# ------------------------------------------------------------------
@bp.route('/')
@login_required
def index():
page = request.args.get('page', 1, type=int)
q = request.args.get('q', '').strip()
show_masked = request.args.get('masked', '0') == '1'
query = User.query
if not show_masked:
query = query.filter_by(is_masked=False)
if q:
like = f'%{q}%'
query = query.filter(
db.or_(
User.windows_id.like(like),
User.first_name.like(like),
User.last_name.like(like),
User.email.like(like),
User.department.like(like),
)
)
pagination = query.order_by(User.last_name, User.first_name).paginate(
page=page, per_page=current_app.config['ITEMS_PER_PAGE'], error_out=False
)
return render_template('users/index.html', pagination=pagination, q=q, show_masked=show_masked)
# ------------------------------------------------------------------
# Create
# ------------------------------------------------------------------
@bp.route('/new', methods=['GET', 'POST'])
@login_required
def create():
if request.method == 'POST':
windows_id = request.form.get('windows_id', '').strip()
if not windows_id:
flash('Windows ID is required.', 'danger')
return render_template('users/form.html', user=None)
if User.query.filter_by(windows_id=windows_id).first():
flash(f'A user with Windows ID {windows_id} already exists.', 'danger')
return render_template('users/form.html', user=None)
user = User(
windows_id=windows_id,
first_name=request.form.get('first_name', '').strip() or None,
last_name=request.form.get('last_name', '').strip() or None,
email=request.form.get('email', '').strip() or None,
phone=request.form.get('phone', '').strip() or None,
department=request.form.get('department', '').strip() or None,
job_title=request.form.get('job_title', '').strip() or None,
location=request.form.get('location', '').strip() or None,
import_source='manual',
)
db.session.add(user)
db.session.flush()
_log('create', user.id, f'Created user WID={windows_id}',
new={'windows_id': windows_id, 'email': user.email})
db.session.commit()
flash(f'User {user.display_name} created.', 'success')
return redirect(url_for('users.detail', user_id=user.id))
return render_template('users/form.html', user=None)
# ------------------------------------------------------------------
# Detail
# ------------------------------------------------------------------
@bp.route('/<int:user_id>')
@login_required
def detail(user_id):
user = User.query.get_or_404(user_id)
assignments = user.assignments.order_by(db.text('assigned_date DESC')).all()
docs = user.paperwork_docs.order_by(db.text('created_at DESC')).all()
return render_template('users/detail.html', user=user, assignments=assignments, docs=docs)
# ------------------------------------------------------------------
# Edit
# ------------------------------------------------------------------
@bp.route('/<int:user_id>/edit', methods=['GET', 'POST'])
@login_required
def edit(user_id):
user = User.query.get_or_404(user_id)
if user.is_masked:
flash('Cannot edit a masked user record.', 'warning')
return redirect(url_for('users.detail', user_id=user_id))
if request.method == 'POST':
old = {
'first_name': user.first_name, 'last_name': user.last_name,
'email': user.email, 'department': user.department,
}
user.first_name = request.form.get('first_name', '').strip() or None
user.last_name = request.form.get('last_name', '').strip() or None
user.email = request.form.get('email', '').strip() or None
user.phone = request.form.get('phone', '').strip() or None
user.department = request.form.get('department', '').strip() or None
user.job_title = request.form.get('job_title', '').strip() or None
user.location = request.form.get('location', '').strip() or None
user.is_active = request.form.get('is_active') == 'on'
_log('update', user.id, f'Updated user WID={user.windows_id}',
old=old, new={'first_name': user.first_name, 'last_name': user.last_name})
db.session.commit()
flash('User updated.', 'success')
return redirect(url_for('users.detail', user_id=user_id))
return render_template('users/form.html', user=user)
# ------------------------------------------------------------------
# Mask (GDPR / off-boarding)
# ------------------------------------------------------------------
@bp.route('/<int:user_id>/mask', methods=['POST'])
@login_required
def mask(user_id):
user = User.query.get_or_404(user_id)
if user.is_masked:
flash('User is already masked.', 'info')
return redirect(url_for('users.detail', user_id=user_id))
old = {'first_name': user.first_name, 'last_name': user.last_name, 'email': user.email}
user.mask(current_user.id)
_log('mask', user.id,
f'Masked PII for WID={user.windows_id}',
old=old, new={'is_masked': True})
# Re-render all template-based documents with masked PII
app = current_app._get_current_object()
for pw_doc in user.paperwork_docs:
if pw_doc.merge_vars:
try:
masked_ctx = mask_variables(pw_doc.get_merge_vars())
pw_doc.merge_vars = json.dumps(masked_ctx)
regenerate_for_paperwork(pw_doc, app)
except Exception as exc:
app.logger.error('Failed to re-render doc %s on mask: %s', pw_doc.id, exc)
db.session.commit()
flash(f'User WID={user.windows_id} has been masked. Asset history is preserved.', 'success')
return redirect(url_for('users.detail', user_id=user_id))
# ------------------------------------------------------------------
# Import page
# ------------------------------------------------------------------
@bp.route('/import')
@login_required
def import_page():
return render_template('users/import.html')
# ------------------------------------------------------------------
# CSV import
# ------------------------------------------------------------------
@bp.route('/import/csv', methods=['POST'])
@login_required
def import_csv():
file = request.files.get('csv_file')
if not file or not file.filename.endswith('.csv'):
flash('Please upload a valid CSV file.', 'danger')
return redirect(url_for('users.import_page'))
users_data, errors = parse_users_csv(file.stream)
created = updated = skipped = 0
for row in users_data:
wid = row.get('windows_id', '').strip()
if not wid:
skipped += 1
continue
existing = User.query.filter_by(windows_id=wid).first()
if existing:
if not existing.is_masked:
existing.first_name = row.get('first_name') or existing.first_name
existing.last_name = row.get('last_name') or existing.last_name
existing.email = row.get('email') or existing.email
existing.department = row.get('department') or existing.department
existing.job_title = row.get('job_title') or existing.job_title
existing.location = row.get('location') or existing.location
existing.import_source = 'csv'
updated += 1
else:
skipped += 1
else:
u = User(
windows_id=wid,
first_name=row.get('first_name') or None,
last_name=row.get('last_name') or None,
email=row.get('email') or None,
department=row.get('department') or None,
job_title=row.get('job_title') or None,
location=row.get('location') or None,
import_source='csv',
)
db.session.add(u)
created += 1
_log('import', None,
f'CSV import: {created} created, {updated} updated, {skipped} skipped',
new={'created': created, 'updated': updated, 'skipped': skipped, 'errors': errors})
db.session.commit()
if errors:
flash(f'Import completed with warnings: {"; ".join(errors[:5])}', 'warning')
flash(f'CSV import done — {created} created, {updated} updated, {skipped} skipped.', 'success')
return redirect(url_for('users.index'))
# ------------------------------------------------------------------
# LDAP / AD sync
# ------------------------------------------------------------------
@bp.route('/import/ldap', methods=['POST'])
@login_required
def import_ldap():
if not current_app.config.get('LDAP_SERVER'):
flash('LDAP server is not configured. Update Settings first.', 'danger')
return redirect(url_for('users.import_page'))
try:
service = LDAPService()
users_data = service.sync_users()
except Exception as exc:
flash(f'LDAP connection failed: {exc}', 'danger')
return redirect(url_for('users.import_page'))
created = updated = skipped = 0
for row in users_data:
wid = row.get('windows_id', '').strip()
if not wid:
skipped += 1
continue
existing = User.query.filter_by(windows_id=wid).first()
if existing:
if not existing.is_masked:
existing.first_name = row.get('first_name') or existing.first_name
existing.last_name = row.get('last_name') or existing.last_name
existing.email = row.get('email') or existing.email
existing.department = row.get('department') or existing.department
existing.job_title = row.get('job_title') or existing.job_title
existing.location = row.get('location') or existing.location
existing.ldap_dn = row.get('ldap_dn') or existing.ldap_dn
existing.is_active = row.get('is_active', True)
existing.import_source = 'ldap'
updated += 1
else:
skipped += 1
else:
u = User(
windows_id=wid,
first_name=row.get('first_name') or None,
last_name=row.get('last_name') or None,
email=row.get('email') or None,
department=row.get('department') or None,
job_title=row.get('job_title') or None,
location=row.get('location') or None,
ldap_dn=row.get('ldap_dn') or None,
is_active=row.get('is_active', True),
import_source='ldap',
)
db.session.add(u)
created += 1
_log('import', None,
f'LDAP sync: {created} created, {updated} updated, {skipped} skipped',
new={'created': created, 'updated': updated, 'skipped': skipped})
db.session.commit()
flash(f'LDAP sync done — {created} created, {updated} updated, {skipped} skipped.', 'success')
return redirect(url_for('users.index'))
# ------------------------------------------------------------------
# Quick search (AJAX)
# ------------------------------------------------------------------
@bp.route('/search')
@login_required
def search():
q = request.args.get('q', '').strip()
if len(q) < 2:
return jsonify([])
like = f'%{q}%'
users = User.query.filter(
User.is_masked == False, # noqa: E712
db.or_(
User.windows_id.like(like),
User.first_name.like(like),
User.last_name.like(like),
)
).limit(15).all()
return jsonify([{
'id': u.id,
'text': f'{u.display_name} (WID: {u.windows_id})',
'windows_id': u.windows_id,
} for u in users])