Files
quality_recticel/py_app/app/order_labels.py
2025-10-05 14:32:47 -04:00

433 lines
18 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Order Labels Module - Handles CSV upload and processing for order label generation
Author: Auto-generated module for order management
"""
import mariadb
from flask import current_app, request, render_template, session, redirect, url_for, flash
import csv
import os
import json
import tempfile
from datetime import datetime
def get_db_connection():
"""Get database connection using external server configuration"""
settings_file = current_app.instance_path + '/external_server.conf'
settings = {}
with open(settings_file, 'r') as f:
for line in f:
key, value = line.strip().split('=', 1)
settings[key] = value
return mariadb.connect(
user=settings['username'],
password=settings['password'],
host=settings['server_domain'],
port=int(settings['port']),
database=settings['database_name']
)
def validate_order_row(row_data):
"""
Validate a single order row for required fields and data types
Required fields: Comanda Productie, Cantitate, Descr. Com. Prod
"""
errors = []
warnings = []
# Check required fields
if not row_data.get('comanda_productie', '').strip():
errors.append("Comanda Productie is required")
if not row_data.get('descr_com_prod', '').strip():
errors.append("Descr. Com. Prod is required")
# Validate Cantitate (quantity) - must be integer
cantitate_str = row_data.get('cantitate', '').strip()
if not cantitate_str:
errors.append("Cantitate is required")
else:
try:
cantitate = int(cantitate_str)
if cantitate <= 0:
errors.append("Cantitate must be a positive number")
elif cantitate > 999: # INT(3) limit
warnings.append("Cantitate exceeds 999 (will be truncated)")
except ValueError:
errors.append("Cantitate must be a valid number")
# Validate numeric fields (optional but must be valid if provided)
for field, max_val in [('nr_linie_com_client', 999), ('line_number', 999)]:
value = row_data.get(field, '').strip()
if value:
try:
num_val = int(value)
if num_val < 0:
warnings.append(f"{field} should be positive")
elif num_val > max_val:
warnings.append(f"{field} exceeds {max_val} (will be truncated)")
except ValueError:
errors.append(f"{field} must be a valid number")
# Validate data_livrare (optional date field)
data_livrare = row_data.get('data_livrare', '').strip()
if data_livrare:
try:
# Try to parse common date formats
for date_format in ['%Y-%m-%d', '%d/%m/%Y', '%m/%d/%Y', '%d.%m.%Y']:
try:
datetime.strptime(data_livrare, date_format)
break
except ValueError:
continue
else:
errors.append("data_livrare must be a valid date (YYYY-MM-DD, DD/MM/YYYY, MM/DD/YYYY, or DD.MM.YYYY)")
except Exception:
errors.append("data_livrare date format error")
# Validate string length limits
field_limits = {
'comanda_productie': 15,
'cod_articol': 15,
'descr_com_prod': 50,
'com_achiz_client': 25,
'customer_name': 50,
'customer_article_number': 25,
'open_for_order': 25
}
for field, max_len in field_limits.items():
value = row_data.get(field, '').strip()
if value and len(value) > max_len:
warnings.append(f"{field} exceeds {max_len} characters (will be truncated)")
return errors, warnings
def add_order_to_database(order_data):
"""
Add a single order to the order_for_labels table
Returns (success: bool, message: str)
"""
try:
conn = get_db_connection()
cursor = conn.cursor()
# Prepare data with proper types and limits
# Handle date conversion for data_livrare
data_livrare_value = None
data_livrare_str = order_data.get('data_livrare', '').strip()
if data_livrare_str:
try:
# Try to parse common date formats and convert to YYYY-MM-DD
for date_format in ['%Y-%m-%d', '%d/%m/%Y', '%m/%d/%Y', '%d.%m.%Y']:
try:
parsed_date = datetime.strptime(data_livrare_str, date_format)
data_livrare_value = parsed_date.strftime('%Y-%m-%d')
break
except ValueError:
continue
except Exception:
pass # Leave as None if parsing fails
insert_data = {
'comanda_productie': order_data.get('comanda_productie', '').strip()[:15],
'cod_articol': order_data.get('cod_articol', '').strip()[:15] or None,
'descr_com_prod': order_data.get('descr_com_prod', '').strip()[:50],
'cantitate': int(order_data.get('cantitate', 0)),
'data_livrare': data_livrare_value,
'dimensiune': order_data.get('dimensiune', '').strip()[:10] or None,
'com_achiz_client': order_data.get('com_achiz_client', '').strip()[:25] or None,
'nr_linie_com_client': int(order_data.get('nr_linie_com_client', 0)) if order_data.get('nr_linie_com_client', '').strip() else None,
'customer_name': order_data.get('customer_name', '').strip()[:50] or None,
'customer_article_number': order_data.get('customer_article_number', '').strip()[:25] or None,
'open_for_order': order_data.get('open_for_order', '').strip()[:25] or None,
'line_number': int(order_data.get('line_number', 0)) if order_data.get('line_number', '').strip() else None
}
sql = """
INSERT INTO order_for_labels
(comanda_productie, cod_articol, descr_com_prod, cantitate, data_livrare, dimensiune,
com_achiz_client, nr_linie_com_client, customer_name,
customer_article_number, open_for_order, line_number, printed_labels)
VALUES (%(comanda_productie)s, %(cod_articol)s, %(descr_com_prod)s, %(cantitate)s, %(data_livrare)s, %(dimensiune)s,
%(com_achiz_client)s, %(nr_linie_com_client)s, %(customer_name)s,
%(customer_article_number)s, %(open_for_order)s, %(line_number)s, 0)
"""
cursor.execute(sql, insert_data)
conn.commit()
conn.close()
return True, f"Successfully added order {insert_data['comanda_productie']}"
except mariadb.Error as e:
return False, f"Database error: {str(e)}"
except ValueError as e:
return False, f"Data validation error: {str(e)}"
except Exception as e:
return False, f"Unexpected error: {str(e)}"
def process_csv_file(file_path):
"""
Process uploaded CSV file and return parsed data with validation
Returns: (orders_data: list, validation_errors: list, validation_warnings: list)
"""
orders_data = []
all_errors = []
all_warnings = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
# Try to detect the CSV dialect
sample = f.read(1024)
f.seek(0)
# Create CSV reader
reader = csv.DictReader(f)
# Map possible column names (case-insensitive)
column_mapping = {
'comanda productie': 'comanda_productie',
'cod articol': 'cod_articol',
'descr. com. prod': 'descr_com_prod',
'cantitate': 'cantitate',
'datalivrare': 'data_livrare',
'data livrare': 'data_livrare',
'dimensiune': 'dimensiune',
'com.achiz.client': 'com_achiz_client',
'nr. linie com. client': 'nr_linie_com_client',
'customer name': 'customer_name',
'customer article number': 'customer_article_number',
'open for order': 'open_for_order',
'line': 'line_number'
}
for row_num, row in enumerate(reader, start=2): # Start at 2 (row 1 is header)
# Normalize column names and create order data
normalized_row = {}
for col_name, col_value in row.items():
if col_name:
col_key = col_name.lower().strip()
mapped_key = column_mapping.get(col_key, col_key.replace(' ', '_').replace('.', '_'))
normalized_row[mapped_key] = col_value.strip() if col_value else ''
# Validate the row
errors, warnings = validate_order_row(normalized_row)
if errors:
all_errors.extend([f"Row {row_num}: {error}" for error in errors])
else:
# Only add valid rows
orders_data.append(normalized_row)
if warnings:
all_warnings.extend([f"Row {row_num}: {warning}" for warning in warnings])
except UnicodeDecodeError:
# Try different encodings
try:
with open(file_path, 'r', encoding='latin-1') as f:
reader = csv.DictReader(f)
# ... repeat the same processing logic
except Exception as e:
all_errors.append(f"File encoding error: {str(e)}")
except Exception as e:
all_errors.append(f"File processing error: {str(e)}")
return orders_data, all_errors, all_warnings
def upload_orders_handler():
# Handle clear table POST
if request.method == 'POST' and request.form.get('clear_table') == '1':
session.pop('orders_csv_data', None)
session.pop('csv_filename', None)
session.pop('orders_csv_filepath', None)
session.pop('orders_validation_errors', None)
session.pop('orders_validation_warnings', None)
session.pop('leftover_orders', None)
session.pop('leftover_description', None)
# Remove the leftovers JSON file if it exists
leftovers_path = os.path.join(current_app.root_path, 'static', 'leftover_orders.json')
try:
if os.path.exists(leftovers_path):
os.remove(leftovers_path)
except Exception:
pass
flash('Leftover orders have been cleared.', 'info')
return redirect(url_for('main.view_orders'))
"""
Main handler for the upload orders functionality
Handles both CSV upload/preview and database insertion
"""
report = None
orders_data = []
validation_errors = []
validation_warnings = []
temp_dir = tempfile.gettempdir()
if request.method == 'POST':
# Handle file upload
file = request.files.get('csv_file')
if file and file.filename.endswith(('.csv', '.CSV')):
try:
# Save uploaded file
temp_path = os.path.join(temp_dir, file.filename)
file.save(temp_path)
# Store file info in session
session['csv_filename'] = file.filename
session['orders_csv_filepath'] = temp_path
# Process the CSV file
orders_data, validation_errors, validation_warnings = process_csv_file(temp_path)
# Store processed data in session
session['orders_csv_data'] = orders_data
session['orders_validation_errors'] = validation_errors
session['orders_validation_warnings'] = validation_warnings
flash(f"📁 File '{file.filename}' uploaded and processed successfully!", "info")
if validation_errors:
flash(f"⚠️ Found {len(validation_errors)} validation errors. Please fix them before importing.", "warning")
if validation_warnings:
flash(f" Found {len(validation_warnings)} warnings. Data will be adjusted automatically.", "info")
except Exception as e:
flash(f"❌ Error processing file: {str(e)}", "error")
# Handle database insertion
elif request.form.get('save_to_database') and 'orders_csv_data' in session:
orders_data = session['orders_csv_data']
if not orders_data:
flash("❌ No valid data to save to database.", "error")
else:
success_count = 0
failed_count = 0
failed_orders = []
for order in orders_data:
success, message = add_order_to_database(order)
if success:
success_count += 1
else:
failed_count += 1
order_with_error = order.copy()
order_with_error['error_message'] = message
failed_orders.append(order_with_error)
# Save leftovers to static/leftover_orders.json if any
if failed_orders:
static_folder = os.path.join(current_app.root_path, 'static')
os.makedirs(static_folder, exist_ok=True)
leftovers_path = os.path.join(static_folder, 'leftover_orders.json')
with open(leftovers_path, 'w', encoding='utf-8') as f:
json.dump(failed_orders, f, ensure_ascii=False, indent=2)
# Create report
report = f"✅ Successfully imported {success_count} orders."
if failed_count > 0:
report += f"{failed_count} orders failed to import."
leftover_description = (
"These orders were not uploaded to the database because they already exist. "
"Check your order list or modify them in the database."
)
session['leftover_orders'] = failed_orders
session['leftover_description'] = leftover_description
for failure in failed_orders[:5]: # Show first 5 failures
report += f"<br>• {failure}"
if len(failed_orders) > 5:
report += f"<br>• ... and {len(failed_orders) - 5} more failures."
# Clear session data after successful import
if success_count > 0:
session.pop('orders_csv_data', None)
session.pop('csv_filename', None)
session.pop('orders_csv_filepath', None)
session.pop('orders_validation_errors', None)
session.pop('orders_validation_warnings', None)
session.pop('leftover_orders', None)
session.pop('leftover_description', None)
flash(report, "success" if failed_count == 0 else "warning")
if failed_count > 0:
# Show leftover orders on the upload_orders page
# Try to reload leftovers from JSON if available
leftovers_path = os.path.join(current_app.root_path, 'static', 'leftover_orders.json')
orders_data = session.get('leftover_orders', [])
try:
if os.path.exists(leftovers_path):
with open(leftovers_path, 'r', encoding='utf-8') as f:
orders_data = json.load(f)
except Exception:
pass
leftover_description = session.get('leftover_description', '')
return render_template('upload_orders.html',
orders=orders_data,
leftover_description=leftover_description,
validation_errors=validation_errors,
validation_warnings=validation_warnings,
report=report)
else:
return redirect(url_for('main.view_orders'))
# Load data from session if available
elif 'orders_csv_data' in session:
orders_data = session['orders_csv_data']
validation_errors = session.get('orders_validation_errors', [])
validation_warnings = session.get('orders_validation_warnings', [])
return render_template('upload_orders.html',
orders=orders_data,
validation_errors=validation_errors,
validation_warnings=validation_warnings,
report=report)
def get_orders_from_database(limit=100):
"""
Retrieve orders from the database for display
Returns list of order dictionaries
"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT id, comanda_productie, cod_articol, descr_com_prod, cantitate,
data_livrare, dimensiune, com_achiz_client, nr_linie_com_client, customer_name,
customer_article_number, open_for_order, line_number,
printed_labels, created_at, updated_at
FROM order_for_labels
ORDER BY created_at DESC
LIMIT %s
""", (limit,))
orders = []
for row in cursor.fetchall():
orders.append({
'id': row[0],
'comanda_productie': row[1],
'cod_articol': row[2],
'descr_com_prod': row[3],
'cantitate': row[4],
'data_livrare': row[5],
'dimensiune': row[6],
'com_achiz_client': row[7],
'nr_linie_com_client': row[8],
'customer_name': row[9],
'customer_article_number': row[10],
'open_for_order': row[11],
'line_number': row[12],
'printed_labels': row[13],
'created_at': row[14],
'updated_at': row[15]
})
conn.close()
return orders
except Exception as e:
print(f"Error retrieving orders: {e}")
return []