diff --git a/test/pi_arduino_serial/avr_runner.js b/test/pi_arduino_serial/avr_runner.js index 29cdfb5..7b32a96 100644 --- a/test/pi_arduino_serial/avr_runner.js +++ b/test/pi_arduino_serial/avr_runner.js @@ -105,10 +105,33 @@ const usart = new AVRUSART(cpu, usart0Config, CLOCK_HZ); let socket = null; let txBacklog = []; // bytes queued before TCP connects -// Arduino → Pi: forward transmitted bytes +// Pi → Arduino: software RX queue. +// avr8js USART has no internal RX buffer — writeByte() drops the byte if +// rxBusy is true. onRxComplete fires only in the baud-rate-timed path +// (immediate=false), NOT when the Arduino reads UDR. So we drain this +// queue in runBatch() after each instruction batch when rxBusy is false. +let rxQueue = []; + +function tryInjectRx() { + if (rxQueue.length === 0) return; + const byte = rxQueue.shift(); + // writeByte(byte, true) returns false on failure, undefined on success (no return stmt) + if (usart.writeByte(byte, true) === false) { + // USART RX disabled — put it back; runBatch will retry next tick + rxQueue.unshift(byte); + } +} + +// Arduino → Pi: forward transmitted bytes (line-buffered logging) +let _avrTxLineBuf = ''; usart.onByteTransmit = (byte) => { const ch = String.fromCharCode(byte); - process.stdout.write(`[AVR->Pi] ${ch === '\n' ? '\\n\n' : ch}`); + if (ch === '\n') { + if (_avrTxLineBuf.trim()) process.stdout.write(`[AVR->Pi] ${_avrTxLineBuf}\n`); + _avrTxLineBuf = ''; + } else { + _avrTxLineBuf += ch; + } if (socket && !socket.destroyed) { socket.write(Buffer.from([byte])); @@ -127,6 +150,10 @@ function runBatch() { avrInstruction(cpu); cpu.tick(); } + // Drain RX queue: inject next byte if Arduino has already read the previous one + if (rxQueue.length > 0 && !usart.rxBusy) { + tryInjectRx(); + } setImmediate(runBatch); } @@ -154,13 +181,22 @@ function connectToBroker() { } }); - // Pi → Arduino: feed received bytes into USART RX + // Pi → Arduino: feed received bytes into USART RX via queue (line-buffered log) + let _piRxLineBuf = ''; s.on('data', (chunk) => { for (const byte of chunk) { const ch = String.fromCharCode(byte); - process.stdout.write(`[Pi->AVR] ${ch === '\n' ? '\\n\n' : ch}`); - // writeByte(value, immediate=true) bypasses baud-rate timing - usart.writeByte(byte, true); + if (ch === '\n') { + if (_piRxLineBuf.trim()) process.stdout.write(`[Pi->AVR] ${_piRxLineBuf}\n`); + _piRxLineBuf = ''; + } else { + _piRxLineBuf += ch; + } + rxQueue.push(byte); + } + // Inject first byte immediately if USART is free + if (rxQueue.length > 0 && !usart.rxBusy) { + tryInjectRx(); } }); diff --git a/test/pi_arduino_serial/test_pi_arduino_serial.py b/test/pi_arduino_serial/test_pi_arduino_serial.py index b138828..53dce17 100644 --- a/test/pi_arduino_serial/test_pi_arduino_serial.py +++ b/test/pi_arduino_serial/test_pi_arduino_serial.py @@ -14,44 +14,24 @@ The Pi receives that reply and prints "TEST_PASSED". Architecture ------------ - |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| - | Python Test Process (this file) | - | | - | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| | - | | SerialBroker (asyncio) | | - | | | | - | | TCP server :5555 <||||||||||||||> QEMU Pi (ttyAMA0) | | - | | TCP server :5556 <||> avr_runner.js (Arduino UART) | | - | | | | - | | - Bridges bytes Pi <-> Arduino | | - | | - State machine automates Pi serial console: | | - | | - waits for shell prompt | | - | | - disables TTY echo | | - | | - injects Pi Python test script via base64 | | - | | - Asserts "TEST_PASSED" in Pi output | | - | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| | - | | - | Subprocesses: | - | - qemu-system-arm (Raspberry Pi 3B, init=/bin/sh) | - | - node avr_runner.js (ATmega328P emulation) | - |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| + Pi ttyAMA0 <-> TCP:15555 <-> [SerialBroker] <-> TCP:15556 <-> avr_runner.js + | + (state machine automates Pi console) Prerequisites ------------- - - qemu-system-arm in PATH + - qemu-system-aarch64 in PATH - node (Node.js >= 18) in PATH - arduino-cli in PATH, with arduino:avr core installed - QEMU images in /img/: - kernel_extracted.img - bcm271~1.dtb + kernel8.img (64-bit ARM64 Pi3 kernel) + bcm271~1.dtb (BCM2710 device tree) 2025-12-04-raspios-trixie-armhf.img Run --- cd python test/pi_arduino_serial/test_pi_arduino_serial.py - -The test may take several minutes while the Pi boots inside QEMU. """ import asyncio @@ -64,7 +44,14 @@ import time from pathlib import Path from typing import Optional -# || Paths ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| +# Set UTF-8 for Windows console output +if sys.platform == "win32": + if hasattr(sys.stdout, "reconfigure"): + sys.stdout.reconfigure(encoding="utf-8", errors="replace") + if hasattr(sys.stderr, "reconfigure"): + sys.stderr.reconfigure(encoding="utf-8", errors="replace") + +# || Paths |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| REPO_ROOT = Path(__file__).resolve().parent.parent.parent IMG_DIR = REPO_ROOT / "img" TEST_DIR = Path(__file__).resolve().parent @@ -72,46 +59,36 @@ TEST_DIR = Path(__file__).resolve().parent SKETCH_FILE = TEST_DIR / "arduino_sketch.ino" AVR_RUNNER = TEST_DIR / "avr_runner.js" -KERNEL_IMG = IMG_DIR / "kernel_extracted.img" -DTB_FILE = IMG_DIR / "bcm271~1.dtb" # Windows 8.3 short name +KERNEL_IMG = IMG_DIR / "kernel8.img" +DTB_FILE = IMG_DIR / "bcm271~1.dtb" SD_IMAGE = IMG_DIR / "2025-12-04-raspios-trixie-armhf.img" -# || Network ports (must be free) ||||||||||||||||||||||||||||||||||||||||||||||| -BROKER_PI_PORT = 15555 # Broker listens; QEMU Pi connects here -BROKER_AVR_PORT = 15556 # Broker listens; avr_runner.js connects here +# || Network ports ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| +BROKER_PI_PORT = 15555 +BROKER_AVR_PORT = 15556 -# || Timeouts |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| -BOOT_TIMEOUT_S = 360 # 6 min -- Pi boot + shell prompt -SCRIPT_TIMEOUT_S = 45 # Script execution after prompt seen +# || Timeouts |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| +BOOT_TIMEOUT_S = 120 # 2 min -- Pi boots in ~5-30s with init=/bin/sh +SCRIPT_TIMEOUT_S = 45 # script execution after prompt seen COMPILE_TIMEOUT_S = 120 # arduino-cli -# || Pi console state-machine |||||||||||||||||||||||||||||||||||||||||||||||||| -# States -ST_BOOT = "BOOT" # waiting for shell prompt -ST_SETUP = "SETUP" # sent stty -echo, waiting for next prompt -ST_INJECT = "INJECT" # script injected, waiting for TEST_PASSED / TEST_FAILED +# || Pi console state-machine |||||||||||||||||||||||||||||||||||||||||||||||| +ST_BOOT = "BOOT" +ST_SETUP = "SETUP" +ST_INJECT = "INJECT" ST_DONE = "DONE" -# Patterns that indicate a ready shell prompt PROMPT_BYTES = [b"# ", b"$ "] -# || Pi Python test script |||||||||||||||||||||||||||||||||||||||||||||||||||||| -# This script is base64-encoded and written to /tmp/pi_test.py on the Pi, -# then executed with "python3 /tmp/pi_test.py". -# -# When python3 runs from the serial console, its stdin/stdout ARE ttyAMA0, -# i.e. the same wire the Arduino is connected to. select() lets us poll -# for incoming bytes without busy-waiting. +# || Pi test script (base64-encoded and injected into the Pi shell) |||||||||| _PI_SCRIPT_SRC = b"""\ import sys, os, time, select -# --- send trigger message to Arduino --- sys.stdout.write("HELLO_FROM_PI\\n") sys.stdout.flush() -# --- wait for Arduino reply --- resp = b"" -deadline = time.time() + 15 # 15-second window +deadline = time.time() + 15 while time.time() < deadline: readable, _, _ = select.select([sys.stdin], [], [], 1.0) @@ -128,13 +105,8 @@ sys.stdout.flush() sys.exit(1) """ -# One-liner shell command injected into the Pi console: -# 1. PATH is set explicitly (init=/bin/sh may not load a profile) -# 2. TTY echo is disabled so our typed command doesn't loop back -# 3. The base64-encoded script is decoded and written to /tmp/pi_test.py -# 4. python3 runs it (stdin = ttyAMA0 = Arduino wire) -_PI_B64 = base64.b64encode(_PI_SCRIPT_SRC).decode() -_PI_CMD = ( +_PI_B64 = base64.b64encode(_PI_SCRIPT_SRC).decode() +_PI_CMD = ( "export PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" " && stty -echo" f" && printf '%s' '{_PI_B64}' | /usr/bin/base64 -d > /tmp/pi_test.py" @@ -142,16 +114,15 @@ _PI_CMD = ( "\n" ) +# Max bytes to keep in the Pi receive buffer (only need last N bytes for prompts) +_PI_BUF_MAX = 8192 -# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| + +# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| class SerialBroker: """ - Bridges the emulated Pi (QEMU) and the emulated Arduino (avr8js). - - Both sides connect to this broker via separate TCP ports. - All bytes are forwarded transparently in both directions while the - broker also automates the Pi serial console to inject and run the - test script. + TCP broker bridging emulated Pi (QEMU) <-> emulated Arduino (avr_runner.js). + Automates the Pi serial console via a state machine. """ def __init__(self) -> None: @@ -160,65 +131,64 @@ class SerialBroker: self._avr_reader: Optional[asyncio.StreamReader] = None self._avr_writer: Optional[asyncio.StreamWriter] = None - # Accumulate Pi output for pattern matching + # Rolling window of Pi output for prompt matching self._pi_buf: bytearray = bytearray() - # Full traffic log: list of (direction, bytes) - self.traffic: list[tuple[str, bytes]] = [] + # Accumulated Pi output lines for human-readable logging + self._pi_line_buf: str = "" self._state = ST_BOOT - self._prompt_count = 0 self._script_deadline: float = 0.0 - # Signals self.pi_connected = asyncio.Event() self.avr_connected = asyncio.Event() self.result_event = asyncio.Event() self.test_passed = False - # || Server callbacks ||||||||||||||||||||||||||||||||||||||||||||||||||||||| - async def _on_pi_connect(self, - reader: asyncio.StreamReader, - writer: asyncio.StreamWriter) -> None: - addr = writer.get_extra_info("peername") - print(f"[broker] Pi (QEMU) connected from {addr}") - self._pi_reader = reader - self._pi_writer = writer + # Background relay tasks (kept so we can cancel them) + self._relay_tasks: list[asyncio.Task] = [] + + # || Server callbacks ||||||||||||||||||||||||||||||||||||||||||||||||||| + async def _on_pi_connect(self, r: asyncio.StreamReader, + w: asyncio.StreamWriter) -> None: + print(f"[broker] Pi (QEMU) connected from {w.get_extra_info('peername')}") + self._pi_reader = r + self._pi_writer = w self.pi_connected.set() - async def _on_avr_connect(self, - reader: asyncio.StreamReader, - writer: asyncio.StreamWriter) -> None: - addr = writer.get_extra_info("peername") - print(f"[broker] Arduino (avr_runner.js) connected from {addr}") - self._avr_reader = reader - self._avr_writer = writer + async def _on_avr_connect(self, r: asyncio.StreamReader, + w: asyncio.StreamWriter) -> None: + print(f"[broker] Arduino (avr_runner.js) connected from {w.get_extra_info('peername')}") + self._avr_reader = r + self._avr_writer = w self.avr_connected.set() - # || Start TCP servers |||||||||||||||||||||||||||||||||||||||||||||||||||||| - async def start(self) -> tuple: + # || Start servers |||||||||||||||||||||||||||||||||||||||||||||||||||||| + async def start(self): pi_srv = await asyncio.start_server( - self._on_pi_connect, "127.0.0.1", BROKER_PI_PORT - ) + self._on_pi_connect, "127.0.0.1", BROKER_PI_PORT) avr_srv = await asyncio.start_server( - self._on_avr_connect, "127.0.0.1", BROKER_AVR_PORT - ) - print(f"[broker] Listening -- Pi port:{BROKER_PI_PORT} " - f"Arduino port:{BROKER_AVR_PORT}") + self._on_avr_connect, "127.0.0.1", BROKER_AVR_PORT) + print(f"[broker] Listening -- Pi:{BROKER_PI_PORT} Arduino:{BROKER_AVR_PORT}") return pi_srv, avr_srv - # || Main relay (call after both sides connected) ||||||||||||||||||||||||||| - async def run(self) -> None: - await asyncio.gather( - self._relay_pi_to_avr(), - self._relay_avr_to_pi(), - self._console_automator(), - ) + # || Start relay tasks ||||||||||||||||||||||||||||||||||||||||||||||||| + def start_relay(self) -> None: + loop = asyncio.get_event_loop() + self._relay_tasks = [ + loop.create_task(self._relay_pi_to_avr()), + loop.create_task(self._relay_avr_to_pi()), + loop.create_task(self._console_automator()), + ] - # || Pi -> Arduino ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| + def cancel_relay(self) -> None: + for t in self._relay_tasks: + t.cancel() + + # || Pi -> Arduino ||||||||||||||||||||||||||||||||||||||||||||||||||||||| async def _relay_pi_to_avr(self) -> None: reader = self._pi_reader - if reader is None: + if not reader: return while True: try: @@ -228,17 +198,31 @@ class SerialBroker: if not data: break - self._log("Pi->AVR", data) + # Append to rolling buffer, cap size to avoid unbounded growth self._pi_buf.extend(data) + if len(self._pi_buf) > _PI_BUF_MAX: + del self._pi_buf[:len(self._pi_buf) - _PI_BUF_MAX] - if self._avr_writer and not self._avr_writer.is_closing(): + # Log Pi output as lines (not char-by-char) + self._pi_line_buf += data.decode("utf-8", errors="replace") + while "\n" in self._pi_line_buf: + line, self._pi_line_buf = self._pi_line_buf.split("\n", 1) + stripped = line.strip() + if stripped: + print(f" [Pi->AVR] {stripped}") + + # Only forward Pi output to Arduino once the test script is running. + # Kernel boot messages must not flood the AVR USART RX queue. + if (self._state == ST_INJECT + and self._avr_writer + and not self._avr_writer.is_closing()): self._avr_writer.write(data) await self._avr_writer.drain() - # || Arduino -> Pi ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| + # || Arduino -> Pi ||||||||||||||||||||||||||||||||||||||||||||||||||||||| async def _relay_avr_to_pi(self) -> None: reader = self._avr_reader - if reader is None: + if not reader: return while True: try: @@ -248,37 +232,42 @@ class SerialBroker: if not data: break - self._log("AVR->Pi", data) + text = data.decode("utf-8", errors="replace").strip() + if text: + print(f" [AVR->Pi] {text}") if self._pi_writer and not self._pi_writer.is_closing(): self._pi_writer.write(data) await self._pi_writer.drain() - # || Console state machine |||||||||||||||||||||||||||||||||||||||||||||||||| + # || Console state machine ||||||||||||||||||||||||||||||||||||||||||||| async def _console_automator(self) -> None: - boot_deadline = time.monotonic() + BOOT_TIMEOUT_S - last_poke_time = time.monotonic() + boot_deadline = time.monotonic() + BOOT_TIMEOUT_S + last_poke_time = time.monotonic() - # Give QEMU a moment to start emitting serial output, then poke + # Small delay before first poke so QEMU starts outputting await asyncio.sleep(3.0) self._send_to_pi(b"\n") + print(f"[broker] Waiting for shell prompt (boot timeout: {BOOT_TIMEOUT_S}s) ...") while self._state != ST_DONE: - await asyncio.sleep(0.15) - + await asyncio.sleep(0.2) now = time.monotonic() - # || Global boot timeout |||||||||||||||||||||||||||||||||||||||||||| + # Boot timeout if now > boot_deadline: - print("\n[broker] [timeout] BOOT TIMEOUT -- shell prompt never appeared") + elapsed = int(now - (boot_deadline - BOOT_TIMEOUT_S)) + print(f"\n[broker] BOOT TIMEOUT after {elapsed}s -- shell prompt never seen") + print(f"[broker] Last Pi buffer tail: {bytes(self._pi_buf[-64:])!r}") self.test_passed = False self._state = ST_DONE self.result_event.set() return - # || Script-execution timeout (after script was injected) |||||||||| + # Script timeout if self._state == ST_INJECT and self._script_deadline and now > self._script_deadline: - print("\n[broker] [timeout] SCRIPT TIMEOUT -- no result from Pi script") + print("\n[broker] SCRIPT TIMEOUT -- no result from Pi script") + print(f"[broker] Pi buffer tail: {bytes(self._pi_buf[-128:])!r}") self.test_passed = False self._state = ST_DONE self.result_event.set() @@ -286,53 +275,47 @@ class SerialBroker: buf = bytes(self._pi_buf) - # || ST_BOOT: wait for shell prompt ||||||||||||||||||||||||||||||||| if self._state == ST_BOOT: - # Periodically poke the shell to get a fresh prompt - # (in case we missed the initial one) if now - last_poke_time > 8.0: self._send_to_pi(b"\n") last_poke_time = now + elapsed_s = int(now - (boot_deadline - BOOT_TIMEOUT_S)) + print(f"[broker] Still booting... ({elapsed_s}s) buf tail: {buf[-16:]!r}") if self._prompt_seen(buf): - print("\n[broker] [OK] Shell prompt detected") + print("\n[broker] Shell prompt detected!") self._pi_buf.clear() - # Disable echo and set PATH, then wait for next prompt self._send_to_pi( b"export PATH=/usr/local/sbin:/usr/local/bin" b":/usr/sbin:/usr/bin:/sbin:/bin && stty -echo\n" ) self._state = ST_SETUP - self._prompt_count = 0 - # || ST_SETUP: wait for prompt after stty -echo ||||||||||||||||||||| elif self._state == ST_SETUP: if self._prompt_seen(buf) or len(buf) > 10: - # Either got a prompt or the shell already answered await asyncio.sleep(0.3) self._pi_buf.clear() - print("[broker] [OK] Environment set -- injecting Pi test script") + print("[broker] Injecting Pi test script ...") self._send_to_pi(_PI_CMD.encode()) self._state = ST_INJECT self._script_deadline = time.monotonic() + SCRIPT_TIMEOUT_S - # || ST_INJECT: wait for TEST_PASSED / TEST_FAILED ||||||||||||||||| elif self._state == ST_INJECT: if b"TEST_PASSED" in buf: - print("\n[broker] [OK] TEST_PASSED received from Pi!") + print("\n[broker] TEST_PASSED received from Pi!") self.test_passed = True self._state = ST_DONE self.result_event.set() elif b"TEST_FAILED" in buf: snippet = buf.decode("utf-8", errors="replace")[-120:] - print(f"\n[broker] [FAIL] TEST_FAILED received from Pi\n last output: {snippet!r}") + print(f"\n[broker] TEST_FAILED from Pi. Last output: {snippet!r}") self.test_passed = False self._state = ST_DONE self.result_event.set() - # || Helpers |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| + # || Helpers ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| def _prompt_seen(self, buf: bytes) -> bool: - tail = buf[-32:] # only check the last 32 bytes + tail = buf[-32:] return any(p in tail for p in PROMPT_BYTES) def _send_to_pi(self, data: bytes) -> None: @@ -340,24 +323,12 @@ class SerialBroker: self._pi_writer.write(data) asyncio.get_event_loop().create_task(self._pi_writer.drain()) - def _log(self, direction: str, data: bytes) -> None: - self.traffic.append((direction, data)) - text = data.decode("utf-8", errors="replace") - for line in text.splitlines(): - stripped = line.strip() - if stripped: - print(f" [{direction}] {stripped}") - -# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| +# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| # Compilation -# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| +# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| def compile_arduino_sketch() -> Path: - """Compile arduino_sketch.ino via arduino-cli and return the .hex path.""" print("\n[compile] Compiling Arduino sketch with arduino-cli ...") - - # arduino-cli requires the sketch to live inside a folder whose name - # matches the .ino file (without extension). tmp_root = Path(tempfile.mkdtemp()) sk_dir = tmp_root / "arduino_sketch" build_dir = tmp_root / "build" @@ -366,33 +337,20 @@ def compile_arduino_sketch() -> Path: shutil.copy(SKETCH_FILE, sk_dir / "arduino_sketch.ino") cli = shutil.which("arduino-cli") or "arduino-cli" - cmd = [ - cli, "compile", - "--fqbn", "arduino:avr:uno", - "--output-dir", str(build_dir), - str(sk_dir), - ] + cmd = [cli, "compile", "--fqbn", "arduino:avr:uno", + "--output-dir", str(build_dir), str(sk_dir)] try: - result = subprocess.run( - cmd, - capture_output=True, - text=True, - timeout=COMPILE_TIMEOUT_S, - ) + result = subprocess.run(cmd, capture_output=True, text=True, + timeout=COMPILE_TIMEOUT_S) except FileNotFoundError: - raise RuntimeError( - "arduino-cli not found in PATH.\n" - "Install: https://arduino.github.io/arduino-cli/\n" - "Then: arduino-cli core install arduino:avr" - ) + raise RuntimeError("arduino-cli not found in PATH") except subprocess.TimeoutExpired: raise RuntimeError("arduino-cli compile timed out") if result.returncode != 0: raise RuntimeError( - f"Compilation failed (exit {result.returncode}):\n" - f" STDOUT: {result.stdout.strip()}\n" + f"Compilation failed:\n STDOUT: {result.stdout.strip()}\n" f" STDERR: {result.stderr.strip()}" ) @@ -401,181 +359,159 @@ def compile_arduino_sketch() -> Path: raise RuntimeError(f"No .hex file produced in {build_dir}") hex_path = hex_files[0] - print(f"[compile] [OK] {hex_path.name} ({hex_path.stat().st_size:,} bytes)") + print(f"[compile] OK {hex_path.name} ({hex_path.stat().st_size:,} bytes)") return hex_path -# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| -# QEMU SD overlay (qcow2 thin-copy aligned to 8 GiB power-of-2) -# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| +# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| +# QEMU SD overlay +# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| def _ensure_sd_overlay() -> Path: - """ - QEMU raspi3b requires the SD card size to be a power of 2. - The Raspbian image (5.29 GiB) does not satisfy this, so we create a - qcow2 overlay that presents the image as 8 GiB without modifying the - original file. The overlay is re-created on every run. - """ - overlay = IMG_DIR / "sd_overlay.qcow2" + overlay = IMG_DIR / "sd_overlay.qcow2" qemu_img = shutil.which("qemu-img") or "C:/Program Files/qemu/qemu-img.exe" - # Always rebuild so stale overlays don't cause issues if overlay.exists(): - overlay.unlink() + try: + overlay.unlink() + print("[qemu-img] Removed old overlay") + except PermissionError: + print("[qemu-img] WARNING: overlay locked by another process, reusing") + return overlay - cmd = [ - qemu_img, "create", - "-f", "qcow2", - "-b", str(SD_IMAGE), - "-F", "raw", - str(overlay), - "8G", - ] - result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) - if result.returncode != 0: - raise RuntimeError( - f"qemu-img overlay creation failed:\n{result.stderr}" - ) - print(f"[qemu-img] SD overlay created: {overlay.name} (8 GiB virtual, " - f"backed by {SD_IMAGE.name})") + cmd = [qemu_img, "create", "-f", "qcow2", + "-b", str(SD_IMAGE), "-F", "raw", str(overlay), "8G"] + r = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + if r.returncode != 0: + raise RuntimeError(f"qemu-img failed:\n{r.stderr}") + print(f"[qemu-img] Created {overlay.name} (8G virtual, backed by {SD_IMAGE.name})") return overlay -# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| +# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| # QEMU command builder -# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| +# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| def build_qemu_cmd() -> list[str]: - missing = [ - f" {label}: {p}" - for label, p in [("kernel", KERNEL_IMG), ("dtb", DTB_FILE), ("sd", SD_IMAGE)] - if not p.exists() - ] + missing = [f" {lbl}: {p}" for lbl, p in + [("kernel", KERNEL_IMG), ("dtb", DTB_FILE), ("sd", SD_IMAGE)] + if not p.exists()] if missing: - raise RuntimeError("Missing QEMU image files:\n" + "\n".join(missing)) + raise RuntimeError("Missing QEMU images:\n" + "\n".join(missing)) - sd_path = _ensure_sd_overlay() - - # raspi3b is only available in qemu-system-aarch64 on this platform - # (qemu-system-arm only ships raspi0/1ap/2b in this Windows build) + sd_path = _ensure_sd_overlay() qemu_bin = shutil.which("qemu-system-aarch64") or "qemu-system-aarch64" return [ qemu_bin, - "-M", "raspi3b", - "-kernel", str(KERNEL_IMG), - "-dtb", str(DTB_FILE), - "-drive", f"file={sd_path},if=sd,format=qcow2", - # init=/bin/sh -> skip systemd, get a root shell immediately - # rw -> mount root filesystem read-write - "-append", ( - "console=ttyAMA0 " + "-M", "raspi3b", + "-kernel", str(KERNEL_IMG), + "-dtb", str(DTB_FILE), + "-drive", f"file={sd_path},if=sd,format=qcow2", + "-append", ( + "console=ttyAMA0,115200 " "root=/dev/mmcblk0p2 rootwait rw " - "init=/bin/sh " - "dwc_otg.lpm_enable=0" + "init=/bin/sh" ), - "-m", "1G", - "-smp", "4", + "-m", "1G", + "-smp", "4", "-display", "none", - # Connect Pi ttyAMA0 directly to our broker (broker is the TCP server) - "-serial", f"tcp:127.0.0.1:{BROKER_PI_PORT}", + "-serial", f"tcp:127.0.0.1:{BROKER_PI_PORT}", ] -# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| -# Subprocess log drainer -# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| +# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| +# Subprocess stdout drainer +# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| async def drain_log(stream: Optional[asyncio.StreamReader], prefix: str) -> None: if stream is None: return async for raw in stream: line = raw.decode("utf-8", errors="replace").rstrip() if line: - # encode to the console charset, dropping unrepresentable chars - safe = line.encode(sys.stdout.encoding or "ascii", errors="replace").decode(sys.stdout.encoding or "ascii") - print(f"{prefix} {safe}") + print(f"{prefix} {line}") -# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| +# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| # Main test coroutine -# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| +# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| async def run_test() -> bool: _banner("Pi <-> Arduino Serial Integration Test") - # 1. Compile --------------------------------------------------------------- + # 1. Compile hex_path = compile_arduino_sketch() - # 2. Start broker ---------------------------------------------------------- + # 2. Start broker broker = SerialBroker() pi_srv, avr_srv = await broker.start() procs: list[asyncio.subprocess.Process] = [] + drain_tasks: list[asyncio.Task] = [] try: - # 3. Start avr_runner.js (Arduino emulation) --------------------------- + # 3. avr_runner.js (Arduino emulation) node_exe = shutil.which("node") or "node" - avr_cmd = [ - node_exe, - str(AVR_RUNNER), - str(hex_path), - "127.0.0.1", - str(BROKER_AVR_PORT), - ] - print(f"\n[avr] Starting Arduino emulator ...\n {' '.join(avr_cmd)}") + avr_cmd = [node_exe, str(AVR_RUNNER), str(hex_path), + "127.0.0.1", str(BROKER_AVR_PORT)] + print(f"\n[avr] {' '.join(avr_cmd[:4])} ...") avr_proc = await asyncio.create_subprocess_exec( *avr_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) procs.append(avr_proc) - asyncio.create_task(drain_log(avr_proc.stdout, "[avr]")) + drain_tasks.append(asyncio.create_task(drain_log(avr_proc.stdout, "[avr]"))) - # 4. Start QEMU (Raspberry Pi emulation) -------------------------------- + # 4. QEMU (Raspberry Pi 3B) qemu_cmd = build_qemu_cmd() - print(f"\n[qemu] Starting Raspberry Pi 3B emulation ...") - print(f" {' '.join(qemu_cmd[:6])} ...") + print(f"\n[qemu] {' '.join(qemu_cmd[:3])} ...") qemu_proc = await asyncio.create_subprocess_exec( *qemu_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) procs.append(qemu_proc) - asyncio.create_task(drain_log(qemu_proc.stdout, "[qemu]")) + drain_tasks.append(asyncio.create_task(drain_log(qemu_proc.stdout, "[qemu]"))) - # 5. Wait for both TCP connections ------------------------------------- - print(f"\n[broker] Waiting for Pi + Arduino TCP connections (30 s) ...") + # 5. Wait for both TCP connections + print(f"\n[broker] Waiting for Pi + Arduino connections (30s) ...") try: await asyncio.wait_for( - asyncio.gather( - broker.pi_connected.wait(), - broker.avr_connected.wait(), - ), + asyncio.gather(broker.pi_connected.wait(), + broker.avr_connected.wait()), timeout=30.0, ) except asyncio.TimeoutError: - print("[broker] [FAIL] Timeout waiting for TCP connections") + print("[broker] TIMEOUT waiting for TCP connections") return False - # 6. Start relay + state machine ---------------------------------------- - print(f"\n[broker] Both sides connected. Boot timeout: {BOOT_TIMEOUT_S} s\n") - asyncio.create_task(broker.run()) + # 6. Start relay + state machine + print(f"\n[broker] Both connected. Starting relay and automator.\n") + broker.start_relay() - # 7. Await test result -------------------------------------------------- + # 7. Wait for test result + total_timeout = BOOT_TIMEOUT_S + SCRIPT_TIMEOUT_S + 15 try: - await asyncio.wait_for( - broker.result_event.wait(), - timeout=BOOT_TIMEOUT_S + SCRIPT_TIMEOUT_S + 10, - ) + await asyncio.wait_for(broker.result_event.wait(), + timeout=total_timeout) except asyncio.TimeoutError: - print("[test] [FAIL] Global timeout -- no result received") + print(f"[test] Global timeout ({total_timeout}s) -- no result") return False return broker.test_passed finally: + # Cancel relay tasks + broker.cancel_relay() + + # Stop servers pi_srv.close() avr_srv.close() - await pi_srv.wait_closed() - await avr_srv.wait_closed() + try: + await asyncio.wait_for(pi_srv.wait_closed(), timeout=3.0) + await asyncio.wait_for(avr_srv.wait_closed(), timeout=3.0) + except asyncio.TimeoutError: + pass + # Terminate subprocesses for p in procs: try: p.terminate() @@ -586,43 +522,44 @@ async def run_test() -> bool: except Exception: pass - # Clean up temp compilation dir + # Cancel drain tasks + for t in drain_tasks: + t.cancel() + + # Remove temp dir try: shutil.rmtree(hex_path.parent.parent, ignore_errors=True) except Exception: pass -# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| -# Helpers -# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| +# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| def _banner(title: str) -> None: bar = "=" * 65 - print(f"\n{bar}\n {title}\n{bar}") + print(f"\n{bar}\n {title}\n{bar}\n") def _print_result(passed: bool) -> None: _banner("Result") if passed: - print(" [PASS] INTEGRATION TEST PASSED\n") + print(" [PASS] INTEGRATION TEST PASSED") + print() print(" Pi -> Arduino : HELLO_FROM_PI") print(" Arduino -> Pi : ACK_FROM_ARDUINO") print(" Pi confirmed : TEST_PASSED") else: print(" [FAIL] INTEGRATION TEST FAILED") - print("\n Troubleshooting hints:") - print(" - Confirm qemu-system-arm, node, and arduino-cli are in PATH.") - print(" - Check that init=/bin/sh produces a '#' prompt on the Pi OS.") - print(" - The SD image may require 'pi'/'raspberry' user for python3.") + print() + print(" Hints:") + print(" - Check that qemu-system-aarch64, node, arduino-cli are in PATH") + print(" - Verify init=/bin/sh gives a '#' prompt on Pi OS") + print(" - Ensure sd_overlay.qcow2 is not locked by another QEMU process") print("=" * 65 + "\n") -# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| def main() -> None: - # Windows requires ProactorEventLoop for subprocess + asyncio if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) - passed = asyncio.run(run_test()) _print_result(passed) sys.exit(0 if passed else 1)