From c4032dbfdd1721159f9be82d10e95c1f20ec71f6 Mon Sep 17 00:00:00 2001 From: "DESKTOP-064TTA1\\Fai LUK" Date: Sat, 7 Mar 2026 00:53:30 +0800 Subject: [PATCH] no message --- python/Bag1.py | 368 ++++++++++++++++++++++++++---------- python/bag1_settings.json | 4 +- python/last_batch_count.txt | 2 + 3 files changed, 267 insertions(+), 107 deletions(-) create mode 100644 python/last_batch_count.txt diff --git a/python/Bag1.py b/python/Bag1.py index d26590a..cb06d53 100644 --- a/python/Bag1.py +++ b/python/Bag1.py @@ -9,13 +9,15 @@ Run: python Bag1.py import json import os +import select import socket import sys +import threading import time import tkinter as tk -from datetime import date, timedelta +from datetime import date, datetime, timedelta from tkinter import messagebox, ttk -from typing import Optional +from typing import Callable, Optional import requests @@ -31,14 +33,15 @@ if getattr(sys, "frozen", False): else: _SETTINGS_DIR = os.path.dirname(os.path.abspath(__file__)) SETTINGS_FILE = os.path.join(_SETTINGS_DIR, "bag1_settings.json") +LASER_COUNTER_FILE = os.path.join(_SETTINGS_DIR, "last_batch_count.txt") DEFAULT_SETTINGS = { "api_ip": "localhost", "api_port": "8090", "dabag_ip": "", "dabag_port": "3008", - "laser_ip": "", - "laser_port": "9100", + "laser_ip": "192.168.17.10", + "laser_port": "45678", "label_com": "COM3", } @@ -83,7 +86,7 @@ def try_printer_connection(printer_name: str, sett: dict) -> bool: return False if printer_name == "激光機": ip = (sett.get("laser_ip") or "").strip() - port_str = (sett.get("laser_port") or "9100").strip() + port_str = (sett.get("laser_port") or "45678").strip() if not ip: return False try: @@ -111,8 +114,7 @@ def try_printer_connection(printer_name: str, sett: dict) -> bool: FONT_SIZE = 16 FONT_SIZE_BUTTONS = 15 FONT_SIZE_QTY = 12 # smaller for 數量 under batch no. -FONT_SIZE_ITEM_CODE = 20 # item code (larger for readability) -FONT_SIZE_ITEM_NAME = 26 # item name (bigger than item code) +FONT_SIZE_ITEM = 20 # item code and item name (larger for readability) FONT_FAMILY = "Microsoft JhengHei UI" # Traditional Chinese; fallback to TkDefaultFont # Column widths: item code own column; item name at least double, wraps in its column ITEM_CODE_WRAP = 140 # item code column width (long codes wrap under code only) @@ -135,7 +137,6 @@ PRINTER_CHECK_MS = 60 * 1000 # 1 minute when printer OK PRINTER_RETRY_MS = 30 * 1000 # 30 seconds when printer failed PRINTER_SOCKET_TIMEOUT = 3 DATAFLEX_SEND_TIMEOUT = 10 # seconds when sending ZPL to DataFlex -DATE_AUTO_RESET_SEC = 5 * 60 # 5 minutes: if no manual date change, auto-set to today def _zpl_escape(s: str) -> str: @@ -143,26 +144,6 @@ def _zpl_escape(s: str) -> str: return s.replace("\\", "\\\\").replace("^", "\\^") -def _split_by_word_count(text: str, max_words: int = 8) -> list[str]: - """Split text into segments of at most max_words (words = non-symbol chars; symbols not counted).""" - segments = [] - current = [] - count = 0 - for c in text: - if c.isalnum() or ("\u4e00" <= c <= "\u9fff") or ("\u3400" <= c <= "\u4dbf"): - count += 1 - current.append(c) - if count >= max_words: - segments.append("".join(current)) - current = [] - count = 0 - else: - current.append(c) - if current: - segments.append("".join(current)) - return segments if segments else [""] - - def generate_zpl_dataflex( batch_no: str, item_code: str, @@ -171,25 +152,25 @@ def generate_zpl_dataflex( font_bold: str = "E:STXihei.ttf", ) -> str: """ - Row 1 (from zero): QR code, then item name (rotated 90°). - Row 2: Batch number (left), item code (right). + Generate ZPL for DataFlex label (53 mm media, 90° rotated). + Uses UTF-8 (^CI28) and configurable .TTF fonts for Chinese (e.g. E:STXihei.ttf). """ desc = _zpl_escape((item_name or "—").strip()) code = _zpl_escape((item_code or "—").strip()) - batch_esc = _zpl_escape((batch_no or "").strip()) + batch = _zpl_escape(batch_no.strip()) return f"""^XA -^CI28 -^PW700 -^LL500 +^PW420 +^LL780 ^PO N -^FO10,20 -^BQR,4,7^FDQA,{batch_no}^FS -^FO170,20 -^A@R,72,72,{font_regular}^FD{desc}^FS -^FO0,260 -^A@R,72,72,{font_regular}^FD{batch_esc}^FS -^FO75,260 -^A@R,88,88,{font_bold}^FD{code}^FS +^CI28 +^FO70,70 +^A@R,60,60,{font_regular}^FD{desc}^FS +^FO220,70 +^A@R,50,50,{font_bold}^FD{code}^FS +^FO310,70 +^A@R,45,45,{font_bold}^FD批次: {batch}^FS +^FO150,420 +^BQN,2,6^FDQA,{batch_no}^FS ^XZ""" @@ -204,6 +185,162 @@ def send_zpl_to_dataflex(ip: str, port: int, zpl: str) -> None: sock.close() +def load_laser_last_count() -> tuple[int, Optional[str]]: + """Load last batch count and date from laser counter file. Returns (count, date_str).""" + if not os.path.exists(LASER_COUNTER_FILE): + return 0, None + try: + with open(LASER_COUNTER_FILE, "r", encoding="utf-8") as f: + lines = f.read().strip().splitlines() + if len(lines) >= 2: + return int(lines[1].strip()), lines[0].strip() + except Exception: + pass + return 0, None + + +def save_laser_last_count(date_str: str, count: int) -> None: + """Save laser batch count and date to file.""" + try: + with open(LASER_COUNTER_FILE, "w", encoding="utf-8") as f: + f.write(f"{date_str}\n{count}") + except Exception: + pass + + +LASER_PUSH_INTERVAL = 2 # seconds between pushes (like sample script) + + +def laser_push_loop( + ip: str, + port: int, + stop_event: threading.Event, + root: tk.Tk, + on_error: Callable[[str], None], +) -> None: + """ + Run in a background thread: persistent connection to EZCAD, push B{yymmdd}{count:03d};; + every LASER_PUSH_INTERVAL seconds. Resets count each new day. Uses counter file. + """ + conn = None + push_count, last_saved_date = load_laser_last_count() + while not stop_event.is_set(): + try: + if conn is None: + conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + conn.settimeout(0.4) + conn.connect((ip, port)) + now = datetime.now() + today_str = now.strftime("%y%m%d") + if last_saved_date != today_str: + push_count = 1 + last_saved_date = today_str + batch = f"B{today_str}{push_count:03d}" + reply = f"{batch};;" + conn.sendall(reply.encode("utf-8")) + save_laser_last_count(today_str, push_count) + rlist, _, _ = select.select([conn], [], [], 0.4) + if rlist: + data = conn.recv(4096) + if not data: + conn.close() + conn = None + push_count += 1 + for _ in range(int(LASER_PUSH_INTERVAL * 2)): + if stop_event.is_set(): + break + time.sleep(0.5) + except socket.timeout: + pass + except Exception as e: + if conn: + try: + conn.close() + except Exception: + pass + conn = None + try: + root.after(0, lambda msg=str(e): on_error(msg)) + except Exception: + pass + for _ in range(6): + if stop_event.is_set(): + break + time.sleep(0.5) + if conn: + try: + conn.close() + except Exception: + pass + + +def send_job_to_laser( + conn_ref: list, + ip: str, + port: int, + job_order_id: int, + item_code: str, + item_name: str, +) -> tuple[bool, str]: + """ + Send job order number; item code; item name to laser. Keeps connection open for next send. + conn_ref: [socket or None] - reused across calls; closed only when switching printer. + Format: {job_order_id};{item_code};{item_name};; + Returns (success, message). + """ + job_id_str = str(job_order_id) if job_order_id is not None else "" + code_str = (item_code or "").strip().replace(";", ",") + name_str = (item_name or "").strip().replace(";", ",") + reply = f"{job_id_str};{code_str};{name_str};;" + conn = conn_ref[0] + try: + if conn is None: + conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + conn.settimeout(3.0) + conn.connect((ip, port)) + conn_ref[0] = conn + conn.settimeout(3.0) + conn.sendall(reply.encode("utf-8")) + conn.settimeout(0.5) + try: + data = conn.recv(4096) + if data: + ack = data.decode("utf-8", errors="ignore").strip().lower() + if "receive" in ack and "invalid" not in ack: + return True, f"已送出激光機:{job_id_str};{code_str};{name_str}(已確認)" + except socket.timeout: + pass + return True, f"已送出激光機:{job_id_str};{code_str};{name_str}" + except (ConnectionRefusedError, socket.timeout, OSError) as e: + if conn_ref[0] is not None: + try: + conn_ref[0].close() + except Exception: + pass + conn_ref[0] = None + if isinstance(e, ConnectionRefusedError): + return False, f"無法連線至 {ip}:{port},請確認激光機已開機且 IP 正確。" + if isinstance(e, socket.timeout): + return False, f"連線逾時 ({ip}:{port}),請檢查網路與連接埠。" + return False, f"激光機送出失敗:{e}" + + +def send_job_to_laser_with_retry( + conn_ref: list, + ip: str, + port: int, + job_order_id: int, + item_code: str, + item_name: str, +) -> tuple[bool, str]: + """Send job to laser; on failure, retry once. Returns (success, message).""" + ok, msg = send_job_to_laser(conn_ref, ip, port, job_order_id, item_code, item_name) + if ok: + return True, msg + ok2, msg2 = send_job_to_laser(conn_ref, ip, port, job_order_id, item_code, item_name) + return ok2, msg2 + + def format_qty(val) -> str: """Format quantity: integer without .0, with thousand separator.""" if val is None: @@ -218,9 +355,8 @@ def format_qty(val) -> str: def batch_no(year: int, job_order_id: int) -> str: - """Batch no.: B + 2-digit year + jobOrderId zero-padded to 6 digits.""" - short_year = year % 100 - return f"B{short_year:02d}{job_order_id:06d}" + """Batch no.: B + 4-digit year + jobOrderId zero-padded to 6 digits.""" + return f"B{year}{job_order_id:06d}" def get_font(size: int = FONT_SIZE, bold: bool = False) -> tuple: @@ -252,33 +388,39 @@ def set_row_highlight(row_frame: tk.Frame, selected: bool) -> None: def on_job_order_click(jo: dict, batch: str) -> None: - """Row click handler (highlight already set; no popup).""" + """Show message and highlight row (keeps printing to selected printer).""" + item_code = jo.get("itemCode") or "—" + item_name = jo.get("itemName") or "—" + messagebox.showinfo( + "工單", + f'已點選:批次 {batch}\n品號 {item_code} {item_name}', + ) -def ask_bag_count(parent: tk.Tk) -> Optional[int]: +def ask_laser_count(parent: tk.Tk) -> Optional[int]: """ - When printer is 打袋機 DataFlex, ask how many bags: +50, +10, +5, +1, C, then 確認送出. + When printer is 激光機, ask how many times to send (like DataFlex). Returns count (>= 1), or -1 for continuous (C), or None if cancelled. """ - result: list[Optional[int]] = [None] + result: list = [None] count_ref = [0] continuous_ref = [False] win = tk.Toplevel(parent) - win.title("打袋列印數量") + win.title("激光機送出數量") win.geometry("420x200") win.transient(parent) win.grab_set() win.configure(bg=BG_TOP) - ttk.Label(win, text="列印多少個袋?", font=get_font(FONT_SIZE)).pack(pady=(12, 4)) - count_lbl = tk.Label(win, text="列印數量: 0", font=get_font(FONT_SIZE), bg=BG_TOP) + ttk.Label(win, text="送出多少次?", font=get_font(FONT_SIZE)).pack(pady=(12, 4)) + count_lbl = tk.Label(win, text="數量: 0", font=get_font(FONT_SIZE), bg=BG_TOP) count_lbl.pack(pady=4) def update_display(): if continuous_ref[0]: - count_lbl.configure(text="列印數量: 連續 (C)") + count_lbl.configure(text="數量: 連續 (C)") else: - count_lbl.configure(text=f"列印數量: {count_ref[0]}") + count_lbl.configure(text=f"數量: {count_ref[0]}") def add(n: int): continuous_ref[0] = False @@ -293,20 +435,19 @@ def ask_bag_count(parent: tk.Tk) -> Optional[int]: if continuous_ref[0]: result[0] = -1 elif count_ref[0] < 1: - messagebox.showwarning("打袋機", "請先按 +50、+10、+5 或 +1 選擇數量。", parent=win) + messagebox.showwarning("激光機", "請先按 +50、+10、+5 或 +1 選擇數量。", parent=win) return else: result[0] = count_ref[0] win.destroy() - btn_row1 = tk.Frame(win, bg=BG_TOP) - btn_row1.pack(pady=8) + btn_row = tk.Frame(win, bg=BG_TOP) + btn_row.pack(pady=8) for label, value in [("+50", 50), ("+10", 10), ("+5", 5), ("+1", 1)]: def make_add(v: int): return lambda: add(v) - ttk.Button(btn_row1, text=label, command=make_add(value), width=8).pack(side=tk.LEFT, padx=4) - ttk.Button(btn_row1, text="C", command=set_continuous, width=8).pack(side=tk.LEFT, padx=4) - + ttk.Button(btn_row, text=label, command=make_add(value), width=8).pack(side=tk.LEFT, padx=4) + ttk.Button(btn_row, text="C", command=set_continuous, width=8).pack(side=tk.LEFT, padx=4) ttk.Button(win, text="確認送出", command=confirm, width=14).pack(pady=12) win.protocol("WM_DELETE_WINDOW", win.destroy) win.wait_window() @@ -383,8 +524,8 @@ def main() -> None: status_frame.configure(bg=BG_STATUS_ERROR) status_lbl.configure(text="連接不到服務器", bg=BG_STATUS_ERROR, fg=FG_STATUS_ERROR) - def set_status_message(msg: str, is_error: bool = False): - """Show a temporary message on the status bar.""" + def set_status_message(msg: str, is_error: bool = False) -> None: + """Show a message on the status bar.""" if is_error: status_frame.configure(bg=BG_STATUS_ERROR) status_lbl.configure(text=msg, bg=BG_STATUS_ERROR, fg=FG_STATUS_ERROR) @@ -392,6 +533,11 @@ def main() -> None: status_frame.configure(bg=BG_STATUS_OK) status_lbl.configure(text=msg, bg=BG_STATUS_OK, fg=FG_STATUS_OK) + # Laser: keep connection open for repeated sends; close when switching away + laser_conn_ref: list = [None] + laser_thread_ref: list = [None] + laser_stop_ref: list = [None] + # Top: left [前一天] [date] [後一天] | right [printer dropdown] top = tk.Frame(root, padx=12, pady=12, bg=BG_TOP) top.pack(fill=tk.X) @@ -399,31 +545,23 @@ def main() -> None: date_var = tk.StringVar(value=date.today().isoformat()) printer_options = ["打袋機 DataFlex", "標簽機", "激光機"] printer_var = tk.StringVar(value=printer_options[0]) - last_manual_date_change_ref = [time.time()] # track when user last changed date manually - - def mark_manual_date_change(): - last_manual_date_change_ref[0] = time.time() def go_prev_day() -> None: try: d = date.fromisoformat(date_var.get().strip()) date_var.set((d - timedelta(days=1)).isoformat()) - mark_manual_date_change() load_job_orders(from_user_date_change=True) except ValueError: date_var.set(date.today().isoformat()) - mark_manual_date_change() load_job_orders(from_user_date_change=True) def go_next_day() -> None: try: d = date.fromisoformat(date_var.get().strip()) date_var.set((d + timedelta(days=1)).isoformat()) - mark_manual_date_change() load_job_orders(from_user_date_change=True) except ValueError: date_var.set(date.today().isoformat()) - mark_manual_date_change() load_job_orders(from_user_date_change=True) # 前一天 (previous day) with left arrow icon @@ -440,11 +578,6 @@ def main() -> None: ) date_entry.pack(side=tk.LEFT, padx=(0, 8), ipady=4) - # Track manual typing in date field as user date change - def on_date_entry_key(event): - mark_manual_date_change() - date_entry.bind("", on_date_entry_key) - # 後一天 (next day) with right arrow icon btn_next = ttk.Button(top, text="後一天 ▶", command=go_next_day) btn_next.pack(side=tk.LEFT, padx=(0, 8)) @@ -498,6 +631,15 @@ def main() -> None: def on_printer_selection_changed(*args) -> None: check_printer() + if printer_var.get() != "激光機": + if laser_stop_ref[0] is not None: + laser_stop_ref[0].set() + if laser_conn_ref[0] is not None: + try: + laser_conn_ref[0].close() + except Exception: + pass + laser_conn_ref[0] = None printer_var.trace_add("write", on_printer_selection_changed) @@ -668,7 +810,7 @@ def main() -> None: code_lbl = tk.Label( row, text=item_code, - font=get_font(FONT_SIZE_ITEM_CODE), + font=get_font(FONT_SIZE_ITEM), bg=BG_ROW, fg="black", wraplength=ITEM_CODE_WRAP, @@ -677,11 +819,11 @@ def main() -> None: ) code_lbl.pack(side=tk.LEFT, anchor=tk.NW, padx=(12, 8)) - # Column 3: item name only, bigger font, at least double width, wraps under its own column + # Column 3: item name only, same bigger font, at least double width, wraps under its own column name_lbl = tk.Label( row, text=item_name or "—", - font=get_font(FONT_SIZE_ITEM_NAME), + font=get_font(FONT_SIZE_ITEM), bg=BG_ROW, fg="black", wraplength=ITEM_NAME_WRAP, @@ -706,25 +848,18 @@ def main() -> None: if not ip: messagebox.showerror("打袋機", "請在設定中填寫打袋機 DataFlex 的 IP。") else: - count = ask_bag_count(root) - if count is not None: - item_code = j.get("itemCode") or "—" - item_name = j.get("itemName") or "—" - zpl = generate_zpl_dataflex(b, item_code, item_name) - n = 100 if count == -1 else count - try: - for i in range(n): - send_zpl_to_dataflex(ip, port, zpl) - if i < n - 1: - time.sleep(2) - msg = f"已送出列印:批次 {b} x {n} 張" if count != -1 else f"已送出列印:批次 {b} x {n} 張 (連續)" - set_status_message(msg, is_error=False) - except ConnectionRefusedError: - set_status_message(f"無法連線至 {ip}:{port},請確認印表機已開機且 IP 正確。", is_error=True) - except socket.timeout: - set_status_message(f"連線逾時 ({ip}:{port}),請檢查網路與連接埠。", is_error=True) - except OSError as err: - set_status_message(f"列印失敗:{err}", is_error=True) + item_code = j.get("itemCode") or "—" + item_name = j.get("itemName") or "—" + zpl = generate_zpl_dataflex(b, item_code, item_name) + try: + send_zpl_to_dataflex(ip, port, zpl) + messagebox.showinfo("打袋機", f"已送出列印:批次 {b}") + except ConnectionRefusedError: + messagebox.showerror("打袋機", f"無法連線至 {ip}:{port},請確認印表機已開機且 IP 正確。") + except socket.timeout: + messagebox.showerror("打袋機", f"連線逾時 ({ip}:{port}),請檢查網路與連接埠。") + except OSError as err: + messagebox.showerror("打袋機", f"列印失敗:{err}") elif printer_var.get() == "標簽機": count = ask_label_count(root) if count is not None: @@ -732,7 +867,37 @@ def main() -> None: msg = "已選擇連續列印標簽" else: msg = f"將列印 {count} 張標簽" - set_status_message(msg, is_error=False) + messagebox.showinfo("標簽機", msg) + elif printer_var.get() == "激光機": + ip = (settings.get("laser_ip") or "").strip() + port_str = (settings.get("laser_port") or "45678").strip() + try: + port = int(port_str) + except ValueError: + port = 45678 + if not ip: + set_status_message("請在設定中填寫激光機的 IP。", is_error=True) + else: + count = ask_laser_count(root) + if count is not None: + jo_id = j.get("id") + item_code_val = j.get("itemCode") or "" + item_name_val = j.get("itemName") or "" + n = 100 if count == -1 else count + sent = 0 + for i in range(n): + ok, msg = send_job_to_laser_with_retry( + laser_conn_ref, ip, port, jo_id, item_code_val, item_name_val + ) + if ok: + sent += 1 + else: + set_status_message(f"已送出 {sent} 次,第 {sent + 1} 次失敗:{msg}", is_error=True) + break + if i < n - 1: + time.sleep(0.2) + if sent == n: + set_status_message(f"已送出激光機:{sent} 次", is_error=False) on_job_order_click(j, b) for w in (row, left, batch_lbl, code_lbl, name_lbl): @@ -755,13 +920,6 @@ def main() -> None: if after_id_ref[0] is not None: root.after_cancel(after_id_ref[0]) after_id_ref[0] = None - # Auto-reset date to today if user hasn't manually changed it recently (for 24x7 use) - if not from_user_date_change: - elapsed = time.time() - last_manual_date_change_ref[0] - today_str = date.today().isoformat() - if elapsed > DATE_AUTO_RESET_SEC and date_var.get().strip() != today_str: - date_var.set(today_str) - from_user_date_change = True # treat as date change to reset selection/scroll date_str = date_var.get().strip() try: plan_start = date.fromisoformat(date_str) diff --git a/python/bag1_settings.json b/python/bag1_settings.json index 37f71ef..a580c57 100644 --- a/python/bag1_settings.json +++ b/python/bag1_settings.json @@ -3,7 +3,7 @@ "api_port": "8090", "dabag_ip": "192.168.17.27", "dabag_port": "3008", - "laser_ip": "192.168.7.77", - "laser_port": "9100", + "laser_ip": "192.168.17.10", + "laser_port": "45678", "label_com": "COM2" } \ No newline at end of file diff --git a/python/last_batch_count.txt b/python/last_batch_count.txt new file mode 100644 index 0000000..38f698e --- /dev/null +++ b/python/last_batch_count.txt @@ -0,0 +1,2 @@ +260306 +125 \ No newline at end of file