Przeglądaj źródła

no message

reset-do-picking-order
DESKTOP-064TTA1\Fai LUK 2 tygodni temu
rodzic
commit
c4032dbfdd
3 zmienionych plików z 267 dodań i 107 usunięć
  1. +263
    -105
      python/Bag1.py
  2. +2
    -2
      python/bag1_settings.json
  3. +2
    -0
      python/last_batch_count.txt

+ 263
- 105
python/Bag1.py Wyświetl plik

@@ -9,13 +9,15 @@ Run: python Bag1.py

import json
import os
import select
import socket
import sys
import threading
import time
import tkinter as tk
from datetime import date, timedelta
from datetime import date, datetime, timedelta
from tkinter import messagebox, ttk
from typing import Optional
from typing import Callable, Optional

import requests

@@ -31,14 +33,15 @@ if getattr(sys, "frozen", False):
else:
_SETTINGS_DIR = os.path.dirname(os.path.abspath(__file__))
SETTINGS_FILE = os.path.join(_SETTINGS_DIR, "bag1_settings.json")
LASER_COUNTER_FILE = os.path.join(_SETTINGS_DIR, "last_batch_count.txt")

DEFAULT_SETTINGS = {
"api_ip": "localhost",
"api_port": "8090",
"dabag_ip": "",
"dabag_port": "3008",
"laser_ip": "",
"laser_port": "9100",
"laser_ip": "192.168.17.10",
"laser_port": "45678",
"label_com": "COM3",
}

@@ -83,7 +86,7 @@ def try_printer_connection(printer_name: str, sett: dict) -> bool:
return False
if printer_name == "激光機":
ip = (sett.get("laser_ip") or "").strip()
port_str = (sett.get("laser_port") or "9100").strip()
port_str = (sett.get("laser_port") or "45678").strip()
if not ip:
return False
try:
@@ -111,8 +114,7 @@ def try_printer_connection(printer_name: str, sett: dict) -> bool:
FONT_SIZE = 16
FONT_SIZE_BUTTONS = 15
FONT_SIZE_QTY = 12 # smaller for 數量 under batch no.
FONT_SIZE_ITEM_CODE = 20 # item code (larger for readability)
FONT_SIZE_ITEM_NAME = 26 # item name (bigger than item code)
FONT_SIZE_ITEM = 20 # item code and item name (larger for readability)
FONT_FAMILY = "Microsoft JhengHei UI" # Traditional Chinese; fallback to TkDefaultFont
# Column widths: item code own column; item name at least double, wraps in its column
ITEM_CODE_WRAP = 140 # item code column width (long codes wrap under code only)
@@ -135,7 +137,6 @@ PRINTER_CHECK_MS = 60 * 1000 # 1 minute when printer OK
PRINTER_RETRY_MS = 30 * 1000 # 30 seconds when printer failed
PRINTER_SOCKET_TIMEOUT = 3
DATAFLEX_SEND_TIMEOUT = 10 # seconds when sending ZPL to DataFlex
DATE_AUTO_RESET_SEC = 5 * 60 # 5 minutes: if no manual date change, auto-set to today


def _zpl_escape(s: str) -> str:
@@ -143,26 +144,6 @@ def _zpl_escape(s: str) -> str:
return s.replace("\\", "\\\\").replace("^", "\\^")


def _split_by_word_count(text: str, max_words: int = 8) -> list[str]:
"""Split text into segments of at most max_words (words = non-symbol chars; symbols not counted)."""
segments = []
current = []
count = 0
for c in text:
if c.isalnum() or ("\u4e00" <= c <= "\u9fff") or ("\u3400" <= c <= "\u4dbf"):
count += 1
current.append(c)
if count >= max_words:
segments.append("".join(current))
current = []
count = 0
else:
current.append(c)
if current:
segments.append("".join(current))
return segments if segments else [""]


def generate_zpl_dataflex(
batch_no: str,
item_code: str,
@@ -171,25 +152,25 @@ def generate_zpl_dataflex(
font_bold: str = "E:STXihei.ttf",
) -> str:
"""
Row 1 (from zero): QR code, then item name (rotated 90°).
Row 2: Batch number (left), item code (right).
Generate ZPL for DataFlex label (53 mm media, 90° rotated).
Uses UTF-8 (^CI28) and configurable .TTF fonts for Chinese (e.g. E:STXihei.ttf).
"""
desc = _zpl_escape((item_name or "—").strip())
code = _zpl_escape((item_code or "—").strip())
batch_esc = _zpl_escape((batch_no or "").strip())
batch = _zpl_escape(batch_no.strip())
return f"""^XA
^CI28
^PW700
^LL500
^PW420
^LL780
^PO N
^FO10,20
^BQR,4,7^FDQA,{batch_no}^FS
^FO170,20
^A@R,72,72,{font_regular}^FD{desc}^FS
^FO0,260
^A@R,72,72,{font_regular}^FD{batch_esc}^FS
^FO75,260
^A@R,88,88,{font_bold}^FD{code}^FS
^CI28
^FO70,70
^A@R,60,60,{font_regular}^FD{desc}^FS
^FO220,70
^A@R,50,50,{font_bold}^FD{code}^FS
^FO310,70
^A@R,45,45,{font_bold}^FD批次: {batch}^FS
^FO150,420
^BQN,2,6^FDQA,{batch_no}^FS
^XZ"""


@@ -204,6 +185,162 @@ def send_zpl_to_dataflex(ip: str, port: int, zpl: str) -> None:
sock.close()


def load_laser_last_count() -> tuple[int, Optional[str]]:
"""Load last batch count and date from laser counter file. Returns (count, date_str)."""
if not os.path.exists(LASER_COUNTER_FILE):
return 0, None
try:
with open(LASER_COUNTER_FILE, "r", encoding="utf-8") as f:
lines = f.read().strip().splitlines()
if len(lines) >= 2:
return int(lines[1].strip()), lines[0].strip()
except Exception:
pass
return 0, None


def save_laser_last_count(date_str: str, count: int) -> None:
"""Save laser batch count and date to file."""
try:
with open(LASER_COUNTER_FILE, "w", encoding="utf-8") as f:
f.write(f"{date_str}\n{count}")
except Exception:
pass


LASER_PUSH_INTERVAL = 2 # seconds between pushes (like sample script)


def laser_push_loop(
ip: str,
port: int,
stop_event: threading.Event,
root: tk.Tk,
on_error: Callable[[str], None],
) -> None:
"""
Run in a background thread: persistent connection to EZCAD, push B{yymmdd}{count:03d};;
every LASER_PUSH_INTERVAL seconds. Resets count each new day. Uses counter file.
"""
conn = None
push_count, last_saved_date = load_laser_last_count()
while not stop_event.is_set():
try:
if conn is None:
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.settimeout(0.4)
conn.connect((ip, port))
now = datetime.now()
today_str = now.strftime("%y%m%d")
if last_saved_date != today_str:
push_count = 1
last_saved_date = today_str
batch = f"B{today_str}{push_count:03d}"
reply = f"{batch};;"
conn.sendall(reply.encode("utf-8"))
save_laser_last_count(today_str, push_count)
rlist, _, _ = select.select([conn], [], [], 0.4)
if rlist:
data = conn.recv(4096)
if not data:
conn.close()
conn = None
push_count += 1
for _ in range(int(LASER_PUSH_INTERVAL * 2)):
if stop_event.is_set():
break
time.sleep(0.5)
except socket.timeout:
pass
except Exception as e:
if conn:
try:
conn.close()
except Exception:
pass
conn = None
try:
root.after(0, lambda msg=str(e): on_error(msg))
except Exception:
pass
for _ in range(6):
if stop_event.is_set():
break
time.sleep(0.5)
if conn:
try:
conn.close()
except Exception:
pass


def send_job_to_laser(
conn_ref: list,
ip: str,
port: int,
job_order_id: int,
item_code: str,
item_name: str,
) -> tuple[bool, str]:
"""
Send job order number; item code; item name to laser. Keeps connection open for next send.
conn_ref: [socket or None] - reused across calls; closed only when switching printer.
Format: {job_order_id};{item_code};{item_name};;
Returns (success, message).
"""
job_id_str = str(job_order_id) if job_order_id is not None else ""
code_str = (item_code or "").strip().replace(";", ",")
name_str = (item_name or "").strip().replace(";", ",")
reply = f"{job_id_str};{code_str};{name_str};;"
conn = conn_ref[0]
try:
if conn is None:
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.settimeout(3.0)
conn.connect((ip, port))
conn_ref[0] = conn
conn.settimeout(3.0)
conn.sendall(reply.encode("utf-8"))
conn.settimeout(0.5)
try:
data = conn.recv(4096)
if data:
ack = data.decode("utf-8", errors="ignore").strip().lower()
if "receive" in ack and "invalid" not in ack:
return True, f"已送出激光機:{job_id_str};{code_str};{name_str}(已確認)"
except socket.timeout:
pass
return True, f"已送出激光機:{job_id_str};{code_str};{name_str}"
except (ConnectionRefusedError, socket.timeout, OSError) as e:
if conn_ref[0] is not None:
try:
conn_ref[0].close()
except Exception:
pass
conn_ref[0] = None
if isinstance(e, ConnectionRefusedError):
return False, f"無法連線至 {ip}:{port},請確認激光機已開機且 IP 正確。"
if isinstance(e, socket.timeout):
return False, f"連線逾時 ({ip}:{port}),請檢查網路與連接埠。"
return False, f"激光機送出失敗:{e}"


def send_job_to_laser_with_retry(
conn_ref: list,
ip: str,
port: int,
job_order_id: int,
item_code: str,
item_name: str,
) -> tuple[bool, str]:
"""Send job to laser; on failure, retry once. Returns (success, message)."""
ok, msg = send_job_to_laser(conn_ref, ip, port, job_order_id, item_code, item_name)
if ok:
return True, msg
ok2, msg2 = send_job_to_laser(conn_ref, ip, port, job_order_id, item_code, item_name)
return ok2, msg2


def format_qty(val) -> str:
"""Format quantity: integer without .0, with thousand separator."""
if val is None:
@@ -218,9 +355,8 @@ def format_qty(val) -> str:


def batch_no(year: int, job_order_id: int) -> str:
"""Batch no.: B + 2-digit year + jobOrderId zero-padded to 6 digits."""
short_year = year % 100
return f"B{short_year:02d}{job_order_id:06d}"
"""Batch no.: B + 4-digit year + jobOrderId zero-padded to 6 digits."""
return f"B{year}{job_order_id:06d}"


def get_font(size: int = FONT_SIZE, bold: bool = False) -> tuple:
@@ -252,33 +388,39 @@ def set_row_highlight(row_frame: tk.Frame, selected: bool) -> None:


def on_job_order_click(jo: dict, batch: str) -> None:
"""Row click handler (highlight already set; no popup)."""
"""Show message and highlight row (keeps printing to selected printer)."""
item_code = jo.get("itemCode") or "—"
item_name = jo.get("itemName") or "—"
messagebox.showinfo(
"工單",
f'已點選:批次 {batch}\n品號 {item_code} {item_name}',
)


def ask_bag_count(parent: tk.Tk) -> Optional[int]:
def ask_laser_count(parent: tk.Tk) -> Optional[int]:
"""
When printer is 打袋機 DataFlex, ask how many bags: +50, +10, +5, +1, C, then 確認送出.
When printer is 激光機, ask how many times to send (like DataFlex).
Returns count (>= 1), or -1 for continuous (C), or None if cancelled.
"""
result: list[Optional[int]] = [None]
result: list = [None]
count_ref = [0]
continuous_ref = [False]

win = tk.Toplevel(parent)
win.title("打袋列印數量")
win.title("激光機送出數量")
win.geometry("420x200")
win.transient(parent)
win.grab_set()
win.configure(bg=BG_TOP)
ttk.Label(win, text="列印多少個袋?", font=get_font(FONT_SIZE)).pack(pady=(12, 4))
count_lbl = tk.Label(win, text="列印數量: 0", font=get_font(FONT_SIZE), bg=BG_TOP)
ttk.Label(win, text="送出多少次?", font=get_font(FONT_SIZE)).pack(pady=(12, 4))
count_lbl = tk.Label(win, text="數量: 0", font=get_font(FONT_SIZE), bg=BG_TOP)
count_lbl.pack(pady=4)

def update_display():
if continuous_ref[0]:
count_lbl.configure(text="列印數量: 連續 (C)")
count_lbl.configure(text="數量: 連續 (C)")
else:
count_lbl.configure(text=f"列印數量: {count_ref[0]}")
count_lbl.configure(text=f"數量: {count_ref[0]}")

def add(n: int):
continuous_ref[0] = False
@@ -293,20 +435,19 @@ def ask_bag_count(parent: tk.Tk) -> Optional[int]:
if continuous_ref[0]:
result[0] = -1
elif count_ref[0] < 1:
messagebox.showwarning("打袋機", "請先按 +50、+10、+5 或 +1 選擇數量。", parent=win)
messagebox.showwarning("激光機", "請先按 +50、+10、+5 或 +1 選擇數量。", parent=win)
return
else:
result[0] = count_ref[0]
win.destroy()

btn_row1 = tk.Frame(win, bg=BG_TOP)
btn_row1.pack(pady=8)
btn_row = tk.Frame(win, bg=BG_TOP)
btn_row.pack(pady=8)
for label, value in [("+50", 50), ("+10", 10), ("+5", 5), ("+1", 1)]:
def make_add(v: int):
return lambda: add(v)
ttk.Button(btn_row1, text=label, command=make_add(value), width=8).pack(side=tk.LEFT, padx=4)
ttk.Button(btn_row1, text="C", command=set_continuous, width=8).pack(side=tk.LEFT, padx=4)

ttk.Button(btn_row, text=label, command=make_add(value), width=8).pack(side=tk.LEFT, padx=4)
ttk.Button(btn_row, text="C", command=set_continuous, width=8).pack(side=tk.LEFT, padx=4)
ttk.Button(win, text="確認送出", command=confirm, width=14).pack(pady=12)
win.protocol("WM_DELETE_WINDOW", win.destroy)
win.wait_window()
@@ -383,8 +524,8 @@ def main() -> None:
status_frame.configure(bg=BG_STATUS_ERROR)
status_lbl.configure(text="連接不到服務器", bg=BG_STATUS_ERROR, fg=FG_STATUS_ERROR)

def set_status_message(msg: str, is_error: bool = False):
"""Show a temporary message on the status bar."""
def set_status_message(msg: str, is_error: bool = False) -> None:
"""Show a message on the status bar."""
if is_error:
status_frame.configure(bg=BG_STATUS_ERROR)
status_lbl.configure(text=msg, bg=BG_STATUS_ERROR, fg=FG_STATUS_ERROR)
@@ -392,6 +533,11 @@ def main() -> None:
status_frame.configure(bg=BG_STATUS_OK)
status_lbl.configure(text=msg, bg=BG_STATUS_OK, fg=FG_STATUS_OK)

# Laser: keep connection open for repeated sends; close when switching away
laser_conn_ref: list = [None]
laser_thread_ref: list = [None]
laser_stop_ref: list = [None]

# Top: left [前一天] [date] [後一天] | right [printer dropdown]
top = tk.Frame(root, padx=12, pady=12, bg=BG_TOP)
top.pack(fill=tk.X)
@@ -399,31 +545,23 @@ def main() -> None:
date_var = tk.StringVar(value=date.today().isoformat())
printer_options = ["打袋機 DataFlex", "標簽機", "激光機"]
printer_var = tk.StringVar(value=printer_options[0])
last_manual_date_change_ref = [time.time()] # track when user last changed date manually

def mark_manual_date_change():
last_manual_date_change_ref[0] = time.time()

def go_prev_day() -> None:
try:
d = date.fromisoformat(date_var.get().strip())
date_var.set((d - timedelta(days=1)).isoformat())
mark_manual_date_change()
load_job_orders(from_user_date_change=True)
except ValueError:
date_var.set(date.today().isoformat())
mark_manual_date_change()
load_job_orders(from_user_date_change=True)

def go_next_day() -> None:
try:
d = date.fromisoformat(date_var.get().strip())
date_var.set((d + timedelta(days=1)).isoformat())
mark_manual_date_change()
load_job_orders(from_user_date_change=True)
except ValueError:
date_var.set(date.today().isoformat())
mark_manual_date_change()
load_job_orders(from_user_date_change=True)

# 前一天 (previous day) with left arrow icon
@@ -440,11 +578,6 @@ def main() -> None:
)
date_entry.pack(side=tk.LEFT, padx=(0, 8), ipady=4)

# Track manual typing in date field as user date change
def on_date_entry_key(event):
mark_manual_date_change()
date_entry.bind("<Key>", on_date_entry_key)

# 後一天 (next day) with right arrow icon
btn_next = ttk.Button(top, text="後一天 ▶", command=go_next_day)
btn_next.pack(side=tk.LEFT, padx=(0, 8))
@@ -498,6 +631,15 @@ def main() -> None:

def on_printer_selection_changed(*args) -> None:
check_printer()
if printer_var.get() != "激光機":
if laser_stop_ref[0] is not None:
laser_stop_ref[0].set()
if laser_conn_ref[0] is not None:
try:
laser_conn_ref[0].close()
except Exception:
pass
laser_conn_ref[0] = None

printer_var.trace_add("write", on_printer_selection_changed)

@@ -668,7 +810,7 @@ def main() -> None:
code_lbl = tk.Label(
row,
text=item_code,
font=get_font(FONT_SIZE_ITEM_CODE),
font=get_font(FONT_SIZE_ITEM),
bg=BG_ROW,
fg="black",
wraplength=ITEM_CODE_WRAP,
@@ -677,11 +819,11 @@ def main() -> None:
)
code_lbl.pack(side=tk.LEFT, anchor=tk.NW, padx=(12, 8))

# Column 3: item name only, bigger font, at least double width, wraps under its own column
# Column 3: item name only, same bigger font, at least double width, wraps under its own column
name_lbl = tk.Label(
row,
text=item_name or "—",
font=get_font(FONT_SIZE_ITEM_NAME),
font=get_font(FONT_SIZE_ITEM),
bg=BG_ROW,
fg="black",
wraplength=ITEM_NAME_WRAP,
@@ -706,25 +848,18 @@ def main() -> None:
if not ip:
messagebox.showerror("打袋機", "請在設定中填寫打袋機 DataFlex 的 IP。")
else:
count = ask_bag_count(root)
if count is not None:
item_code = j.get("itemCode") or "—"
item_name = j.get("itemName") or "—"
zpl = generate_zpl_dataflex(b, item_code, item_name)
n = 100 if count == -1 else count
try:
for i in range(n):
send_zpl_to_dataflex(ip, port, zpl)
if i < n - 1:
time.sleep(2)
msg = f"已送出列印:批次 {b} x {n} 張" if count != -1 else f"已送出列印:批次 {b} x {n} 張 (連續)"
set_status_message(msg, is_error=False)
except ConnectionRefusedError:
set_status_message(f"無法連線至 {ip}:{port},請確認印表機已開機且 IP 正確。", is_error=True)
except socket.timeout:
set_status_message(f"連線逾時 ({ip}:{port}),請檢查網路與連接埠。", is_error=True)
except OSError as err:
set_status_message(f"列印失敗:{err}", is_error=True)
item_code = j.get("itemCode") or "—"
item_name = j.get("itemName") or "—"
zpl = generate_zpl_dataflex(b, item_code, item_name)
try:
send_zpl_to_dataflex(ip, port, zpl)
messagebox.showinfo("打袋機", f"已送出列印:批次 {b}")
except ConnectionRefusedError:
messagebox.showerror("打袋機", f"無法連線至 {ip}:{port},請確認印表機已開機且 IP 正確。")
except socket.timeout:
messagebox.showerror("打袋機", f"連線逾時 ({ip}:{port}),請檢查網路與連接埠。")
except OSError as err:
messagebox.showerror("打袋機", f"列印失敗:{err}")
elif printer_var.get() == "標簽機":
count = ask_label_count(root)
if count is not None:
@@ -732,7 +867,37 @@ def main() -> None:
msg = "已選擇連續列印標簽"
else:
msg = f"將列印 {count} 張標簽"
set_status_message(msg, is_error=False)
messagebox.showinfo("標簽機", msg)
elif printer_var.get() == "激光機":
ip = (settings.get("laser_ip") or "").strip()
port_str = (settings.get("laser_port") or "45678").strip()
try:
port = int(port_str)
except ValueError:
port = 45678
if not ip:
set_status_message("請在設定中填寫激光機的 IP。", is_error=True)
else:
count = ask_laser_count(root)
if count is not None:
jo_id = j.get("id")
item_code_val = j.get("itemCode") or ""
item_name_val = j.get("itemName") or ""
n = 100 if count == -1 else count
sent = 0
for i in range(n):
ok, msg = send_job_to_laser_with_retry(
laser_conn_ref, ip, port, jo_id, item_code_val, item_name_val
)
if ok:
sent += 1
else:
set_status_message(f"已送出 {sent} 次,第 {sent + 1} 次失敗:{msg}", is_error=True)
break
if i < n - 1:
time.sleep(0.2)
if sent == n:
set_status_message(f"已送出激光機:{sent} 次", is_error=False)
on_job_order_click(j, b)

for w in (row, left, batch_lbl, code_lbl, name_lbl):
@@ -755,13 +920,6 @@ def main() -> None:
if after_id_ref[0] is not None:
root.after_cancel(after_id_ref[0])
after_id_ref[0] = None
# Auto-reset date to today if user hasn't manually changed it recently (for 24x7 use)
if not from_user_date_change:
elapsed = time.time() - last_manual_date_change_ref[0]
today_str = date.today().isoformat()
if elapsed > DATE_AUTO_RESET_SEC and date_var.get().strip() != today_str:
date_var.set(today_str)
from_user_date_change = True # treat as date change to reset selection/scroll
date_str = date_var.get().strip()
try:
plan_start = date.fromisoformat(date_str)


+ 2
- 2
python/bag1_settings.json Wyświetl plik

@@ -3,7 +3,7 @@
"api_port": "8090",
"dabag_ip": "192.168.17.27",
"dabag_port": "3008",
"laser_ip": "192.168.7.77",
"laser_port": "9100",
"laser_ip": "192.168.17.10",
"laser_port": "45678",
"label_com": "COM2"
}

+ 2
- 0
python/last_batch_count.txt Wyświetl plik

@@ -0,0 +1,2 @@
260306
125

Ładowanie…
Anuluj
Zapisz