|
- #!/usr/bin/env python3
- """
- Minimal TCP client to probe the laser (EZCAD / Bag2 plugin).
-
- Examples:
- python testLaser.py
- python testLaser.py "GetMarkStatus;"
- python testLaser.py -c "0;GetMarkStatus;;"
- python testLaser.py --host 192.168.18.69 --port 45678 --preset bag2-status
-
- Default host/port: 192.168.18.69 / 45678
- """
-
- from __future__ import annotations
-
- import argparse
- import socket
- import sys
-
- DEFAULT_HOST = "192.168.18.69"
- DEFAULT_PORT = 45678
-
- # Shortcuts (second column = description)
- PRESETS: dict[str, tuple[str, str]] = {
- "status": ("GetMarkStatus;", "plain command + semicolon"),
- "data": ("GetMarkData;", "plain command + semicolon"),
- "count": ("GetMarkedCount;", "plain command + semicolon"),
- "bag2-status": ("0;GetMarkStatus;;", "Bag2 line shape (same idea as FPSMS backend)"),
- "bag2-data": ("0;GetMarkData;;", "Bag2 line shape"),
- "bag2-count": ("0;GetMarkedCount;;", "Bag2 line shape"),
- }
-
- SWEEP_PAYLOADS = [
- "GetMarkStatus;",
- "GetMarkStatus",
- "GetMarkStatus\r\n",
- "GetMarkData;",
- "GetMarkData",
- "GetMarkData\r\n",
- "GetMarkedCount;",
- "GetMarkedCount",
- "GetMarkedCount\r\n",
- "0;GetMarkStatus;;",
- "0;;GetMarkStatus;;",
- "0;GetMarkData;;",
- "0;;GetMarkData;;",
- "0;GetMarkedCount;;",
- "0;;GetMarkedCount;;",
- ]
-
-
- def send_and_read(
- host: str,
- port: int,
- payload: str,
- *,
- encoding: str = "utf-8",
- shutdown_write: bool = True,
- read_timeout: float = 3.0,
- ) -> bytes:
- data = payload.encode(encoding)
- with socket.create_connection((host, port), timeout=5) as sock:
- sock.sendall(data)
- if shutdown_write:
- try:
- sock.shutdown(socket.SHUT_WR)
- except OSError:
- pass
- sock.settimeout(read_timeout)
- chunks: list[bytes] = []
- while True:
- try:
- block = sock.recv(8192)
- except socket.timeout:
- break
- if not block:
- break
- chunks.append(block)
- return b"".join(chunks)
-
-
- def interactive(host: str, port: int, shutdown_write: bool) -> None:
- print(f"Host={host} Port={port} (empty line = quit)")
- print("Presets: 1=GetMarkStatus; 2=GetMarkData; 3=GetMarkedCount;")
- print(" 4=0;GetMarkStatus;; 5=0;GetMarkData;; 6=0;GetMarkedCount;;")
- while True:
- try:
- line = input("> ").strip()
- except (EOFError, KeyboardInterrupt):
- print()
- break
- if not line:
- break
- if line == "1":
- line = "GetMarkStatus;"
- elif line == "2":
- line = "GetMarkData;"
- elif line == "3":
- line = "GetMarkedCount;"
- elif line == "4":
- line = "0;GetMarkStatus;;"
- elif line == "5":
- line = "0;GetMarkData;;"
- elif line == "6":
- line = "0;GetMarkedCount;;"
-
- try:
- raw = send_and_read(host, port, line, shutdown_write=shutdown_write)
- except OSError as e:
- print(f"ERROR: {e}", file=sys.stderr)
- continue
- text = raw.decode("utf-8", errors="replace")
- print(f"Sent {len(line.encode('utf-8'))} bytes, recv {len(raw)} bytes:")
- print(repr(text))
- if text.strip():
- print(text)
-
-
- def sweep(host: str, port: int, shutdown_write: bool) -> int:
- print(f"Sweep Host={host} Port={port}")
- invalid_hits = 0
- non_empty = 0
- for i, payload in enumerate(SWEEP_PAYLOADS, start=1):
- try:
- raw = send_and_read(host, port, payload, shutdown_write=shutdown_write)
- text = raw.decode("utf-8", errors="replace").strip()
- except OSError as e:
- print(f"[{i:02}] {payload!r} -> ERROR: {e}")
- continue
- if text:
- non_empty += 1
- if "errorinvalid data" in text.lower():
- invalid_hits += 1
- print(f"[{i:02}] {payload!r} -> {text!r}")
- print(f"Done. non-empty={non_empty}, invalid-data={invalid_hits}")
- return 0
-
-
- def main() -> None:
- ap = argparse.ArgumentParser(description="Send one TCP command to the laser and print the reply.")
- ap.add_argument("--host", default=DEFAULT_HOST, help=f"default {DEFAULT_HOST}")
- ap.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"default {DEFAULT_PORT}")
- ap.add_argument(
- "-c",
- "--command",
- help="Single command to send (e.g. GetMarkStatus;). If omitted, interactive mode.",
- )
- ap.add_argument(
- "--preset",
- choices=sorted(PRESETS.keys()),
- help="Use a built-in payload instead of --command",
- )
- ap.add_argument(
- "--no-shutdown",
- action="store_true",
- help="Do not shutdown(SHUT_WR) after send (some devices differ)",
- )
- ap.add_argument(
- "--sweep",
- action="store_true",
- help="Try multiple payload variants automatically and print all replies.",
- )
- ap.add_argument(
- "command_positional",
- nargs="?",
- help="Same as -c (so you can run: python testLaser.py GetMarkStatus;)",
- )
- args = ap.parse_args()
-
- shutdown_write = not args.no_shutdown
-
- if args.command and args.command_positional:
- ap.error("Use either -c/--command or the positional argument, not both")
- if args.preset and (args.command or args.command_positional):
- ap.error("Use either --preset or a custom command, not both")
- if args.sweep and (args.preset or args.command or args.command_positional):
- ap.error("Use --sweep alone (without --preset/--command)")
-
- if args.sweep:
- raise SystemExit(sweep(args.host, args.port, shutdown_write))
-
- if args.preset:
- payload, desc = PRESETS[args.preset]
- print(f"preset {args.preset}: {desc}")
- elif args.command:
- payload = args.command
- elif args.command_positional:
- payload = args.command_positional
- else:
- interactive(args.host, args.port, shutdown_write)
- return
-
- try:
- raw = send_and_read(
- args.host,
- args.port,
- payload,
- shutdown_write=shutdown_write,
- )
- except OSError as e:
- print(f"ERROR: {e}", file=sys.stderr)
- sys.exit(1)
- text = raw.decode("utf-8", errors="replace")
- print(f"Sent to {args.host}:{args.port} -> {payload!r}")
- print(f"Recv {len(raw)} bytes:")
- print(repr(text))
- if text.strip():
- print("---")
- print(text)
-
-
- if __name__ == "__main__":
- main()
|