Files
quality_recticel/py_app/app/order_labels.py
2025-09-17 19:12:30 +03:00

339 lines
14 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Order Labels Module - Handles CSV upload and processing for order label generation
Author: Auto-generated module for order management
"""
import mariadb
from flask import current_app, request, render_template, session, redirect, url_for, flash
import csv
import os
import tempfile
from datetime import datetime
def get_db_connection():
"""Get database connection using external server configuration"""
settings_file = current_app.instance_path + '/external_server.conf'
settings = {}
with open(settings_file, 'r') as f:
for line in f:
key, value = line.strip().split('=', 1)
settings[key] = value
return mariadb.connect(
user=settings['username'],
password=settings['password'],
host=settings['server_domain'],
port=int(settings['port']),
database=settings['database_name']
)
def validate_order_row(row_data):
"""
Validate a single order row for required fields and data types
Required fields: Comanda Productie, Cantitate, Descr. Com. Prod
"""
errors = []
warnings = []
# Check required fields
if not row_data.get('comanda_productie', '').strip():
errors.append("Comanda Productie is required")
if not row_data.get('descr_com_prod', '').strip():
errors.append("Descr. Com. Prod is required")
# Validate Cantitate (quantity) - must be integer
cantitate_str = row_data.get('cantitate', '').strip()
if not cantitate_str:
errors.append("Cantitate is required")
else:
try:
cantitate = int(cantitate_str)
if cantitate <= 0:
errors.append("Cantitate must be a positive number")
elif cantitate > 999: # INT(3) limit
warnings.append("Cantitate exceeds 999 (will be truncated)")
except ValueError:
errors.append("Cantitate must be a valid number")
# Validate numeric fields (optional but must be valid if provided)
for field, max_val in [('nr_linie_com_client', 999), ('line_number', 999)]:
value = row_data.get(field, '').strip()
if value:
try:
num_val = int(value)
if num_val < 0:
warnings.append(f"{field} should be positive")
elif num_val > max_val:
warnings.append(f"{field} exceeds {max_val} (will be truncated)")
except ValueError:
errors.append(f"{field} must be a valid number")
# Validate string length limits
field_limits = {
'comanda_productie': 15,
'cod_articol': 15,
'descr_com_prod': 50,
'com_achiz_client': 25,
'customer_name': 50,
'customer_article_number': 25,
'open_for_order': 25
}
for field, max_len in field_limits.items():
value = row_data.get(field, '').strip()
if value and len(value) > max_len:
warnings.append(f"{field} exceeds {max_len} characters (will be truncated)")
return errors, warnings
def add_order_to_database(order_data):
"""
Add a single order to the order_for_labels table
Returns (success: bool, message: str)
"""
try:
conn = get_db_connection()
cursor = conn.cursor()
# Prepare data with proper types and limits
insert_data = {
'comanda_productie': order_data.get('comanda_productie', '').strip()[:15],
'cod_articol': order_data.get('cod_articol', '').strip()[:15] or None,
'descr_com_prod': order_data.get('descr_com_prod', '').strip()[:50],
'cantitate': int(order_data.get('cantitate', 0)),
'com_achiz_client': order_data.get('com_achiz_client', '').strip()[:25] or None,
'nr_linie_com_client': int(order_data.get('nr_linie_com_client', 0)) if order_data.get('nr_linie_com_client', '').strip() else None,
'customer_name': order_data.get('customer_name', '').strip()[:50] or None,
'customer_article_number': order_data.get('customer_article_number', '').strip()[:25] or None,
'open_for_order': order_data.get('open_for_order', '').strip()[:25] or None,
'line_number': int(order_data.get('line_number', 0)) if order_data.get('line_number', '').strip() else None
}
sql = """
INSERT INTO order_for_labels
(comanda_productie, cod_articol, descr_com_prod, cantitate,
com_achiz_client, nr_linie_com_client, customer_name,
customer_article_number, open_for_order, line_number, printed_labels)
VALUES (%(comanda_productie)s, %(cod_articol)s, %(descr_com_prod)s, %(cantitate)s,
%(com_achiz_client)s, %(nr_linie_com_client)s, %(customer_name)s,
%(customer_article_number)s, %(open_for_order)s, %(line_number)s, 0)
"""
cursor.execute(sql, insert_data)
conn.commit()
conn.close()
return True, f"Successfully added order {insert_data['comanda_productie']}"
except mariadb.Error as e:
return False, f"Database error: {str(e)}"
except ValueError as e:
return False, f"Data validation error: {str(e)}"
except Exception as e:
return False, f"Unexpected error: {str(e)}"
def process_csv_file(file_path):
"""
Process uploaded CSV file and return parsed data with validation
Returns: (orders_data: list, validation_errors: list, validation_warnings: list)
"""
orders_data = []
all_errors = []
all_warnings = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
# Try to detect the CSV dialect
sample = f.read(1024)
f.seek(0)
# Create CSV reader
reader = csv.DictReader(f)
# Map possible column names (case-insensitive)
column_mapping = {
'comanda productie': 'comanda_productie',
'cod articol': 'cod_articol',
'descr. com. prod': 'descr_com_prod',
'cantitate': 'cantitate',
'com.achiz.client': 'com_achiz_client',
'nr. linie com. client': 'nr_linie_com_client',
'customer name': 'customer_name',
'customer article number': 'customer_article_number',
'open for order': 'open_for_order',
'line': 'line_number'
}
for row_num, row in enumerate(reader, start=2): # Start at 2 (row 1 is header)
# Normalize column names and create order data
normalized_row = {}
for col_name, col_value in row.items():
if col_name:
col_key = col_name.lower().strip()
mapped_key = column_mapping.get(col_key, col_key.replace(' ', '_').replace('.', '_'))
normalized_row[mapped_key] = col_value.strip() if col_value else ''
# Validate the row
errors, warnings = validate_order_row(normalized_row)
if errors:
all_errors.extend([f"Row {row_num}: {error}" for error in errors])
else:
# Only add valid rows
orders_data.append(normalized_row)
if warnings:
all_warnings.extend([f"Row {row_num}: {warning}" for warning in warnings])
except UnicodeDecodeError:
# Try different encodings
try:
with open(file_path, 'r', encoding='latin-1') as f:
reader = csv.DictReader(f)
# ... repeat the same processing logic
except Exception as e:
all_errors.append(f"File encoding error: {str(e)}")
except Exception as e:
all_errors.append(f"File processing error: {str(e)}")
return orders_data, all_errors, all_warnings
def upload_orders_handler():
"""
Main handler for the upload orders functionality
Handles both CSV upload/preview and database insertion
"""
report = None
orders_data = []
validation_errors = []
validation_warnings = []
temp_dir = tempfile.gettempdir()
if request.method == 'POST':
# Handle file upload
file = request.files.get('csv_file')
if file and file.filename.endswith(('.csv', '.CSV')):
try:
# Save uploaded file
temp_path = os.path.join(temp_dir, file.filename)
file.save(temp_path)
# Store file info in session
session['csv_filename'] = file.filename
session['orders_csv_filepath'] = temp_path
# Process the CSV file
orders_data, validation_errors, validation_warnings = process_csv_file(temp_path)
# Store processed data in session
session['orders_csv_data'] = orders_data
session['orders_validation_errors'] = validation_errors
session['orders_validation_warnings'] = validation_warnings
flash(f"📁 File '{file.filename}' uploaded and processed successfully!", "info")
if validation_errors:
flash(f"⚠️ Found {len(validation_errors)} validation errors. Please fix them before importing.", "warning")
if validation_warnings:
flash(f" Found {len(validation_warnings)} warnings. Data will be adjusted automatically.", "info")
except Exception as e:
flash(f"❌ Error processing file: {str(e)}", "error")
# Handle database insertion
elif request.form.get('save_to_database') and 'orders_csv_data' in session:
orders_data = session['orders_csv_data']
if not orders_data:
flash("❌ No valid data to save to database.", "error")
else:
success_count = 0
failed_count = 0
failed_orders = []
for order in orders_data:
success, message = add_order_to_database(order)
if success:
success_count += 1
else:
failed_count += 1
failed_orders.append(f"{order.get('comanda_productie', 'Unknown')}: {message}")
# Create report
report = f"✅ Successfully imported {success_count} orders."
if failed_count > 0:
report += f"{failed_count} orders failed to import."
for failure in failed_orders[:5]: # Show first 5 failures
report += f"<br>• {failure}"
if len(failed_orders) > 5:
report += f"<br>• ... and {len(failed_orders) - 5} more failures."
# Clear session data after successful import
if success_count > 0:
session.pop('orders_csv_data', None)
session.pop('csv_filename', None)
session.pop('orders_csv_filepath', None)
session.pop('orders_validation_errors', None)
session.pop('orders_validation_warnings', None)
flash(report, "success" if failed_count == 0 else "warning")
return redirect(url_for('main.upload_orders') + '#imported')
# Load data from session if available
elif 'orders_csv_data' in session:
orders_data = session['orders_csv_data']
validation_errors = session.get('orders_validation_errors', [])
validation_warnings = session.get('orders_validation_warnings', [])
return render_template('upload_orders.html',
orders=orders_data,
validation_errors=validation_errors,
validation_warnings=validation_warnings,
report=report)
def get_orders_from_database(limit=100):
"""
Retrieve orders from the database for display
Returns list of order dictionaries
"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT id, comanda_productie, cod_articol, descr_com_prod, cantitate,
com_achiz_client, nr_linie_com_client, customer_name,
customer_article_number, open_for_order, line_number,
printed_labels, created_at, updated_at
FROM order_for_labels
ORDER BY created_at DESC
LIMIT %s
""", (limit,))
orders = []
for row in cursor.fetchall():
orders.append({
'id': row[0],
'comanda_productie': row[1],
'cod_articol': row[2],
'descr_com_prod': row[3],
'cantitate': row[4],
'com_achiz_client': row[5],
'nr_linie_com_client': row[6],
'customer_name': row[7],
'customer_article_number': row[8],
'open_for_order': row[9],
'line_number': row[10],
'printed_labels': row[11],
'created_at': row[12],
'updated_at': row[13]
})
conn.close()
return orders
except Exception as e:
print(f"Error retrieving orders: {e}")
return []