From 23fa4f3209e8b6c99a27ef4fe7d4eda6cd761faf Mon Sep 17 00:00:00 2001 From: "vluk@2fi-solutions.com.hk" Date: Tue, 21 Apr 2026 22:28:47 +0800 Subject: [PATCH] no message --- python/testLaser.py | 213 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 213 insertions(+) create mode 100644 python/testLaser.py diff --git a/python/testLaser.py b/python/testLaser.py new file mode 100644 index 0000000..8c1c69b --- /dev/null +++ b/python/testLaser.py @@ -0,0 +1,213 @@ +#!/usr/bin/env python3 +""" +Minimal TCP client to probe the laser (EZCAD / Bag2 plugin). + +Examples: + python testLaser.py + python testLaser.py "GetMarkStatus;" + python testLaser.py -c "0;GetMarkStatus;;" + python testLaser.py --host 192.168.18.69 --port 45678 --preset bag2-status + +Default host/port: 192.168.18.69 / 45678 +""" + +from __future__ import annotations + +import argparse +import socket +import sys + +DEFAULT_HOST = "192.168.18.69" +DEFAULT_PORT = 45678 + +# Shortcuts (second column = description) +PRESETS: dict[str, tuple[str, str]] = { + "status": ("GetMarkStatus;", "plain command + semicolon"), + "data": ("GetMarkData;", "plain command + semicolon"), + "count": ("GetMarkedCount;", "plain command + semicolon"), + "bag2-status": ("0;GetMarkStatus;;", "Bag2 line shape (same idea as FPSMS backend)"), + "bag2-data": ("0;GetMarkData;;", "Bag2 line shape"), + "bag2-count": ("0;GetMarkedCount;;", "Bag2 line shape"), +} + +SWEEP_PAYLOADS = [ + "GetMarkStatus;", + "GetMarkStatus", + "GetMarkStatus\r\n", + "GetMarkData;", + "GetMarkData", + "GetMarkData\r\n", + "GetMarkedCount;", + "GetMarkedCount", + "GetMarkedCount\r\n", + "0;GetMarkStatus;;", + "0;;GetMarkStatus;;", + "0;GetMarkData;;", + "0;;GetMarkData;;", + "0;GetMarkedCount;;", + "0;;GetMarkedCount;;", +] + + +def send_and_read( + host: str, + port: int, + payload: str, + *, + encoding: str = "utf-8", + shutdown_write: bool = True, + read_timeout: float = 3.0, +) -> bytes: + data = payload.encode(encoding) + with socket.create_connection((host, port), timeout=5) as sock: + sock.sendall(data) + if shutdown_write: + try: + sock.shutdown(socket.SHUT_WR) + except OSError: + pass + sock.settimeout(read_timeout) + chunks: list[bytes] = [] + while True: + try: + block = sock.recv(8192) + except socket.timeout: + break + if not block: + break + chunks.append(block) + return b"".join(chunks) + + +def interactive(host: str, port: int, shutdown_write: bool) -> None: + print(f"Host={host} Port={port} (empty line = quit)") + print("Presets: 1=GetMarkStatus; 2=GetMarkData; 3=GetMarkedCount;") + print(" 4=0;GetMarkStatus;; 5=0;GetMarkData;; 6=0;GetMarkedCount;;") + while True: + try: + line = input("> ").strip() + except (EOFError, KeyboardInterrupt): + print() + break + if not line: + break + if line == "1": + line = "GetMarkStatus;" + elif line == "2": + line = "GetMarkData;" + elif line == "3": + line = "GetMarkedCount;" + elif line == "4": + line = "0;GetMarkStatus;;" + elif line == "5": + line = "0;GetMarkData;;" + elif line == "6": + line = "0;GetMarkedCount;;" + + try: + raw = send_and_read(host, port, line, shutdown_write=shutdown_write) + except OSError as e: + print(f"ERROR: {e}", file=sys.stderr) + continue + text = raw.decode("utf-8", errors="replace") + print(f"Sent {len(line.encode('utf-8'))} bytes, recv {len(raw)} bytes:") + print(repr(text)) + if text.strip(): + print(text) + + +def sweep(host: str, port: int, shutdown_write: bool) -> int: + print(f"Sweep Host={host} Port={port}") + invalid_hits = 0 + non_empty = 0 + for i, payload in enumerate(SWEEP_PAYLOADS, start=1): + try: + raw = send_and_read(host, port, payload, shutdown_write=shutdown_write) + text = raw.decode("utf-8", errors="replace").strip() + except OSError as e: + print(f"[{i:02}] {payload!r} -> ERROR: {e}") + continue + if text: + non_empty += 1 + if "errorinvalid data" in text.lower(): + invalid_hits += 1 + print(f"[{i:02}] {payload!r} -> {text!r}") + print(f"Done. non-empty={non_empty}, invalid-data={invalid_hits}") + return 0 + + +def main() -> None: + ap = argparse.ArgumentParser(description="Send one TCP command to the laser and print the reply.") + ap.add_argument("--host", default=DEFAULT_HOST, help=f"default {DEFAULT_HOST}") + ap.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"default {DEFAULT_PORT}") + ap.add_argument( + "-c", + "--command", + help="Single command to send (e.g. GetMarkStatus;). If omitted, interactive mode.", + ) + ap.add_argument( + "--preset", + choices=sorted(PRESETS.keys()), + help="Use a built-in payload instead of --command", + ) + ap.add_argument( + "--no-shutdown", + action="store_true", + help="Do not shutdown(SHUT_WR) after send (some devices differ)", + ) + ap.add_argument( + "--sweep", + action="store_true", + help="Try multiple payload variants automatically and print all replies.", + ) + ap.add_argument( + "command_positional", + nargs="?", + help="Same as -c (so you can run: python testLaser.py GetMarkStatus;)", + ) + args = ap.parse_args() + + shutdown_write = not args.no_shutdown + + if args.command and args.command_positional: + ap.error("Use either -c/--command or the positional argument, not both") + if args.preset and (args.command or args.command_positional): + ap.error("Use either --preset or a custom command, not both") + if args.sweep and (args.preset or args.command or args.command_positional): + ap.error("Use --sweep alone (without --preset/--command)") + + if args.sweep: + raise SystemExit(sweep(args.host, args.port, shutdown_write)) + + if args.preset: + payload, desc = PRESETS[args.preset] + print(f"preset {args.preset}: {desc}") + elif args.command: + payload = args.command + elif args.command_positional: + payload = args.command_positional + else: + interactive(args.host, args.port, shutdown_write) + return + + try: + raw = send_and_read( + args.host, + args.port, + payload, + shutdown_write=shutdown_write, + ) + except OSError as e: + print(f"ERROR: {e}", file=sys.stderr) + sys.exit(1) + text = raw.decode("utf-8", errors="replace") + print(f"Sent to {args.host}:{args.port} -> {payload!r}") + print(f"Recv {len(raw)} bytes:") + print(repr(text)) + if text.strip(): + print("---") + print(text) + + +if __name__ == "__main__": + main()