feat: enhance AVR USART RX handling with software queue and improve logging

pull/47/head
David Montero Crespo 2026-03-12 22:11:06 -03:00
parent b89f85e5f7
commit a07c76470e
2 changed files with 252 additions and 279 deletions

View File

@ -105,10 +105,33 @@ const usart = new AVRUSART(cpu, usart0Config, CLOCK_HZ);
let socket = null;
let txBacklog = []; // bytes queued before TCP connects
// Arduino → Pi: forward transmitted bytes
// Pi → Arduino: software RX queue.
// avr8js USART has no internal RX buffer — writeByte() drops the byte if
// rxBusy is true. onRxComplete fires only in the baud-rate-timed path
// (immediate=false), NOT when the Arduino reads UDR. So we drain this
// queue in runBatch() after each instruction batch when rxBusy is false.
let rxQueue = [];
function tryInjectRx() {
if (rxQueue.length === 0) return;
const byte = rxQueue.shift();
// writeByte(byte, true) returns false on failure, undefined on success (no return stmt)
if (usart.writeByte(byte, true) === false) {
// USART RX disabled — put it back; runBatch will retry next tick
rxQueue.unshift(byte);
}
}
// Arduino → Pi: forward transmitted bytes (line-buffered logging)
let _avrTxLineBuf = '';
usart.onByteTransmit = (byte) => {
const ch = String.fromCharCode(byte);
process.stdout.write(`[AVR->Pi] ${ch === '\n' ? '\\n\n' : ch}`);
if (ch === '\n') {
if (_avrTxLineBuf.trim()) process.stdout.write(`[AVR->Pi] ${_avrTxLineBuf}\n`);
_avrTxLineBuf = '';
} else {
_avrTxLineBuf += ch;
}
if (socket && !socket.destroyed) {
socket.write(Buffer.from([byte]));
@ -127,6 +150,10 @@ function runBatch() {
avrInstruction(cpu);
cpu.tick();
}
// Drain RX queue: inject next byte if Arduino has already read the previous one
if (rxQueue.length > 0 && !usart.rxBusy) {
tryInjectRx();
}
setImmediate(runBatch);
}
@ -154,13 +181,22 @@ function connectToBroker() {
}
});
// Pi → Arduino: feed received bytes into USART RX
// Pi → Arduino: feed received bytes into USART RX via queue (line-buffered log)
let _piRxLineBuf = '';
s.on('data', (chunk) => {
for (const byte of chunk) {
const ch = String.fromCharCode(byte);
process.stdout.write(`[Pi->AVR] ${ch === '\n' ? '\\n\n' : ch}`);
// writeByte(value, immediate=true) bypasses baud-rate timing
usart.writeByte(byte, true);
if (ch === '\n') {
if (_piRxLineBuf.trim()) process.stdout.write(`[Pi->AVR] ${_piRxLineBuf}\n`);
_piRxLineBuf = '';
} else {
_piRxLineBuf += ch;
}
rxQueue.push(byte);
}
// Inject first byte immediately if USART is free
if (rxQueue.length > 0 && !usart.rxBusy) {
tryInjectRx();
}
});

View File

@ -14,44 +14,24 @@ The Pi receives that reply and prints "TEST_PASSED".
Architecture
------------
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Python Test Process (this file) |
| |
| ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| |
| | SerialBroker (asyncio) | |
| | | |
| | TCP server :5555 <||||||||||||||> QEMU Pi (ttyAMA0) | |
| | TCP server :5556 <||> avr_runner.js (Arduino UART) | |
| | | |
| | - Bridges bytes Pi <-> Arduino | |
| | - State machine automates Pi serial console: | |
| | - waits for shell prompt | |
| | - disables TTY echo | |
| | - injects Pi Python test script via base64 | |
| | - Asserts "TEST_PASSED" in Pi output | |
| ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| |
| |
| Subprocesses: |
| - qemu-system-arm (Raspberry Pi 3B, init=/bin/sh) |
| - node avr_runner.js (ATmega328P emulation) |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Pi ttyAMA0 <-> TCP:15555 <-> [SerialBroker] <-> TCP:15556 <-> avr_runner.js
|
(state machine automates Pi console)
Prerequisites
-------------
- qemu-system-arm in PATH
- qemu-system-aarch64 in PATH
- node (Node.js >= 18) in PATH
- arduino-cli in PATH, with arduino:avr core installed
- QEMU images in <repo>/img/:
kernel_extracted.img
bcm271~1.dtb
kernel8.img (64-bit ARM64 Pi3 kernel)
bcm271~1.dtb (BCM2710 device tree)
2025-12-04-raspios-trixie-armhf.img
Run
---
cd <repo>
python test/pi_arduino_serial/test_pi_arduino_serial.py
The test may take several minutes while the Pi boots inside QEMU.
"""
import asyncio
@ -64,7 +44,14 @@ import time
from pathlib import Path
from typing import Optional
# || Paths |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Set UTF-8 for Windows console output
if sys.platform == "win32":
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
if hasattr(sys.stderr, "reconfigure"):
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
# || Paths ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
IMG_DIR = REPO_ROOT / "img"
TEST_DIR = Path(__file__).resolve().parent
@ -72,46 +59,36 @@ TEST_DIR = Path(__file__).resolve().parent
SKETCH_FILE = TEST_DIR / "arduino_sketch.ino"
AVR_RUNNER = TEST_DIR / "avr_runner.js"
KERNEL_IMG = IMG_DIR / "kernel_extracted.img"
DTB_FILE = IMG_DIR / "bcm271~1.dtb" # Windows 8.3 short name
KERNEL_IMG = IMG_DIR / "kernel8.img"
DTB_FILE = IMG_DIR / "bcm271~1.dtb"
SD_IMAGE = IMG_DIR / "2025-12-04-raspios-trixie-armhf.img"
# || Network ports (must be free) |||||||||||||||||||||||||||||||||||||||||||||||
BROKER_PI_PORT = 15555 # Broker listens; QEMU Pi connects here
BROKER_AVR_PORT = 15556 # Broker listens; avr_runner.js connects here
# || Network ports |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
BROKER_PI_PORT = 15555
BROKER_AVR_PORT = 15556
# || Timeouts ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
BOOT_TIMEOUT_S = 360 # 6 min -- Pi boot + shell prompt
SCRIPT_TIMEOUT_S = 45 # Script execution after prompt seen
# || Timeouts ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
BOOT_TIMEOUT_S = 120 # 2 min -- Pi boots in ~5-30s with init=/bin/sh
SCRIPT_TIMEOUT_S = 45 # script execution after prompt seen
COMPILE_TIMEOUT_S = 120 # arduino-cli
# || Pi console state-machine ||||||||||||||||||||||||||||||||||||||||||||||||||
# States
ST_BOOT = "BOOT" # waiting for shell prompt
ST_SETUP = "SETUP" # sent stty -echo, waiting for next prompt
ST_INJECT = "INJECT" # script injected, waiting for TEST_PASSED / TEST_FAILED
# || Pi console state-machine ||||||||||||||||||||||||||||||||||||||||||||||||
ST_BOOT = "BOOT"
ST_SETUP = "SETUP"
ST_INJECT = "INJECT"
ST_DONE = "DONE"
# Patterns that indicate a ready shell prompt
PROMPT_BYTES = [b"# ", b"$ "]
# || Pi Python test script ||||||||||||||||||||||||||||||||||||||||||||||||||||||
# This script is base64-encoded and written to /tmp/pi_test.py on the Pi,
# then executed with "python3 /tmp/pi_test.py".
#
# When python3 runs from the serial console, its stdin/stdout ARE ttyAMA0,
# i.e. the same wire the Arduino is connected to. select() lets us poll
# for incoming bytes without busy-waiting.
# || Pi test script (base64-encoded and injected into the Pi shell) ||||||||||
_PI_SCRIPT_SRC = b"""\
import sys, os, time, select
# --- send trigger message to Arduino ---
sys.stdout.write("HELLO_FROM_PI\\n")
sys.stdout.flush()
# --- wait for Arduino reply ---
resp = b""
deadline = time.time() + 15 # 15-second window
deadline = time.time() + 15
while time.time() < deadline:
readable, _, _ = select.select([sys.stdin], [], [], 1.0)
@ -128,13 +105,8 @@ sys.stdout.flush()
sys.exit(1)
"""
# One-liner shell command injected into the Pi console:
# 1. PATH is set explicitly (init=/bin/sh may not load a profile)
# 2. TTY echo is disabled so our typed command doesn't loop back
# 3. The base64-encoded script is decoded and written to /tmp/pi_test.py
# 4. python3 runs it (stdin = ttyAMA0 = Arduino wire)
_PI_B64 = base64.b64encode(_PI_SCRIPT_SRC).decode()
_PI_CMD = (
_PI_B64 = base64.b64encode(_PI_SCRIPT_SRC).decode()
_PI_CMD = (
"export PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
" && stty -echo"
f" && printf '%s' '{_PI_B64}' | /usr/bin/base64 -d > /tmp/pi_test.py"
@ -142,16 +114,15 @@ _PI_CMD = (
"\n"
)
# Max bytes to keep in the Pi receive buffer (only need last N bytes for prompts)
_PI_BUF_MAX = 8192
# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
class SerialBroker:
"""
Bridges the emulated Pi (QEMU) and the emulated Arduino (avr8js).
Both sides connect to this broker via separate TCP ports.
All bytes are forwarded transparently in both directions while the
broker also automates the Pi serial console to inject and run the
test script.
TCP broker bridging emulated Pi (QEMU) <-> emulated Arduino (avr_runner.js).
Automates the Pi serial console via a state machine.
"""
def __init__(self) -> None:
@ -160,65 +131,64 @@ class SerialBroker:
self._avr_reader: Optional[asyncio.StreamReader] = None
self._avr_writer: Optional[asyncio.StreamWriter] = None
# Accumulate Pi output for pattern matching
# Rolling window of Pi output for prompt matching
self._pi_buf: bytearray = bytearray()
# Full traffic log: list of (direction, bytes)
self.traffic: list[tuple[str, bytes]] = []
# Accumulated Pi output lines for human-readable logging
self._pi_line_buf: str = ""
self._state = ST_BOOT
self._prompt_count = 0
self._script_deadline: float = 0.0
# Signals
self.pi_connected = asyncio.Event()
self.avr_connected = asyncio.Event()
self.result_event = asyncio.Event()
self.test_passed = False
# || Server callbacks |||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def _on_pi_connect(self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter) -> None:
addr = writer.get_extra_info("peername")
print(f"[broker] Pi (QEMU) connected from {addr}")
self._pi_reader = reader
self._pi_writer = writer
# Background relay tasks (kept so we can cancel them)
self._relay_tasks: list[asyncio.Task] = []
# || Server callbacks |||||||||||||||||||||||||||||||||||||||||||||||||||
async def _on_pi_connect(self, r: asyncio.StreamReader,
w: asyncio.StreamWriter) -> None:
print(f"[broker] Pi (QEMU) connected from {w.get_extra_info('peername')}")
self._pi_reader = r
self._pi_writer = w
self.pi_connected.set()
async def _on_avr_connect(self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter) -> None:
addr = writer.get_extra_info("peername")
print(f"[broker] Arduino (avr_runner.js) connected from {addr}")
self._avr_reader = reader
self._avr_writer = writer
async def _on_avr_connect(self, r: asyncio.StreamReader,
w: asyncio.StreamWriter) -> None:
print(f"[broker] Arduino (avr_runner.js) connected from {w.get_extra_info('peername')}")
self._avr_reader = r
self._avr_writer = w
self.avr_connected.set()
# || Start TCP servers ||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def start(self) -> tuple:
# || Start servers ||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def start(self):
pi_srv = await asyncio.start_server(
self._on_pi_connect, "127.0.0.1", BROKER_PI_PORT
)
self._on_pi_connect, "127.0.0.1", BROKER_PI_PORT)
avr_srv = await asyncio.start_server(
self._on_avr_connect, "127.0.0.1", BROKER_AVR_PORT
)
print(f"[broker] Listening -- Pi port:{BROKER_PI_PORT} "
f"Arduino port:{BROKER_AVR_PORT}")
self._on_avr_connect, "127.0.0.1", BROKER_AVR_PORT)
print(f"[broker] Listening -- Pi:{BROKER_PI_PORT} Arduino:{BROKER_AVR_PORT}")
return pi_srv, avr_srv
# || Main relay (call after both sides connected) |||||||||||||||||||||||||||
async def run(self) -> None:
await asyncio.gather(
self._relay_pi_to_avr(),
self._relay_avr_to_pi(),
self._console_automator(),
)
# || Start relay tasks |||||||||||||||||||||||||||||||||||||||||||||||||
def start_relay(self) -> None:
loop = asyncio.get_event_loop()
self._relay_tasks = [
loop.create_task(self._relay_pi_to_avr()),
loop.create_task(self._relay_avr_to_pi()),
loop.create_task(self._console_automator()),
]
# || Pi -> Arduino |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def cancel_relay(self) -> None:
for t in self._relay_tasks:
t.cancel()
# || Pi -> Arduino |||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def _relay_pi_to_avr(self) -> None:
reader = self._pi_reader
if reader is None:
if not reader:
return
while True:
try:
@ -228,17 +198,31 @@ class SerialBroker:
if not data:
break
self._log("Pi->AVR", data)
# Append to rolling buffer, cap size to avoid unbounded growth
self._pi_buf.extend(data)
if len(self._pi_buf) > _PI_BUF_MAX:
del self._pi_buf[:len(self._pi_buf) - _PI_BUF_MAX]
if self._avr_writer and not self._avr_writer.is_closing():
# Log Pi output as lines (not char-by-char)
self._pi_line_buf += data.decode("utf-8", errors="replace")
while "\n" in self._pi_line_buf:
line, self._pi_line_buf = self._pi_line_buf.split("\n", 1)
stripped = line.strip()
if stripped:
print(f" [Pi->AVR] {stripped}")
# Only forward Pi output to Arduino once the test script is running.
# Kernel boot messages must not flood the AVR USART RX queue.
if (self._state == ST_INJECT
and self._avr_writer
and not self._avr_writer.is_closing()):
self._avr_writer.write(data)
await self._avr_writer.drain()
# || Arduino -> Pi |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# || Arduino -> Pi |||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def _relay_avr_to_pi(self) -> None:
reader = self._avr_reader
if reader is None:
if not reader:
return
while True:
try:
@ -248,37 +232,42 @@ class SerialBroker:
if not data:
break
self._log("AVR->Pi", data)
text = data.decode("utf-8", errors="replace").strip()
if text:
print(f" [AVR->Pi] {text}")
if self._pi_writer and not self._pi_writer.is_closing():
self._pi_writer.write(data)
await self._pi_writer.drain()
# || Console state machine ||||||||||||||||||||||||||||||||||||||||||||||||||
# || Console state machine |||||||||||||||||||||||||||||||||||||||||||||
async def _console_automator(self) -> None:
boot_deadline = time.monotonic() + BOOT_TIMEOUT_S
last_poke_time = time.monotonic()
boot_deadline = time.monotonic() + BOOT_TIMEOUT_S
last_poke_time = time.monotonic()
# Give QEMU a moment to start emitting serial output, then poke
# Small delay before first poke so QEMU starts outputting
await asyncio.sleep(3.0)
self._send_to_pi(b"\n")
print(f"[broker] Waiting for shell prompt (boot timeout: {BOOT_TIMEOUT_S}s) ...")
while self._state != ST_DONE:
await asyncio.sleep(0.15)
await asyncio.sleep(0.2)
now = time.monotonic()
# || Global boot timeout ||||||||||||||||||||||||||||||||||||||||||||
# Boot timeout
if now > boot_deadline:
print("\n[broker] [timeout] BOOT TIMEOUT -- shell prompt never appeared")
elapsed = int(now - (boot_deadline - BOOT_TIMEOUT_S))
print(f"\n[broker] BOOT TIMEOUT after {elapsed}s -- shell prompt never seen")
print(f"[broker] Last Pi buffer tail: {bytes(self._pi_buf[-64:])!r}")
self.test_passed = False
self._state = ST_DONE
self.result_event.set()
return
# || Script-execution timeout (after script was injected) ||||||||||
# Script timeout
if self._state == ST_INJECT and self._script_deadline and now > self._script_deadline:
print("\n[broker] [timeout] SCRIPT TIMEOUT -- no result from Pi script")
print("\n[broker] SCRIPT TIMEOUT -- no result from Pi script")
print(f"[broker] Pi buffer tail: {bytes(self._pi_buf[-128:])!r}")
self.test_passed = False
self._state = ST_DONE
self.result_event.set()
@ -286,53 +275,47 @@ class SerialBroker:
buf = bytes(self._pi_buf)
# || ST_BOOT: wait for shell prompt |||||||||||||||||||||||||||||||||
if self._state == ST_BOOT:
# Periodically poke the shell to get a fresh prompt
# (in case we missed the initial one)
if now - last_poke_time > 8.0:
self._send_to_pi(b"\n")
last_poke_time = now
elapsed_s = int(now - (boot_deadline - BOOT_TIMEOUT_S))
print(f"[broker] Still booting... ({elapsed_s}s) buf tail: {buf[-16:]!r}")
if self._prompt_seen(buf):
print("\n[broker] [OK] Shell prompt detected")
print("\n[broker] Shell prompt detected!")
self._pi_buf.clear()
# Disable echo and set PATH, then wait for next prompt
self._send_to_pi(
b"export PATH=/usr/local/sbin:/usr/local/bin"
b":/usr/sbin:/usr/bin:/sbin:/bin && stty -echo\n"
)
self._state = ST_SETUP
self._prompt_count = 0
# || ST_SETUP: wait for prompt after stty -echo |||||||||||||||||||||
elif self._state == ST_SETUP:
if self._prompt_seen(buf) or len(buf) > 10:
# Either got a prompt or the shell already answered
await asyncio.sleep(0.3)
self._pi_buf.clear()
print("[broker] [OK] Environment set -- injecting Pi test script")
print("[broker] Injecting Pi test script ...")
self._send_to_pi(_PI_CMD.encode())
self._state = ST_INJECT
self._script_deadline = time.monotonic() + SCRIPT_TIMEOUT_S
# || ST_INJECT: wait for TEST_PASSED / TEST_FAILED |||||||||||||||||
elif self._state == ST_INJECT:
if b"TEST_PASSED" in buf:
print("\n[broker] [OK] TEST_PASSED received from Pi!")
print("\n[broker] TEST_PASSED received from Pi!")
self.test_passed = True
self._state = ST_DONE
self.result_event.set()
elif b"TEST_FAILED" in buf:
snippet = buf.decode("utf-8", errors="replace")[-120:]
print(f"\n[broker] [FAIL] TEST_FAILED received from Pi\n last output: {snippet!r}")
print(f"\n[broker] TEST_FAILED from Pi. Last output: {snippet!r}")
self.test_passed = False
self._state = ST_DONE
self.result_event.set()
# || Helpers ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# || Helpers |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def _prompt_seen(self, buf: bytes) -> bool:
tail = buf[-32:] # only check the last 32 bytes
tail = buf[-32:]
return any(p in tail for p in PROMPT_BYTES)
def _send_to_pi(self, data: bytes) -> None:
@ -340,24 +323,12 @@ class SerialBroker:
self._pi_writer.write(data)
asyncio.get_event_loop().create_task(self._pi_writer.drain())
def _log(self, direction: str, data: bytes) -> None:
self.traffic.append((direction, data))
text = data.decode("utf-8", errors="replace")
for line in text.splitlines():
stripped = line.strip()
if stripped:
print(f" [{direction}] {stripped}")
# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Compilation
# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def compile_arduino_sketch() -> Path:
"""Compile arduino_sketch.ino via arduino-cli and return the .hex path."""
print("\n[compile] Compiling Arduino sketch with arduino-cli ...")
# arduino-cli requires the sketch to live inside a folder whose name
# matches the .ino file (without extension).
tmp_root = Path(tempfile.mkdtemp())
sk_dir = tmp_root / "arduino_sketch"
build_dir = tmp_root / "build"
@ -366,33 +337,20 @@ def compile_arduino_sketch() -> Path:
shutil.copy(SKETCH_FILE, sk_dir / "arduino_sketch.ino")
cli = shutil.which("arduino-cli") or "arduino-cli"
cmd = [
cli, "compile",
"--fqbn", "arduino:avr:uno",
"--output-dir", str(build_dir),
str(sk_dir),
]
cmd = [cli, "compile", "--fqbn", "arduino:avr:uno",
"--output-dir", str(build_dir), str(sk_dir)]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=COMPILE_TIMEOUT_S,
)
result = subprocess.run(cmd, capture_output=True, text=True,
timeout=COMPILE_TIMEOUT_S)
except FileNotFoundError:
raise RuntimeError(
"arduino-cli not found in PATH.\n"
"Install: https://arduino.github.io/arduino-cli/\n"
"Then: arduino-cli core install arduino:avr"
)
raise RuntimeError("arduino-cli not found in PATH")
except subprocess.TimeoutExpired:
raise RuntimeError("arduino-cli compile timed out")
if result.returncode != 0:
raise RuntimeError(
f"Compilation failed (exit {result.returncode}):\n"
f" STDOUT: {result.stdout.strip()}\n"
f"Compilation failed:\n STDOUT: {result.stdout.strip()}\n"
f" STDERR: {result.stderr.strip()}"
)
@ -401,181 +359,159 @@ def compile_arduino_sketch() -> Path:
raise RuntimeError(f"No .hex file produced in {build_dir}")
hex_path = hex_files[0]
print(f"[compile] [OK] {hex_path.name} ({hex_path.stat().st_size:,} bytes)")
print(f"[compile] OK {hex_path.name} ({hex_path.stat().st_size:,} bytes)")
return hex_path
# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# QEMU SD overlay (qcow2 thin-copy aligned to 8 GiB power-of-2)
# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# QEMU SD overlay
# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def _ensure_sd_overlay() -> Path:
"""
QEMU raspi3b requires the SD card size to be a power of 2.
The Raspbian image (5.29 GiB) does not satisfy this, so we create a
qcow2 overlay that presents the image as 8 GiB without modifying the
original file. The overlay is re-created on every run.
"""
overlay = IMG_DIR / "sd_overlay.qcow2"
overlay = IMG_DIR / "sd_overlay.qcow2"
qemu_img = shutil.which("qemu-img") or "C:/Program Files/qemu/qemu-img.exe"
# Always rebuild so stale overlays don't cause issues
if overlay.exists():
overlay.unlink()
try:
overlay.unlink()
print("[qemu-img] Removed old overlay")
except PermissionError:
print("[qemu-img] WARNING: overlay locked by another process, reusing")
return overlay
cmd = [
qemu_img, "create",
"-f", "qcow2",
"-b", str(SD_IMAGE),
"-F", "raw",
str(overlay),
"8G",
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise RuntimeError(
f"qemu-img overlay creation failed:\n{result.stderr}"
)
print(f"[qemu-img] SD overlay created: {overlay.name} (8 GiB virtual, "
f"backed by {SD_IMAGE.name})")
cmd = [qemu_img, "create", "-f", "qcow2",
"-b", str(SD_IMAGE), "-F", "raw", str(overlay), "8G"]
r = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if r.returncode != 0:
raise RuntimeError(f"qemu-img failed:\n{r.stderr}")
print(f"[qemu-img] Created {overlay.name} (8G virtual, backed by {SD_IMAGE.name})")
return overlay
# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# QEMU command builder
# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def build_qemu_cmd() -> list[str]:
missing = [
f" {label}: {p}"
for label, p in [("kernel", KERNEL_IMG), ("dtb", DTB_FILE), ("sd", SD_IMAGE)]
if not p.exists()
]
missing = [f" {lbl}: {p}" for lbl, p in
[("kernel", KERNEL_IMG), ("dtb", DTB_FILE), ("sd", SD_IMAGE)]
if not p.exists()]
if missing:
raise RuntimeError("Missing QEMU image files:\n" + "\n".join(missing))
raise RuntimeError("Missing QEMU images:\n" + "\n".join(missing))
sd_path = _ensure_sd_overlay()
# raspi3b is only available in qemu-system-aarch64 on this platform
# (qemu-system-arm only ships raspi0/1ap/2b in this Windows build)
sd_path = _ensure_sd_overlay()
qemu_bin = shutil.which("qemu-system-aarch64") or "qemu-system-aarch64"
return [
qemu_bin,
"-M", "raspi3b",
"-kernel", str(KERNEL_IMG),
"-dtb", str(DTB_FILE),
"-drive", f"file={sd_path},if=sd,format=qcow2",
# init=/bin/sh -> skip systemd, get a root shell immediately
# rw -> mount root filesystem read-write
"-append", (
"console=ttyAMA0 "
"-M", "raspi3b",
"-kernel", str(KERNEL_IMG),
"-dtb", str(DTB_FILE),
"-drive", f"file={sd_path},if=sd,format=qcow2",
"-append", (
"console=ttyAMA0,115200 "
"root=/dev/mmcblk0p2 rootwait rw "
"init=/bin/sh "
"dwc_otg.lpm_enable=0"
"init=/bin/sh"
),
"-m", "1G",
"-smp", "4",
"-m", "1G",
"-smp", "4",
"-display", "none",
# Connect Pi ttyAMA0 directly to our broker (broker is the TCP server)
"-serial", f"tcp:127.0.0.1:{BROKER_PI_PORT}",
"-serial", f"tcp:127.0.0.1:{BROKER_PI_PORT}",
]
# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Subprocess log drainer
# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Subprocess stdout drainer
# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def drain_log(stream: Optional[asyncio.StreamReader], prefix: str) -> None:
if stream is None:
return
async for raw in stream:
line = raw.decode("utf-8", errors="replace").rstrip()
if line:
# encode to the console charset, dropping unrepresentable chars
safe = line.encode(sys.stdout.encoding or "ascii", errors="replace").decode(sys.stdout.encoding or "ascii")
print(f"{prefix} {safe}")
print(f"{prefix} {line}")
# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Main test coroutine
# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def run_test() -> bool:
_banner("Pi <-> Arduino Serial Integration Test")
# 1. Compile ---------------------------------------------------------------
# 1. Compile
hex_path = compile_arduino_sketch()
# 2. Start broker ----------------------------------------------------------
# 2. Start broker
broker = SerialBroker()
pi_srv, avr_srv = await broker.start()
procs: list[asyncio.subprocess.Process] = []
drain_tasks: list[asyncio.Task] = []
try:
# 3. Start avr_runner.js (Arduino emulation) ---------------------------
# 3. avr_runner.js (Arduino emulation)
node_exe = shutil.which("node") or "node"
avr_cmd = [
node_exe,
str(AVR_RUNNER),
str(hex_path),
"127.0.0.1",
str(BROKER_AVR_PORT),
]
print(f"\n[avr] Starting Arduino emulator ...\n {' '.join(avr_cmd)}")
avr_cmd = [node_exe, str(AVR_RUNNER), str(hex_path),
"127.0.0.1", str(BROKER_AVR_PORT)]
print(f"\n[avr] {' '.join(avr_cmd[:4])} ...")
avr_proc = await asyncio.create_subprocess_exec(
*avr_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
procs.append(avr_proc)
asyncio.create_task(drain_log(avr_proc.stdout, "[avr]"))
drain_tasks.append(asyncio.create_task(drain_log(avr_proc.stdout, "[avr]")))
# 4. Start QEMU (Raspberry Pi emulation) --------------------------------
# 4. QEMU (Raspberry Pi 3B)
qemu_cmd = build_qemu_cmd()
print(f"\n[qemu] Starting Raspberry Pi 3B emulation ...")
print(f" {' '.join(qemu_cmd[:6])} ...")
print(f"\n[qemu] {' '.join(qemu_cmd[:3])} ...")
qemu_proc = await asyncio.create_subprocess_exec(
*qemu_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
procs.append(qemu_proc)
asyncio.create_task(drain_log(qemu_proc.stdout, "[qemu]"))
drain_tasks.append(asyncio.create_task(drain_log(qemu_proc.stdout, "[qemu]")))
# 5. Wait for both TCP connections -------------------------------------
print(f"\n[broker] Waiting for Pi + Arduino TCP connections (30 s) ...")
# 5. Wait for both TCP connections
print(f"\n[broker] Waiting for Pi + Arduino connections (30s) ...")
try:
await asyncio.wait_for(
asyncio.gather(
broker.pi_connected.wait(),
broker.avr_connected.wait(),
),
asyncio.gather(broker.pi_connected.wait(),
broker.avr_connected.wait()),
timeout=30.0,
)
except asyncio.TimeoutError:
print("[broker] [FAIL] Timeout waiting for TCP connections")
print("[broker] TIMEOUT waiting for TCP connections")
return False
# 6. Start relay + state machine ----------------------------------------
print(f"\n[broker] Both sides connected. Boot timeout: {BOOT_TIMEOUT_S} s\n")
asyncio.create_task(broker.run())
# 6. Start relay + state machine
print(f"\n[broker] Both connected. Starting relay and automator.\n")
broker.start_relay()
# 7. Await test result --------------------------------------------------
# 7. Wait for test result
total_timeout = BOOT_TIMEOUT_S + SCRIPT_TIMEOUT_S + 15
try:
await asyncio.wait_for(
broker.result_event.wait(),
timeout=BOOT_TIMEOUT_S + SCRIPT_TIMEOUT_S + 10,
)
await asyncio.wait_for(broker.result_event.wait(),
timeout=total_timeout)
except asyncio.TimeoutError:
print("[test] [FAIL] Global timeout -- no result received")
print(f"[test] Global timeout ({total_timeout}s) -- no result")
return False
return broker.test_passed
finally:
# Cancel relay tasks
broker.cancel_relay()
# Stop servers
pi_srv.close()
avr_srv.close()
await pi_srv.wait_closed()
await avr_srv.wait_closed()
try:
await asyncio.wait_for(pi_srv.wait_closed(), timeout=3.0)
await asyncio.wait_for(avr_srv.wait_closed(), timeout=3.0)
except asyncio.TimeoutError:
pass
# Terminate subprocesses
for p in procs:
try:
p.terminate()
@ -586,43 +522,44 @@ async def run_test() -> bool:
except Exception:
pass
# Clean up temp compilation dir
# Cancel drain tasks
for t in drain_tasks:
t.cancel()
# Remove temp dir
try:
shutil.rmtree(hex_path.parent.parent, ignore_errors=True)
except Exception:
pass
# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Helpers
# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def _banner(title: str) -> None:
bar = "=" * 65
print(f"\n{bar}\n {title}\n{bar}")
print(f"\n{bar}\n {title}\n{bar}\n")
def _print_result(passed: bool) -> None:
_banner("Result")
if passed:
print(" [PASS] INTEGRATION TEST PASSED\n")
print(" [PASS] INTEGRATION TEST PASSED")
print()
print(" Pi -> Arduino : HELLO_FROM_PI")
print(" Arduino -> Pi : ACK_FROM_ARDUINO")
print(" Pi confirmed : TEST_PASSED")
else:
print(" [FAIL] INTEGRATION TEST FAILED")
print("\n Troubleshooting hints:")
print(" - Confirm qemu-system-arm, node, and arduino-cli are in PATH.")
print(" - Check that init=/bin/sh produces a '#' prompt on the Pi OS.")
print(" - The SD image may require 'pi'/'raspberry' user for python3.")
print()
print(" Hints:")
print(" - Check that qemu-system-aarch64, node, arduino-cli are in PATH")
print(" - Verify init=/bin/sh gives a '#' prompt on Pi OS")
print(" - Ensure sd_overlay.qcow2 is not locked by another QEMU process")
print("=" * 65 + "\n")
# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def main() -> None:
# Windows requires ProactorEventLoop for subprocess + asyncio
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
passed = asyncio.run(run_test())
_print_result(passed)
sys.exit(0 if passed else 1)