Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.
 
 
 
 
 

214 rader
6.4 KiB

  1. #!/usr/bin/env python3
  2. """
  3. Minimal TCP client to probe the laser (EZCAD / Bag2 plugin).
  4. Examples:
  5. python testLaser.py
  6. python testLaser.py "GetMarkStatus;"
  7. python testLaser.py -c "0;GetMarkStatus;;"
  8. python testLaser.py --host 192.168.18.69 --port 45678 --preset bag2-status
  9. Default host/port: 192.168.18.69 / 45678
  10. """
  11. from __future__ import annotations
  12. import argparse
  13. import socket
  14. import sys
  15. DEFAULT_HOST = "192.168.18.69"
  16. DEFAULT_PORT = 45678
  17. # Shortcuts (second column = description)
  18. PRESETS: dict[str, tuple[str, str]] = {
  19. "status": ("GetMarkStatus;", "plain command + semicolon"),
  20. "data": ("GetMarkData;", "plain command + semicolon"),
  21. "count": ("GetMarkedCount;", "plain command + semicolon"),
  22. "bag2-status": ("0;GetMarkStatus;;", "Bag2 line shape (same idea as FPSMS backend)"),
  23. "bag2-data": ("0;GetMarkData;;", "Bag2 line shape"),
  24. "bag2-count": ("0;GetMarkedCount;;", "Bag2 line shape"),
  25. }
  26. SWEEP_PAYLOADS = [
  27. "GetMarkStatus;",
  28. "GetMarkStatus",
  29. "GetMarkStatus\r\n",
  30. "GetMarkData;",
  31. "GetMarkData",
  32. "GetMarkData\r\n",
  33. "GetMarkedCount;",
  34. "GetMarkedCount",
  35. "GetMarkedCount\r\n",
  36. "0;GetMarkStatus;;",
  37. "0;;GetMarkStatus;;",
  38. "0;GetMarkData;;",
  39. "0;;GetMarkData;;",
  40. "0;GetMarkedCount;;",
  41. "0;;GetMarkedCount;;",
  42. ]
  43. def send_and_read(
  44. host: str,
  45. port: int,
  46. payload: str,
  47. *,
  48. encoding: str = "utf-8",
  49. shutdown_write: bool = True,
  50. read_timeout: float = 3.0,
  51. ) -> bytes:
  52. data = payload.encode(encoding)
  53. with socket.create_connection((host, port), timeout=5) as sock:
  54. sock.sendall(data)
  55. if shutdown_write:
  56. try:
  57. sock.shutdown(socket.SHUT_WR)
  58. except OSError:
  59. pass
  60. sock.settimeout(read_timeout)
  61. chunks: list[bytes] = []
  62. while True:
  63. try:
  64. block = sock.recv(8192)
  65. except socket.timeout:
  66. break
  67. if not block:
  68. break
  69. chunks.append(block)
  70. return b"".join(chunks)
  71. def interactive(host: str, port: int, shutdown_write: bool) -> None:
  72. print(f"Host={host} Port={port} (empty line = quit)")
  73. print("Presets: 1=GetMarkStatus; 2=GetMarkData; 3=GetMarkedCount;")
  74. print(" 4=0;GetMarkStatus;; 5=0;GetMarkData;; 6=0;GetMarkedCount;;")
  75. while True:
  76. try:
  77. line = input("> ").strip()
  78. except (EOFError, KeyboardInterrupt):
  79. print()
  80. break
  81. if not line:
  82. break
  83. if line == "1":
  84. line = "GetMarkStatus;"
  85. elif line == "2":
  86. line = "GetMarkData;"
  87. elif line == "3":
  88. line = "GetMarkedCount;"
  89. elif line == "4":
  90. line = "0;GetMarkStatus;;"
  91. elif line == "5":
  92. line = "0;GetMarkData;;"
  93. elif line == "6":
  94. line = "0;GetMarkedCount;;"
  95. try:
  96. raw = send_and_read(host, port, line, shutdown_write=shutdown_write)
  97. except OSError as e:
  98. print(f"ERROR: {e}", file=sys.stderr)
  99. continue
  100. text = raw.decode("utf-8", errors="replace")
  101. print(f"Sent {len(line.encode('utf-8'))} bytes, recv {len(raw)} bytes:")
  102. print(repr(text))
  103. if text.strip():
  104. print(text)
  105. def sweep(host: str, port: int, shutdown_write: bool) -> int:
  106. print(f"Sweep Host={host} Port={port}")
  107. invalid_hits = 0
  108. non_empty = 0
  109. for i, payload in enumerate(SWEEP_PAYLOADS, start=1):
  110. try:
  111. raw = send_and_read(host, port, payload, shutdown_write=shutdown_write)
  112. text = raw.decode("utf-8", errors="replace").strip()
  113. except OSError as e:
  114. print(f"[{i:02}] {payload!r} -> ERROR: {e}")
  115. continue
  116. if text:
  117. non_empty += 1
  118. if "errorinvalid data" in text.lower():
  119. invalid_hits += 1
  120. print(f"[{i:02}] {payload!r} -> {text!r}")
  121. print(f"Done. non-empty={non_empty}, invalid-data={invalid_hits}")
  122. return 0
  123. def main() -> None:
  124. ap = argparse.ArgumentParser(description="Send one TCP command to the laser and print the reply.")
  125. ap.add_argument("--host", default=DEFAULT_HOST, help=f"default {DEFAULT_HOST}")
  126. ap.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"default {DEFAULT_PORT}")
  127. ap.add_argument(
  128. "-c",
  129. "--command",
  130. help="Single command to send (e.g. GetMarkStatus;). If omitted, interactive mode.",
  131. )
  132. ap.add_argument(
  133. "--preset",
  134. choices=sorted(PRESETS.keys()),
  135. help="Use a built-in payload instead of --command",
  136. )
  137. ap.add_argument(
  138. "--no-shutdown",
  139. action="store_true",
  140. help="Do not shutdown(SHUT_WR) after send (some devices differ)",
  141. )
  142. ap.add_argument(
  143. "--sweep",
  144. action="store_true",
  145. help="Try multiple payload variants automatically and print all replies.",
  146. )
  147. ap.add_argument(
  148. "command_positional",
  149. nargs="?",
  150. help="Same as -c (so you can run: python testLaser.py GetMarkStatus;)",
  151. )
  152. args = ap.parse_args()
  153. shutdown_write = not args.no_shutdown
  154. if args.command and args.command_positional:
  155. ap.error("Use either -c/--command or the positional argument, not both")
  156. if args.preset and (args.command or args.command_positional):
  157. ap.error("Use either --preset or a custom command, not both")
  158. if args.sweep and (args.preset or args.command or args.command_positional):
  159. ap.error("Use --sweep alone (without --preset/--command)")
  160. if args.sweep:
  161. raise SystemExit(sweep(args.host, args.port, shutdown_write))
  162. if args.preset:
  163. payload, desc = PRESETS[args.preset]
  164. print(f"preset {args.preset}: {desc}")
  165. elif args.command:
  166. payload = args.command
  167. elif args.command_positional:
  168. payload = args.command_positional
  169. else:
  170. interactive(args.host, args.port, shutdown_write)
  171. return
  172. try:
  173. raw = send_and_read(
  174. args.host,
  175. args.port,
  176. payload,
  177. shutdown_write=shutdown_write,
  178. )
  179. except OSError as e:
  180. print(f"ERROR: {e}", file=sys.stderr)
  181. sys.exit(1)
  182. text = raw.decode("utf-8", errors="replace")
  183. print(f"Sent to {args.host}:{args.port} -> {payload!r}")
  184. print(f"Recv {len(raw)} bytes:")
  185. print(repr(text))
  186. if text.strip():
  187. print("---")
  188. print(text)
  189. if __name__ == "__main__":
  190. main()