Add import labels functionality to labels module

- Created import_labels.py with CSV and Excel file processing
- Implemented validation for order rows and date format handling
- Added import-labels route with preview and save functionality
- Created import_labels.html template with new app UI styling
- Added import card to labels module index
- Added openpyxl to requirements.txt for Excel support
This commit is contained in:
Quality App Developer
2026-02-10 13:28:35 +02:00
parent bb8cd011f5
commit 78033a498a
5 changed files with 721 additions and 2 deletions

View File

@@ -0,0 +1,316 @@
"""
Labels Module - Import Labels Data Functions
Handles CSV/Excel upload and processing for order label data
"""
import csv
import json
import os
import tempfile
import logging
from datetime import datetime
from app.database import get_db
logger = logging.getLogger(__name__)
def validate_order_row(row_data):
"""
Validate a single order row for required fields and data types
Required fields: comanda_productie, cantitate, descr_com_prod
"""
errors = []
warnings = []
# Check required fields
if not row_data.get('comanda_productie', '').strip():
errors.append("Comanda Productie is required")
if not row_data.get('descr_com_prod', '').strip():
errors.append("Descr. Com. Prod is required")
# Validate Cantitate (quantity) - must be integer
cantitate_str = row_data.get('cantitate', '').strip()
if not cantitate_str:
errors.append("Cantitate is required")
else:
try:
cantitate = int(float(cantitate_str))
if cantitate <= 0:
errors.append("Cantitate must be a positive number")
elif cantitate > 999:
warnings.append("Cantitate exceeds 999 (will be truncated)")
except ValueError:
errors.append("Cantitate must be a valid number")
# Validate numeric fields (optional but must be valid if provided)
for field in ['nr_linie_com_client', 'line_number']:
value = row_data.get(field, '').strip()
if value:
try:
num_val = int(value)
if num_val < 0:
warnings.append(f"{field} should be positive")
except ValueError:
errors.append(f"{field} must be a valid number")
# Validate data_livrare (optional date field)
data_livrare = row_data.get('data_livrare', '').strip()
if data_livrare:
try:
date_formats = [
'%Y-%m-%d', # 2024-03-12
'%Y-%m-%d %H:%M:%S', # 2024-03-12 00:00:00 (Excel format)
'%d/%m/%Y', # 12/03/2024
'%m/%d/%Y', # 03/12/2024
'%d.%m.%Y' # 12.03.2024
]
for date_format in date_formats:
try:
datetime.strptime(data_livrare, date_format)
break
except ValueError:
continue
else:
errors.append("data_livrare must be a valid date")
except Exception:
errors.append("data_livrare date format error")
return errors, warnings
def process_csv_file(file_path):
"""
Process a CSV file and return parsed orders data
"""
try:
orders_data = []
errors = []
warnings = []
with open(file_path, 'r', encoding='utf-8') as csvfile:
csv_reader = csv.DictReader(csvfile)
for row_num, row in enumerate(csv_reader, start=1):
# Normalize column names (remove spaces and special characters)
normalized_row = {}
for key, value in row.items():
normalized_key = key.strip().lower().replace(' ', '_').replace('.', '')
normalized_row[normalized_key] = value
# Validate row
row_errors, row_warnings = validate_order_row(normalized_row)
if row_errors:
errors.extend([f"Row {row_num}: {err}" for err in row_errors])
continue
if row_warnings:
warnings.extend([f"Row {row_num}: {warn}" for warn in row_warnings])
# Extract and clean data
try:
cantitate = int(float(normalized_row.get('cantitate', 0)))
nr_linie = normalized_row.get('nr_linie_com_client', '')
nr_linie = int(nr_linie) if nr_linie.strip() else None
line_num = normalized_row.get('line_number', '')
line_num = int(line_num) if line_num.strip() else None
data_livrare = normalized_row.get('data_livrare', '').strip()
if data_livrare:
# Parse and reformat date
date_formats = [
('%Y-%m-%d', '%Y-%m-%d'),
('%Y-%m-%d %H:%M:%S', '%Y-%m-%d'),
('%d/%m/%Y', '%Y-%m-%d'),
('%m/%d/%Y', '%Y-%m-%d'),
('%d.%m.%Y', '%Y-%m-%d')
]
for fmt_in, fmt_out in date_formats:
try:
parsed_date = datetime.strptime(data_livrare, fmt_in)
data_livrare = parsed_date.strftime(fmt_out)
break
except ValueError:
continue
else:
data_livrare = None
order = {
'comanda_productie': normalized_row.get('comanda_productie', '').strip(),
'cod_articol': normalized_row.get('cod_articol', '').strip(),
'descr_com_prod': normalized_row.get('descr_com_prod', '').strip(),
'cantitate': cantitate,
'com_achiz_client': normalized_row.get('com_achiz_client', '').strip(),
'nr_linie_com_client': nr_linie,
'customer_name': normalized_row.get('customer_name', '').strip(),
'customer_article_number': normalized_row.get('customer_article_number', '').strip(),
'open_for_order': normalized_row.get('open_for_order', '').strip(),
'line_number': line_num,
'data_livrare': data_livrare,
'dimensiune': normalized_row.get('dimensiune', '').strip()
}
orders_data.append(order)
except Exception as e:
errors.append(f"Row {row_num}: Error processing row - {str(e)}")
return orders_data, errors, warnings
except Exception as e:
logger.error(f"Error processing CSV file: {e}")
return [], [f"Error reading CSV file: {str(e)}"], []
def process_excel_file(file_path):
"""
Process an Excel file and return parsed orders data
"""
try:
import openpyxl
orders_data = []
errors = []
warnings = []
workbook = openpyxl.load_workbook(file_path, data_only=True)
worksheet = workbook.active
# Get headers from first row
headers = []
for cell in worksheet[1]:
if cell.value:
headers.append(str(cell.value).strip().lower().replace(' ', '_').replace('.', ''))
else:
headers.append('')
# Process data rows
for row_num, row in enumerate(worksheet.iter_rows(min_row=2, values_only=True), start=2):
# Create dictionary for this row
row_dict = {}
for col_idx, value in enumerate(row):
if col_idx < len(headers) and headers[col_idx]:
row_dict[headers[col_idx]] = str(value) if value is not None else ''
if not row_dict or not any(row_dict.values()):
# Skip empty rows
continue
# Validate row
row_errors, row_warnings = validate_order_row(row_dict)
if row_errors:
errors.extend([f"Row {row_num}: {err}" for err in row_errors])
continue
if row_warnings:
warnings.extend([f"Row {row_num}: {warn}" for warn in row_warnings])
# Extract and clean data
try:
cantitate = int(float(row_dict.get('cantitate', 0)))
nr_linie = row_dict.get('nr_linie_com_client', '')
nr_linie = int(nr_linie) if nr_linie.strip() else None
line_num = row_dict.get('line_number', '')
line_num = int(line_num) if line_num.strip() else None
data_livrare = row_dict.get('data_livrare', '').strip()
if data_livrare:
# Parse and reformat date
date_formats = [
('%Y-%m-%d', '%Y-%m-%d'),
('%Y-%m-%d %H:%M:%S', '%Y-%m-%d'),
('%d/%m/%Y', '%Y-%m-%d'),
('%m/%d/%Y', '%Y-%m-%d'),
('%d.%m.%Y', '%Y-%m-%d')
]
for fmt_in, fmt_out in date_formats:
try:
parsed_date = datetime.strptime(data_livrare, fmt_in)
data_livrare = parsed_date.strftime(fmt_out)
break
except ValueError:
continue
else:
data_livrare = None
order = {
'comanda_productie': row_dict.get('comanda_productie', '').strip(),
'cod_articol': row_dict.get('cod_articol', '').strip(),
'descr_com_prod': row_dict.get('descr_com_prod', '').strip(),
'cantitate': cantitate,
'com_achiz_client': row_dict.get('com_achiz_client', '').strip(),
'nr_linie_com_client': nr_linie,
'customer_name': row_dict.get('customer_name', '').strip(),
'customer_article_number': row_dict.get('customer_article_number', '').strip(),
'open_for_order': row_dict.get('open_for_order', '').strip(),
'line_number': line_num,
'data_livrare': data_livrare,
'dimensiune': row_dict.get('dimensiune', '').strip()
}
orders_data.append(order)
except Exception as e:
errors.append(f"Row {row_num}: Error processing row - {str(e)}")
workbook.close()
return orders_data, errors, warnings
except ImportError:
return [], ["openpyxl is required for Excel file processing"], []
except Exception as e:
logger.error(f"Error processing Excel file: {e}")
return [], [f"Error reading Excel file: {str(e)}"], []
def save_orders_to_database(orders_list):
"""
Save orders to the order_for_labels table
Returns tuple of (inserted_count, error_messages)
"""
try:
conn = get_db()
cursor = conn.cursor()
inserted_count = 0
errors = []
for index, order in enumerate(orders_list):
try:
cursor.execute("""
INSERT INTO order_for_labels (
comanda_productie, cod_articol, descr_com_prod, cantitate,
com_achiz_client, nr_linie_com_client, customer_name,
customer_article_number, open_for_order, line_number,
data_livrare, dimensiune, printed_labels
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 0)
""", (
order.get('comanda_productie'),
order.get('cod_articol'),
order.get('descr_com_prod'),
order.get('cantitate'),
order.get('com_achiz_client'),
order.get('nr_linie_com_client'),
order.get('customer_name'),
order.get('customer_article_number'),
order.get('open_for_order'),
order.get('line_number'),
order.get('data_livrare'),
order.get('dimensiune')
))
inserted_count += 1
except Exception as e:
errors.append(f"Order {order.get('comanda_productie', 'UNKNOWN')}: {str(e)}")
conn.commit()
cursor.close()
logger.info(f"Inserted {inserted_count} orders successfully")
return inserted_count, errors
except Exception as e:
logger.error(f"Error saving orders to database: {e}")
return 0, [f"Database error: {str(e)}"]

View File

@@ -2,8 +2,12 @@
Labels Module Routes
Handles label printing pages and API endpoints
"""
from flask import Blueprint, render_template, session, redirect, url_for, jsonify, request
from flask import Blueprint, render_template, session, redirect, url_for, jsonify, request, flash
import logging
import json
import uuid
import os
import tempfile
from app.database import get_db
from .print_module import (
@@ -12,6 +16,11 @@ from .print_module import (
update_order_printed_status,
search_orders_by_cp_code
)
from .import_labels import (
process_csv_file,
process_excel_file,
save_orders_to_database
)
logger = logging.getLogger(__name__)
@@ -54,6 +63,148 @@ def print_lost_labels():
return render_template('modules/labels/print_lost_labels.html')
@labels_bp.route('/import-labels', methods=['GET', 'POST'])
def import_labels():
"""Import labels data from CSV or Excel file"""
if 'user_id' not in session:
return redirect(url_for('main.login'))
if request.method == 'POST':
action = request.form.get('action', 'preview')
if action == 'preview':
# Handle file upload and show preview
if 'file' not in request.files:
flash('No file selected', 'error')
return redirect(request.url)
file = request.files['file']
if file.filename == '':
flash('No file selected', 'error')
return redirect(request.url)
filename_lower = file.filename.lower()
# Check file type
if not (filename_lower.endswith('.csv') or filename_lower.endswith('.xlsx') or filename_lower.endswith('.xls')):
flash('Please upload a CSV or Excel file (.csv, .xlsx, .xls)', 'error')
return redirect(request.url)
try:
# Save file temporarily
upload_id = str(uuid.uuid4())
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[1])
file.save(temp_file.name)
temp_file.close()
# Process file
if filename_lower.endswith('.csv'):
orders_data, errors, warnings = process_csv_file(temp_file.name)
else:
orders_data, errors, warnings = process_excel_file(temp_file.name)
# Clean up temp file
try:
os.unlink(temp_file.name)
except:
pass
# Save orders data to temp file
temp_data_file = f'/tmp/upload_{upload_id}.json'
with open(temp_data_file, 'w') as f:
json.dump(orders_data, f)
# Store in session
session['upload_id'] = upload_id
session['import_filename'] = file.filename
session.modified = True
# Get headers for preview
database_fields = [
'comanda_productie', 'cod_articol', 'descr_com_prod', 'cantitate',
'data_livrare', 'dimensiune', 'com_achiz_client', 'nr_linie_com_client',
'customer_name', 'customer_article_number', 'open_for_order', 'line_number'
]
headers = [field for field in database_fields if field in orders_data[0].keys()] if orders_data else database_fields
preview_data = orders_data[:10]
# Flash any warnings/errors
for warning in warnings[:5]:
flash(warning, 'warning')
if len(warnings) > 5:
flash(f'... and {len(warnings) - 5} more warnings', 'warning')
for error in errors[:10]:
flash(error, 'error')
if len(errors) > 10:
flash(f'... and {len(errors) - 10} more errors', 'error')
if not orders_data:
flash('No valid data found in file', 'error')
return redirect(request.url)
return render_template('modules/labels/import_labels.html',
preview_data=preview_data,
headers=headers,
show_preview=True,
filename=file.filename,
total_orders=len(orders_data))
except Exception as e:
logger.error(f"Error processing import file: {e}")
flash(f'Error processing file: {str(e)}', 'error')
return redirect(request.url)
elif action == 'save':
# Save data to database
upload_id = session.get('upload_id')
if not upload_id:
flash('No data to save. Please upload a file first.', 'error')
return redirect(url_for('labels.import_labels'))
try:
# Load orders data from temp file
temp_data_file = f'/tmp/upload_{upload_id}.json'
with open(temp_data_file, 'r') as f:
orders_data = json.load(f)
# Save to database
inserted_count, errors = save_orders_to_database(orders_data)
# Clean up
try:
os.unlink(temp_data_file)
except:
pass
session.pop('upload_id', None)
session.pop('import_filename', None)
session.modified = True
# Flash results
if errors:
for error in errors[:5]:
flash(error, 'error')
if len(errors) > 5:
flash(f'... and {len(errors) - 5} more errors', 'error')
flash(f'Imported {inserted_count} orders with {len(errors)} errors', 'warning')
else:
flash(f'Successfully imported {inserted_count} orders for labels', 'success')
return redirect(url_for('labels.import_labels'))
except Exception as e:
logger.error(f"Error saving import data: {e}")
flash(f'Error saving data: {str(e)}', 'error')
return redirect(url_for('labels.import_labels'))
# GET request - show the import form
return render_template('modules/labels/import_labels.html')
@labels_bp.route('/help/<page>', methods=['GET'])
def help(page='index'):
"""Help page for labels module"""