Sfoglia il codice sorgente

no message

stable1
[email protected] 2 settimane fa
parent
commit
23fa4f3209
1 ha cambiato i file con 213 aggiunte e 0 eliminazioni
  1. +213
    -0
      python/testLaser.py

+ 213
- 0
python/testLaser.py Vedi File

@@ -0,0 +1,213 @@
#!/usr/bin/env python3
"""
Minimal TCP client to probe the laser (EZCAD / Bag2 plugin).

Examples:
python testLaser.py
python testLaser.py "GetMarkStatus;"
python testLaser.py -c "0;GetMarkStatus;;"
python testLaser.py --host 192.168.18.69 --port 45678 --preset bag2-status

Default host/port: 192.168.18.69 / 45678
"""

from __future__ import annotations

import argparse
import socket
import sys

DEFAULT_HOST = "192.168.18.69"
DEFAULT_PORT = 45678

# Shortcuts (second column = description)
PRESETS: dict[str, tuple[str, str]] = {
"status": ("GetMarkStatus;", "plain command + semicolon"),
"data": ("GetMarkData;", "plain command + semicolon"),
"count": ("GetMarkedCount;", "plain command + semicolon"),
"bag2-status": ("0;GetMarkStatus;;", "Bag2 line shape (same idea as FPSMS backend)"),
"bag2-data": ("0;GetMarkData;;", "Bag2 line shape"),
"bag2-count": ("0;GetMarkedCount;;", "Bag2 line shape"),
}

SWEEP_PAYLOADS = [
"GetMarkStatus;",
"GetMarkStatus",
"GetMarkStatus\r\n",
"GetMarkData;",
"GetMarkData",
"GetMarkData\r\n",
"GetMarkedCount;",
"GetMarkedCount",
"GetMarkedCount\r\n",
"0;GetMarkStatus;;",
"0;;GetMarkStatus;;",
"0;GetMarkData;;",
"0;;GetMarkData;;",
"0;GetMarkedCount;;",
"0;;GetMarkedCount;;",
]


def send_and_read(
host: str,
port: int,
payload: str,
*,
encoding: str = "utf-8",
shutdown_write: bool = True,
read_timeout: float = 3.0,
) -> bytes:
data = payload.encode(encoding)
with socket.create_connection((host, port), timeout=5) as sock:
sock.sendall(data)
if shutdown_write:
try:
sock.shutdown(socket.SHUT_WR)
except OSError:
pass
sock.settimeout(read_timeout)
chunks: list[bytes] = []
while True:
try:
block = sock.recv(8192)
except socket.timeout:
break
if not block:
break
chunks.append(block)
return b"".join(chunks)


def interactive(host: str, port: int, shutdown_write: bool) -> None:
print(f"Host={host} Port={port} (empty line = quit)")
print("Presets: 1=GetMarkStatus; 2=GetMarkData; 3=GetMarkedCount;")
print(" 4=0;GetMarkStatus;; 5=0;GetMarkData;; 6=0;GetMarkedCount;;")
while True:
try:
line = input("> ").strip()
except (EOFError, KeyboardInterrupt):
print()
break
if not line:
break
if line == "1":
line = "GetMarkStatus;"
elif line == "2":
line = "GetMarkData;"
elif line == "3":
line = "GetMarkedCount;"
elif line == "4":
line = "0;GetMarkStatus;;"
elif line == "5":
line = "0;GetMarkData;;"
elif line == "6":
line = "0;GetMarkedCount;;"

try:
raw = send_and_read(host, port, line, shutdown_write=shutdown_write)
except OSError as e:
print(f"ERROR: {e}", file=sys.stderr)
continue
text = raw.decode("utf-8", errors="replace")
print(f"Sent {len(line.encode('utf-8'))} bytes, recv {len(raw)} bytes:")
print(repr(text))
if text.strip():
print(text)


def sweep(host: str, port: int, shutdown_write: bool) -> int:
print(f"Sweep Host={host} Port={port}")
invalid_hits = 0
non_empty = 0
for i, payload in enumerate(SWEEP_PAYLOADS, start=1):
try:
raw = send_and_read(host, port, payload, shutdown_write=shutdown_write)
text = raw.decode("utf-8", errors="replace").strip()
except OSError as e:
print(f"[{i:02}] {payload!r} -> ERROR: {e}")
continue
if text:
non_empty += 1
if "errorinvalid data" in text.lower():
invalid_hits += 1
print(f"[{i:02}] {payload!r} -> {text!r}")
print(f"Done. non-empty={non_empty}, invalid-data={invalid_hits}")
return 0


def main() -> None:
ap = argparse.ArgumentParser(description="Send one TCP command to the laser and print the reply.")
ap.add_argument("--host", default=DEFAULT_HOST, help=f"default {DEFAULT_HOST}")
ap.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"default {DEFAULT_PORT}")
ap.add_argument(
"-c",
"--command",
help="Single command to send (e.g. GetMarkStatus;). If omitted, interactive mode.",
)
ap.add_argument(
"--preset",
choices=sorted(PRESETS.keys()),
help="Use a built-in payload instead of --command",
)
ap.add_argument(
"--no-shutdown",
action="store_true",
help="Do not shutdown(SHUT_WR) after send (some devices differ)",
)
ap.add_argument(
"--sweep",
action="store_true",
help="Try multiple payload variants automatically and print all replies.",
)
ap.add_argument(
"command_positional",
nargs="?",
help="Same as -c (so you can run: python testLaser.py GetMarkStatus;)",
)
args = ap.parse_args()

shutdown_write = not args.no_shutdown

if args.command and args.command_positional:
ap.error("Use either -c/--command or the positional argument, not both")
if args.preset and (args.command or args.command_positional):
ap.error("Use either --preset or a custom command, not both")
if args.sweep and (args.preset or args.command or args.command_positional):
ap.error("Use --sweep alone (without --preset/--command)")

if args.sweep:
raise SystemExit(sweep(args.host, args.port, shutdown_write))

if args.preset:
payload, desc = PRESETS[args.preset]
print(f"preset {args.preset}: {desc}")
elif args.command:
payload = args.command
elif args.command_positional:
payload = args.command_positional
else:
interactive(args.host, args.port, shutdown_write)
return

try:
raw = send_and_read(
args.host,
args.port,
payload,
shutdown_write=shutdown_write,
)
except OSError as e:
print(f"ERROR: {e}", file=sys.stderr)
sys.exit(1)
text = raw.decode("utf-8", errors="replace")
print(f"Sent to {args.host}:{args.port} -> {payload!r}")
print(f"Recv {len(raw)} bytes:")
print(repr(text))
if text.strip():
print("---")
print(text)


if __name__ == "__main__":
main()

Caricamento…
Annulla
Salva