#!/usr/bin/env python3 """ Minimal TCP client to probe the laser (EZCAD / Bag2 plugin). Examples: python testLaser.py python testLaser.py "GetMarkStatus;" python testLaser.py -c "0;GetMarkStatus;;" python testLaser.py --host 192.168.18.69 --port 45678 --preset bag2-status Default host/port: 192.168.18.69 / 45678 """ from __future__ import annotations import argparse import socket import sys DEFAULT_HOST = "192.168.18.69" DEFAULT_PORT = 45678 # Shortcuts (second column = description) PRESETS: dict[str, tuple[str, str]] = { "status": ("GetMarkStatus;", "plain command + semicolon"), "data": ("GetMarkData;", "plain command + semicolon"), "count": ("GetMarkedCount;", "plain command + semicolon"), "bag2-status": ("0;GetMarkStatus;;", "Bag2 line shape (same idea as FPSMS backend)"), "bag2-data": ("0;GetMarkData;;", "Bag2 line shape"), "bag2-count": ("0;GetMarkedCount;;", "Bag2 line shape"), } SWEEP_PAYLOADS = [ "GetMarkStatus;", "GetMarkStatus", "GetMarkStatus\r\n", "GetMarkData;", "GetMarkData", "GetMarkData\r\n", "GetMarkedCount;", "GetMarkedCount", "GetMarkedCount\r\n", "0;GetMarkStatus;;", "0;;GetMarkStatus;;", "0;GetMarkData;;", "0;;GetMarkData;;", "0;GetMarkedCount;;", "0;;GetMarkedCount;;", ] def send_and_read( host: str, port: int, payload: str, *, encoding: str = "utf-8", shutdown_write: bool = True, read_timeout: float = 3.0, ) -> bytes: data = payload.encode(encoding) with socket.create_connection((host, port), timeout=5) as sock: sock.sendall(data) if shutdown_write: try: sock.shutdown(socket.SHUT_WR) except OSError: pass sock.settimeout(read_timeout) chunks: list[bytes] = [] while True: try: block = sock.recv(8192) except socket.timeout: break if not block: break chunks.append(block) return b"".join(chunks) def interactive(host: str, port: int, shutdown_write: bool) -> None: print(f"Host={host} Port={port} (empty line = quit)") print("Presets: 1=GetMarkStatus; 2=GetMarkData; 3=GetMarkedCount;") print(" 4=0;GetMarkStatus;; 5=0;GetMarkData;; 6=0;GetMarkedCount;;") while True: try: line = input("> ").strip() except (EOFError, KeyboardInterrupt): print() break if not line: break if line == "1": line = "GetMarkStatus;" elif line == "2": line = "GetMarkData;" elif line == "3": line = "GetMarkedCount;" elif line == "4": line = "0;GetMarkStatus;;" elif line == "5": line = "0;GetMarkData;;" elif line == "6": line = "0;GetMarkedCount;;" try: raw = send_and_read(host, port, line, shutdown_write=shutdown_write) except OSError as e: print(f"ERROR: {e}", file=sys.stderr) continue text = raw.decode("utf-8", errors="replace") print(f"Sent {len(line.encode('utf-8'))} bytes, recv {len(raw)} bytes:") print(repr(text)) if text.strip(): print(text) def sweep(host: str, port: int, shutdown_write: bool) -> int: print(f"Sweep Host={host} Port={port}") invalid_hits = 0 non_empty = 0 for i, payload in enumerate(SWEEP_PAYLOADS, start=1): try: raw = send_and_read(host, port, payload, shutdown_write=shutdown_write) text = raw.decode("utf-8", errors="replace").strip() except OSError as e: print(f"[{i:02}] {payload!r} -> ERROR: {e}") continue if text: non_empty += 1 if "errorinvalid data" in text.lower(): invalid_hits += 1 print(f"[{i:02}] {payload!r} -> {text!r}") print(f"Done. non-empty={non_empty}, invalid-data={invalid_hits}") return 0 def main() -> None: ap = argparse.ArgumentParser(description="Send one TCP command to the laser and print the reply.") ap.add_argument("--host", default=DEFAULT_HOST, help=f"default {DEFAULT_HOST}") ap.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"default {DEFAULT_PORT}") ap.add_argument( "-c", "--command", help="Single command to send (e.g. GetMarkStatus;). If omitted, interactive mode.", ) ap.add_argument( "--preset", choices=sorted(PRESETS.keys()), help="Use a built-in payload instead of --command", ) ap.add_argument( "--no-shutdown", action="store_true", help="Do not shutdown(SHUT_WR) after send (some devices differ)", ) ap.add_argument( "--sweep", action="store_true", help="Try multiple payload variants automatically and print all replies.", ) ap.add_argument( "command_positional", nargs="?", help="Same as -c (so you can run: python testLaser.py GetMarkStatus;)", ) args = ap.parse_args() shutdown_write = not args.no_shutdown if args.command and args.command_positional: ap.error("Use either -c/--command or the positional argument, not both") if args.preset and (args.command or args.command_positional): ap.error("Use either --preset or a custom command, not both") if args.sweep and (args.preset or args.command or args.command_positional): ap.error("Use --sweep alone (without --preset/--command)") if args.sweep: raise SystemExit(sweep(args.host, args.port, shutdown_write)) if args.preset: payload, desc = PRESETS[args.preset] print(f"preset {args.preset}: {desc}") elif args.command: payload = args.command elif args.command_positional: payload = args.command_positional else: interactive(args.host, args.port, shutdown_write) return try: raw = send_and_read( args.host, args.port, payload, shutdown_write=shutdown_write, ) except OSError as e: print(f"ERROR: {e}", file=sys.stderr) sys.exit(1) text = raw.decode("utf-8", errors="replace") print(f"Sent to {args.host}:{args.port} -> {payload!r}") print(f"Recv {len(raw)} bytes:") print(repr(text)) if text.strip(): print("---") print(text) if __name__ == "__main__": main()