503 lines
20 KiB
Python
503 lines
20 KiB
Python
import os
|
|
import mariadb
|
|
from datetime import datetime, timedelta
|
|
from flask import Blueprint, render_template, redirect, url_for, request, flash, session, current_app, jsonify
|
|
from .models import User
|
|
from . import db
|
|
from reportlab.lib.pagesizes import letter
|
|
from reportlab.pdfgen import canvas
|
|
from flask import Blueprint, render_template, request, redirect, url_for, flash
|
|
import csv
|
|
from .warehouse import add_location
|
|
from .settings import settings_handler, edit_access_roles_handler
|
|
|
|
bp = Blueprint('main', __name__)
|
|
warehouse_bp = Blueprint('warehouse', __name__)
|
|
|
|
@bp.route('/update_role_access/<role>', methods=['POST'])
|
|
def update_role_access(role):
|
|
from .settings import update_role_access_handler
|
|
return update_role_access_handler(role)
|
|
|
|
@bp.route('/store_articles')
|
|
def store_articles():
|
|
return render_template('store_articles.html')
|
|
|
|
@bp.route('/warehouse_reports')
|
|
def warehouse_reports():
|
|
return render_template('warehouse_reports.html')
|
|
|
|
def get_db_connection():
|
|
"""Reads the external_server.conf file and returns a MariaDB database connection."""
|
|
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
|
|
if not os.path.exists(settings_file):
|
|
raise FileNotFoundError("The external_server.conf file is missing in the instance folder.")
|
|
|
|
# Read settings from the configuration file
|
|
settings = {}
|
|
with open(settings_file, 'r') as f:
|
|
for line in f:
|
|
key, value = line.strip().split('=', 1)
|
|
settings[key] = value
|
|
|
|
# Create a database connection
|
|
return mariadb.connect(
|
|
user=settings['username'],
|
|
password=settings['password'],
|
|
host=settings['server_domain'],
|
|
port=int(settings['port']),
|
|
database=settings['database_name']
|
|
)
|
|
|
|
@bp.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
import sqlite3
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
user = None
|
|
print("Raw form input:", repr(username), repr(password))
|
|
|
|
# Logic: If username starts with #, check internal SQLite database
|
|
if username.startswith('#'):
|
|
username_clean = username[1:].strip()
|
|
password_clean = password.strip()
|
|
print(f"Checking internal database for: {username_clean}")
|
|
|
|
# Check internal SQLite database (py_app/instance/users.db)
|
|
internal_db_path = os.path.join(os.path.dirname(__file__), '../instance/users.db')
|
|
try:
|
|
conn = sqlite3.connect(internal_db_path)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'")
|
|
if cursor.fetchone():
|
|
cursor.execute("SELECT username, password, role FROM users WHERE username=? AND password=?", (username_clean, password_clean))
|
|
row = cursor.fetchone()
|
|
print("Internal DB query result:", row)
|
|
if row:
|
|
user = {'username': row[0], 'password': row[1], 'role': row[2]}
|
|
else:
|
|
print("No users table in internal database")
|
|
conn.close()
|
|
except Exception as e:
|
|
print("Internal DB error:", e)
|
|
|
|
else:
|
|
# Check external MariaDB database first
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SHOW TABLES LIKE 'users'")
|
|
if cursor.fetchone():
|
|
cursor.execute("SELECT username, password, role FROM users WHERE username=%s AND password=%s", (username.strip(), password.strip()))
|
|
row = cursor.fetchone()
|
|
print("External DB query result:", row)
|
|
if row:
|
|
user = {'username': row[0], 'password': row[1], 'role': row[2]}
|
|
conn.close()
|
|
except Exception as e:
|
|
print("External DB error:", e)
|
|
|
|
# Fallback to internal database if external fails
|
|
print("Falling back to internal database")
|
|
internal_db_path = os.path.join(os.path.dirname(__file__), '../instance/users.db')
|
|
try:
|
|
conn = sqlite3.connect(internal_db_path)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'")
|
|
if cursor.fetchone():
|
|
cursor.execute("SELECT username, password, role FROM users WHERE username=? AND password=?", (username.strip(), password.strip()))
|
|
row = cursor.fetchone()
|
|
print("Internal DB fallback query result:", row)
|
|
if row:
|
|
user = {'username': row[0], 'password': row[1], 'role': row[2]}
|
|
conn.close()
|
|
except Exception as e2:
|
|
print("Internal DB fallback error:", e2)
|
|
|
|
if user:
|
|
session['user'] = user['username']
|
|
session['role'] = user['role']
|
|
print("Logged in as:", session.get('user'), session.get('role'))
|
|
return redirect(url_for('main.dashboard'))
|
|
else:
|
|
print("Login failed for:", username, password)
|
|
flash('Invalid credentials. Please try again.')
|
|
return render_template('login.html')
|
|
|
|
@bp.route('/dashboard')
|
|
def dashboard():
|
|
print("Session user:", session.get('user'), session.get('role'))
|
|
if 'user' not in session:
|
|
return redirect(url_for('main.login'))
|
|
return render_template('dashboard.html')
|
|
|
|
@bp.route('/settings')
|
|
def settings():
|
|
return settings_handler()
|
|
|
|
# Route for editing access roles (superadmin only)
|
|
@bp.route('/edit_access_roles')
|
|
def edit_access_roles():
|
|
return edit_access_roles_handler()
|
|
|
|
@bp.route('/quality')
|
|
def quality():
|
|
if 'role' not in session or session['role'] not in ['superadmin', 'quality']:
|
|
flash('Access denied: Quality users only.')
|
|
return redirect(url_for('main.dashboard'))
|
|
return render_template('quality.html')
|
|
|
|
@bp.route('/warehouse')
|
|
def warehouse():
|
|
if 'role' not in session or session['role'] not in ['superadmin', 'warehouse']:
|
|
flash('Access denied: Warehouse users only.')
|
|
return redirect(url_for('main.dashboard'))
|
|
return render_template('main_page_warehouse.html')
|
|
|
|
@bp.route('/scan', methods=['GET', 'POST'])
|
|
def scan():
|
|
if 'role' not in session or session['role'] not in ['superadmin', 'scan']:
|
|
flash('Access denied: Scan users only.')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
if request.method == 'POST':
|
|
# Handle form submission
|
|
operator_code = request.form.get('operator_code')
|
|
cp_code = request.form.get('cp_code')
|
|
oc1_code = request.form.get('oc1_code')
|
|
oc2_code = request.form.get('oc2_code')
|
|
defect_code = request.form.get('defect_code')
|
|
date = request.form.get('date')
|
|
time = request.form.get('time')
|
|
|
|
try:
|
|
# Connect to the database
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if the CP_full_code already exists
|
|
cursor.execute("SELECT Id FROM scan1_orders WHERE CP_full_code = ?", (cp_code,))
|
|
existing_entry = cursor.fetchone()
|
|
|
|
if existing_entry:
|
|
# Update the existing entry
|
|
update_query = """
|
|
UPDATE scan1_orders
|
|
SET operator_code = ?, OC1_code = ?, OC2_code = ?, quality_code = ?, date = ?, time = ?
|
|
WHERE CP_full_code = ?
|
|
"""
|
|
cursor.execute(update_query, (operator_code, oc1_code, oc2_code, defect_code, date, time, cp_code))
|
|
flash('Existing entry updated successfully.')
|
|
else:
|
|
# Insert a new entry
|
|
insert_query = """
|
|
INSERT INTO scan1_orders (operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
"""
|
|
cursor.execute(insert_query, (operator_code, cp_code, oc1_code, oc2_code, defect_code, date, time))
|
|
flash('New entry inserted successfully.')
|
|
|
|
# Commit the transaction
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
except mariadb.Error as e:
|
|
print(f"Error saving scan data: {e}")
|
|
flash(f"Error saving scan data: {e}")
|
|
|
|
# Fetch the latest scan data for display
|
|
scan_data = []
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
ORDER BY Id DESC
|
|
LIMIT 15
|
|
""")
|
|
scan_data = cursor.fetchall()
|
|
conn.close()
|
|
except mariadb.Error as e:
|
|
print(f"Error fetching scan data: {e}")
|
|
flash(f"Error fetching scan data: {e}")
|
|
|
|
return render_template('scan.html', scan_data=scan_data)
|
|
|
|
@bp.route('/logout')
|
|
def logout():
|
|
session.pop('user', None)
|
|
session.pop('role', None)
|
|
return redirect(url_for('main.login'))
|
|
|
|
@bp.route('/create_user', methods=['POST'])
|
|
def create_user():
|
|
if 'role' not in session or session['role'] != 'superadmin':
|
|
flash('Access denied: Superadmin only.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
role = request.form['role']
|
|
|
|
# Check if the username already exists
|
|
if User.query.filter_by(username=username).first():
|
|
flash('User already exists.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
# Create a new user
|
|
new_user = User(username=username, password=password, role=role)
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
|
|
flash('User created successfully.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
@bp.route('/edit_user', methods=['POST'])
|
|
def edit_user():
|
|
if 'role' not in session or session['role'] != 'superadmin':
|
|
flash('Access denied: Superadmin only.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
user_id = request.form['user_id']
|
|
password = request.form['password']
|
|
role = request.form['role']
|
|
|
|
# Fetch the user from the database
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
flash('User not found.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
# Update the user's details
|
|
if password:
|
|
user.password = password
|
|
user.role = role
|
|
db.session.commit()
|
|
|
|
flash('User updated successfully.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
@bp.route('/delete_user', methods=['POST'])
|
|
def delete_user():
|
|
if 'role' not in session or session['role'] != 'superadmin':
|
|
flash('Access denied: Superadmin only.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
user_id = request.form['user_id']
|
|
|
|
# Fetch the user from the database
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
flash('User not found.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
# Delete the user
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
|
|
flash('User deleted successfully.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
@bp.route('/save_external_db', methods=['POST'])
|
|
def save_external_db():
|
|
if 'role' not in session or session['role'] != 'superadmin':
|
|
flash('Access denied: Superadmin only.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
# Get form data
|
|
server_domain = request.form['server_domain']
|
|
port = request.form['port']
|
|
database_name = request.form['database_name']
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
|
|
# Save data to a file in the instance folder
|
|
settings_file = os.path.join(current_app.instance_path, 'external_server.conf')
|
|
os.makedirs(os.path.dirname(settings_file), exist_ok=True)
|
|
with open(settings_file, 'w') as f:
|
|
f.write(f"server_domain={server_domain}\n")
|
|
f.write(f"port={port}\n")
|
|
f.write(f"database_name={database_name}\n")
|
|
f.write(f"username={username}\n")
|
|
f.write(f"password={password}\n")
|
|
|
|
flash('External database settings saved/updated successfully.')
|
|
return redirect(url_for('main.settings'))
|
|
|
|
@bp.route('/get_report_data', methods=['GET'])
|
|
def get_report_data():
|
|
report = request.args.get('report')
|
|
data = {"headers": [], "rows": []}
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
if report == "1": # Logic for the 1-day report
|
|
one_day_ago = datetime.now() - timedelta(days=1)
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
WHERE date >= ?
|
|
ORDER BY date DESC, time DESC
|
|
""", (one_day_ago.strftime('%Y-%m-%d'),))
|
|
rows = cursor.fetchall()
|
|
print("Fetched rows for report 1 (last 1 day):", rows)
|
|
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
|
|
data["rows"] = [[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row] for row in rows]
|
|
|
|
elif report == "2": # Logic for the 5-day report
|
|
five_days_ago = datetime.now() - timedelta(days=5)
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_base_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
WHERE date >= ?
|
|
ORDER BY date DESC, time DESC
|
|
""", (five_days_ago.strftime('%Y-%m-%d'),))
|
|
rows = cursor.fetchall()
|
|
print("Fetched rows for report 2 (last 5 days):", rows)
|
|
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
|
|
data["rows"] = [[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row] for row in rows]
|
|
|
|
elif report == "3": # Logic for the report with non-zero quality_code (1 day)
|
|
one_day_ago = datetime.now() - timedelta(days=1)
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
WHERE date >= ? AND quality_code != 0
|
|
ORDER BY date DESC, time DESC
|
|
""", (one_day_ago.strftime('%Y-%m-%d'),))
|
|
rows = cursor.fetchall()
|
|
print("Fetched rows for report 3 (non-zero quality_code, last 1 day):", rows)
|
|
data["headers"] = ["Id", "Operator Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
|
|
data["rows"] = [[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row] for row in rows]
|
|
|
|
elif report == "4": # Logic for the report with non-zero quality_code (5 days)
|
|
five_days_ago = datetime.now() - timedelta(days=5)
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_full_code, OC1_code, OC2 Code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
WHERE date >= ? AND quality_code != 0
|
|
ORDER BY date DESC, time DESC
|
|
""", (five_days_ago.strftime('%Y-%m-%d'),))
|
|
rows = cursor.fetchall()
|
|
print("Fetched rows for report 4 (non-zero quality_code, last 5 days):", rows)
|
|
data["headers"] = ["Id", "Operator Code", "CP Base Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
|
|
data["rows"] = [[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row] for row in rows]
|
|
|
|
elif report == "5": # Logic for the 5-ft report (all rows)
|
|
cursor.execute("""
|
|
SELECT Id, operator_code, CP_base_code, CP_full_code, OC1_code, OC2_code, quality_code, date, time, approved_quantity, rejected_quantity
|
|
FROM scan1_orders
|
|
ORDER BY date DESC, time DESC
|
|
""")
|
|
rows = cursor.fetchall()
|
|
print("Fetched rows for report 5 (all rows):", rows)
|
|
data["headers"] = ["Id", "Operator Code", "CP Base Code", "CP Full Code", "OC1 Code", "OC2 Code", "Quality Code", "Date", "Time", "Approved Quantity", "Rejected Quantity"]
|
|
data["rows"] = [[str(cell) if isinstance(cell, (datetime, timedelta)) else cell for cell in row] for row in rows]
|
|
|
|
conn.close()
|
|
except mariadb.Error as e:
|
|
print(f"Error fetching report data: {e}")
|
|
data["error"] = "Error fetching report data."
|
|
|
|
print("Data being returned:", data)
|
|
return jsonify(data)
|
|
|
|
@bp.route('/etichete')
|
|
def etichete():
|
|
if 'role' not in session or session['role'] not in ['superadmin', 'etichete']:
|
|
flash('Access denied: Etichete users only.')
|
|
return redirect(url_for('main.dashboard'))
|
|
return render_template('main_page_etichete.html')
|
|
|
|
@bp.route('/upload_data')
|
|
def upload_data():
|
|
return render_template('upload_data.html')
|
|
|
|
@bp.route('/print_module')
|
|
def print_module():
|
|
return render_template('print_module.html')
|
|
|
|
@bp.route('/label_templates')
|
|
def label_templates():
|
|
return render_template('label_templates.html')
|
|
|
|
@bp.route('/create_template')
|
|
def create_template():
|
|
return render_template('create_template.html')
|
|
|
|
@bp.route('/edit_template/<int:template_id>')
|
|
def edit_template(template_id):
|
|
# Logic for editing a template will go here
|
|
return f"Edit template with ID {template_id}"
|
|
|
|
@bp.route('/delete_template/<int:template_id>', methods=['POST'])
|
|
def delete_template(template_id):
|
|
# Logic for deleting a template will go here
|
|
return f"Delete template with ID {template_id}"
|
|
|
|
@bp.route('/get_tables')
|
|
def get_tables():
|
|
# Replace with logic to fetch tables from your database
|
|
tables = ['table1', 'table2', 'table3']
|
|
return jsonify({'tables': tables})
|
|
|
|
@bp.route('/get_columns')
|
|
def get_columns():
|
|
table = request.args.get('table')
|
|
# Replace with logic to fetch columns for the selected table
|
|
columns = ['column1', 'column2', 'column3'] if table else []
|
|
return jsonify({'columns': columns})
|
|
|
|
@bp.route('/save_template', methods=['POST'])
|
|
def save_template():
|
|
data = request.get_json()
|
|
# Replace with logic to save the template to the database
|
|
print(f"Saving template: {data}")
|
|
return jsonify({'message': 'Template saved successfully!'})
|
|
|
|
@bp.route('/generate_pdf', methods=['POST'])
|
|
def generate_pdf():
|
|
data = request.get_json()
|
|
width = data.get('width', 100) # Default width in mm
|
|
height = data.get('height', 50) # Default height in mm
|
|
columns = data.get('columns', [])
|
|
|
|
# Convert dimensions from mm to points (1 mm = 2.83465 points)
|
|
width_points = width * 2.83465
|
|
height_points = height * 2.83465
|
|
|
|
# Ensure the /static/label_templates folder exists
|
|
label_templates_folder = os.path.join(current_app.root_path, 'static', 'label_templates')
|
|
os.makedirs(label_templates_folder, exist_ok=True)
|
|
|
|
# Define the path for the PDF file
|
|
pdf_file_path = os.path.join(label_templates_folder, 'label_template.pdf')
|
|
|
|
# Create a PDF file
|
|
c = canvas.Canvas(pdf_file_path, pagesize=(width_points, height_points))
|
|
|
|
# Add content to the PDF
|
|
c.drawString(10, height_points - 20, "Label Template")
|
|
y_position = height_points - 40
|
|
for column in columns:
|
|
c.drawString(10, y_position, f"Column: {column}")
|
|
y_position -= 20
|
|
|
|
# Save the PDF
|
|
c.save()
|
|
|
|
return jsonify({'message': 'PDF generated successfully!', 'pdf_path': f'/static/label_templates/label_template.pdf'})
|
|
|
|
@warehouse_bp.route('/create_locations', methods=['GET', 'POST'])
|
|
def create_locations():
|
|
from app.warehouse import create_locations_handler
|
|
return create_locations_handler()
|
|
|
|
@warehouse_bp.route('/import_locations_csv', methods=['GET', 'POST'])
|
|
def import_locations_csv():
|
|
from app.warehouse import import_locations_csv_handler
|
|
return import_locations_csv_handler() |