import os import mariadb from datetime import datetime, timedelta from flask import Blueprint, render_template, redirect, url_for, request, flash, session, current_app, jsonify from .models import User from . import db bp = Blueprint('main', __name__) def get_db_connection(): """Reads the external_server.conf file and returns a MariaDB database connection.""" settings_file = os.path.join(current_app.instance_path, 'external_server.conf') if not os.path.exists(settings_file): raise FileNotFoundError("The external_server.conf file is missing in the instance folder.") # Read settings from the configuration file settings = {} with open(settings_file, 'r') as f: for line in f: key, value = line.strip().split('=', 1) settings[key] = value # Create a database connection return mariadb.connect( user=settings['username'], password=settings['password'], host=settings['server_domain'], port=int(settings['port']), database=settings['database_name'] ) @bp.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] user = User.query.filter_by(username=username, password=password).first() if user: session['user'] = user.username session['role'] = user.role return redirect(url_for('main.dashboard')) else: flash('Invalid credentials. Please try again.') return render_template('login.html') @bp.route('/dashboard') def dashboard(): if 'user' not in session: return redirect(url_for('main.login')) return render_template('dashboard.html') @bp.route('/settings') def settings(): if 'role' not in session or session['role'] != 'superadmin': flash('Access denied: Superadmin only.') return redirect(url_for('main.dashboard')) # Fetch all users from the database users = User.query.all() # Load external database settings from the instance folder external_settings = {} settings_file = os.path.join(current_app.instance_path, 'external_server.conf') if os.path.exists(settings_file): with open(settings_file, 'r') as f: for line in f: key, value = line.strip().split('=', 1) external_settings[key] = value return render_template('settings.html', users=users, external_settings=external_settings) @bp.route('/quality') def quality(): if 'role' not in session or session['role'] not in ['superadmin', 'quality']: flash('Access denied: Quality users only.') return redirect(url_for('main.dashboard')) return render_template('quality.html') @bp.route('/warehouse') def warehouse(): if 'role' not in session or session['role'] not in ['superadmin', 'warehouse']: flash('Access denied: Warehouse users only.') return redirect(url_for('main.dashboard')) return render_template('warehouse.html') @bp.route('/scan', methods=['GET', 'POST']) def scan(): if 'role' not in session or session['role'] not in ['superadmin', 'scan']: flash('Access denied: Scan users only.') return redirect(url_for('main.dashboard')) if request.method == 'POST': # Handle form submission operator_code = request.form.get('operator_code') cp_code = request.form.get('cp_code') oc1_code = request.form.get('oc1_code') oc2_code = request.form.get('oc2_code') defect_code = request.form.get('defect_code') date = request.form.get('date') time = request.form.get('time') try: # Connect to the database conn = get_db_connection() cursor = conn.cursor() # Check if the CP_full_code already exists cursor.execute("SELECT Id FROM scan1_orders WHERE CP_full_code = ?", (cp_code,)) existing_entry = cursor.fetchone() if existing_entry: # Update the existing entry update_query = """ UPDATE scan1_orders SET operator_code = ?, OC1_code = ?, OC2_code = ?, quality_code = ?, date = ?, time = ? WHERE CP_full_code = ? """ cursor.execute(update_query, (operator_code, oc1_code, oc2_code, defect_code, date, time, cp_code)) flash('Existing entry updated successfully.') else: # Insert a new entry insert_query = """ INSERT INTO scan1_orders (operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time) VALUES (?, ?, ?, ?, ?, ?, ?) """ cursor.execute(insert_query, (operator_code, cp_code, oc1_code, oc2_code, defect_code, date, time)) flash('New entry inserted successfully.') # Commit the transaction conn.commit() conn.close() except mariadb.Error as e: print(f"Error saving scan data: {e}") flash(f"Error saving scan data: {e}") # Fetch the latest scan data for display scan_data = [] try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity FROM scan1_orders ORDER BY Id DESC LIMIT 15 """) scan_data = cursor.fetchall() conn.close() except mariadb.Error as e: print(f"Error fetching scan data: {e}") flash(f"Error fetching scan data: {e}") return render_template('scan.html', scan_data=scan_data) @bp.route('/logout') def logout(): session.pop('user', None) session.pop('role', None) return redirect(url_for('main.login')) @bp.route('/create_user', methods=['POST']) def create_user(): if 'role' not in session or session['role'] != 'superadmin': flash('Access denied: Superadmin only.') return redirect(url_for('main.settings')) username = request.form['username'] password = request.form['password'] role = request.form['role'] # Check if the username already exists if User.query.filter_by(username=username).first(): flash('User already exists.') return redirect(url_for('main.settings')) # Create a new user new_user = User(username=username, password=password, role=role) db.session.add(new_user) db.session.commit() flash('User created successfully.') return redirect(url_for('main.settings')) @bp.route('/edit_user', methods=['POST']) def edit_user(): if 'role' not in session or session['role'] != 'superadmin': flash('Access denied: Superadmin only.') return redirect(url_for('main.settings')) user_id = request.form['user_id'] password = request.form['password'] role = request.form['role'] # Fetch the user from the database user = User.query.get(user_id) if not user: flash('User not found.') return redirect(url_for('main.settings')) # Update the user's details if password: user.password = password user.role = role db.session.commit() flash('User updated successfully.') return redirect(url_for('main.settings')) @bp.route('/delete_user', methods=['POST']) def delete_user(): if 'role' not in session or session['role'] != 'superadmin': flash('Access denied: Superadmin only.') return redirect(url_for('main.settings')) user_id = request.form['user_id'] # Fetch the user from the database user = User.query.get(user_id) if not user: flash('User not found.') return redirect(url_for('main.settings')) # Delete the user db.session.delete(user) db.session.commit() flash('User deleted successfully.') return redirect(url_for('main.settings')) @bp.route('/save_external_db', methods=['POST']) def save_external_db(): if 'role' not in session or session['role'] != 'superadmin': flash('Access denied: Superadmin only.') return redirect(url_for('main.settings')) # Get form data server_domain = request.form['server_domain'] port = request.form['port'] database_name = request.form['database_name'] username = request.form['username'] password = request.form['password'] # Save data to a file in the instance folder settings_file = os.path.join(current_app.instance_path, 'external_server.conf') os.makedirs(os.path.dirname(settings_file), exist_ok=True) with open(settings_file, 'w') as f: f.write(f"server_domain={server_domain}\n") f.write(f"port={port}\n") f.write(f"database_name={database_name}\n") f.write(f"username={username}\n") f.write(f"password={password}\n") flash('External database settings saved/updated successfully.') return redirect(url_for('main.settings')) @bp.route('/get_report_data', methods=['GET']) def get_report_data(): report = request.args.get('report') data = {"headers": [], "rows": []} try: conn = get_db_connection() cursor = conn.cursor() if report == "1": # Logic for the 1-day report one_day_ago = datetime.now() - timedelta(days=1) cursor.execute(""" SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity FROM scan1_orders WHERE date >= ? ORDER BY date DESC, time DESC """, (one_day_ago.strftime('%Y-%m-%d'),)) rows = cursor.fetchall() print("Fetched rows for report 1 (last 1 day):", rows) data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"] data["rows"] = [[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row] for row in rows] elif report == "2": # Logic for the 5-day report five_days_ago = datetime.now() - timedelta(days=5) cursor.execute(""" SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity FROM scan1_orders WHERE date >= ? ORDER BY date DESC, time DESC """, (five_days_ago.strftime('%Y-%m-%d'),)) rows = cursor.fetchall() print("Fetched rows for report 2 (last 5 days):", rows) data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"] data["rows"] = [[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row] for row in rows] elif report == "3": # Logic for the report with non-zero quality_code (1 day) one_day_ago = datetime.now() - timedelta(days=1) cursor.execute(""" SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity FROM scan1_orders WHERE date >= ? AND quality_code != 0 ORDER BY date DESC, time DESC """, (one_day_ago.strftime('%Y-%m-%d'),)) rows = cursor.fetchall() print("Fetched rows for report 3 (non-zero quality_code, last 1 day):", rows) data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"] data["rows"] = [[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row] for row in rows] elif report == "4": # Logic for the report with non-zero quality_code (5 days) five_days_ago = datetime.now() - timedelta(days=5) cursor.execute(""" SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity FROM scan1_orders WHERE date >= ? AND quality_code != 0 ORDER BY date DESC, time DESC """, (five_days_ago.strftime('%Y-%m-%d'),)) rows = cursor.fetchall() print("Fetched rows for report 4 (non-zero quality_code, last 5 days):", rows) data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"] data["rows"] = [[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row] for row in rows] conn.close() except mariadb.Error as e: print(f"Error fetching report data: {e}") data["error"] = "Error fetching report data." print("Data being returned:", data) return jsonify(data)