Files
quality_recticel/py_app/app/routes.py

460 lines
18 KiB
Python

import os
import mariadb
from datetime import datetime, timedelta
from flask import Blueprint, render_template, redirect, url_for, request, flash, session, current_app, jsonify
from .models import User
from . import db
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from flask import Blueprint, render_template, request, redirect, url_for, flash
import csv
from .warehouse import add_location
from .settings import settings_handler, edit_access_roles_handler
bp = Blueprint('main', __name__)
warehouse_bp = Blueprint('warehouse', __name__)
bp = Blueprint('main', __name__)
warehouse_bp = Blueprint('warehouse', __name__)
@bp.route('/update_role_access/<role>', methods=['POST'])
def update_role_access(role):
from .settings import update_role_access_handler
return update_role_access_handler(role)
@bp.route('/store_articles')
def store_articles():
return render_template('store_articles.html')
@bp.route('/warehouse_reports')
def warehouse_reports():
return render_template('warehouse_reports.html')
def get_db_connection():
"""Reads the external_server.conf file and returns a MariaDB database connection."""
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
if not os.path.exists(settings_file):
raise FileNotFoundError("The external_server.conf file is missing in the instance folder.")
# Read settings from the configuration file
settings = {}
with open(settings_file, 'r') as f:
for line in f:
key, value = line.strip().split('=', 1)
settings[key] = value
# Create a database connection
return mariadb.connect(
user=settings['username'],
password=settings['password'],
host=settings['server_domain'],
port=int(settings['port']),
database=settings['database_name']
)
@bp.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
user = None
print("Raw form input:", repr(username), repr(password))
# Only check external MariaDB for user authentication
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SHOW TABLES LIKE 'users'")
if cursor.fetchone():
cursor.execute("SELECT username, password, role FROM users WHERE username=%s AND password=%s", (username.strip(), password.strip()))
row = cursor.fetchone()
print("External DB query result:", row)
if row:
user = {'username': row[0], 'password': row[1], 'role': row[2]}
conn.close()
except Exception as e:
print("External DB error:", e)
if user:
session['user'] = user['username']
session['role'] = user['role']
print("Logged in as:", session.get('user'), session.get('role'))
return redirect(url_for('main.dashboard'))
else:
print("Login failed for:", username, password)
flash('Invalid credentials. Please try again.')
return render_template('login.html')
@bp.route('/dashboard')
def dashboard():
print("Session user:", session.get('user'), session.get('role'))
if 'user' not in session:
return redirect(url_for('main.login'))
return render_template('dashboard.html')
@bp.route('/settings')
def settings():
return settings_handler()
# Route for editing access roles (superadmin only)
@bp.route('/edit_access_roles')
def edit_access_roles():
return edit_access_roles_handler()
@bp.route('/quality')
def quality():
if 'role' not in session or session['role'] not in ['superadmin', 'quality']:
flash('Access denied: Quality users only.')
return redirect(url_for('main.dashboard'))
return render_template('quality.html')
@bp.route('/warehouse')
def warehouse():
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse']:
flash('Access denied: Warehouse users only.')
return redirect(url_for('main.dashboard'))
return render_template('main_page_warehouse.html')
@bp.route('/scan', methods=['GET', 'POST'])
def scan():
if 'role' not in session or session['role'] not in ['superadmin', 'scan']:
flash('Access denied: Scan users only.')
return redirect(url_for('main.dashboard'))
if request.method == 'POST':
# Handle form submission
operator_code = request.form.get('operator_code')
cp_code = request.form.get('cp_code')
oc1_code = request.form.get('oc1_code')
oc2_code = request.form.get('oc2_code')
defect_code = request.form.get('defect_code')
date = request.form.get('date')
time = request.form.get('time')
try:
# Connect to the database
conn = get_db_connection()
cursor = conn.cursor()
# Check if the CP_full_code already exists
cursor.execute("SELECT Id FROM scan1_orders WHERE CP_full_code = ?", (cp_code,))
existing_entry = cursor.fetchone()
if existing_entry:
# Update the existing entry
update_query = """
UPDATE scan1_orders
SET operator_code = ?, OC1_code = ?, OC2_code = ?, quality_code = ?, date = ?, time = ?
WHERE CP_full_code = ?
"""
cursor.execute(update_query, (operator_code, oc1_code, oc2_code, defect_code, date, time, cp_code))
flash('Existing entry updated successfully.')
else:
# Insert a new entry
insert_query = """
INSERT INTO scan1_orders (operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time)
VALUES (?, ?, ?, ?, ?, ?, ?)
"""
cursor.execute(insert_query, (operator_code, cp_code, oc1_code, oc2_code, defect_code, date, time))
flash('New entry inserted successfully.')
# Commit the transaction
conn.commit()
conn.close()
except mariadb.Error as e:
print(f"Error saving scan data: {e}")
flash(f"Error saving scan data: {e}")
# Fetch the latest scan data for display
scan_data = []
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
ORDER BY Id DESC
LIMIT 15
""")
scan_data = cursor.fetchall()
conn.close()
except mariadb.Error as e:
print(f"Error fetching scan data: {e}")
flash(f"Error fetching scan data: {e}")
return render_template('scan.html', scan_data=scan_data)
@bp.route('/logout')
def logout():
session.pop('user', None)
session.pop('role', None)
return redirect(url_for('main.login'))
@bp.route('/create_user', methods=['POST'])
def create_user():
if 'role' not in session or session['role'] != 'superadmin':
flash('Access denied: Superadmin only.')
return redirect(url_for('main.settings'))
username = request.form['username']
password = request.form['password']
role = request.form['role']
# Check if the username already exists
if User.query.filter_by(username=username).first():
flash('User already exists.')
return redirect(url_for('main.settings'))
# Create a new user
new_user = User(username=username, password=password, role=role)
db.session.add(new_user)
db.session.commit()
flash('User created successfully.')
return redirect(url_for('main.settings'))
@bp.route('/edit_user', methods=['POST'])
def edit_user():
if 'role' not in session or session['role'] != 'superadmin':
flash('Access denied: Superadmin only.')
return redirect(url_for('main.settings'))
user_id = request.form['user_id']
password = request.form['password']
role = request.form['role']
# Fetch the user from the database
user = User.query.get(user_id)
if not user:
flash('User not found.')
return redirect(url_for('main.settings'))
# Update the user's details
if password:
user.password = password
user.role = role
db.session.commit()
flash('User updated successfully.')
return redirect(url_for('main.settings'))
@bp.route('/delete_user', methods=['POST'])
def delete_user():
if 'role' not in session or session['role'] != 'superadmin':
flash('Access denied: Superadmin only.')
return redirect(url_for('main.settings'))
user_id = request.form['user_id']
# Fetch the user from the database
user = User.query.get(user_id)
if not user:
flash('User not found.')
return redirect(url_for('main.settings'))
# Delete the user
db.session.delete(user)
db.session.commit()
flash('User deleted successfully.')
return redirect(url_for('main.settings'))
@bp.route('/save_external_db', methods=['POST'])
def save_external_db():
if 'role' not in session or session['role'] != 'superadmin':
flash('Access denied: Superadmin only.')
return redirect(url_for('main.settings'))
# Get form data
server_domain = request.form['server_domain']
port = request.form['port']
database_name = request.form['database_name']
username = request.form['username']
password = request.form['password']
# Save data to a file in the instance folder
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
os.makedirs(os.path.dirname(settings_file), exist_ok=True)
with open(settings_file, 'w') as f:
f.write(f"server_domain={server_domain}\n")
f.write(f"port={port}\n")
f.write(f"database_name={database_name}\n")
f.write(f"username={username}\n")
f.write(f"password={password}\n")
flash('External database settings saved/updated successfully.')
return redirect(url_for('main.settings'))
@bp.route('/get_report_data', methods=['GET'])
def get_report_data():
report = request.args.get('report')
data = {"headers": [], "rows": []}
try:
conn = get_db_connection()
cursor = conn.cursor()
if report == "1": # Logic for the 1-day report
one_day_ago = datetime.now() - timedelta(days=1)
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date >= ?
ORDER BY date DESC, time DESC
""", (one_day_ago.strftime('%Y-%m-%d'),))
rows = cursor.fetchall()
print("Fetched rows for report 1 (last 1 day):", rows)
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row] for row in rows]
elif report == "2": # Logic for the 5-day report
five_days_ago = datetime.now() - timedelta(days=5)
cursor.execute("""
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date >= ?
ORDER BY date DESC, time DESC
""", (five_days_ago.strftime('%Y-%m-%d'),))
rows = cursor.fetchall()
print("Fetched rows for report 2 (last 5 days):", rows)
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row] for row in rows]
elif report == "3": # Logic for the report with non-zero quality_code (1 day)
one_day_ago = datetime.now() - timedelta(days=1)
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date >= ? AND quality_code != 0
ORDER BY date DESC, time DESC
""", (one_day_ago.strftime('%Y-%m-%d'),))
rows = cursor.fetchall()
print("Fetched rows for report 3 (non-zero quality_code, last 1 day):", rows)
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row] for row in rows]
elif report == "4": # Logic for the report with non-zero quality_code (5 days)
five_days_ago = datetime.now() - timedelta(days=5)
cursor.execute("""
SELECT Id, operator_code, CP_full_code, OC1_code, OC2 Code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
WHERE date >= ? AND quality_code != 0
ORDER BY date DESC, time DESC
""", (five_days_ago.strftime('%Y-%m-%d'),))
rows = cursor.fetchall()
print("Fetched rows for report 4 (non-zero quality_code, last 5 days):", rows)
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row] for row in rows]
elif report == "5": # Logic for the 5-ft report (all rows)
cursor.execute("""
SELECT Id, operator_code, CP_base_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
FROM scan1_orders
ORDER BY date DESC, time DESC
""")
rows = cursor.fetchall()
print("Fetched rows for report 5 (all rows):", rows)
data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
data["rows"] = [[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row] for row in rows]
conn.close()
except mariadb.Error as e:
print(f"Error fetching report data: {e}")
data["error"] = "Error fetching report data."
print("Data being returned:", data)
return jsonify(data)
@bp.route('/etichete')
def etichete():
if 'role' not in session or session['role'] not in ['superadmin', 'etichete']:
flash('Access denied: Etichete users only.')
return redirect(url_for('main.dashboard'))
return render_template('main_page_etichete.html')
@bp.route('/upload_data')
def upload_data():
return render_template('upload_data.html')
@bp.route('/print_module')
def print_module():
return render_template('print_module.html')
@bp.route('/label_templates')
def label_templates():
return render_template('label_templates.html')
@bp.route('/create_template')
def create_template():
return render_template('create_template.html')
@bp.route('/edit_template/<int:template_id>')
def edit_template(template_id):
# Logic for editing a template will go here
return f"Edit template with ID {template_id}"
@bp.route('/delete_template/<int:template_id>', methods=['POST'])
def delete_template(template_id):
# Logic for deleting a template will go here
return f"Delete template with ID {template_id}"
@bp.route('/get_tables')
def get_tables():
# Replace with logic to fetch tables from your database
tables = ['table1', 'table2', 'table3']
return jsonify({'tables': tables})
@bp.route('/get_columns')
def get_columns():
table = request.args.get('table')
# Replace with logic to fetch columns for the selected table
columns = ['column1', 'column2', 'column3'] if table else []
return jsonify({'columns': columns})
@bp.route('/save_template', methods=['POST'])
def save_template():
data = request.get_json()
# Replace with logic to save the template to the database
print(f"Saving template: {data}")
return jsonify({'message': 'Template saved successfully!'})
@bp.route('/generate_pdf', methods=['POST'])
def generate_pdf():
data = request.get_json()
width = data.get('width', 100) # Default width in mm
height = data.get('height', 50) # Default height in mm
columns = data.get('columns', [])
# Convert dimensions from mm to points (1 mm = 2.83465 points)
width_points = width * 2.83465
height_points = height * 2.83465
# Ensure the /static/label_templates folder exists
label_templates_folder = os.path.join(current_app.root_path, 'static', 'label_templates')
os.makedirs(label_templates_folder, exist_ok=True)
# Define the path for the PDF file
pdf_file_path = os.path.join(label_templates_folder, 'label_template.pdf')
# Create a PDF file
c = canvas.Canvas(pdf_file_path, pagesize=(width_points, height_points))
# Add content to the PDF
c.drawString(10, height_points - 20, "Label Template")
y_position = height_points - 40
for column in columns:
c.drawString(10, y_position, f"Column: {column}")
y_position -= 20
# Save the PDF
c.save()
return jsonify({'message': 'PDF generated successfully!', 'pdf_path': f'/static/label_templates/label_template.pdf'})
@warehouse_bp.route('/create_locations', methods=['GET', 'POST'])
def create_locations():
from app.warehouse import create_locations_handler
return create_locations_handler()
@warehouse_bp.route('/import_locations_csv', methods=['GET', 'POST'])
def import_locations_csv():
from app.warehouse import import_locations_csv_handler
return import_locations_csv_handler()