Files
quality_recticel/py_app/app/routes.py
2025-04-30 11:57:48 +03:00

342 lines
14 KiB
Python

import os
import mariadb
from datetime import datetime, timedelta
from flask import Blueprint, render_template, redirect, url_for, request, flash, session, current_app, jsonify
from .models import User
from . import db
bp = Blueprint('main', __name__)
def get_db_connection():
"""Reads the external_server.conf file and returns a MariaDB database connection."""
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
if not os.path.exists(settings_file):
raise FileNotFoundError("The external_server.conf file is missing in the instance folder.")
# Read settings from the configuration file
settings = {}
with open(settings_file, 'r') as f:
for line in f:
key, value = line.strip().split('=', 1)
settings[key] = value
# Create a database connection
return mariadb.connect(
user=settings['username'],
password=settings['password'],
host=settings['server_domain'],
port=int(settings['port']),
database=settings['database_name']
)
@bp.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
user = User.query.filter_by(username=username, password=password).first()
if user:
session['user'] = user.username
session['role'] = user.role
return redirect(url_for('main.dashboard'))
else:
flash('Invalid credentials. Please try again.')
return render_template('login.html')
@bp.route('/dashboard')
def dashboard():
if 'user' not in session:
return redirect(url_for('main.login'))
return render_template('dashboard.html')
@bp.route('/settings')
def settings():
if 'role' not in session or session['role'] != 'superadmin':
flash('Access denied: Superadmin only.')
return redirect(url_for('main.dashboard'))
# Fetch all users from the database
users = User.query.all()
# Load external database settings from the instance folder
external_settings = {}
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
if os.path.exists(settings_file):
with open(settings_file, 'r') as f:
for line in f:
key, value = line.strip().split('=', 1)
external_settings[key] = value
return render_template('settings.html', users=users, external_settings=external_settings)
@bp.route('/quality')
def quality():
if 'role' not in session or session['role'] not in ['superadmin', 'quality']:
flash('Access denied: Quality users only.')
return redirect(url_for('main.dashboard'))
return render_template('quality.html')
@bp.route('/warehouse')
def warehouse():
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse']:
flash('Access denied: Warehouse users only.')
return redirect(url_for('main.dashboard'))
return render_template('warehouse.html')
@bp.route('/scan', methods=['GET', 'POST'])
def scan():
if 'role' not in session or session['role'] not in ['superadmin', 'scan']:
flash('Access denied: Scan users only.')
return redirect(url_for('main.dashboard'))
if request.method == 'POST':
# Handle form submission
operator_code = request.form.get('operator_code')
cp_code = request.form.get('cp_code')
oc1_code = request.form.get('oc1_code')
oc2_code = request.form.get('oc2_code')
defect_code = request.form.get('defect_code')
date = request.form.get('date')
time = request.form.get('time')
try:
# Connect to the database
conn = get_db_connection()
cursor = conn.cursor()
# Check if the CP_full_code already exists
cursor.execute("SELECT Id FROM scan1_orders WHERE CP_full_code = ?", (cp_code,))
existing_entry = cursor.fetchone()
if existing_entry:
# Update the existing entry
update_query = """
UPDATE scan1_orders
SET operator_code = ?, OC1_code = ?, OC2_code = ?, quality_code = ?, date = ?, time = ?
WHERE CP_full_code = ?
"""
cursor.execute(update_query, (operator_code, oc1_code, oc2_code, defect_code, date, time, cp_code))
flash('Existing entry updated successfully.')
else:
# Insert a new entry
insert_query = """
INSERT INTO scan1_orders (operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time)
VALUES (?, ?, ?, ?, ?, ?, ?)
"""
cursor.execute(insert_query, (operator_code, cp_code, oc1_code, oc2_code, defect_code, date, time))
flash('New entry inserted successfully.')
# Commit the transaction
conn.commit()
conn.close()
except mariadb.Error as e:
print(f"Error saving scan data: {e}")
flash(f"Error saving scan data: {e}")
# Fetch the latest scan data for display
scan_data = []
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
ORDER BY Id DESC
LIMIT 15
""")
scan_data = cursor.fetchall()
conn.close()
except mariadb.Error as e:
print(f"Error fetching scan data: {e}")
flash(f"Error fetching scan data: {e}")
return render_template('scan.html', scan_data=scan_data)
@bp.route('/logout')
def logout():
session.pop('user', None)
session.pop('role', None)
return redirect(url_for('main.login'))
@bp.route('/create_user', methods=['POST'])
def create_user():
if 'role' not in session or session['role'] != 'superadmin':
flash('Access denied: Superadmin only.')
return redirect(url_for('main.settings'))
username = request.form['username']
password = request.form['password']
role = request.form['role']
# Check if the username already exists
if User.query.filter_by(username=username).first():
flash('User already exists.')
return redirect(url_for('main.settings'))
# Create a new user
new_user = User(username=username, password=password, role=role)
db.session.add(new_user)
db.session.commit()
flash('User created successfully.')
return redirect(url_for('main.settings'))
@bp.route('/edit_user', methods=['POST'])
def edit_user():
if 'role' not in session or session['role'] != 'superadmin':
flash('Access denied: Superadmin only.')
return redirect(url_for('main.settings'))
user_id = request.form['user_id']
password = request.form['password']
role = request.form['role']
# Fetch the user from the database
user = User.query.get(user_id)
if not user:
flash('User not found.')
return redirect(url_for('main.settings'))
# Update the user's details
if password:
user.password = password
user.role = role
db.session.commit()
flash('User updated successfully.')
return redirect(url_for('main.settings'))
@bp.route('/delete_user', methods=['POST'])
def delete_user():
if 'role' not in session or session['role'] != 'superadmin':
flash('Access denied: Superadmin only.')
return redirect(url_for('main.settings'))
user_id = request.form['user_id']
# Fetch the user from the database
user = User.query.get(user_id)
if not user:
flash('User not found.')
return redirect(url_for('main.settings'))
# Delete the user
db.session.delete(user)
db.session.commit()
flash('User deleted successfully.')
return redirect(url_for('main.settings'))
@bp.route('/save_external_db', methods=['POST'])
def save_external_db():
if 'role' not in session or session['role'] != 'superadmin':
flash('Access denied: Superadmin only.')
return redirect(url_for('main.settings'))
# Get form data
server_domain = request.form['server_domain']
port = request.form['port']
database_name = request.form['database_name']
username = request.form['username']
password = request.form['password']
# Save data to a file in the instance folder
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
os.makedirs(os.path.dirname(settings_file), exist_ok=True)
with open(settings_file, 'w') as f:
f.write(f"server_domain={server_domain}\n")
f.write(f"port={port}\n")
f.write(f"database_name={database_name}\n")
f.write(f"username={username}\n")
f.write(f"password={password}\n")
flash('External database settings saved/updated successfully.')
return redirect(url_for('main.settings'))
@bp.route('/get_report_data', methods=['GET'])
def get_report_data():
report = request.args.get('report')
data = {"headers": [], "rows": []}
try:
conn = get_db_connection()
cursor = conn.cursor()
if report == "1": # Logic for the 1-day report
one_day_ago = datetime.now() - timedelta(days=1)
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date >= ?
ORDER BY date DESC, time DESC
""", (one_day_ago.strftime('%Y-%m-%d'),))
rows = cursor.fetchall()
print("Fetched rows for report 1 (last 1 day):", rows)
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row] for row in rows]
elif report == "2": # Logic for the 5-day report
five_days_ago = datetime.now() - timedelta(days=5)
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date >= ?
ORDER BY date DESC, time DESC
""", (five_days_ago.strftime('%Y-%m-%d'),))
rows = cursor.fetchall()
print("Fetched rows for report 2 (last 5 days):", rows)
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row] for row in rows]
elif report == "3": # Logic for the report with non-zero quality_code (1 day)
one_day_ago = datetime.now() - timedelta(days=1)
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date >= ? AND quality_code != 0
ORDER BY date DESC, time DESC
""", (one_day_ago.strftime('%Y-%m-%d'),))
rows = cursor.fetchall()
print("Fetched rows for report 3 (non-zero quality_code, last 1 day):", rows)
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row] for row in rows]
elif report == "4": # Logic for the report with non-zero quality_code (5 days)
five_days_ago = datetime.now() - timedelta(days=5)
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date >= ? AND quality_code != 0
ORDER BY date DESC, time DESC
""", (five_days_ago.strftime('%Y-%m-%d'),))
rows = cursor.fetchall()
print("Fetched rows for report 4 (non-zero quality_code, last 5 days):", rows)
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row] for row in rows]
elif report == "5": # Logic for the 5-ft report (all rows)
cursor.execute("""
SELECT Id, operator_code, CP_base_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
ORDER BY date DESC, time DESC
""")
rows = cursor.fetchall()
print("Fetched rows for report 5 (all rows):", rows)
data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row] for row in rows]
conn.close()
except mariadb.Error as e:
print(f"Error fetching report data: {e}")
data["error"] = "Error fetching report data."
print("Data being returned:", data)
return jsonify(data)
@bp.route('/etichete')
def etichete():
if 'role' not in session or session['role'] not in ['superadmin', 'etichete']:
flash('Access denied: Etichete users only.')
return redirect(url_for('main.dashboard'))
return render_template('main_page_etichete.html')