+
+ {/* Brand */}
-
-
-
Examples
-
+ {/* Main nav links (desktop) */}
+
-
-
- GitHub
-
+ {/* Right: auth + mobile hamburger */}
+
+ {/* Auth UI */}
+ {user ? (
+
+
- {/* Auth UI */}
- {user ? (
-
-
- {dropdownOpen && (
-
-
setDropdownOpen(false)}
- style={{ display: 'block', padding: '9px 14px', color: '#ccc', textDecoration: 'none', fontSize: 13 }}
- >
- My projects
-
-
-
- Sign out
-
-
- )}
-
- ) : (
-
-
- Sign in
-
-
- Sign up
-
-
- )}
);
diff --git a/frontend/src/components/simulator/SimulatorCanvas.tsx b/frontend/src/components/simulator/SimulatorCanvas.tsx
index 6f44861..badf184 100644
--- a/frontend/src/components/simulator/SimulatorCanvas.tsx
+++ b/frontend/src/components/simulator/SimulatorCanvas.tsx
@@ -33,6 +33,8 @@ export const SimulatorCanvas = () => {
updateComponent,
serialMonitorOpen,
toggleSerialMonitor,
+ connectRemoteSimulator,
+ disconnectRemoteSimulator,
} = useSimulatorStore();
// Wire management from store
@@ -119,6 +121,21 @@ export const SimulatorCanvas = () => {
initSimulator();
}, [initSimulator]);
+ // Connect to Remote Simulator (QEMU) if a Raspberry Pi exists and simulation is running
+ useEffect(() => {
+ const hasPi = components.some(c => c.metadataId === 'raspberry-pi-3');
+ if (running && hasPi) {
+ // Using a random client ID or static one for local development
+ connectRemoteSimulator('local-velxio-client');
+ } else {
+ disconnectRemoteSimulator();
+ }
+
+ return () => {
+ if (!running) disconnectRemoteSimulator();
+ };
+ }, [running, components, connectRemoteSimulator, disconnectRemoteSimulator]);
+
// Attach wheel listener as non-passive so preventDefault() works
useEffect(() => {
const el = canvasRef.current;
diff --git a/frontend/src/main.tsx b/frontend/src/main.tsx
index b73f8b2..f18ba85 100644
--- a/frontend/src/main.tsx
+++ b/frontend/src/main.tsx
@@ -2,6 +2,7 @@ import { createRoot } from 'react-dom/client'
import './index.css'
import './components/components-wokwi/IC74HC595'
import './components/components-wokwi/LogicGateElements'
+import './components/components-wokwi/RaspberryPi3Element'
import App from './App.tsx'
createRoot(document.getElementById('root')!).render(
diff --git a/frontend/src/pages/DocsPage.tsx b/frontend/src/pages/DocsPage.tsx
new file mode 100644
index 0000000..f9e6928
--- /dev/null
+++ b/frontend/src/pages/DocsPage.tsx
@@ -0,0 +1,121 @@
+import React from 'react';
+import { Link } from 'react-router-dom';
+import { AppHeader } from '../components/layout/AppHeader';
+
+const GITHUB_URL = 'https://github.com/davidmonterocrespo24/velxio';
+const DISCORD_URL = 'https://discord.gg/rCScB9cG';
+
+interface DocSection {
+ title: string;
+ items: { label: string; href: string; desc: string }[];
+}
+
+const sections: DocSection[] = [
+ {
+ title: 'Getting Started',
+ items: [
+ { label: 'README', href: `${GITHUB_URL}#readme`, desc: 'Project overview, features, and setup instructions.' },
+ { label: 'Self-Hosting with Docker', href: `${GITHUB_URL}#self-hosting`, desc: 'Run Velxio locally with a single Docker command.' },
+ { label: 'Manual Setup', href: `${GITHUB_URL}#option-c-manual-setup`, desc: 'Set up the frontend and backend manually for development.' },
+ ],
+ },
+ {
+ title: 'Architecture',
+ items: [
+ { label: 'Architecture Overview', href: `${GITHUB_URL}/blob/master/doc/ARCHITECTURE.md`, desc: 'High-level data flow, component system, and simulation loop.' },
+ { label: 'AVR8 Simulation', href: `${GITHUB_URL}#avr8-simulation-arduino-uno--nano--mega`, desc: 'How the ATmega328p emulation works at 16 MHz.' },
+ { label: 'RP2040 Simulation', href: `${GITHUB_URL}#rp2040-simulation-raspberry-pi-pico`, desc: 'Raspberry Pi Pico emulation via rp2040js.' },
+ ],
+ },
+ {
+ title: 'Using the Editor',
+ items: [
+ { label: 'Writing Sketches', href: `${GITHUB_URL}#code-editing`, desc: 'Monaco editor features — autocomplete, multi-file, minimap.' },
+ { label: 'Supported Boards', href: `${GITHUB_URL}#multi-board-support`, desc: 'Arduino Uno, Nano, Mega, and Raspberry Pi Pico.' },
+ { label: 'Serial Monitor', href: `${GITHUB_URL}#serial-monitor`, desc: 'Live TX/RX output with auto baud-rate detection.' },
+ { label: 'Library Manager', href: `${GITHUB_URL}#library-manager`, desc: 'Browse and install the full Arduino library index.' },
+ ],
+ },
+ {
+ title: 'Components & Wiring',
+ items: [
+ { label: 'Component System', href: `${GITHUB_URL}#component-system-48-components`, desc: '48+ electronic components — LEDs, displays, sensors, and more.' },
+ { label: 'Wire System', href: `${GITHUB_URL}#wire-system`, desc: 'Orthogonal routing, segment editing, and signal-type colors.' },
+ ],
+ },
+ {
+ title: 'Contributing',
+ items: [
+ { label: 'Contributing Guide', href: `${GITHUB_URL}#contributing`, desc: 'Bug reports, pull requests, and CLA information.' },
+ { label: 'Open Issues', href: `${GITHUB_URL}/issues`, desc: 'Browse open issues and feature requests on GitHub.' },
+ { label: 'Discord Community', href: DISCORD_URL, desc: 'Ask questions and share projects with the community.' },
+ ],
+ },
+];
+
+const IcoExternal = () => (
+
+);
+
+export const DocsPage: React.FC = () => {
+ return (
+
+
+
+ Documentation
+
+ Resources and guides for using and extending Velxio. Full documentation lives on{' '}
+ GitHub.
+
+
+
+ {sections.map((section) => (
+
+
+ {section.title}
+
+
+
+ ))}
+
+
+
+
+
+
+ Open Editor →
+
+
+
+
+ );
+};
diff --git a/frontend/src/pages/ExamplesPage.tsx b/frontend/src/pages/ExamplesPage.tsx
index 69409e7..1d175b4 100644
--- a/frontend/src/pages/ExamplesPage.tsx
+++ b/frontend/src/pages/ExamplesPage.tsx
@@ -7,6 +7,7 @@
import React from 'react';
import { useNavigate } from 'react-router-dom';
import { ExamplesGallery } from '../components/examples/ExamplesGallery';
+import { AppHeader } from '../components/layout/AppHeader';
import { useEditorStore } from '../store/useEditorStore';
import { useSimulatorStore } from '../store/useSimulatorStore';
import type { ExampleProject } from '../data/examples';
@@ -71,5 +72,10 @@ export const ExamplesPage: React.FC = () => {
navigate('/editor');
};
- return
;
+ return (
+
+ );
};
diff --git a/frontend/src/pages/LandingPage.tsx b/frontend/src/pages/LandingPage.tsx
index afd9ac0..a7c08b6 100644
--- a/frontend/src/pages/LandingPage.tsx
+++ b/frontend/src/pages/LandingPage.tsx
@@ -1,6 +1,5 @@
-import { useState, useRef, useEffect } from 'react';
-import { Link, useNavigate } from 'react-router-dom';
-import { useAuthStore } from '../store/useAuthStore';
+import { Link } from 'react-router-dom';
+import { AppHeader } from '../components/layout/AppHeader';
import '@wokwi/elements';
import './LandingPage.css';
@@ -282,108 +281,11 @@ const IcoSponsor = () => (
);
-/* ── User nav dropdown ────────────────────────────────── */
-const UserMenu: React.FC = () => {
- const user = useAuthStore((s) => s.user);
- const logout = useAuthStore((s) => s.logout);
- const navigate = useNavigate();
- const [open, setOpen] = useState(false);
- const ref = useRef
(null);
-
- useEffect(() => {
- const handler = (e: MouseEvent) => {
- if (ref.current && !ref.current.contains(e.target as Node)) setOpen(false);
- };
- document.addEventListener('mousedown', handler);
- return () => document.removeEventListener('mousedown', handler);
- }, []);
-
- if (!user) return null;
-
- const initials = user.username[0].toUpperCase();
-
- return (
-
-
setOpen((v) => !v)}>
- {user.avatar_url ? (
-
- ) : (
- {initials}
- )}
- {user.username}
-
-
- {open && (
-
-
- {user.avatar_url ? (
-

- ) : (
-
{initials}
- )}
-
-
{user.username}
-
{user.email}
-
-
-
-
setOpen(false)}>
-
- Open Editor
-
-
setOpen(false)}>
-
- My Projects
-
-
-
{ setOpen(false); await logout(); navigate('/'); }}>
-
- Sign out
-
-
- )}
-
- );
-};
-
/* ── Component ────────────────────────────────────────── */
export const LandingPage: React.FC = () => {
- const user = useAuthStore((s) => s.user);
- const isLoading = useAuthStore((s) => s.isLoading);
-
return (
- {/* Nav */}
-
+
{/* Hero */}
diff --git a/frontend/src/services/ComponentRegistry.ts b/frontend/src/services/ComponentRegistry.ts
index 269a8a8..ffff1f5 100644
--- a/frontend/src/services/ComponentRegistry.ts
+++ b/frontend/src/services/ComponentRegistry.ts
@@ -61,6 +61,21 @@ export class ComponentRegistry {
}
const data: ComponentMetadataCollection = await response.json();
+
+ // Inject Raspberry Pi 3 metadata
+ data.components.push({
+ id: 'raspberry-pi-3',
+ tagName: 'wokwi-raspberry-pi-3',
+ name: 'Raspberry Pi 3',
+ category: 'boards',
+ description: 'Raspberry Pi 3 Model B with 40-pin GPIO. Connects to backend QEMU simulator.',
+ thumbnail: '',
+ properties: [],
+ defaultValues: {},
+ pinCount: 40,
+ tags: ['raspberry', 'pi', 'rp3', 'board', 'qemu', 'linux']
+ });
+
this.processMetadata(data.components);
this.loaded = true;
diff --git a/frontend/src/simulation/parts/PartSimulationRegistry.ts b/frontend/src/simulation/parts/PartSimulationRegistry.ts
index d347583..ce14aa9 100644
--- a/frontend/src/simulation/parts/PartSimulationRegistry.ts
+++ b/frontend/src/simulation/parts/PartSimulationRegistry.ts
@@ -46,3 +46,14 @@ class PartRegistry {
}
export const PartSimulationRegistry = new PartRegistry();
+
+// Import store explicitly inside a function to avoid circular dependencies if any,
+// but since we just need it at runtime, we can import it at the top or dynamically.
+import { useSimulatorStore } from '../../store/useSimulatorStore';
+
+PartSimulationRegistry.register('raspberry-pi-3', {
+ onPinStateChange: (pinName: string, state: boolean, _element: HTMLElement) => {
+ // When Arduino changes a pin connected to Raspberry Pi, forward to backend
+ useSimulatorStore.getState().sendRemotePinEvent(pinName, state ? 1 : 0);
+ }
+});
diff --git a/frontend/src/store/useSimulatorStore.ts b/frontend/src/store/useSimulatorStore.ts
index 481b2ad..b22b77b 100644
--- a/frontend/src/store/useSimulatorStore.ts
+++ b/frontend/src/store/useSimulatorStore.ts
@@ -68,6 +68,13 @@ interface SimulatorState {
serialBaudRate: number;
serialMonitorOpen: boolean;
+ // Remote Simulator (Raspberry Pi/QEMU)
+ remoteConnected: boolean;
+ remoteSocket: WebSocket | null;
+ connectRemoteSimulator: (clientId: string) => void;
+ disconnectRemoteSimulator: () => void;
+ sendRemotePinEvent: (pin: string, state: number) => void;
+
// Actions
initSimulator: () => void;
loadHex: (hex: string) => void;
@@ -114,6 +121,9 @@ export const useSimulatorStore = create((set, get) => {
// Create PinManager instance
const pinManager = new PinManager();
+ // Create remote socket reference (cannot be strictly in state without triggering too many renders)
+ // We'll put it in state for simple disconnect, but typically useRef/module level is better.
+
return {
boardType: 'arduino-uno' as BoardType,
boardPosition: { ...DEFAULT_BOARD_POSITION },
@@ -184,6 +194,87 @@ export const useSimulatorStore = create((set, get) => {
serialBaudRate: 0,
serialMonitorOpen: false,
+ remoteConnected: false,
+ remoteSocket: null,
+
+ connectRemoteSimulator: (clientId: string) => {
+ const { remoteSocket } = get();
+ if (remoteSocket) remoteSocket.close();
+
+ const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
+ const API_BASE = import.meta.env.VITE_API_BASE || 'http://localhost:8001/api';
+ const wsUrl = API_BASE.replace(/^https?:/, wsProtocol) + `/simulation/ws/${clientId}`;
+
+ const socket = new WebSocket(wsUrl);
+
+ socket.onopen = () => {
+ console.log('Connected to remote simulator');
+ set({ remoteConnected: true, remoteSocket: socket });
+ };
+
+ socket.onmessage = (event) => {
+ const data = JSON.parse(event.data);
+ console.log("Remote Event:", data);
+
+ if (data.type === 'pin_change') {
+ const { pin, state } = data.data;
+ const { wires, simulator } = get();
+
+ if (!simulator) return;
+
+ // Find if this Pi pin is connected to the Arduino
+ // We need to look for a wire where one end is the Pi and the other is the Arduino
+ // In a real scenario we might have multiple Pi components, so we should match the clientId
+ // Here we assume any Pi component connected to current simulator
+ const wire = wires.find(w =>
+ (w.start.componentId.includes('raspberry-pi') && w.start.pinName === String(pin)) ||
+ (w.end.componentId.includes('raspberry-pi') && w.end.pinName === String(pin))
+ );
+
+ if (wire) {
+ const IsArduinoStart = !wire.start.componentId.includes('raspberry-pi');
+ const targetEndpoint = IsArduinoStart ? wire.start : wire.end;
+
+ // If target is an arduino board pin, set its state
+ if (targetEndpoint.componentId.startsWith('arduino-') || targetEndpoint.componentId === 'raspberry-pi-pico') {
+ // We need boardPinToNumber utility to get the numeric pin
+ // For now, if the pinName is a number, we can use it directly
+ const pinNum = parseInt(targetEndpoint.pinName, 10);
+ if (!isNaN(pinNum)) {
+ simulator.setPinState(pinNum, state);
+ }
+ }
+ }
+ }
+ };
+
+ socket.onclose = () => {
+ console.log('Disconnected from remote simulator');
+ set({ remoteConnected: false, remoteSocket: null });
+ };
+
+ socket.onerror = (error) => {
+ console.error('Remote simulator WS error:', error);
+ };
+ },
+
+ disconnectRemoteSimulator: () => {
+ const { remoteSocket } = get();
+ if (remoteSocket) {
+ remoteSocket.close();
+ }
+ },
+
+ sendRemotePinEvent: (pin: string, state: number) => {
+ const { remoteSocket, remoteConnected } = get();
+ if (remoteConnected && remoteSocket && remoteSocket.readyState === WebSocket.OPEN) {
+ remoteSocket.send(JSON.stringify({
+ type: 'pin_change',
+ data: { pin, state }
+ }));
+ }
+ },
+
setBoardPosition: (pos) => {
set({ boardPosition: pos });
},
diff --git a/test/pi_arduino_serial/arduino_sketch.ino b/test/pi_arduino_serial/arduino_sketch.ino
new file mode 100644
index 0000000..c35ed48
--- /dev/null
+++ b/test/pi_arduino_serial/arduino_sketch.ino
@@ -0,0 +1,48 @@
+/**
+ * arduino_sketch.ino
+ * ------------------
+ * Part of the Pi <-> Arduino Serial Integration Test.
+ *
+ * Protocol (9600 baud, newline-terminated messages):
+ * Pi --> Arduino : "HELLO_FROM_PI"
+ * Arduino --> Pi : "ACK_FROM_ARDUINO"
+ *
+ * The sketch accumulates incoming characters into a line buffer.
+ * When a full line arrives and it contains "HELLO_FROM_PI" it
+ * replies "ACK_FROM_ARDUINO\n". All other input is silently
+ * ignored so kernel boot chatter from the Pi console does not
+ * produce spurious replies.
+ */
+
+static String lineBuffer = "";
+
+void setup() {
+ Serial.begin(9600);
+ // Signal that the Arduino is ready (useful for debugging via a real UART)
+ Serial.println("ARDUINO_READY");
+}
+
+void loop() {
+ while (Serial.available() > 0) {
+ char c = (char)Serial.read();
+
+ if (c == '\r') {
+ // ignore carriage-return (Windows / Pi console may send \r\n)
+ continue;
+ }
+
+ if (c == '\n') {
+ // Process complete line
+ if (lineBuffer.indexOf("HELLO_FROM_PI") >= 0) {
+ Serial.println("ACK_FROM_ARDUINO");
+ }
+ lineBuffer = "";
+ } else {
+ lineBuffer += c;
+ // Safety: prevent unbounded growth from high-volume console noise
+ if (lineBuffer.length() > 256) {
+ lineBuffer = "";
+ }
+ }
+ }
+}
diff --git a/test/pi_arduino_serial/avr_runner.js b/test/pi_arduino_serial/avr_runner.js
new file mode 100644
index 0000000..29cdfb5
--- /dev/null
+++ b/test/pi_arduino_serial/avr_runner.js
@@ -0,0 +1,188 @@
+/**
+ * avr_runner.js
+ * -------------
+ * Node.js ATmega328P (Arduino Uno) emulator using avr8js.
+ *
+ * Loads a compiled Intel HEX firmware file and emulates the CPU at
+ * 16 MHz. The USART (Serial) peripheral is bridged to a TCP socket
+ * so the Python broker can connect and exchange bytes.
+ *
+ * Usage:
+ * node avr_runner.js [broker_host] [broker_port]
+ *
+ * The script acts as a TCP CLIENT. It connects (with retries) to the
+ * Python broker which acts as the server for the Arduino side.
+ *
+ * Data flow:
+ * Pi --> broker:5556 --> avr_runner --> usart.writeByte() --> Arduino RX
+ * Arduino TX --> usart.onByteTransmit --> broker:5556 --> Pi
+ */
+
+'use strict';
+
+const fs = require('fs');
+const net = require('net');
+const path = require('path');
+
+// ── Load avr8js from local wokwi-libs ────────────────────────────────────────
+const AVR8JS_CJS = path.resolve(
+ __dirname, '..', '..', 'wokwi-libs', 'avr8js', 'dist', 'cjs', 'index.js'
+);
+
+let avr8js;
+try {
+ avr8js = require(AVR8JS_CJS);
+} catch (e) {
+ process.stderr.write(`[avr_runner] FATAL: cannot load avr8js from:\n ${AVR8JS_CJS}\n ${e.message}\n`);
+ process.exit(1);
+}
+
+const {
+ CPU,
+ avrInstruction,
+ AVRUSART, usart0Config,
+ AVRTimer, timer0Config, timer1Config, timer2Config,
+} = avr8js;
+
+// ── CLI arguments ─────────────────────────────────────────────────────────────
+const [,, hexFile, brokerHost = '127.0.0.1', brokerPort = '5556'] = process.argv;
+
+if (!hexFile) {
+ process.stderr.write('Usage: node avr_runner.js [broker_host] [broker_port]\n');
+ process.exit(1);
+}
+
+if (!fs.existsSync(hexFile)) {
+ process.stderr.write(`[avr_runner] ERROR: hex file not found: ${hexFile}\n`);
+ process.exit(1);
+}
+
+// ── Intel HEX parser ──────────────────────────────────────────────────────────
+function parseIntelHex(content) {
+ // ATmega328P has 32 KB flash → 0x8000 bytes
+ const flash = new Uint8Array(0x8000);
+
+ for (const rawLine of content.split('\n')) {
+ const line = rawLine.trim();
+ if (!line.startsWith(':') || line.length < 11) continue;
+
+ const bytes = Buffer.from(line.slice(1), 'hex');
+ const byteCount = bytes[0];
+ const addr = (bytes[1] << 8) | bytes[2];
+ const recordType = bytes[3];
+
+ if (recordType === 0x00) { // Data record
+ for (let i = 0; i < byteCount; i++) {
+ if (addr + i < flash.length) {
+ flash[addr + i] = bytes[4 + i];
+ }
+ }
+ }
+ // recordType 0x01 = EOF — nothing to do
+ }
+
+ // AVR instructions are 16-bit little-endian words
+ return new Uint16Array(flash.buffer);
+}
+
+// ── Build the CPU ─────────────────────────────────────────────────────────────
+const CLOCK_HZ = 16_000_000;
+
+const hexContent = fs.readFileSync(hexFile, 'utf8');
+const program = parseIntelHex(hexContent);
+const cpu = new CPU(program);
+
+// Timers are needed for delay() / millis() inside the Arduino sketch
+const timers = [
+ new AVRTimer(cpu, timer0Config),
+ new AVRTimer(cpu, timer1Config),
+ new AVRTimer(cpu, timer2Config),
+];
+
+const usart = new AVRUSART(cpu, usart0Config, CLOCK_HZ);
+
+// ── TCP bridge state ───────────────────────────────────────────────────────────
+let socket = null;
+let txBacklog = []; // bytes queued before TCP connects
+
+// Arduino → Pi: forward transmitted bytes
+usart.onByteTransmit = (byte) => {
+ const ch = String.fromCharCode(byte);
+ process.stdout.write(`[AVR->Pi] ${ch === '\n' ? '\\n\n' : ch}`);
+
+ if (socket && !socket.destroyed) {
+ socket.write(Buffer.from([byte]));
+ } else {
+ txBacklog.push(byte); // buffer until connected
+ }
+};
+
+// ── Simulation loop ───────────────────────────────────────────────────────────
+// Run ~160 000 instructions per Node.js event-loop tick ≈ 10 ms simulated time.
+// setImmediate() yields after each batch so I/O callbacks can fire.
+const BATCH = 160_000;
+
+function runBatch() {
+ for (let i = 0; i < BATCH; i++) {
+ avrInstruction(cpu);
+ cpu.tick();
+ }
+ setImmediate(runBatch);
+}
+
+// ── TCP connection to broker ──────────────────────────────────────────────────
+let retryCount = 0;
+const MAX_RETRIES = 40; // 40 × 500 ms = 20 s
+
+function connectToBroker() {
+ if (retryCount >= MAX_RETRIES) {
+ process.stderr.write('[avr_runner] ERROR: could not connect to broker after max retries\n');
+ process.exit(1);
+ }
+
+ const s = new net.Socket();
+
+ s.connect(parseInt(brokerPort, 10), brokerHost, () => {
+ retryCount = 0;
+ socket = s;
+ process.stdout.write(`[avr_runner] Connected to broker ${brokerHost}:${brokerPort}\n`);
+
+ // Flush bytes queued before connection
+ if (txBacklog.length > 0) {
+ s.write(Buffer.from(txBacklog));
+ txBacklog = [];
+ }
+ });
+
+ // Pi → Arduino: feed received bytes into USART RX
+ s.on('data', (chunk) => {
+ for (const byte of chunk) {
+ const ch = String.fromCharCode(byte);
+ process.stdout.write(`[Pi->AVR] ${ch === '\n' ? '\\n\n' : ch}`);
+ // writeByte(value, immediate=true) bypasses baud-rate timing
+ usart.writeByte(byte, true);
+ }
+ });
+
+ s.on('close', () => {
+ process.stdout.write('[avr_runner] Broker connection closed\n');
+ socket = null;
+ });
+
+ s.on('error', (err) => {
+ if (err.code === 'ECONNREFUSED') {
+ retryCount++;
+ process.stdout.write(`[avr_runner] Broker not ready, retry ${retryCount}/${MAX_RETRIES} ...\n`);
+ setTimeout(connectToBroker, 500);
+ } else {
+ process.stderr.write(`[avr_runner] Socket error: ${err.message}\n`);
+ }
+ });
+}
+
+// ── Start ─────────────────────────────────────────────────────────────────────
+process.stdout.write(`[avr_runner] Loaded: ${path.basename(hexFile)}\n`);
+process.stdout.write(`[avr_runner] ATmega328P @ ${CLOCK_HZ / 1e6} MHz — simulation starting\n`);
+
+connectToBroker();
+runBatch();
diff --git a/test/pi_arduino_serial/test_pi_arduino_serial.py b/test/pi_arduino_serial/test_pi_arduino_serial.py
new file mode 100644
index 0000000..b138828
--- /dev/null
+++ b/test/pi_arduino_serial/test_pi_arduino_serial.py
@@ -0,0 +1,632 @@
+#!/usr/bin/env python3
+"""
+Pi <-> Arduino Serial Integration Test
+=======================================
+
+What this test proves
+---------------------
+Python code running on an **emulated Raspberry Pi 3B** (QEMU) sends the
+string "HELLO_FROM_PI" over its UART serial port (ttyAMA0).
+An **emulated Arduino Uno** (avr8js, via Node.js) receives it and replies
+"ACK_FROM_ARDUINO".
+The Pi receives that reply and prints "TEST_PASSED".
+
+Architecture
+------------
+
+ ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+ | Python Test Process (this file) |
+ | |
+ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| |
+ | | SerialBroker (asyncio) | |
+ | | | |
+ | | TCP server :5555 <||||||||||||||> QEMU Pi (ttyAMA0) | |
+ | | TCP server :5556 <||> avr_runner.js (Arduino UART) | |
+ | | | |
+ | | - Bridges bytes Pi <-> Arduino | |
+ | | - State machine automates Pi serial console: | |
+ | | - waits for shell prompt | |
+ | | - disables TTY echo | |
+ | | - injects Pi Python test script via base64 | |
+ | | - Asserts "TEST_PASSED" in Pi output | |
+ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| |
+ | |
+ | Subprocesses: |
+ | - qemu-system-arm (Raspberry Pi 3B, init=/bin/sh) |
+ | - node avr_runner.js (ATmega328P emulation) |
+ ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+
+Prerequisites
+-------------
+ - qemu-system-arm in PATH
+ - node (Node.js >= 18) in PATH
+ - arduino-cli in PATH, with arduino:avr core installed
+ - QEMU images in /img/:
+ kernel_extracted.img
+ bcm271~1.dtb
+ 2025-12-04-raspios-trixie-armhf.img
+
+Run
+---
+ cd
+ python test/pi_arduino_serial/test_pi_arduino_serial.py
+
+The test may take several minutes while the Pi boots inside QEMU.
+"""
+
+import asyncio
+import base64
+import shutil
+import subprocess
+import sys
+import tempfile
+import time
+from pathlib import Path
+from typing import Optional
+
+# || Paths |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+REPO_ROOT = Path(__file__).resolve().parent.parent.parent
+IMG_DIR = REPO_ROOT / "img"
+TEST_DIR = Path(__file__).resolve().parent
+
+SKETCH_FILE = TEST_DIR / "arduino_sketch.ino"
+AVR_RUNNER = TEST_DIR / "avr_runner.js"
+
+KERNEL_IMG = IMG_DIR / "kernel_extracted.img"
+DTB_FILE = IMG_DIR / "bcm271~1.dtb" # Windows 8.3 short name
+SD_IMAGE = IMG_DIR / "2025-12-04-raspios-trixie-armhf.img"
+
+# || Network ports (must be free) |||||||||||||||||||||||||||||||||||||||||||||||
+BROKER_PI_PORT = 15555 # Broker listens; QEMU Pi connects here
+BROKER_AVR_PORT = 15556 # Broker listens; avr_runner.js connects here
+
+# || Timeouts ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+BOOT_TIMEOUT_S = 360 # 6 min -- Pi boot + shell prompt
+SCRIPT_TIMEOUT_S = 45 # Script execution after prompt seen
+COMPILE_TIMEOUT_S = 120 # arduino-cli
+
+# || Pi console state-machine ||||||||||||||||||||||||||||||||||||||||||||||||||
+# States
+ST_BOOT = "BOOT" # waiting for shell prompt
+ST_SETUP = "SETUP" # sent stty -echo, waiting for next prompt
+ST_INJECT = "INJECT" # script injected, waiting for TEST_PASSED / TEST_FAILED
+ST_DONE = "DONE"
+
+# Patterns that indicate a ready shell prompt
+PROMPT_BYTES = [b"# ", b"$ "]
+
+# || Pi Python test script ||||||||||||||||||||||||||||||||||||||||||||||||||||||
+# This script is base64-encoded and written to /tmp/pi_test.py on the Pi,
+# then executed with "python3 /tmp/pi_test.py".
+#
+# When python3 runs from the serial console, its stdin/stdout ARE ttyAMA0,
+# i.e. the same wire the Arduino is connected to. select() lets us poll
+# for incoming bytes without busy-waiting.
+_PI_SCRIPT_SRC = b"""\
+import sys, os, time, select
+
+# --- send trigger message to Arduino ---
+sys.stdout.write("HELLO_FROM_PI\\n")
+sys.stdout.flush()
+
+# --- wait for Arduino reply ---
+resp = b""
+deadline = time.time() + 15 # 15-second window
+
+while time.time() < deadline:
+ readable, _, _ = select.select([sys.stdin], [], [], 1.0)
+ if readable:
+ chunk = os.read(sys.stdin.fileno(), 256)
+ resp += chunk
+ if b"ACK_FROM_ARDUINO" in resp:
+ sys.stdout.write("TEST_PASSED\\n")
+ sys.stdout.flush()
+ sys.exit(0)
+
+sys.stdout.write("TEST_FAILED_TIMEOUT\\n")
+sys.stdout.flush()
+sys.exit(1)
+"""
+
+# One-liner shell command injected into the Pi console:
+# 1. PATH is set explicitly (init=/bin/sh may not load a profile)
+# 2. TTY echo is disabled so our typed command doesn't loop back
+# 3. The base64-encoded script is decoded and written to /tmp/pi_test.py
+# 4. python3 runs it (stdin = ttyAMA0 = Arduino wire)
+_PI_B64 = base64.b64encode(_PI_SCRIPT_SRC).decode()
+_PI_CMD = (
+ "export PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
+ " && stty -echo"
+ f" && printf '%s' '{_PI_B64}' | /usr/bin/base64 -d > /tmp/pi_test.py"
+ " && /usr/bin/python3 /tmp/pi_test.py"
+ "\n"
+)
+
+
+# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+class SerialBroker:
+ """
+ Bridges the emulated Pi (QEMU) and the emulated Arduino (avr8js).
+
+ Both sides connect to this broker via separate TCP ports.
+ All bytes are forwarded transparently in both directions while the
+ broker also automates the Pi serial console to inject and run the
+ test script.
+ """
+
+ def __init__(self) -> None:
+ self._pi_reader: Optional[asyncio.StreamReader] = None
+ self._pi_writer: Optional[asyncio.StreamWriter] = None
+ self._avr_reader: Optional[asyncio.StreamReader] = None
+ self._avr_writer: Optional[asyncio.StreamWriter] = None
+
+ # Accumulate Pi output for pattern matching
+ self._pi_buf: bytearray = bytearray()
+
+ # Full traffic log: list of (direction, bytes)
+ self.traffic: list[tuple[str, bytes]] = []
+
+ self._state = ST_BOOT
+ self._prompt_count = 0
+ self._script_deadline: float = 0.0
+
+ # Signals
+ self.pi_connected = asyncio.Event()
+ self.avr_connected = asyncio.Event()
+ self.result_event = asyncio.Event()
+ self.test_passed = False
+
+ # || Server callbacks |||||||||||||||||||||||||||||||||||||||||||||||||||||||
+ async def _on_pi_connect(self,
+ reader: asyncio.StreamReader,
+ writer: asyncio.StreamWriter) -> None:
+ addr = writer.get_extra_info("peername")
+ print(f"[broker] Pi (QEMU) connected from {addr}")
+ self._pi_reader = reader
+ self._pi_writer = writer
+ self.pi_connected.set()
+
+ async def _on_avr_connect(self,
+ reader: asyncio.StreamReader,
+ writer: asyncio.StreamWriter) -> None:
+ addr = writer.get_extra_info("peername")
+ print(f"[broker] Arduino (avr_runner.js) connected from {addr}")
+ self._avr_reader = reader
+ self._avr_writer = writer
+ self.avr_connected.set()
+
+ # || Start TCP servers ||||||||||||||||||||||||||||||||||||||||||||||||||||||
+ async def start(self) -> tuple:
+ pi_srv = await asyncio.start_server(
+ self._on_pi_connect, "127.0.0.1", BROKER_PI_PORT
+ )
+ avr_srv = await asyncio.start_server(
+ self._on_avr_connect, "127.0.0.1", BROKER_AVR_PORT
+ )
+ print(f"[broker] Listening -- Pi port:{BROKER_PI_PORT} "
+ f"Arduino port:{BROKER_AVR_PORT}")
+ return pi_srv, avr_srv
+
+ # || Main relay (call after both sides connected) |||||||||||||||||||||||||||
+ async def run(self) -> None:
+ await asyncio.gather(
+ self._relay_pi_to_avr(),
+ self._relay_avr_to_pi(),
+ self._console_automator(),
+ )
+
+ # || Pi -> Arduino |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+ async def _relay_pi_to_avr(self) -> None:
+ reader = self._pi_reader
+ if reader is None:
+ return
+ while True:
+ try:
+ data = await reader.read(512)
+ except Exception:
+ break
+ if not data:
+ break
+
+ self._log("Pi->AVR", data)
+ self._pi_buf.extend(data)
+
+ if self._avr_writer and not self._avr_writer.is_closing():
+ self._avr_writer.write(data)
+ await self._avr_writer.drain()
+
+ # || Arduino -> Pi |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+ async def _relay_avr_to_pi(self) -> None:
+ reader = self._avr_reader
+ if reader is None:
+ return
+ while True:
+ try:
+ data = await reader.read(512)
+ except Exception:
+ break
+ if not data:
+ break
+
+ self._log("AVR->Pi", data)
+
+ if self._pi_writer and not self._pi_writer.is_closing():
+ self._pi_writer.write(data)
+ await self._pi_writer.drain()
+
+ # || Console state machine ||||||||||||||||||||||||||||||||||||||||||||||||||
+ async def _console_automator(self) -> None:
+ boot_deadline = time.monotonic() + BOOT_TIMEOUT_S
+ last_poke_time = time.monotonic()
+
+ # Give QEMU a moment to start emitting serial output, then poke
+ await asyncio.sleep(3.0)
+ self._send_to_pi(b"\n")
+
+ while self._state != ST_DONE:
+ await asyncio.sleep(0.15)
+
+ now = time.monotonic()
+
+ # || Global boot timeout ||||||||||||||||||||||||||||||||||||||||||||
+ if now > boot_deadline:
+ print("\n[broker] [timeout] BOOT TIMEOUT -- shell prompt never appeared")
+ self.test_passed = False
+ self._state = ST_DONE
+ self.result_event.set()
+ return
+
+ # || Script-execution timeout (after script was injected) ||||||||||
+ if self._state == ST_INJECT and self._script_deadline and now > self._script_deadline:
+ print("\n[broker] [timeout] SCRIPT TIMEOUT -- no result from Pi script")
+ self.test_passed = False
+ self._state = ST_DONE
+ self.result_event.set()
+ return
+
+ buf = bytes(self._pi_buf)
+
+ # || ST_BOOT: wait for shell prompt |||||||||||||||||||||||||||||||||
+ if self._state == ST_BOOT:
+ # Periodically poke the shell to get a fresh prompt
+ # (in case we missed the initial one)
+ if now - last_poke_time > 8.0:
+ self._send_to_pi(b"\n")
+ last_poke_time = now
+
+ if self._prompt_seen(buf):
+ print("\n[broker] [OK] Shell prompt detected")
+ self._pi_buf.clear()
+ # Disable echo and set PATH, then wait for next prompt
+ self._send_to_pi(
+ b"export PATH=/usr/local/sbin:/usr/local/bin"
+ b":/usr/sbin:/usr/bin:/sbin:/bin && stty -echo\n"
+ )
+ self._state = ST_SETUP
+ self._prompt_count = 0
+
+ # || ST_SETUP: wait for prompt after stty -echo |||||||||||||||||||||
+ elif self._state == ST_SETUP:
+ if self._prompt_seen(buf) or len(buf) > 10:
+ # Either got a prompt or the shell already answered
+ await asyncio.sleep(0.3)
+ self._pi_buf.clear()
+ print("[broker] [OK] Environment set -- injecting Pi test script")
+ self._send_to_pi(_PI_CMD.encode())
+ self._state = ST_INJECT
+ self._script_deadline = time.monotonic() + SCRIPT_TIMEOUT_S
+
+ # || ST_INJECT: wait for TEST_PASSED / TEST_FAILED |||||||||||||||||
+ elif self._state == ST_INJECT:
+ if b"TEST_PASSED" in buf:
+ print("\n[broker] [OK] TEST_PASSED received from Pi!")
+ self.test_passed = True
+ self._state = ST_DONE
+ self.result_event.set()
+ elif b"TEST_FAILED" in buf:
+ snippet = buf.decode("utf-8", errors="replace")[-120:]
+ print(f"\n[broker] [FAIL] TEST_FAILED received from Pi\n last output: {snippet!r}")
+ self.test_passed = False
+ self._state = ST_DONE
+ self.result_event.set()
+
+ # || Helpers ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+ def _prompt_seen(self, buf: bytes) -> bool:
+ tail = buf[-32:] # only check the last 32 bytes
+ return any(p in tail for p in PROMPT_BYTES)
+
+ def _send_to_pi(self, data: bytes) -> None:
+ if self._pi_writer and not self._pi_writer.is_closing():
+ self._pi_writer.write(data)
+ asyncio.get_event_loop().create_task(self._pi_writer.drain())
+
+ def _log(self, direction: str, data: bytes) -> None:
+ self.traffic.append((direction, data))
+ text = data.decode("utf-8", errors="replace")
+ for line in text.splitlines():
+ stripped = line.strip()
+ if stripped:
+ print(f" [{direction}] {stripped}")
+
+
+# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+# Compilation
+# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+def compile_arduino_sketch() -> Path:
+ """Compile arduino_sketch.ino via arduino-cli and return the .hex path."""
+ print("\n[compile] Compiling Arduino sketch with arduino-cli ...")
+
+ # arduino-cli requires the sketch to live inside a folder whose name
+ # matches the .ino file (without extension).
+ tmp_root = Path(tempfile.mkdtemp())
+ sk_dir = tmp_root / "arduino_sketch"
+ build_dir = tmp_root / "build"
+ sk_dir.mkdir()
+ build_dir.mkdir()
+ shutil.copy(SKETCH_FILE, sk_dir / "arduino_sketch.ino")
+
+ cli = shutil.which("arduino-cli") or "arduino-cli"
+ cmd = [
+ cli, "compile",
+ "--fqbn", "arduino:avr:uno",
+ "--output-dir", str(build_dir),
+ str(sk_dir),
+ ]
+
+ try:
+ result = subprocess.run(
+ cmd,
+ capture_output=True,
+ text=True,
+ timeout=COMPILE_TIMEOUT_S,
+ )
+ except FileNotFoundError:
+ raise RuntimeError(
+ "arduino-cli not found in PATH.\n"
+ "Install: https://arduino.github.io/arduino-cli/\n"
+ "Then: arduino-cli core install arduino:avr"
+ )
+ except subprocess.TimeoutExpired:
+ raise RuntimeError("arduino-cli compile timed out")
+
+ if result.returncode != 0:
+ raise RuntimeError(
+ f"Compilation failed (exit {result.returncode}):\n"
+ f" STDOUT: {result.stdout.strip()}\n"
+ f" STDERR: {result.stderr.strip()}"
+ )
+
+ hex_files = sorted(build_dir.glob("*.hex"))
+ if not hex_files:
+ raise RuntimeError(f"No .hex file produced in {build_dir}")
+
+ hex_path = hex_files[0]
+ print(f"[compile] [OK] {hex_path.name} ({hex_path.stat().st_size:,} bytes)")
+ return hex_path
+
+
+# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+# QEMU SD overlay (qcow2 thin-copy aligned to 8 GiB power-of-2)
+# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+def _ensure_sd_overlay() -> Path:
+ """
+ QEMU raspi3b requires the SD card size to be a power of 2.
+ The Raspbian image (5.29 GiB) does not satisfy this, so we create a
+ qcow2 overlay that presents the image as 8 GiB without modifying the
+ original file. The overlay is re-created on every run.
+ """
+ overlay = IMG_DIR / "sd_overlay.qcow2"
+ qemu_img = shutil.which("qemu-img") or "C:/Program Files/qemu/qemu-img.exe"
+
+ # Always rebuild so stale overlays don't cause issues
+ if overlay.exists():
+ overlay.unlink()
+
+ cmd = [
+ qemu_img, "create",
+ "-f", "qcow2",
+ "-b", str(SD_IMAGE),
+ "-F", "raw",
+ str(overlay),
+ "8G",
+ ]
+ result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
+ if result.returncode != 0:
+ raise RuntimeError(
+ f"qemu-img overlay creation failed:\n{result.stderr}"
+ )
+ print(f"[qemu-img] SD overlay created: {overlay.name} (8 GiB virtual, "
+ f"backed by {SD_IMAGE.name})")
+ return overlay
+
+
+# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+# QEMU command builder
+# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+def build_qemu_cmd() -> list[str]:
+ missing = [
+ f" {label}: {p}"
+ for label, p in [("kernel", KERNEL_IMG), ("dtb", DTB_FILE), ("sd", SD_IMAGE)]
+ if not p.exists()
+ ]
+ if missing:
+ raise RuntimeError("Missing QEMU image files:\n" + "\n".join(missing))
+
+ sd_path = _ensure_sd_overlay()
+
+ # raspi3b is only available in qemu-system-aarch64 on this platform
+ # (qemu-system-arm only ships raspi0/1ap/2b in this Windows build)
+ qemu_bin = shutil.which("qemu-system-aarch64") or "qemu-system-aarch64"
+
+ return [
+ qemu_bin,
+ "-M", "raspi3b",
+ "-kernel", str(KERNEL_IMG),
+ "-dtb", str(DTB_FILE),
+ "-drive", f"file={sd_path},if=sd,format=qcow2",
+ # init=/bin/sh -> skip systemd, get a root shell immediately
+ # rw -> mount root filesystem read-write
+ "-append", (
+ "console=ttyAMA0 "
+ "root=/dev/mmcblk0p2 rootwait rw "
+ "init=/bin/sh "
+ "dwc_otg.lpm_enable=0"
+ ),
+ "-m", "1G",
+ "-smp", "4",
+ "-display", "none",
+ # Connect Pi ttyAMA0 directly to our broker (broker is the TCP server)
+ "-serial", f"tcp:127.0.0.1:{BROKER_PI_PORT}",
+ ]
+
+
+# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+# Subprocess log drainer
+# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+async def drain_log(stream: Optional[asyncio.StreamReader], prefix: str) -> None:
+ if stream is None:
+ return
+ async for raw in stream:
+ line = raw.decode("utf-8", errors="replace").rstrip()
+ if line:
+ # encode to the console charset, dropping unrepresentable chars
+ safe = line.encode(sys.stdout.encoding or "ascii", errors="replace").decode(sys.stdout.encoding or "ascii")
+ print(f"{prefix} {safe}")
+
+
+# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+# Main test coroutine
+# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+async def run_test() -> bool:
+ _banner("Pi <-> Arduino Serial Integration Test")
+
+ # 1. Compile ---------------------------------------------------------------
+ hex_path = compile_arduino_sketch()
+
+ # 2. Start broker ----------------------------------------------------------
+ broker = SerialBroker()
+ pi_srv, avr_srv = await broker.start()
+
+ procs: list[asyncio.subprocess.Process] = []
+
+ try:
+ # 3. Start avr_runner.js (Arduino emulation) ---------------------------
+ node_exe = shutil.which("node") or "node"
+ avr_cmd = [
+ node_exe,
+ str(AVR_RUNNER),
+ str(hex_path),
+ "127.0.0.1",
+ str(BROKER_AVR_PORT),
+ ]
+ print(f"\n[avr] Starting Arduino emulator ...\n {' '.join(avr_cmd)}")
+ avr_proc = await asyncio.create_subprocess_exec(
+ *avr_cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.STDOUT,
+ )
+ procs.append(avr_proc)
+ asyncio.create_task(drain_log(avr_proc.stdout, "[avr]"))
+
+ # 4. Start QEMU (Raspberry Pi emulation) --------------------------------
+ qemu_cmd = build_qemu_cmd()
+ print(f"\n[qemu] Starting Raspberry Pi 3B emulation ...")
+ print(f" {' '.join(qemu_cmd[:6])} ...")
+ qemu_proc = await asyncio.create_subprocess_exec(
+ *qemu_cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.STDOUT,
+ )
+ procs.append(qemu_proc)
+ asyncio.create_task(drain_log(qemu_proc.stdout, "[qemu]"))
+
+ # 5. Wait for both TCP connections -------------------------------------
+ print(f"\n[broker] Waiting for Pi + Arduino TCP connections (30 s) ...")
+ try:
+ await asyncio.wait_for(
+ asyncio.gather(
+ broker.pi_connected.wait(),
+ broker.avr_connected.wait(),
+ ),
+ timeout=30.0,
+ )
+ except asyncio.TimeoutError:
+ print("[broker] [FAIL] Timeout waiting for TCP connections")
+ return False
+
+ # 6. Start relay + state machine ----------------------------------------
+ print(f"\n[broker] Both sides connected. Boot timeout: {BOOT_TIMEOUT_S} s\n")
+ asyncio.create_task(broker.run())
+
+ # 7. Await test result --------------------------------------------------
+ try:
+ await asyncio.wait_for(
+ broker.result_event.wait(),
+ timeout=BOOT_TIMEOUT_S + SCRIPT_TIMEOUT_S + 10,
+ )
+ except asyncio.TimeoutError:
+ print("[test] [FAIL] Global timeout -- no result received")
+ return False
+
+ return broker.test_passed
+
+ finally:
+ pi_srv.close()
+ avr_srv.close()
+ await pi_srv.wait_closed()
+ await avr_srv.wait_closed()
+
+ for p in procs:
+ try:
+ p.terminate()
+ await asyncio.wait_for(p.wait(), timeout=5.0)
+ except Exception:
+ try:
+ p.kill()
+ except Exception:
+ pass
+
+ # Clean up temp compilation dir
+ try:
+ shutil.rmtree(hex_path.parent.parent, ignore_errors=True)
+ except Exception:
+ pass
+
+
+# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+# Helpers
+# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+def _banner(title: str) -> None:
+ bar = "=" * 65
+ print(f"\n{bar}\n {title}\n{bar}")
+
+
+def _print_result(passed: bool) -> None:
+ _banner("Result")
+ if passed:
+ print(" [PASS] INTEGRATION TEST PASSED\n")
+ print(" Pi -> Arduino : HELLO_FROM_PI")
+ print(" Arduino -> Pi : ACK_FROM_ARDUINO")
+ print(" Pi confirmed : TEST_PASSED")
+ else:
+ print(" [FAIL] INTEGRATION TEST FAILED")
+ print("\n Troubleshooting hints:")
+ print(" - Confirm qemu-system-arm, node, and arduino-cli are in PATH.")
+ print(" - Check that init=/bin/sh produces a '#' prompt on the Pi OS.")
+ print(" - The SD image may require 'pi'/'raspberry' user for python3.")
+ print("=" * 65 + "\n")
+
+
+# |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+def main() -> None:
+ # Windows requires ProactorEventLoop for subprocess + asyncio
+ if sys.platform == "win32":
+ asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
+
+ passed = asyncio.run(run_test())
+ _print_result(passed)
+ sys.exit(0 if passed else 1)
+
+
+if __name__ == "__main__":
+ main()