333 lines
13 KiB
Python
333 lines
13 KiB
Python
import json
|
|
from datetime import datetime
|
|
from flask import (Blueprint, render_template, redirect, url_for,
|
|
flash, request, current_app, jsonify)
|
|
from flask_login import login_required, current_user
|
|
from app.extensions import db
|
|
from app.models.user import User
|
|
from app.models.audit_log import AuditLog
|
|
from app.services.csv_service import parse_users_csv
|
|
from app.services.ldap_service import LDAPService
|
|
from app.services.template_service import mask_variables, regenerate_for_paperwork
|
|
|
|
bp = Blueprint('users', __name__, url_prefix='/users')
|
|
|
|
|
|
def _log(action, record_id, description, old=None, new=None):
|
|
entry = AuditLog(
|
|
table_name='users',
|
|
record_id=record_id,
|
|
action=action,
|
|
old_values=json.dumps(old) if old else None,
|
|
new_values=json.dumps(new) if new else None,
|
|
performed_by_id=current_user.id,
|
|
ip_address=request.remote_addr,
|
|
description=description,
|
|
)
|
|
db.session.add(entry)
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# List
|
|
# ------------------------------------------------------------------
|
|
@bp.route('/')
|
|
@login_required
|
|
def index():
|
|
page = request.args.get('page', 1, type=int)
|
|
q = request.args.get('q', '').strip()
|
|
show_masked = request.args.get('masked', '0') == '1'
|
|
|
|
query = User.query
|
|
if not show_masked:
|
|
query = query.filter_by(is_masked=False)
|
|
if q:
|
|
like = f'%{q}%'
|
|
query = query.filter(
|
|
db.or_(
|
|
User.windows_id.like(like),
|
|
User.first_name.like(like),
|
|
User.last_name.like(like),
|
|
User.email.like(like),
|
|
User.department.like(like),
|
|
)
|
|
)
|
|
|
|
pagination = query.order_by(User.last_name, User.first_name).paginate(
|
|
page=page, per_page=current_app.config['ITEMS_PER_PAGE'], error_out=False
|
|
)
|
|
return render_template('users/index.html', pagination=pagination, q=q, show_masked=show_masked)
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Create
|
|
# ------------------------------------------------------------------
|
|
@bp.route('/new', methods=['GET', 'POST'])
|
|
@login_required
|
|
def create():
|
|
if request.method == 'POST':
|
|
windows_id = request.form.get('windows_id', '').strip()
|
|
if not windows_id:
|
|
flash('Windows ID is required.', 'danger')
|
|
return render_template('users/form.html', user=None)
|
|
|
|
if User.query.filter_by(windows_id=windows_id).first():
|
|
flash(f'A user with Windows ID {windows_id} already exists.', 'danger')
|
|
return render_template('users/form.html', user=None)
|
|
|
|
user = User(
|
|
windows_id=windows_id,
|
|
first_name=request.form.get('first_name', '').strip() or None,
|
|
last_name=request.form.get('last_name', '').strip() or None,
|
|
email=request.form.get('email', '').strip() or None,
|
|
phone=request.form.get('phone', '').strip() or None,
|
|
department=request.form.get('department', '').strip() or None,
|
|
job_title=request.form.get('job_title', '').strip() or None,
|
|
location=request.form.get('location', '').strip() or None,
|
|
import_source='manual',
|
|
)
|
|
db.session.add(user)
|
|
db.session.flush()
|
|
_log('create', user.id, f'Created user WID={windows_id}',
|
|
new={'windows_id': windows_id, 'email': user.email})
|
|
db.session.commit()
|
|
flash(f'User {user.display_name} created.', 'success')
|
|
return redirect(url_for('users.detail', user_id=user.id))
|
|
|
|
return render_template('users/form.html', user=None)
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Detail
|
|
# ------------------------------------------------------------------
|
|
@bp.route('/<int:user_id>')
|
|
@login_required
|
|
def detail(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
assignments = user.assignments.order_by(db.text('assigned_date DESC')).all()
|
|
docs = user.paperwork_docs.order_by(db.text('created_at DESC')).all()
|
|
return render_template('users/detail.html', user=user, assignments=assignments, docs=docs)
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Edit
|
|
# ------------------------------------------------------------------
|
|
@bp.route('/<int:user_id>/edit', methods=['GET', 'POST'])
|
|
@login_required
|
|
def edit(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
|
|
if user.is_masked:
|
|
flash('Cannot edit a masked user record.', 'warning')
|
|
return redirect(url_for('users.detail', user_id=user_id))
|
|
|
|
if request.method == 'POST':
|
|
old = {
|
|
'first_name': user.first_name, 'last_name': user.last_name,
|
|
'email': user.email, 'department': user.department,
|
|
}
|
|
user.first_name = request.form.get('first_name', '').strip() or None
|
|
user.last_name = request.form.get('last_name', '').strip() or None
|
|
user.email = request.form.get('email', '').strip() or None
|
|
user.phone = request.form.get('phone', '').strip() or None
|
|
user.department = request.form.get('department', '').strip() or None
|
|
user.job_title = request.form.get('job_title', '').strip() or None
|
|
user.location = request.form.get('location', '').strip() or None
|
|
user.is_active = request.form.get('is_active') == 'on'
|
|
|
|
_log('update', user.id, f'Updated user WID={user.windows_id}',
|
|
old=old, new={'first_name': user.first_name, 'last_name': user.last_name})
|
|
db.session.commit()
|
|
flash('User updated.', 'success')
|
|
return redirect(url_for('users.detail', user_id=user_id))
|
|
|
|
return render_template('users/form.html', user=user)
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Mask (GDPR / off-boarding)
|
|
# ------------------------------------------------------------------
|
|
@bp.route('/<int:user_id>/mask', methods=['POST'])
|
|
@login_required
|
|
def mask(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
|
|
if user.is_masked:
|
|
flash('User is already masked.', 'info')
|
|
return redirect(url_for('users.detail', user_id=user_id))
|
|
|
|
old = {'first_name': user.first_name, 'last_name': user.last_name, 'email': user.email}
|
|
user.mask(current_user.id)
|
|
_log('mask', user.id,
|
|
f'Masked PII for WID={user.windows_id}',
|
|
old=old, new={'is_masked': True})
|
|
|
|
# Re-render all template-based documents with masked PII
|
|
app = current_app._get_current_object()
|
|
for pw_doc in user.paperwork_docs:
|
|
if pw_doc.merge_vars:
|
|
try:
|
|
masked_ctx = mask_variables(pw_doc.get_merge_vars())
|
|
pw_doc.merge_vars = json.dumps(masked_ctx)
|
|
regenerate_for_paperwork(pw_doc, app)
|
|
except Exception as exc:
|
|
app.logger.error('Failed to re-render doc %s on mask: %s', pw_doc.id, exc)
|
|
|
|
db.session.commit()
|
|
flash(f'User WID={user.windows_id} has been masked. Asset history is preserved.', 'success')
|
|
return redirect(url_for('users.detail', user_id=user_id))
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Import page
|
|
# ------------------------------------------------------------------
|
|
@bp.route('/import')
|
|
@login_required
|
|
def import_page():
|
|
return render_template('users/import.html')
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# CSV import
|
|
# ------------------------------------------------------------------
|
|
@bp.route('/import/csv', methods=['POST'])
|
|
@login_required
|
|
def import_csv():
|
|
file = request.files.get('csv_file')
|
|
if not file or not file.filename.endswith('.csv'):
|
|
flash('Please upload a valid CSV file.', 'danger')
|
|
return redirect(url_for('users.import_page'))
|
|
|
|
users_data, errors = parse_users_csv(file.stream)
|
|
|
|
created = updated = skipped = 0
|
|
for row in users_data:
|
|
wid = row.get('windows_id', '').strip()
|
|
if not wid:
|
|
skipped += 1
|
|
continue
|
|
|
|
existing = User.query.filter_by(windows_id=wid).first()
|
|
if existing:
|
|
if not existing.is_masked:
|
|
existing.first_name = row.get('first_name') or existing.first_name
|
|
existing.last_name = row.get('last_name') or existing.last_name
|
|
existing.email = row.get('email') or existing.email
|
|
existing.department = row.get('department') or existing.department
|
|
existing.job_title = row.get('job_title') or existing.job_title
|
|
existing.location = row.get('location') or existing.location
|
|
existing.import_source = 'csv'
|
|
updated += 1
|
|
else:
|
|
skipped += 1
|
|
else:
|
|
u = User(
|
|
windows_id=wid,
|
|
first_name=row.get('first_name') or None,
|
|
last_name=row.get('last_name') or None,
|
|
email=row.get('email') or None,
|
|
department=row.get('department') or None,
|
|
job_title=row.get('job_title') or None,
|
|
location=row.get('location') or None,
|
|
import_source='csv',
|
|
)
|
|
db.session.add(u)
|
|
created += 1
|
|
|
|
_log('import', None,
|
|
f'CSV import: {created} created, {updated} updated, {skipped} skipped',
|
|
new={'created': created, 'updated': updated, 'skipped': skipped, 'errors': errors})
|
|
db.session.commit()
|
|
|
|
if errors:
|
|
flash(f'Import completed with warnings: {"; ".join(errors[:5])}', 'warning')
|
|
flash(f'CSV import done — {created} created, {updated} updated, {skipped} skipped.', 'success')
|
|
return redirect(url_for('users.index'))
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# LDAP / AD sync
|
|
# ------------------------------------------------------------------
|
|
@bp.route('/import/ldap', methods=['POST'])
|
|
@login_required
|
|
def import_ldap():
|
|
if not current_app.config.get('LDAP_SERVER'):
|
|
flash('LDAP server is not configured. Update Settings first.', 'danger')
|
|
return redirect(url_for('users.import_page'))
|
|
|
|
try:
|
|
service = LDAPService()
|
|
users_data = service.sync_users()
|
|
except Exception as exc:
|
|
flash(f'LDAP connection failed: {exc}', 'danger')
|
|
return redirect(url_for('users.import_page'))
|
|
|
|
created = updated = skipped = 0
|
|
for row in users_data:
|
|
wid = row.get('windows_id', '').strip()
|
|
if not wid:
|
|
skipped += 1
|
|
continue
|
|
|
|
existing = User.query.filter_by(windows_id=wid).first()
|
|
if existing:
|
|
if not existing.is_masked:
|
|
existing.first_name = row.get('first_name') or existing.first_name
|
|
existing.last_name = row.get('last_name') or existing.last_name
|
|
existing.email = row.get('email') or existing.email
|
|
existing.department = row.get('department') or existing.department
|
|
existing.job_title = row.get('job_title') or existing.job_title
|
|
existing.location = row.get('location') or existing.location
|
|
existing.ldap_dn = row.get('ldap_dn') or existing.ldap_dn
|
|
existing.is_active = row.get('is_active', True)
|
|
existing.import_source = 'ldap'
|
|
updated += 1
|
|
else:
|
|
skipped += 1
|
|
else:
|
|
u = User(
|
|
windows_id=wid,
|
|
first_name=row.get('first_name') or None,
|
|
last_name=row.get('last_name') or None,
|
|
email=row.get('email') or None,
|
|
department=row.get('department') or None,
|
|
job_title=row.get('job_title') or None,
|
|
location=row.get('location') or None,
|
|
ldap_dn=row.get('ldap_dn') or None,
|
|
is_active=row.get('is_active', True),
|
|
import_source='ldap',
|
|
)
|
|
db.session.add(u)
|
|
created += 1
|
|
|
|
_log('import', None,
|
|
f'LDAP sync: {created} created, {updated} updated, {skipped} skipped',
|
|
new={'created': created, 'updated': updated, 'skipped': skipped})
|
|
db.session.commit()
|
|
flash(f'LDAP sync done — {created} created, {updated} updated, {skipped} skipped.', 'success')
|
|
return redirect(url_for('users.index'))
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Quick search (AJAX)
|
|
# ------------------------------------------------------------------
|
|
@bp.route('/search')
|
|
@login_required
|
|
def search():
|
|
q = request.args.get('q', '').strip()
|
|
if len(q) < 2:
|
|
return jsonify([])
|
|
like = f'%{q}%'
|
|
users = User.query.filter(
|
|
User.is_masked == False, # noqa: E712
|
|
db.or_(
|
|
User.windows_id.like(like),
|
|
User.first_name.like(like),
|
|
User.last_name.like(like),
|
|
)
|
|
).limit(15).all()
|
|
return jsonify([{
|
|
'id': u.id,
|
|
'text': f'{u.display_name} (WID: {u.windows_id})',
|
|
'windows_id': u.windows_id,
|
|
} for u in users])
|