Files
quality_recticel/py_app/app/daily_mirror.py
Quality System Admin 87469ecb8e starting daily mirror
2025-10-25 02:15:54 +03:00

1016 lines
40 KiB
Python

"""
Daily Mirror Module - Business Intelligence and Production Reporting
Quality Recticel Application
This module provides comprehensive daily production reporting and analytics,
including order tracking, quality control metrics, and historical analysis.
"""
from flask import Blueprint, request, jsonify, render_template, flash, redirect, url_for, session, current_app
from datetime import datetime, timedelta, date
import json
import pandas as pd
import os
from werkzeug.utils import secure_filename
from app.print_module import get_db_connection
from app.daily_mirror_db_setup import DailyMirrorDatabase
# Create Blueprint for Daily Mirror routes
daily_mirror_bp = Blueprint('daily_mirror', __name__, url_prefix='/daily_mirror')
def check_daily_mirror_access():
"""Helper function to check if user has access to Daily Mirror functionality"""
# Check if user is logged in
if 'user' not in session:
flash('Please log in to access this page.')
return redirect(url_for('main.login'))
# Check if user has admin+ access
user_role = session.get('role', '')
if user_role not in ['superadmin', 'admin']:
flash('Access denied: Admin privileges required for Daily Mirror.')
return redirect(url_for('main.dashboard'))
return None # Access granted
def check_daily_mirror_api_access():
"""Helper function to check API access for Daily Mirror"""
# Check if user is logged in and has admin+ access
if 'user' not in session:
return jsonify({'error': 'Authentication required'}), 401
user_role = session.get('role', '')
if user_role not in ['superadmin', 'admin']:
return jsonify({'error': 'Admin privileges required'}), 403
return None # Access granted
class DailyMirrorManager:
"""Main class for managing Daily Mirror functionality"""
def __init__(self):
self.module_name = "daily_mirror"
self.module_display_name = "Daily Mirror"
self.module_description = "Business Intelligence and Production Reporting"
def get_daily_production_data(self, report_date):
"""Get comprehensive daily production data for a specific date"""
try:
conn = get_db_connection()
cursor = conn.cursor()
# Initialize report data structure
report_data = {
'date': report_date,
'orders_quantity': 0,
'production_launched': 0,
'production_finished': 0,
'orders_delivered': 0,
'operators': {
'active_operators': 0,
'operator_performance': []
},
'production_efficiency': {
'launch_rate': 0,
'completion_rate': 0,
'delivery_rate': 0
}
}
# Get orders data from order_for_labels table
cursor.execute("""
SELECT COUNT(*) as total_orders,
SUM(cantitate) as total_quantity,
SUM(printed_labels) as total_printed,
COUNT(DISTINCT customer_name) as unique_customers
FROM order_for_labels
WHERE DATE(created_at) = ?
""", (report_date,))
orders_row = cursor.fetchone()
if orders_row:
report_data['orders_quantity'] = orders_row[1] or 0
report_data['production_launched'] = orders_row[0] or 0
# Get production data from dm_production_orders if available
cursor.execute("""
SELECT COUNT(*) as total_production,
SUM(CASE WHEN production_status = 'FINISHED' THEN 1 ELSE 0 END) as finished_production,
SUM(CASE WHEN end_of_quilting IS NOT NULL THEN 1 ELSE 0 END) as quilting_done,
SUM(CASE WHEN end_of_sewing IS NOT NULL THEN 1 ELSE 0 END) as sewing_done,
COUNT(DISTINCT customer_code) as unique_customers
FROM dm_production_orders
WHERE DATE(data_planificare) = ?
""", (report_date,))
production_row = cursor.fetchone()
if production_row:
report_data['production_launched'] = max(report_data['production_launched'], production_row[0] or 0)
report_data['production_finished'] = production_row[1] or 0
report_data['orders_delivered'] = production_row[3] or 0 # Use sewing_done as delivery proxy
# Get operator count
cursor.execute("""
SELECT COUNT(DISTINCT CASE
WHEN t1_operator_name IS NOT NULL THEN t1_operator_name
WHEN t2_operator_name IS NOT NULL THEN t2_operator_name
WHEN t3_operator_name IS NOT NULL THEN t3_operator_name
END) as active_operators
FROM dm_production_orders
WHERE DATE(data_planificare) = ?
""", (report_date,))
operator_row = cursor.fetchone()
if operator_row:
report_data['operators']['active_operators'] = operator_row[0] or 0
# Calculate efficiency metrics
if report_data['production_launched'] > 0:
report_data['production_efficiency'] = {
'launch_rate': 100, # All launched orders are 100% launched
'completion_rate': (report_data['production_finished'] / report_data['production_launched']) * 100,
'delivery_rate': (report_data['orders_delivered'] / report_data['production_launched']) * 100
}
cursor.close()
conn.close()
return report_data
except Exception as e:
print(f"Error getting daily production data: {e}")
return None
def get_historical_data(self, start_date, end_date):
"""Get historical production data for date range"""
try:
conn = get_db_connection()
cursor = conn.cursor()
# Get daily aggregated data for the date range
cursor.execute("""
SELECT DATE(created_at) as report_date,
COUNT(*) as orders_count,
SUM(cantitate) as total_quantity,
SUM(printed_labels) as total_printed,
COUNT(DISTINCT customer_name) as unique_customers
FROM order_for_labels
WHERE DATE(created_at) BETWEEN %s AND %s
GROUP BY DATE(created_at)
ORDER BY report_date DESC
""", (start_date, end_date))
orders_data = {}
for row in cursor.fetchall():
date_str = str(row[0])
orders_data[date_str] = {
'orders_count': row[1] or 0,
'orders_quantity': row[2] or 0,
'production_launched': row[3] or 0,
'unique_customers': row[4] or 0
}
# Get production data from dm_production_orders if available
cursor.execute("""
SELECT DATE(data_planificare) as production_date,
COUNT(*) as total_production,
SUM(CASE WHEN production_status = 'FINISHED' THEN 1 ELSE 0 END) as finished_production,
SUM(CASE WHEN end_of_sewing IS NOT NULL THEN 1 ELSE 0 END) as sewing_done,
COUNT(DISTINCT customer_code) as unique_customers
FROM dm_production_orders
WHERE DATE(data_planificare) BETWEEN %s AND %s
GROUP BY DATE(data_planificare)
ORDER BY production_date DESC
""", (start_date, end_date))
production_data = {}
for row in cursor.fetchall():
date_str = str(row[0])
production_data[date_str] = {
'production_launched': row[1] or 0,
'production_finished': row[2] or 0,
'orders_delivered': row[3] or 0, # Use sewing_done as delivery proxy
'unique_customers': row[4] or 0
}
conn.close()
# Combine data by date
all_dates = set(orders_data.keys()) | set(production_data.keys())
history_data = []
for date_str in sorted(all_dates, reverse=True):
orders_info = orders_data.get(date_str, {
'orders_count': 0, 'orders_quantity': 0,
'production_launched': 0, 'unique_customers': 0
})
production_info = production_data.get(date_str, {
'production_launched': 0, 'production_finished': 0,
'orders_delivered': 0, 'unique_customers': 0
})
day_data = {
'date': date_str,
'orders_quantity': orders_info['orders_quantity'],
'production_launched': max(orders_info['production_launched'], production_info['production_launched']),
'production_finished': production_info['production_finished'],
'orders_delivered': production_info['orders_delivered'],
'unique_customers': max(orders_info['unique_customers'], production_info['unique_customers'])
}
history_data.append(day_data)
return history_data
except Exception as e:
print(f"Error getting historical data: {e}")
return []
def generate_trend_analysis(self, history_data):
"""Generate trend analysis from historical data"""
try:
if not history_data or len(history_data) < 2:
return None
# Calculate moving averages and trends
trends = {
'orders_quantity': [],
'production_efficiency': [],
'daily_performance': []
}
for day in history_data:
trends['orders_quantity'].append({
'date': day['date'],
'value': day['orders_quantity']
})
# Calculate efficiency rates
orders_qty = day['orders_quantity']
if orders_qty > 0:
launch_rate = round((day['production_launched'] / orders_qty * 100), 1)
completion_rate = round((day['production_finished'] / orders_qty * 100), 1)
delivery_rate = round((day['orders_delivered'] / orders_qty * 100), 1)
else:
launch_rate = completion_rate = delivery_rate = 0
trends['production_efficiency'].append({
'date': day['date'],
'launch_rate': launch_rate,
'completion_rate': completion_rate,
'delivery_rate': delivery_rate
})
trends['daily_performance'].append({
'date': day['date'],
'orders_quantity': day['orders_quantity'],
'production_launched': day['production_launched'],
'production_finished': day['production_finished'],
'orders_delivered': day['orders_delivered']
})
return trends
except Exception as e:
print(f"Error generating trend analysis: {e}")
return None
# Initialize the Daily Mirror manager
daily_mirror_manager = DailyMirrorManager()
# Route handler functions
@daily_mirror_bp.route('/main')
def daily_mirror_main_route():
"""Main Daily Mirror page - central hub for all Daily Mirror functionalities"""
access_check = check_daily_mirror_access()
if access_check:
return access_check
try:
from datetime import datetime
# Get current date for default values
today = datetime.now().strftime('%Y-%m-%d')
# Get quick stats for dashboard display
quick_stats = daily_mirror_manager.get_daily_production_data(today)
return render_template('daily_mirror_main.html',
today=today,
quick_stats=quick_stats,
module_info={
'name': daily_mirror_manager.module_name,
'display_name': daily_mirror_manager.module_display_name,
'description': daily_mirror_manager.module_description
})
except Exception as e:
print(f"Error loading Daily Mirror main page: {e}")
flash('Error loading Daily Mirror main page.', 'error')
return redirect(url_for('main.dashboard'))
@daily_mirror_bp.route('/')
def daily_mirror_route():
"""Daily Mirror report generation page"""
access_check = check_daily_mirror_access()
if access_check:
return access_check
try:
from datetime import datetime
# Get current date for default values
today = datetime.now().strftime('%Y-%m-%d')
return render_template('daily_mirror.html', today=today)
except Exception as e:
print(f"Error loading Daily Mirror report page: {e}")
flash('Error loading Daily Mirror report page.', 'error')
return redirect(url_for('daily_mirror.daily_mirror_main_route'))
@daily_mirror_bp.route('/history')
def daily_mirror_history_route():
"""Daily Mirror history and trend analysis page"""
access_check = check_daily_mirror_access()
if access_check:
return access_check
try:
from datetime import datetime, timedelta
# Get last 30 days of data for history view
end_date = datetime.now()
start_date = end_date - timedelta(days=30)
return render_template('daily_mirror_history.html',
start_date=start_date.strftime('%Y-%m-%d'),
end_date=end_date.strftime('%Y-%m-%d'))
except Exception as e:
print(f"Error loading Daily Mirror history page: {e}")
flash('Error loading Daily Mirror history page.', 'error')
return redirect(url_for('daily_mirror.daily_mirror_main_route'))
@daily_mirror_bp.route('/build_database', methods=['GET', 'POST'])
def daily_mirror_build_database():
"""Daily Mirror - Build Database: Upload Excel files to populate database tables"""
access_check = check_daily_mirror_access()
if access_check:
return access_check
if request.method == 'POST':
is_ajax = request.headers.get('X-Requested-With') == 'XMLHttpRequest'
try:
# Check if file was uploaded
if 'excel_file' not in request.files:
if is_ajax:
return jsonify({'error': 'No file selected.'}), 400
flash('No file selected.', 'error')
return redirect(request.url)
file = request.files['excel_file']
if file.filename == '':
if is_ajax:
return jsonify({'error': 'No file selected.'}), 400
flash('No file selected.', 'error')
return redirect(request.url)
if not file.filename.lower().endswith(('.xlsx', '.xls')):
if is_ajax:
return jsonify({'error': 'Please upload an Excel file (.xlsx or .xls).'}), 400
flash('Please upload an Excel file (.xlsx or .xls).', 'error')
return redirect(request.url)
target_table = request.form.get('target_table', '')
if not target_table:
if is_ajax:
return jsonify({'error': 'Please select a target table.'}), 400
flash('Please select a target table.', 'error')
return redirect(request.url)
filename = secure_filename(file.filename)
temp_path = os.path.join('/tmp', f'upload_{filename}')
file.save(temp_path)
try:
success_count = 0
error_count = 0
created_rows = 0
updated_rows = 0
try:
df = pd.read_excel(temp_path)
except Exception as excel_error:
if is_ajax:
return jsonify({'error': f'Error reading Excel file: {str(excel_error)}'}), 400
flash(f'Error reading Excel file: {str(excel_error)}', 'error')
return redirect(request.url)
dm_db = DailyMirrorDatabase()
if not dm_db.connect():
if is_ajax:
return jsonify({'error': 'Database connection failed.'}), 500
flash('Database connection failed.', 'error')
return redirect(request.url)
try:
result = None
if target_table == 'production_data':
result = dm_db.import_production_data(temp_path)
elif target_table == 'orders_data':
result = dm_db.import_orders_data(temp_path)
elif target_table == 'delivery_data':
result = dm_db.import_delivery_data(temp_path)
else:
if is_ajax:
return jsonify({'error': f'Unknown target table: {target_table}'}), 400
flash(f'Unknown target table: {target_table}', 'error')
return redirect(request.url)
if result:
success_count = result.get('success_count', 0)
error_count = result.get('error_count', 0)
total_rows = result.get('total_rows', 0)
created_rows = result.get('created_count', 0)
updated_rows = result.get('updated_count', 0)
if is_ajax:
return jsonify({
'total_rows': total_rows,
'created_rows': created_rows,
'updated_rows': updated_rows,
'error_count': error_count
})
else:
if is_ajax:
return jsonify({'error': 'Import failed.'}), 500
flash('Import failed.', 'error')
return redirect(request.url)
finally:
dm_db.disconnect()
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
except Exception as e:
if is_ajax:
return jsonify({'error': f'Error processing file: {str(e)}'}), 500
flash(f'Error processing file: {str(e)}', 'error')
# For GET request, show the upload form
try:
# Get available tables for the dropdown
# Get list of tables (customized for our database schema)
available_tables = [
{
'name': 'production_data',
'display': 'Production Data (Comenzi Productie)',
'description': 'Production orders with timeline, quality stages, and machine data'
},
{
'name': 'orders_data',
'display': 'Orders Data (Vizual. Artic. Comenzi Deschise)',
'description': 'Open orders with customer, article, and delivery information'
},
{
'name': 'delivery_data',
'display': 'Delivery Data (Articole livrate)',
'description': 'Shipped and delivered orders with dates and quantities'
}
]
return render_template('daily_mirror_build_database.html',
available_tables=available_tables)
except Exception as e:
print(f"Error loading Build Database page: {e}")
flash('Error loading Build Database page.', 'error')
return redirect(url_for('daily_mirror.daily_mirror_main_route'))
@daily_mirror_bp.route('/api/data', methods=['GET'])
def api_daily_mirror_data():
"""API endpoint to get daily production data for reports"""
access_check = check_daily_mirror_api_access()
if access_check:
return access_check
try:
# Get date parameter or use today
report_date = request.args.get('date', datetime.now().strftime('%Y-%m-%d'))
print(f"DEBUG: Getting daily mirror data for date: {report_date}")
# Use the manager to get data
report_data = daily_mirror_manager.get_daily_production_data(report_date)
if report_data is None:
return jsonify({'error': 'Failed to retrieve daily production data'}), 500
print(f"DEBUG: Daily mirror data retrieved successfully for {report_date}")
return jsonify(report_data)
except Exception as e:
print(f"Error getting daily mirror data: {e}")
return jsonify({'error': str(e)}), 500
@daily_mirror_bp.route('/api/history_data', methods=['GET'])
def api_daily_mirror_history_data():
"""API endpoint to get historical daily production data"""
access_check = check_daily_mirror_api_access()
if access_check:
return access_check
try:
from datetime import datetime, timedelta
# Get date range parameters
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
if not start_date or not end_date:
# Default to last 30 days
end_date_obj = datetime.now()
start_date_obj = end_date_obj - timedelta(days=30)
start_date = start_date_obj.strftime('%Y-%m-%d')
end_date = end_date_obj.strftime('%Y-%m-%d')
print(f"DEBUG: Getting daily mirror history from {start_date} to {end_date}")
# Use the manager to get historical data
history_result = daily_mirror_manager.get_historical_data(start_date, end_date)
if history_result is None:
return jsonify({'error': 'Failed to retrieve historical data'}), 500
# Generate trend analysis
trends = daily_mirror_manager.generate_trend_analysis(history_result['history'])
if trends:
history_result['trends'] = trends
print(f"DEBUG: Retrieved {history_result['total_days']} days of history data")
return jsonify(history_result)
except Exception as e:
print(f"Error getting daily mirror history data: {e}")
return jsonify({'error': str(e)}), 500
# =============================================
# TUNE DATABASE ROUTES
# =============================================
@daily_mirror_bp.route('/tune/production')
def tune_production_data():
"""Tune Production Orders Data - Edit and update production records"""
access_check = check_daily_mirror_access()
if access_check:
return access_check
return render_template('daily_mirror_tune_production.html')
@daily_mirror_bp.route('/tune/orders')
def tune_orders_data():
"""Tune Customer Orders Data - Edit and update order records"""
access_check = check_daily_mirror_access()
if access_check:
return access_check
return render_template('daily_mirror_tune_orders.html')
@daily_mirror_bp.route('/api/tune/orders_data', methods=['GET'])
def api_get_orders_data():
"""API endpoint to get orders data for editing"""
access_check = check_daily_mirror_access()
if access_check:
return access_check
try:
db = DailyMirrorDatabase()
db.connect()
cursor = db.connection.cursor()
# Get pagination parameters
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 50))
search = request.args.get('search', '').strip()
status_filter = request.args.get('status', '').strip()
customer_filter = request.args.get('customer', '').strip()
# Build WHERE clause for filters
where_conditions = []
params = []
if search:
where_conditions.append("""
(order_id LIKE ? OR customer_name LIKE ? OR article_code LIKE ? OR
article_description LIKE ? OR client_order LIKE ?)
""")
search_param = f'%{search}%'
params.extend([search_param] * 5)
if status_filter:
where_conditions.append("order_status = ?")
params.append(status_filter)
if customer_filter:
where_conditions.append("customer_code = ?")
params.append(customer_filter)
where_clause = ""
if where_conditions:
where_clause = "WHERE " + " AND ".join(where_conditions)
# Get total count
count_query = f"SELECT COUNT(*) FROM dm_orders {where_clause}"
cursor.execute(count_query, params)
total_records = cursor.fetchone()[0]
# Calculate offset
offset = (page - 1) * per_page
# Get paginated data
data_query = f"""
SELECT id, order_id, customer_code, customer_name, client_order,
article_code, article_description, quantity_requested,
delivery_date, order_status, priority, product_group, order_date
FROM dm_orders {where_clause}
ORDER BY order_date DESC, order_id
LIMIT ? OFFSET ?
"""
cursor.execute(data_query, params + [per_page, offset])
records = cursor.fetchall()
# Format data for JSON response
data = []
for record in records:
data.append({
'id': record[0],
'order_id': record[1],
'customer_code': record[2],
'customer_name': record[3],
'client_order': record[4],
'article_code': record[5],
'article_description': record[6],
'quantity_requested': record[7],
'delivery_date': record[8].strftime('%Y-%m-%d') if record[8] else '',
'order_status': record[9],
'priority': record[10],
'product_group': record[11],
'order_date': record[12].strftime('%Y-%m-%d') if record[12] else ''
})
# Get unique customers for filter dropdown
cursor.execute("SELECT DISTINCT customer_code, customer_name FROM dm_orders ORDER BY customer_name")
customers = [{'code': row[0], 'name': row[1]} for row in cursor.fetchall()]
# Get unique statuses for filter dropdown
cursor.execute("SELECT DISTINCT order_status FROM dm_orders WHERE order_status IS NOT NULL ORDER BY order_status")
statuses = [row[0] for row in cursor.fetchall()]
return jsonify({
'success': True,
'data': data,
'total_records': total_records,
'page': page,
'per_page': per_page,
'total_pages': (total_records + per_page - 1) // per_page,
'customers': customers,
'statuses': statuses
})
except Exception as e:
current_app.logger.error(f"Error getting orders data: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@daily_mirror_bp.route('/api/tune/orders_data/<int:record_id>', methods=['PUT'])
def api_update_orders_data(record_id):
"""API endpoint to update orders record"""
access_check = check_daily_mirror_access()
if access_check:
return access_check
try:
data = request.get_json()
db = DailyMirrorDatabase()
db.connect()
cursor = db.connection.cursor()
# Update the record
update_query = """
UPDATE dm_orders SET
customer_code = ?, customer_name = ?, client_order = ?,
article_code = ?, article_description = ?, quantity_requested = ?,
delivery_date = ?, order_status = ?, priority = ?, product_group = ?,
order_date = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
"""
cursor.execute(update_query, (
data['customer_code'], data['customer_name'], data['client_order'],
data['article_code'], data['article_description'], data['quantity_requested'],
data['delivery_date'] if data['delivery_date'] else None,
data['order_status'], data['priority'], data['product_group'],
data['order_date'] if data['order_date'] else None,
record_id
))
db.connection.commit()
return jsonify({'success': True, 'message': 'Order updated successfully'})
except Exception as e:
current_app.logger.error(f"Error updating orders record: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@daily_mirror_bp.route('/tune/delivery')
def tune_delivery_data():
"""Tune Delivery Records Data - Edit and update delivery information"""
access_check = check_daily_mirror_access()
if access_check:
return access_check
return render_template('daily_mirror_tune_delivery.html')
@daily_mirror_bp.route('/api/tune/delivery_data', methods=['GET'])
def api_get_delivery_data():
"""API endpoint to get delivery data for editing"""
access_check = check_daily_mirror_access()
if access_check:
return access_check
try:
db = DailyMirrorDatabase()
db.connect()
cursor = db.connection.cursor()
# Get pagination parameters
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 50))
search = request.args.get('search', '').strip()
status_filter = request.args.get('status', '').strip()
customer_filter = request.args.get('customer', '').strip()
# Build WHERE clause for filters
where_conditions = []
params = []
if search:
where_conditions.append("""
(shipment_id LIKE ? OR customer_name LIKE ? OR article_code LIKE ? OR
article_description LIKE ? OR order_id LIKE ?)
""")
search_param = f'%{search}%'
params.extend([search_param] * 5)
if status_filter:
where_conditions.append("delivery_status = ?")
params.append(status_filter)
if customer_filter:
where_conditions.append("customer_code = ?")
params.append(customer_filter)
where_clause = ""
if where_conditions:
where_clause = "WHERE " + " AND ".join(where_conditions)
# Get total count
count_query = f"SELECT COUNT(*) FROM dm_deliveries {where_clause}"
cursor.execute(count_query, params)
total_records = cursor.fetchone()[0]
# Calculate offset
offset = (page - 1) * per_page
# Get paginated data
data_query = f"""
SELECT id, shipment_id, order_id, customer_code, customer_name,
article_code, article_description, quantity_delivered,
shipment_date, delivery_date, delivery_status, total_value
FROM dm_deliveries {where_clause}
ORDER BY shipment_date DESC, shipment_id
LIMIT ? OFFSET ?
"""
cursor.execute(data_query, params + [per_page, offset])
records = cursor.fetchall()
# Format data for JSON response
data = []
for record in records:
data.append({
'id': record[0],
'shipment_id': record[1],
'order_id': record[2],
'customer_code': record[3],
'customer_name': record[4],
'article_code': record[5],
'article_description': record[6],
'quantity_delivered': record[7],
'shipment_date': record[8].strftime('%Y-%m-%d') if record[8] else '',
'delivery_date': record[9].strftime('%Y-%m-%d') if record[9] else '',
'delivery_status': record[10],
'total_value': float(record[11]) if record[11] else 0.0
})
# Get unique customers for filter dropdown
cursor.execute("SELECT DISTINCT customer_code, customer_name FROM dm_deliveries ORDER BY customer_name")
customers = [{'code': row[0], 'name': row[1]} for row in cursor.fetchall()]
# Get unique statuses for filter dropdown
cursor.execute("SELECT DISTINCT delivery_status FROM dm_deliveries WHERE delivery_status IS NOT NULL ORDER BY delivery_status")
statuses = [row[0] for row in cursor.fetchall()]
return jsonify({
'success': True,
'data': data,
'total_records': total_records,
'page': page,
'per_page': per_page,
'total_pages': (total_records + per_page - 1) // per_page,
'customers': customers,
'statuses': statuses
})
except Exception as e:
current_app.logger.error(f"Error getting delivery data: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@daily_mirror_bp.route('/api/tune/delivery_data/<int:record_id>', methods=['PUT'])
def api_update_delivery_data(record_id):
"""API endpoint to update delivery record"""
access_check = check_daily_mirror_access()
if access_check:
return access_check
try:
data = request.get_json()
db = DailyMirrorDatabase()
db.connect()
cursor = db.connection.cursor()
# Update the record
update_query = """
UPDATE dm_deliveries SET
customer_code = ?, customer_name = ?, order_id = ?,
article_code = ?, article_description = ?, quantity_delivered = ?,
shipment_date = ?, delivery_date = ?, delivery_status = ?,
total_value = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
"""
cursor.execute(update_query, (
data['customer_code'], data['customer_name'], data['order_id'],
data['article_code'], data['article_description'], data['quantity_delivered'],
data['shipment_date'] if data['shipment_date'] else None,
data['delivery_date'] if data['delivery_date'] else None,
data['delivery_status'], data['total_value'],
record_id
))
db.connection.commit()
return jsonify({'success': True, 'message': 'Delivery record updated successfully'})
except Exception as e:
current_app.logger.error(f"Error updating delivery record: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@daily_mirror_bp.route('/api/tune/production_data', methods=['GET'])
def api_get_production_data():
"""API endpoint to get production data for editing"""
access_check = check_daily_mirror_api_access()
if access_check:
return access_check
try:
# Get pagination parameters
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 50))
search = request.args.get('search', '')
filter_status = request.args.get('status', '')
filter_customer = request.args.get('customer', '')
dm_db = DailyMirrorDatabase()
dm_db.connect()
cursor = dm_db.connection.cursor()
# Build the query with filters
where_conditions = []
params = []
if search:
where_conditions.append("(production_order LIKE %s OR customer_code LIKE %s OR article_code LIKE %s)")
params.extend([f"%{search}%", f"%{search}%", f"%{search}%"])
if filter_status:
where_conditions.append("production_status = %s")
params.append(filter_status)
if filter_customer:
where_conditions.append("customer_code = %s")
params.append(filter_customer)
where_clause = "WHERE " + " AND ".join(where_conditions) if where_conditions else ""
# Get total count
count_query = f"SELECT COUNT(*) FROM dm_production_orders {where_clause}"
cursor.execute(count_query, params)
total_records = cursor.fetchone()[0]
# Get paginated data
offset = (page - 1) * per_page
data_query = f"""
SELECT id, production_order, customer_code, customer_name, client_order,
article_code, article_description, quantity_requested, delivery_date,
production_status, end_of_quilting, end_of_sewing, machine_code,
data_planificare
FROM dm_production_orders {where_clause}
ORDER BY data_planificare DESC, production_order
LIMIT %s OFFSET %s
"""
cursor.execute(data_query, params + [per_page, offset])
records = []
for row in cursor.fetchall():
records.append({
'id': row[0],
'production_order': row[1],
'customer_code': row[2],
'customer_name': row[3],
'client_order': row[4],
'article_code': row[5],
'article_description': row[6],
'quantity_requested': row[7],
'delivery_date': str(row[8]) if row[8] else None,
'production_status': row[9],
'end_of_quilting': str(row[10]) if row[10] else None,
'end_of_sewing': str(row[11]) if row[11] else None,
'machine_code': row[12],
'data_planificare': str(row[13]) if row[13] else None
})
dm_db.disconnect()
return jsonify({
'records': records,
'total': total_records,
'page': page,
'per_page': per_page,
'total_pages': (total_records + per_page - 1) // per_page
})
except Exception as e:
print(f"Error getting production data: {e}")
return jsonify({'error': str(e)}), 500
@daily_mirror_bp.route('/api/tune/production_data/<int:record_id>', methods=['PUT'])
def api_update_production_data(record_id):
"""API endpoint to update production record"""
access_check = check_daily_mirror_api_access()
if access_check:
return access_check
try:
data = request.get_json()
dm_db = DailyMirrorDatabase()
dm_db.connect()
cursor = dm_db.connection.cursor()
update_query = """
UPDATE dm_production_orders SET
customer_code = %s, customer_name = %s, client_order = %s,
article_code = %s, article_description = %s, quantity_requested = %s,
delivery_date = %s, production_status = %s, machine_code = %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
"""
cursor.execute(update_query, (
data.get('customer_code'),
data.get('customer_name'),
data.get('client_order'),
data.get('article_code'),
data.get('article_description'),
data.get('quantity_requested'),
data.get('delivery_date') if data.get('delivery_date') else None,
data.get('production_status'),
data.get('machine_code'),
record_id
))
dm_db.connection.commit()
dm_db.disconnect()
return jsonify({'success': True, 'message': 'Production record updated successfully'})
except Exception as e:
print(f"Error updating production data: {e}")
return jsonify({'error': str(e)}), 500