308 lines
11 KiB
Python
308 lines
11 KiB
Python
import os
|
|
import mariadb
|
|
from datetime import datetime, timedelta
|
|
from flask import Blueprint, render_template, redirect, url_for, request, flash, session, current_app, jsonify
|
|
from .models import User
|
|
from . import db
|
|
|
|
bp = Blueprint('main', __name__)
|
|
|
|
def get_db_connection():
|
|
"""Reads the external_server.conf file and returns a MariaDB database connection."""
|
|
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
|
|
if not os.path.exists(settings_file):
|
|
raise FileNotFoundError("The external_server.conf file is missing in the instance folder.")
|
|
|
|
# Read settings from the configuration file
|
|
settings = {}
|
|
with open(settings_file, 'r') as f:
|
|
for line in f:
|
|
key, value = line.strip().split('=', 1)
|
|
settings[key] = value
|
|
|
|
# Create a database connection
|
|
return mariadb.connect(
|
|
user=settings['username'],
|
|
password=settings['password'],
|
|
host=settings['server_domain'],
|
|
port=int(settings['port']),
|
|
database=settings['database_name']
|
|
)
|
|
|
|
@bp.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
user = User.query.filter_by(username=username, password=password).first()
|
|
if user:
|
|
session['user'] = user.username
|
|
session['role'] = user.role
|
|
return redirect(url_for('main.dashboard'))
|
|
else:
|
|
flash('Invalid credentials. Please try again.')
|
|
return render_template('login.html')
|
|
|
|
@bp.route('/dashboard')
|
|
def dashboard():
|
|
if 'user' not in session:
|
|
return redirect(url_for('main.login'))
|
|
return render_template('dashboard.html')
|
|
|
|
@bp.route('/settings')
|
|
def settings():
|
|
if 'role' not in session or session['role'] != 'superadmin':
|
|
flash('Access denied: Superadmin only.')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
# Fetch all users from the database
|
|
users = User.query.all()
|
|
|
|
# Load external database settings from the instance folder
|
|
external_settings = {}
|
|
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
|
|
if os.path.exists(settings_file):
|
|
with open(settings_file, 'r') as f:
|
|
for line in f:
|
|
key, value = line.strip().split('=', 1)
|
|
external_settings[key] = value
|
|
|
|
return render_template('settings.html', users=users, external_settings=external_settings)
|
|
|
|
@bp.route('/quality')
|
|
def quality():
|
|
if 'role' not in session or session['role'] not in ['superadmin', 'quality']:
|
|
flash('Access denied: Quality users only.')
|
|
return redirect(url_for('main.dashboard'))
|
|
return render_template('quality.html')
|
|
|
|
@bp.route('/warehouse')
|
|
def warehouse():
|
|
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse']:
|
|
flash('Access denied: Warehouse users only.')
|
|
return redirect(url_for('main.dashboard'))
|
|
return render_template('warehouse.html')
|
|
|
|
@bp.route('/scan', methods=['GET', 'POST'])
|
|
def scan():
|
|
if 'role' not in session or session['role'] not in ['superadmin', 'scan']:
|
|
flash('Access denied: Scan users only.')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
if request.method == 'POST':
|
|
# Handle form submission
|
|
operator_code = request.form.get('operator_code')
|
|
cp_code = request.form.get('cp_code')
|
|
oc1_code = request.form.get('oc1_code')
|
|
oc2_code = request.form.get('oc2_code')
|
|
defect_code = request.form.get('defect_code')
|
|
date = request.form.get('date')
|
|
time = request.form.get('time')
|
|
|
|
|
|
try:
|
|
# Connect to the database
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Insert query
|
|
insert_query = """
|
|
INSERT INTO scan1_orders (operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
"""
|
|
print(f"Executing query: {insert_query}")
|
|
print(f"With values: ('{operator_code}', '{cp_code}', '{oc1_code}', '{oc2_code}', '{defect_code}', '{date}', '{time}')")
|
|
|
|
# Execute the query
|
|
cursor.execute(insert_query, (operator_code, cp_code, oc1_code, oc2_code, defect_code, date, time))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
flash('Scan data saved successfully.')
|
|
except mariadb.Error as e:
|
|
print(f"Error saving scan data: {e}")
|
|
flash(f"Error saving scan data: {e}")
|
|
|
|
# Fetch the latest scan data for display
|
|
scan_data = []
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
ORDER BY Id DESC
|
|
LIMIT 15
|
|
""")
|
|
scan_data = cursor.fetchall()
|
|
conn.close()
|
|
except mariadb.Error as e:
|
|
print(f"Error fetching scan data: {e}")
|
|
flash(f"Error fetching scan data: {e}")
|
|
|
|
return render_template('scan.html', scan_data=scan_data)
|
|
|
|
@bp.route('/logout')
|
|
def logout():
|
|
session.pop('user', None)
|
|
session.pop('role', None)
|
|
return redirect(url_for('main.login'))
|
|
|
|
@bp.route('/create_user', methods=['POST'])
|
|
def create_user():
|
|
if 'role' not in session or session['role'] != 'superadmin':
|
|
flash('Access denied: Superadmin only.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
role = request.form['role']
|
|
|
|
# Check if the username already exists
|
|
if User.query.filter_by(username=username).first():
|
|
flash('User already exists.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
# Create a new user
|
|
new_user = User(username=username, password=password, role=role)
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
|
|
flash('User created successfully.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
@bp.route('/edit_user', methods=['POST'])
|
|
def edit_user():
|
|
if 'role' not in session or session['role'] != 'superadmin':
|
|
flash('Access denied: Superadmin only.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
user_id = request.form['user_id']
|
|
password = request.form['password']
|
|
role = request.form['role']
|
|
|
|
# Fetch the user from the database
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
flash('User not found.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
# Update the user's details
|
|
if password:
|
|
user.password = password
|
|
user.role = role
|
|
db.session.commit()
|
|
|
|
flash('User updated successfully.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
@bp.route('/delete_user', methods=['POST'])
|
|
def delete_user():
|
|
if 'role' not in session or session['role'] != 'superadmin':
|
|
flash('Access denied: Superadmin only.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
user_id = request.form['user_id']
|
|
|
|
# Fetch the user from the database
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
flash('User not found.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
# Delete the user
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
|
|
flash('User deleted successfully.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
@bp.route('/save_external_db', methods=['POST'])
|
|
def save_external_db():
|
|
if 'role' not in session or session['role'] != 'superadmin':
|
|
flash('Access denied: Superadmin only.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
# Get form data
|
|
server_domain = request.form['server_domain']
|
|
port = request.form['port']
|
|
database_name = request.form['database_name']
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
|
|
# Save data to a file in the instance folder
|
|
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
|
|
os.makedirs(os.path.dirname(settings_file), exist_ok=True)
|
|
with open(settings_file, 'w') as f:
|
|
f.write(f"server_domain={server_domain}\n")
|
|
f.write(f"port={port}\n")
|
|
f.write(f"database_name={database_name}\n")
|
|
f.write(f"username={username}\n")
|
|
f.write(f"password={password}\n")
|
|
|
|
flash('External database settings saved/updated successfully.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
@bp.route('/get_report_data', methods=['GET'])
|
|
def get_report_data():
|
|
report = request.args.get('report')
|
|
data = {"headers": [], "rows": []}
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
if report == "1": # Logic for the 1-day report
|
|
# Calculate the date 1 day ago
|
|
one_day_ago = datetime.now() - timedelta(days=1)
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
WHERE date >= ?
|
|
ORDER BY date DESC, time DESC
|
|
""", (one_day_ago.strftime('%Y-%m-%d'),))
|
|
rows = cursor.fetchall()
|
|
|
|
# Debugging: Print fetched rows
|
|
print("Fetched rows for report 1 (last 1 day):", rows)
|
|
|
|
# Define headers based on database column names
|
|
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
|
|
|
|
# Convert rows to JSON serializable format
|
|
data["rows"] = [
|
|
[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row]
|
|
for row in rows
|
|
]
|
|
|
|
elif report == "2": # Logic for the 5-day report
|
|
# Calculate the date 5 days ago
|
|
five_days_ago = datetime.now() - timedelta(days=5)
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
WHERE date >= ?
|
|
ORDER BY date DESC, time DESC
|
|
""", (five_days_ago.strftime('%Y-%m-%d'),))
|
|
rows = cursor.fetchall()
|
|
|
|
# Debugging: Print fetched rows
|
|
print("Fetched rows for report 2 (last 5 days):", rows)
|
|
|
|
# Define headers based on database column names
|
|
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
|
|
|
|
# Convert rows to JSON serializable format
|
|
data["rows"] = [
|
|
[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row]
|
|
for row in rows
|
|
]
|
|
|
|
conn.close()
|
|
except mariadb.Error as e:
|
|
print(f"Error fetching report data: {e}")
|
|
data["error"] = "Error fetching report data."
|
|
|
|
# Debugging: Print the final data being returned
|
|
print("Data being returned:", data)
|
|
return jsonify(data) |