diff --git a/.gitignore b/.gitignore index f83cf20..63c57ea 100644 --- a/.gitignore +++ b/.gitignore @@ -81,3 +81,13 @@ data/* .publicar/* .publicar_discord/* img/* + +# ESP32 QEMU runtime binaries (too large for git — build/download separately) +backend/app/services/libqemu-xtensa.dll +backend/app/services/esp32-v3-rom.bin +backend/app/services/esp32-v3-rom-app.bin + +# ESP32 build artifacts (ELF/MAP debug symbols — large, not needed for tests) +test/esp32-emulator/**/*.elf +test/esp32-emulator/**/*.map +test/esp32-emulator/out_*/ diff --git a/test/esp32-emulator/binaries_lcgamboa/blink_lcgamboa.ino.bin b/test/esp32-emulator/binaries_lcgamboa/blink_lcgamboa.ino.bin new file mode 100644 index 0000000..f5e0bc2 Binary files /dev/null and b/test/esp32-emulator/binaries_lcgamboa/blink_lcgamboa.ino.bin differ diff --git a/test/esp32-emulator/binaries_lcgamboa/blink_lcgamboa.ino.bootloader.bin b/test/esp32-emulator/binaries_lcgamboa/blink_lcgamboa.ino.bootloader.bin new file mode 100644 index 0000000..cd0d943 Binary files /dev/null and b/test/esp32-emulator/binaries_lcgamboa/blink_lcgamboa.ino.bootloader.bin differ diff --git a/test/esp32-emulator/binaries_lcgamboa/blink_lcgamboa.ino.merged.bin b/test/esp32-emulator/binaries_lcgamboa/blink_lcgamboa.ino.merged.bin new file mode 100644 index 0000000..9d6471f Binary files /dev/null and b/test/esp32-emulator/binaries_lcgamboa/blink_lcgamboa.ino.merged.bin differ diff --git a/test/esp32-emulator/binaries_lcgamboa/blink_lcgamboa.ino.partitions.bin b/test/esp32-emulator/binaries_lcgamboa/blink_lcgamboa.ino.partitions.bin new file mode 100644 index 0000000..2108af9 Binary files /dev/null and b/test/esp32-emulator/binaries_lcgamboa/blink_lcgamboa.ino.partitions.bin differ diff --git a/test/esp32-emulator/binaries_lcgamboa/serial_led.ino.merged.bin b/test/esp32-emulator/binaries_lcgamboa/serial_led.ino.merged.bin new file mode 100644 index 0000000..41022c3 Binary files /dev/null and b/test/esp32-emulator/binaries_lcgamboa/serial_led.ino.merged.bin differ diff --git a/test/esp32-emulator/sketches/arduino_serial_controller/arduino_serial_controller.ino b/test/esp32-emulator/sketches/arduino_serial_controller/arduino_serial_controller.ino new file mode 100644 index 0000000..de245e6 --- /dev/null +++ b/test/esp32-emulator/sketches/arduino_serial_controller/arduino_serial_controller.ino @@ -0,0 +1,72 @@ +/** + * arduino_serial_controller.ino — Arduino Uno firmware. + * + * Controls an ESP32's built-in LED (GPIO2) over a UART serial link. + * Sends LED_ON / LED_OFF commands and reads the ESP32's replies. + * + * ── Wiring ───────────────────────────────────────────────────────────────── + * In Velxio canvas: + * Wire Arduino pin 11 (SoftSerial TX) → ESP32 GPIO1 (UART0 RX) + * Wire Arduino pin 10 (SoftSerial RX) ← ESP32 GPIO3 (UART0 TX) + * + * On real hardware: use a 3.3V level-shifter (Arduino is 5V; ESP32 is 3.3V). + * + * ── Protocol ─────────────────────────────────────────────────────────────── + * Arduino → ESP32: "LED_ON\n" or "LED_OFF\n" or "PING\n" + * ESP32 → Arduino: "OK:ON\n" or "OK:OFF\n" or "PONG\n" + * + * ── Board ────────────────────────────────────────────────────────────────── + * FQBN: arduino:avr:uno + * No extra libraries required (SoftwareSerial is built-in). + */ + +#include + +// Use pins 10/11 so USB Serial (0/1) stays free for the Serial Monitor +SoftwareSerial esp32Link(10, 11); // RX, TX + +bool ledOn = false; + +void setup() { + Serial.begin(9600); // USB Serial Monitor + esp32Link.begin(115200); // Link to ESP32 + + Serial.println(F("Arduino: booting...")); + delay(2000); // Wait for ESP32 to boot + + // Connection check + Serial.println(F("Arduino: sending PING...")); + esp32Link.println("PING"); + delay(300); + + while (esp32Link.available()) { + String resp = esp32Link.readStringUntil('\n'); + resp.trim(); + Serial.print(F("ESP32 boot reply: ")); + Serial.println(resp); + } +} + +void loop() { + // Read any incoming response from ESP32 + while (esp32Link.available()) { + String resp = esp32Link.readStringUntil('\n'); + resp.trim(); + if (resp.length() > 0) { + Serial.print(F("ESP32: ")); + Serial.println(resp); + } + } + + // Toggle LED every second + if (ledOn) { + Serial.println(F("Arduino → ESP32: LED_OFF")); + esp32Link.println("LED_OFF"); + } else { + Serial.println(F("Arduino → ESP32: LED_ON")); + esp32Link.println("LED_ON"); + } + ledOn = !ledOn; + + delay(1000); +} diff --git a/test/esp32-emulator/sketches/blink_lcgamboa/blink_lcgamboa.ino b/test/esp32-emulator/sketches/blink_lcgamboa/blink_lcgamboa.ino new file mode 100644 index 0000000..837998d --- /dev/null +++ b/test/esp32-emulator/sketches/blink_lcgamboa/blink_lcgamboa.ino @@ -0,0 +1,54 @@ +/** + * blink_lcgamboa.ino — IRAM-safe blink for lcgamboa esp32-picsimlab QEMU machine. + * + * The lcgamboa machine runs WiFi/BT init on core 1 which periodically disables + * the SPI flash cache. Any code or data in IROM/DROM (flash cache) will crash + * with "Cache disabled but cached memory region accessed" when that happens. + * + * Solution: + * - ALL functions tagged IRAM_ATTR (placed in SRAM, not flash) + * - ALL string constants tagged DRAM_ATTR (placed in SRAM, not flash) + * - Use esp_rom_printf() (ROM function, no cache needed) instead of Serial + * - Use direct GPIO register writes instead of Arduino/IDF cached helpers + * - Use ets_delay_us() (ROM function) instead of delay() + */ + +// Direct GPIO register access (ESP32 TRM chapter 4) +#define GPIO_OUT_W1TS (*((volatile uint32_t*)0x3FF44008)) // set bits HIGH +#define GPIO_OUT_W1TC (*((volatile uint32_t*)0x3FF4400C)) // set bits LOW +#define GPIO_ENABLE_W1TS (*((volatile uint32_t*)0x3FF44020)) // enable output + +#define LED_BIT (1u << 2) // GPIO2 + +extern "C" { + void ets_delay_us(uint32_t us); + int esp_rom_printf(const char* fmt, ...); +} + +// String constants in DRAM (not DROM flash cache) +static const char DRAM_ATTR s_start[] = "LCGAMBOA_STARTED\n"; +static const char DRAM_ATTR s_on[] = "LED_ON\n"; +static const char DRAM_ATTR s_off[] = "LED_OFF\n"; +static const char DRAM_ATTR s_done[] = "BLINK_DONE\n"; + +void IRAM_ATTR setup() { + GPIO_ENABLE_W1TS = LED_BIT; // GPIO2 → output + + esp_rom_printf(s_start); + + for (int i = 0; i < 5; i++) { + GPIO_OUT_W1TS = LED_BIT; // HIGH + esp_rom_printf(s_on); + ets_delay_us(300000); // 300 ms (ROM busy-wait, cache-safe) + + GPIO_OUT_W1TC = LED_BIT; // LOW + esp_rom_printf(s_off); + ets_delay_us(300000); + } + + esp_rom_printf(s_done); +} + +void IRAM_ATTR loop() { + ets_delay_us(1000000); // idle 1 s, no flash access +} diff --git a/test/esp32-emulator/sketches/blink_qemu.ino b/test/esp32-emulator/sketches/blink_qemu.ino new file mode 100644 index 0000000..951b2ab --- /dev/null +++ b/test/esp32-emulator/sketches/blink_qemu.ino @@ -0,0 +1,31 @@ +// ESP32 QEMU-compatible blink test +// Uses short delays and prints before delay to avoid FreeRTOS cache issue. + +#define LED_PIN 2 + +void setup() { + Serial.begin(115200); + Serial.println("ESP32 Blink Test Started"); + pinMode(LED_PIN, OUTPUT); + + // Print LED ON/OFF in setup before any vTaskDelay is needed + for (int i = 0; i < 3; i++) { + digitalWrite(LED_PIN, HIGH); + Serial.println("LED ON"); + Serial.flush(); + ets_delay_us(200000); // 200ms ROM busy-wait (no FreeRTOS, no cache issue) + + digitalWrite(LED_PIN, LOW); + Serial.println("LED OFF"); + Serial.flush(); + ets_delay_us(200000); + } + + Serial.println("BLINK_DONE"); + Serial.flush(); +} + +void loop() { + // Nothing - all work done in setup to avoid FreeRTOS cache crash in QEMU + ets_delay_us(1000000); +} diff --git a/test/esp32-emulator/sketches/serial_led/serial_led.ino b/test/esp32-emulator/sketches/serial_led/serial_led.ino new file mode 100644 index 0000000..a87a2c0 --- /dev/null +++ b/test/esp32-emulator/sketches/serial_led/serial_led.ino @@ -0,0 +1,59 @@ +/** + * serial_led.ino — ESP32 firmware for Arduino+ESP32 serial integration test. + * + * Listens on UART0 (Serial / GPIO1=TX, GPIO3=RX) for commands from an Arduino Uno: + * "LED_ON\n" → turn GPIO2 LED on, reply "OK:ON\n" + * "LED_OFF\n" → turn GPIO2 LED off, reply "OK:OFF\n" + * "PING\n" → reply "PONG\n" (connection check) + * + * Unknown commands are silently ignored (no crash). + * + * ── Compile (arduino-esp32 2.0.17) ──────────────────────────────────────── + * arduino-cli compile \ + * --fqbn esp32:esp32:esp32:FlashMode=dio \ + * --output-dir test/esp32-emulator/out_serial_led \ + * test/esp32-emulator/sketches/serial_led + * + * ── Merge to 4 MB flash image required by QEMU ──────────────────────────── + * esptool.py --chip esp32 merge_bin --fill-flash-size 4MB \ + * -o test/esp32-emulator/binaries_lcgamboa/serial_led.ino.merged.bin \ + * --flash_mode dio --flash_size 4MB \ + * 0x1000 test/esp32-emulator/out_serial_led/serial_led.ino.bootloader.bin \ + * 0x8000 test/esp32-emulator/out_serial_led/serial_led.ino.partitions.bin \ + * 0x10000 test/esp32-emulator/out_serial_led/serial_led.ino.bin + * + * ── GPIO pinmap in lcgamboa ──────────────────────────────────────────────── + * Identity mapping: pinmap position i → GPIO (i-1). + * GPIO2 → position 3 in the pinmap → picsimlab_write_pin(pin=3, value). + * In the Python test / Esp32LibBridge: gpio_change events arrive as pin=2. + */ + +#define LED_PIN 2 + +void setup() { + Serial.begin(115200); + pinMode(LED_PIN, OUTPUT); + digitalWrite(LED_PIN, LOW); + + // Small delay so QEMU UART listener is ready before we write + delay(100); + Serial.println("READY"); +} + +void loop() { + if (Serial.available() > 0) { + String cmd = Serial.readStringUntil('\n'); + cmd.trim(); + + if (cmd == "LED_ON") { + digitalWrite(LED_PIN, HIGH); + Serial.println("OK:ON"); + } else if (cmd == "LED_OFF") { + digitalWrite(LED_PIN, LOW); + Serial.println("OK:OFF"); + } else if (cmd == "PING") { + Serial.println("PONG"); + } + // Unknown commands silently ignored + } +} diff --git a/test/esp32/test_arduino_esp32_integration.py b/test/esp32/test_arduino_esp32_integration.py new file mode 100644 index 0000000..4d97b61 --- /dev/null +++ b/test/esp32/test_arduino_esp32_integration.py @@ -0,0 +1,418 @@ +""" +test_arduino_esp32_integration.py +================================== +Integration test: Arduino Uno ↔ ESP32 (QEMU/lcgamboa) over serial. + +Architecture +------------ + [Python "Arduino"] ──UART0 bytes──→ [ESP32 QEMU (lcgamboa)] + ←──UART0 bytes── + + The "Arduino simulator" is a Python coroutine that sends LED_ON / LED_OFF / + PING commands via bridge.uart_send(), exactly as a real Arduino Uno would + send bytes over its TX line. + + In Velxio (browser), the same protocol works with the AVR emulator (avr8js): + - Wire Arduino TX pin → ESP32 GPIO1 (UART0 RX) + - Wire Arduino RX pin ← ESP32 GPIO3 (UART0 TX) + - The store routes bytes between the two bridges via WebSocket + +ESP32 firmware required +----------------------- + test/esp32-emulator/sketches/serial_led/serial_led.ino + + Compile: + arduino-cli compile \\ + --fqbn esp32:esp32:esp32:FlashMode=dio \\ + --output-dir test/esp32-emulator/out_serial_led \\ + test/esp32-emulator/sketches/serial_led + + Merge (4 MB required by QEMU esp32-picsimlab): + esptool.py --chip esp32 merge_bin --fill-flash-size 4MB \\ + -o test/esp32-emulator/binaries_lcgamboa/serial_led.ino.merged.bin \\ + --flash_mode dio --flash_size 4MB \\ + 0x1000 test/esp32-emulator/out_serial_led/serial_led.ino.bootloader.bin \\ + 0x8000 test/esp32-emulator/out_serial_led/serial_led.ino.partitions.bin \\ + 0x10000 test/esp32-emulator/out_serial_led/serial_led.ino.bin + +GPIO pinmap note +---------------- + Identity pinmap: position i → GPIO (i-1). + GPIO2 (LED_PIN) → pinmap position 3. + Bridge translates slot→real GPIO, so gpio_change events arrive as pin=2. + +Run: + cd test/esp32 + python -m pytest test_arduino_esp32_integration.py -v + + # Skip integration (unit-only): + SKIP_LIB_INTEGRATION=1 python -m pytest test_arduino_esp32_integration.py -v +""" + +import asyncio +import base64 +import os +import pathlib +import sys +import time +import unittest + +# ── paths ───────────────────────────────────────────────────────────────────── +_REPO = pathlib.Path(__file__).parent.parent.parent +_BACKEND = _REPO / "backend" +_FW_PATH = ( + _REPO / "test" / "esp32-emulator" / "binaries_lcgamboa" + / "serial_led.ino.merged.bin" +) +_SKETCH_PATH = ( + _REPO / "test" / "esp32-emulator" / "sketches" + / "serial_led" / "serial_led.ino" +) + +sys.path.insert(0, str(_BACKEND)) + +from app.services.esp32_lib_bridge import Esp32LibBridge # noqa: E402 +from app.services.esp32_lib_manager import LIB_PATH, EspLibManager # noqa: E402 + +_DLL_AVAILABLE = bool(LIB_PATH) and os.path.isfile(LIB_PATH) +_FW_AVAILABLE = _FW_PATH.is_file() +_SKIP_INT = os.environ.get("SKIP_LIB_INTEGRATION", "") == "1" + +_SKIP_DLL = "libqemu-xtensa.dll not available (set QEMU_ESP32_LIB)" +_SKIP_FW = ( + f"Firmware not found: {_FW_PATH}\n" + "Compile serial_led.ino with arduino-esp32 2.0.17 then merge with " + "--fill-flash-size 4MB. See docstring for commands." +) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Unit tests — no QEMU process needed +# ═══════════════════════════════════════════════════════════════════════════════ + +class TestSketchExists(unittest.TestCase): + """Verify the Arduino/ESP32 sketch source files are present in the repo.""" + + def test_esp32_sketch_exists(self): + self.assertTrue( + _SKETCH_PATH.is_file(), + f"ESP32 sketch not found: {_SKETCH_PATH}", + ) + + def test_arduino_sketch_exists(self): + arduino_sketch = ( + _REPO / "test" / "esp32-emulator" / "sketches" + / "arduino_serial_controller" / "arduino_serial_controller.ino" + ) + self.assertTrue( + arduino_sketch.is_file(), + f"Arduino sketch not found: {arduino_sketch}", + ) + + def test_firmware_path_documented(self): + """Firmware path is a .merged.bin (4 MB) as required by QEMU.""" + self.assertTrue(str(_FW_PATH).endswith(".merged.bin")) + + +@unittest.skipUnless(_DLL_AVAILABLE, _SKIP_DLL) +class TestManagerApi(unittest.TestCase): + """Manager API is present and compatible.""" + + def test_is_available(self): + self.assertTrue(EspLibManager().is_available()) + + def test_manager_has_send_serial_bytes(self): + mgr = EspLibManager() + self.assertTrue( + hasattr(mgr, "send_serial_bytes"), + "EspLibManager must expose send_serial_bytes() for Arduino routing", + ) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Integration tests — starts QEMU with serial_led firmware +# ═══════════════════════════════════════════════════════════════════════════════ + +@unittest.skipUnless(_DLL_AVAILABLE, _SKIP_DLL) +@unittest.skipUnless(_FW_AVAILABLE, _SKIP_FW) +@unittest.skipIf(_SKIP_INT, "SKIP_LIB_INTEGRATION=1") +class TestArduinoEsp32Serial(unittest.TestCase): + """ + Live integration: Arduino Uno (simulated in Python) ↔ ESP32 (QEMU lcgamboa). + + setUpClass boots the ESP32 once. All tests share the same running instance. + + The "Arduino" is a Python helper that calls bridge.uart_send() — the same + bytes that the AVR emulator (avr8js) would produce from the Serial.println() + calls in arduino_serial_controller.ino. + """ + + BOOT_TIMEOUT = 20.0 # seconds to wait for READY + REPLY_TIMEOUT = 8.0 # seconds to wait for command reply + GPIO_TIMEOUT = 8.0 # seconds to wait for GPIO change + + @classmethod + def setUpClass(cls): + cls.loop = asyncio.new_event_loop() + cls.bridge = Esp32LibBridge(LIB_PATH, cls.loop) + + cls.serial_lines: list[str] = [] # complete UART0 lines received + cls.gpio_events: list[tuple[int, int]] = [] # (pin, state) + cls._uart_buf = "" + + cls.bridge.register_uart_listener(cls._on_uart) + cls.bridge.register_gpio_listener(cls._on_gpio) + + fw_b64 = base64.b64encode(_FW_PATH.read_bytes()).decode() + cls.bridge.start(fw_b64, machine="esp32-picsimlab") + + # Wait for READY before any test runs + deadline = time.monotonic() + cls.BOOT_TIMEOUT + while time.monotonic() < deadline: + cls.loop.run_until_complete(asyncio.sleep(0.05)) + if any("READY" in ln for ln in cls.serial_lines): + break + + @classmethod + def tearDownClass(cls): + cls.bridge.stop() + cls.loop.close() + + # ── Internal callbacks (called from QEMU thread) ────────────────────────── + + @classmethod + def _on_uart(cls, uart_id: int, byte_val: int) -> None: + if uart_id != 0: + return + ch = chr(byte_val) + cls._uart_buf += ch + if ch == "\n": + line = cls._uart_buf.strip() + if line: + cls.serial_lines.append(line) + cls._uart_buf = "" + + @classmethod + def _on_gpio(cls, pin: int, state: int) -> None: + cls.gpio_events.append((pin, state)) + + # ── Helpers ─────────────────────────────────────────────────────────────── + + def _wait_line(self, contains: str, timeout: float) -> str | None: + """Wait until a line containing `contains` arrives on UART0.""" + seen = set(self.serial_lines) + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + self.loop.run_until_complete(asyncio.sleep(0.05)) + for ln in self.serial_lines: + if ln not in seen and contains in ln: + return ln + seen.add(ln) + # Final check including lines seen before the call + return next((ln for ln in self.serial_lines if contains in ln), None) + + def _wait_gpio(self, pin: int, state: int, timeout: float) -> bool: + """Wait until GPIO `pin` reaches `state`.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + self.loop.run_until_complete(asyncio.sleep(0.05)) + if any(p == pin and s == state for p, s in self.gpio_events): + return True + return False + + def _arduino_send(self, cmd: str) -> None: + """ + Simulate Arduino sending a command string over its TX line. + + Equivalent to: esp32Link.println(cmd); // in arduino_serial_controller.ino + Routes via bridge.uart_send() — same path used in Velxio when + the AVR simulator's serial bytes are forwarded to the ESP32 UART input. + """ + self.bridge.uart_send(uart_id=0, data=(cmd + "\n").encode()) + + # ── Tests ───────────────────────────────────────────────────────────────── + + def test_01_esp32_boots_and_sends_ready(self): + """ESP32 firmware boots and sends 'READY' over UART0.""" + ok = any("READY" in ln for ln in self.serial_lines) + self.assertTrue( + ok, + f"ESP32 did not send READY within {self.BOOT_TIMEOUT}s.\n" + f"Received lines: {self.serial_lines}", + ) + + def test_02_ping_pong_handshake(self): + """ + Arduino sends PING; ESP32 replies PONG. + + Simulates the setup() block of arduino_serial_controller.ino: + esp32Link.println("PING"); + → ESP32 replies: "PONG" + """ + before = len(self.serial_lines) + self._arduino_send("PING") + deadline = time.monotonic() + self.REPLY_TIMEOUT + while time.monotonic() < deadline: + self.loop.run_until_complete(asyncio.sleep(0.05)) + if any("PONG" in ln for ln in self.serial_lines[before:]): + break + got_pong = any("PONG" in ln for ln in self.serial_lines[before:]) + self.assertTrue(got_pong, "ESP32 did not reply PONG to PING") + + def test_03_led_on_command(self): + """ + Arduino sends LED_ON; ESP32 turns on GPIO2 and replies OK:ON. + + Simulates: esp32Link.println("LED_ON"); in arduino_serial_controller.ino + Verifies: + - ESP32 serial output contains "OK:ON" + - GPIO2 goes HIGH (gpio_change pin=2, state=1) + """ + gpio_before = len(self.gpio_events) + lines_before = len(self.serial_lines) + + self._arduino_send("LED_ON") + + # Wait for serial reply + deadline = time.monotonic() + self.REPLY_TIMEOUT + while time.monotonic() < deadline: + self.loop.run_until_complete(asyncio.sleep(0.05)) + if any("OK:ON" in ln for ln in self.serial_lines[lines_before:]): + break + got_reply = any("OK:ON" in ln for ln in self.serial_lines[lines_before:]) + self.assertTrue(got_reply, "ESP32 did not send OK:ON after LED_ON") + + # Wait for GPIO2 HIGH + ok = self._wait_gpio(pin=2, state=1, timeout=self.GPIO_TIMEOUT) + self.assertTrue( + ok, + f"GPIO2 did not go HIGH after LED_ON command.\n" + f"GPIO events since test start: {self.gpio_events[gpio_before:]}", + ) + + def test_04_led_off_command(self): + """ + Arduino sends LED_OFF; ESP32 turns off GPIO2 and replies OK:OFF. + + Simulates the loop() in arduino_serial_controller.ino toggling off. + """ + # Ensure LED is on first + self._arduino_send("LED_ON") + self._wait_gpio(pin=2, state=1, timeout=self.GPIO_TIMEOUT) + + gpio_before = len(self.gpio_events) + lines_before = len(self.serial_lines) + + self._arduino_send("LED_OFF") + + deadline = time.monotonic() + self.REPLY_TIMEOUT + while time.monotonic() < deadline: + self.loop.run_until_complete(asyncio.sleep(0.05)) + if any("OK:OFF" in ln for ln in self.serial_lines[lines_before:]): + break + got_reply = any("OK:OFF" in ln for ln in self.serial_lines[lines_before:]) + self.assertTrue(got_reply, "ESP32 did not send OK:OFF after LED_OFF") + + ok = self._wait_gpio(pin=2, state=0, timeout=self.GPIO_TIMEOUT) + self.assertTrue( + ok, + f"GPIO2 did not go LOW after LED_OFF.\n" + f"GPIO events: {self.gpio_events[gpio_before:]}", + ) + + def test_05_toggle_five_times(self): + """ + Arduino toggles LED 5 times (like the loop() in arduino_serial_controller.ino). + Expects ≥10 GPIO2 transitions (5 HIGH + 5 LOW). + """ + gpio_before = len(self.gpio_events) + + for _ in range(5): + self._arduino_send("LED_ON") + self._wait_line("OK:ON", self.REPLY_TIMEOUT) + self._arduino_send("LED_OFF") + self._wait_line("OK:OFF", self.REPLY_TIMEOUT) + + pin2 = [(p, s) for p, s in self.gpio_events[gpio_before:] if p == 2] + self.assertGreaterEqual( + len(pin2), 10, + f"Expected ≥10 GPIO2 transitions for 5 toggles, got {len(pin2)}: {pin2}", + ) + + def test_06_gpio_sequence_on_off(self): + """GPIO2 transitions after toggles follow ON→OFF→ON→OFF pattern.""" + gpio_before = len(self.gpio_events) + + self._arduino_send("LED_ON") + self._wait_line("OK:ON", self.REPLY_TIMEOUT) + self._arduino_send("LED_OFF") + self._wait_line("OK:OFF", self.REPLY_TIMEOUT) + self._arduino_send("LED_ON") + self._wait_line("OK:ON", self.REPLY_TIMEOUT) + self._arduino_send("LED_OFF") + self._wait_line("OK:OFF", self.REPLY_TIMEOUT) + + pin2 = [s for p, s in self.gpio_events[gpio_before:] if p == 2] + # Find first HIGH to anchor the pattern + try: + start = pin2.index(1) + except ValueError: + self.fail(f"GPIO2 never went HIGH. Events: {pin2}") + sequence = pin2[start:start + 4] + self.assertEqual( + sequence, [1, 0, 1, 0], + f"Expected ON→OFF→ON→OFF sequence, got: {sequence}", + ) + + def test_07_unknown_command_does_not_crash(self): + """ + Unknown serial command from Arduino is silently ignored. + ESP32 continues to respond to valid commands (no crash/hang). + """ + before = len(self.serial_lines) + self._arduino_send("GARBAGE_CMD_XYZ_123") + # Small settle time + self.loop.run_until_complete(asyncio.sleep(0.3)) + + # ESP32 must still respond to PING + self._arduino_send("PING") + deadline = time.monotonic() + self.REPLY_TIMEOUT + while time.monotonic() < deadline: + self.loop.run_until_complete(asyncio.sleep(0.05)) + if any("PONG" in ln for ln in self.serial_lines[before:]): + break + ok = any("PONG" in ln for ln in self.serial_lines[before:]) + self.assertTrue( + ok, + "ESP32 became unresponsive after receiving an unknown command", + ) + + def test_08_rapid_commands(self): + """ + Rapid burst of commands (no delay between them) does not stall UART. + Simulates fast polling from a heavily-loaded Arduino sketch. + """ + lines_before = len(self.serial_lines) + for _ in range(10): + self._arduino_send("LED_ON") + self._arduino_send("LED_OFF") + + # Wait for at least 10 replies + deadline = time.monotonic() + self.REPLY_TIMEOUT * 2 + while time.monotonic() < deadline: + self.loop.run_until_complete(asyncio.sleep(0.05)) + replies = [ + ln for ln in self.serial_lines[lines_before:] + if "OK:" in ln + ] + if len(replies) >= 10: + break + replies = [ln for ln in self.serial_lines[lines_before:] if "OK:" in ln] + self.assertGreaterEqual( + len(replies), 10, + f"Expected ≥10 OK replies for 10 toggle pairs, got {len(replies)}", + ) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/test/esp32/test_esp32_lib_bridge.py b/test/esp32/test_esp32_lib_bridge.py new file mode 100644 index 0000000..565cdfc --- /dev/null +++ b/test/esp32/test_esp32_lib_bridge.py @@ -0,0 +1,409 @@ +""" +Tests for ESP32 emulation via lcgamboa libqemu-xtensa.dll. + +What this tests +=============== + Unit (fast, no QEMU process): + - DLL file exists at the expected path + - DLL loads via ctypes without error + - All required C symbols are exported + - EspLibManager.is_available() returns True + - Manager API surface matches EspQemuManager (duck-typing compatibility) + + Integration (starts the real QEMU DLL): + - QEMU boots with IRAM-safe blink firmware (esp32-picsimlab machine) + - UART output: "LCGAMBOA_STARTED", "LED_ON", "LED_OFF", "BLINK_DONE" + - GPIO callbacks: pin 3 (GPIO2 identity-mapped) toggles HIGH/LOW 5 times + - GPIO input: qemu_picsimlab_set_pin() accepted without crash + - ADC input: qemu_picsimlab_set_apin() accepted without crash + - UART RX: qemu_picsimlab_uart_receive() accepted without crash + +Firmware notes +-------------- + Uses blink_lcgamboa.ino compiled with: + - IRAM_ATTR on all functions → code lives in SRAM, not flash cache + - DRAM_ATTR on all strings → data lives in SRAM, not flash cache + - esp_rom_printf() for UART output (ROM function, cache-safe) + - Direct GPIO register writes (no IDF cached helpers) + - ets_delay_us() for delays (ROM function, cache-safe) + + This avoids the "Cache disabled but cached memory region accessed" crash that + occurs in the lcgamboa machine when the WiFi core temporarily disables the + SPI flash cache during emulated radio init. + +GPIO pinmap note +---------------- + Identity pinmap: position i → GPIO (i-1). + GPIO2 (LED_BIT) → pinmap position 3 → picsimlab_write_pin(pin=3, value). +""" + +import ctypes +import os +import pathlib +import sys +import threading +import time +import unittest + +# ── paths ──────────────────────────────────────────────────────────────────── +_REPO = pathlib.Path(__file__).parent.parent.parent +_BACKEND = _REPO / "backend" +_FW_LCGAMBOA = ( + _REPO / "test" / "esp32-emulator" / "binaries_lcgamboa" + / "blink_lcgamboa.ino.merged.bin" +) + +sys.path.insert(0, str(_BACKEND)) + +from app.services.esp32_lib_bridge import _DEFAULT_LIB, _MINGW64_BIN, _PINMAP, _GPIO_COUNT +from app.services.esp32_lib_manager import LIB_PATH, EspLibManager + +_DLL_AVAILABLE = bool(LIB_PATH) and os.path.isfile(LIB_PATH) +_FW_AVAILABLE = _FW_LCGAMBOA.is_file() +_SKIP_INTEGRATION = os.environ.get("SKIP_LIB_INTEGRATION", "") == "1" + +_GPIO_PIN_FOR_LED = 3 # pinmap position for GPIO2 (identity map: pos i → gpio i-1) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Unit tests — no QEMU process needed +# ═══════════════════════════════════════════════════════════════════════════════ + +class TestDllExists(unittest.TestCase): + def test_default_lib_path_is_absolute(self): + self.assertTrue(pathlib.Path(_DEFAULT_LIB).is_absolute()) + + def test_dll_file_exists(self): + self.assertTrue( + os.path.isfile(_DEFAULT_LIB), + f"libqemu-xtensa.dll not found at {_DEFAULT_LIB}" + ) + + def test_lib_path_resolved(self): + self.assertTrue(_DLL_AVAILABLE, f"LIB_PATH='{LIB_PATH}' — empty or missing") + + def test_mingw64_bin_exists(self): + self.assertTrue(os.path.isdir(_MINGW64_BIN), f"MinGW64 bin not found: {_MINGW64_BIN}") + + def test_rom_binaries_exist(self): + rom_dir = pathlib.Path(_DEFAULT_LIB).parent + for name in ("esp32-v3-rom.bin", "esp32-v3-rom-app.bin"): + self.assertTrue((rom_dir / name).is_file(), f"ROM binary missing: {name}") + + +class TestDllLoads(unittest.TestCase): + REQUIRED_SYMBOLS = [ + "qemu_init", "qemu_main_loop", + "qemu_picsimlab_register_callbacks", + "qemu_picsimlab_set_pin", "qemu_picsimlab_set_apin", + "qemu_picsimlab_uart_receive", + "qemu_picsimlab_get_internals", "qemu_picsimlab_get_TIOCM", + "qemu_picsimlab_flash_dump", + "picsimlab_write_pin", "picsimlab_dir_pin", "picsimlab_uart_tx_event", + ] + + @classmethod + def setUpClass(cls): + if not _DLL_AVAILABLE: + raise unittest.SkipTest("DLL not available") + if os.name == "nt" and os.path.isdir(_MINGW64_BIN): + os.add_dll_directory(_MINGW64_BIN) + cls.lib = ctypes.CDLL(LIB_PATH) + + def test_all_required_symbols_exported(self): + missing = [s for s in self.REQUIRED_SYMBOLS if not hasattr(self.lib, s)] + self.assertFalse(missing, f"Missing DLL exports: {missing}") + + def test_qemu_init_is_callable(self): + self.assertIsNotNone(self.lib.qemu_init) + + def test_qemu_main_loop_is_callable(self): + self.assertIsNotNone(self.lib.qemu_main_loop) + + +class TestPinmap(unittest.TestCase): + def test_pinmap_count(self): + self.assertEqual(_PINMAP[0], _GPIO_COUNT) + + def test_pinmap_identity_mapping(self): + for i in range(1, _GPIO_COUNT + 1): + self.assertEqual(_PINMAP[i], i - 1) + + def test_gpio2_at_position3(self): + self.assertEqual(_PINMAP[_GPIO_PIN_FOR_LED], 2) + + +class TestManagerAvailability(unittest.TestCase): + def test_is_available(self): + self.assertTrue(EspLibManager().is_available()) + + def test_api_surface_matches_subprocess_manager(self): + from app.services.esp_qemu_manager import EspQemuManager # noqa: F401 + for m in ["start_instance", "stop_instance", "load_firmware", + "set_pin_state", "send_serial_bytes"]: + self.assertTrue(hasattr(EspLibManager(), m), f"Missing: {m}") + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Integration tests — starts QEMU with lcgamboa DLL +# ═══════════════════════════════════════════════════════════════════════════════ + +@unittest.skipUnless(_DLL_AVAILABLE, "libqemu-xtensa.dll not available") +@unittest.skipUnless(_FW_AVAILABLE, f"Firmware not found: {_FW_LCGAMBOA}") +@unittest.skipIf(_SKIP_INTEGRATION, "SKIP_LIB_INTEGRATION=1") +class TestEsp32LibIntegration(unittest.TestCase): + """ + Live tests: boot the IRAM-safe blink firmware in the lcgamboa DLL. + + setUpClass starts QEMU once. All tests share the same instance. + Firmware: blink_lcgamboa.ino — blinks GPIO2 5x using IRAM/DRAM-safe code. + Expected UART: LCGAMBOA_STARTED, LED_ON×5, LED_OFF×5, BLINK_DONE. + Expected GPIO: pin=3 toggles HIGH/LOW 5 times each. + """ + + BOOT_TIMEOUT = 30.0 + BLINK_TIMEOUT = 60.0 + LED_PIN = _GPIO_PIN_FOR_LED # pinmap position 3 → GPIO2 + + _uart_lines: list + _gpio_events: list + _qemu_thread: threading.Thread + + @classmethod + def setUpClass(cls): + from app.services.esp32_lib_bridge import ( + _WRITE_PIN, _DIR_PIN, _I2C_EVENT, _SPI_EVENT, _UART_TX, _RMT_EVENT, + _CallbacksT, _PINMAP, + ) + + if os.name == "nt" and os.path.isdir(_MINGW64_BIN): + os.add_dll_directory(_MINGW64_BIN) + cls._lib = ctypes.CDLL(LIB_PATH) + + cls._uart_lines = [] + cls._gpio_events = [] + cls._uart_buf = bytearray() + cls._uart_lock = threading.Lock() + cls._gpio_lock = threading.Lock() + + def _on_pin(pin, value): + with cls._gpio_lock: + cls._gpio_events.append((pin, value)) + + def _on_uart(uart_id, byte_val): + with cls._uart_lock: + cls._uart_buf.append(byte_val) + if byte_val == ord('\n'): + line = cls._uart_buf.decode("utf-8", errors="replace").strip() + cls._uart_buf.clear() + if line: + cls._uart_lines.append(line) + + cb_write = _WRITE_PIN(_on_pin) + cb_dir = _DIR_PIN(lambda p, v: None) + cb_i2c = _I2C_EVENT(lambda *a: 0) + cb_spi = _SPI_EVENT(lambda *a: 0) + cb_uart = _UART_TX(_on_uart) + cb_rmt = _RMT_EVENT(lambda *a: None) + + cbs = _CallbacksT( + picsimlab_write_pin = cb_write, + picsimlab_dir_pin = cb_dir, + picsimlab_i2c_event = cb_i2c, + picsimlab_spi_event = cb_spi, + picsimlab_uart_tx_event = cb_uart, + pinmap = ctypes.cast(_PINMAP, ctypes.c_void_p).value, + picsimlab_rmt_event = cb_rmt, + ) + cls._keep = (cbs, cb_write, cb_dir, cb_i2c, cb_spi, cb_uart, cb_rmt) + cls._lib.qemu_picsimlab_register_callbacks(ctypes.byref(cbs)) + + rom_dir = str(pathlib.Path(_DEFAULT_LIB).parent).encode() + fw = str(_FW_LCGAMBOA).encode() + args = [b"qemu", b"-M", b"esp32-picsimlab", b"-nographic", + b"-L", rom_dir, + b"-drive", b"file=" + fw + b",if=mtd,format=raw"] + argc = len(args) + argv = (ctypes.c_char_p * argc)(*args) + + init_done = threading.Event() + + def _qemu_run(): + cls._lib.qemu_init(argc, argv, None) + init_done.set() + cls._lib.qemu_main_loop() + + cls._qemu_thread = threading.Thread(target=_qemu_run, daemon=True, name="qemu-lib-test") + cls._qemu_thread.start() + + if not init_done.wait(timeout=30.0): + raise RuntimeError("qemu_init() timed out") + + # ── Helpers ────────────────────────────────────────────────────────────── + + def _wait_uart(self, text: str, timeout: float) -> bool: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + with self._uart_lock: + if text in self._uart_lines: + return True + time.sleep(0.1) + return False + + def _wait_gpio(self, pin: int, value: int, timeout: float) -> bool: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + with self._gpio_lock: + if any(p == pin and v == value for p, v in self._gpio_events): + return True + time.sleep(0.05) + return False + + # ── UART output tests ───────────────────────────────────────────────────── + + def test_01_boot_started(self): + ok = self._wait_uart("LCGAMBOA_STARTED", self.BOOT_TIMEOUT) + self.assertTrue(ok, f"'LCGAMBOA_STARTED' not received in {self.BOOT_TIMEOUT}s.\n" + f"UART lines: {self._uart_lines[:15]}") + + def test_02_uart_led_on(self): + ok = self._wait_uart("LED_ON", self.BLINK_TIMEOUT) + self.assertTrue(ok, f"'LED_ON' not seen. Lines: {self._uart_lines}") + + def test_03_uart_led_off(self): + """LED_OFF must appear — IDF 4.x does not crash between ON and OFF.""" + ok = self._wait_uart("LED_OFF", self.BLINK_TIMEOUT) + self.assertTrue(ok, f"'LED_OFF' not seen. Lines: {self._uart_lines}") + + def test_04_uart_blink_done(self): + """BLINK_DONE must appear after all 5 cycles — IDF 4.x stays stable.""" + ok = self._wait_uart("BLINK_DONE", self.BLINK_TIMEOUT) + self.assertTrue(ok, f"'BLINK_DONE' not seen after 5 cycles.\n" + f"Lines: {self._uart_lines}") + + def test_05_uart_five_led_on(self): + """Firmware blinks 5 times — expect exactly 5 'LED_ON' lines.""" + deadline = time.monotonic() + self.BLINK_TIMEOUT + while time.monotonic() < deadline: + with self._uart_lock: + count = self._uart_lines.count("LED_ON") + if count >= 5: + break + time.sleep(0.1) + with self._uart_lock: + count = self._uart_lines.count("LED_ON") + self.assertEqual(count, 5, + f"Expected 5×LED_ON, got {count}. Lines: {self._uart_lines}") + + # ── GPIO output callback tests ──────────────────────────────────────────── + + def test_06_gpio_led_goes_high(self): + ok = self._wait_gpio(self.LED_PIN, 1, self.BLINK_TIMEOUT) + with self._gpio_lock: + ev = list(self._gpio_events) + self.assertTrue(ok, f"GPIO pin {self.LED_PIN} never HIGH. Events: {ev[:20]}") + + def test_07_gpio_led_goes_low(self): + ok = self._wait_gpio(self.LED_PIN, 0, self.BLINK_TIMEOUT) + with self._gpio_lock: + ev = list(self._gpio_events) + self.assertTrue(ok, f"GPIO pin {self.LED_PIN} never LOW. Events: {ev[:20]}") + + def test_08_gpio_toggles_five_times(self): + """GPIO2 must toggle 5 HIGH + 5 LOW = 10 transitions (IDF 4.x is stable).""" + deadline = time.monotonic() + self.BLINK_TIMEOUT + while time.monotonic() < deadline: + with self._gpio_lock: + led = [(p, v) for p, v in self._gpio_events if p == self.LED_PIN] + if len(led) >= 10: + break + time.sleep(0.1) + with self._gpio_lock: + led = [(p, v) for p, v in self._gpio_events if p == self.LED_PIN] + self.assertGreaterEqual( + len(led), 10, + f"Expected ≥10 LED transitions, got {len(led)}: {led}" + ) + + def test_09_gpio_sequence_correct(self): + """GPIO2 must alternate HIGH/LOW for 5 full cycles.""" + deadline = time.monotonic() + self.BLINK_TIMEOUT + while time.monotonic() < deadline: + with self._gpio_lock: + led = [v for p, v in self._gpio_events if p == self.LED_PIN] + if len(led) >= 10: + break + time.sleep(0.1) + with self._gpio_lock: + led = [v for p, v in self._gpio_events if p == self.LED_PIN] + try: + start = led.index(1) + except ValueError: + self.fail(f"GPIO2 never went HIGH. Values seen: {led}") + sequence = led[start:start + 10] + expected = [1, 0] * 5 + self.assertEqual(sequence, expected, + f"GPIO2 sequence not alternating. Got: {sequence}") + + # ── GPIO input / ADC / UART RX tests ───────────────────────────────────── + + def test_10_set_pin_accepted(self): + """qemu_picsimlab_set_pin() must not raise.""" + try: + self._lib.qemu_picsimlab_set_pin(0, 1) + self._lib.qemu_picsimlab_set_pin(0, 0) + self._lib.qemu_picsimlab_set_pin(4, 1) + self._lib.qemu_picsimlab_set_pin(34, 0) # GPIO34 = ADC1_CH6 + except Exception as e: + self.fail(f"qemu_picsimlab_set_pin raised: {e}") + + def test_11_set_adc_accepted(self): + """qemu_picsimlab_set_apin() must accept 12-bit values without raising.""" + try: + self._lib.qemu_picsimlab_set_apin(0, 0) + self._lib.qemu_picsimlab_set_apin(0, 2048) + self._lib.qemu_picsimlab_set_apin(0, 4095) + self._lib.qemu_picsimlab_set_apin(3, 1000) + except Exception as e: + self.fail(f"qemu_picsimlab_set_apin raised: {e}") + + def test_12_uart_receive_accepted(self): + """qemu_picsimlab_uart_receive() must accept bytes without raising.""" + try: + data = b"test\n" + buf = (ctypes.c_uint8 * len(data))(*data) + self._lib.qemu_picsimlab_uart_receive(0, buf, len(data)) + except Exception as e: + self.fail(f"qemu_picsimlab_uart_receive raised: {e}") + + def test_13_get_internals_not_null(self): + self._lib.qemu_picsimlab_get_internals.restype = ctypes.c_void_p + result = self._lib.qemu_picsimlab_get_internals(0) + self.assertIsNotNone(result, "qemu_picsimlab_get_internals(0) returned NULL") + + def test_14_qemu_thread_alive(self): + self.assertTrue(self._qemu_thread.is_alive(), + "QEMU daemon thread died — QEMU may have crashed") + + def test_15_summary(self): + """Informational: print full GPIO and UART event summary.""" + with self._gpio_lock: + ev = list(self._gpio_events) + with self._uart_lock: + lines = list(self._uart_lines) + + led = [(p, v) for p, v in ev if p == self.LED_PIN] + other_pins = sorted({p for p, _ in ev if p != self.LED_PIN}) + + print(f"\n GPIO events total : {len(ev)}") + print(f" Pins that fired : {sorted({p for p, _ in ev})}") + print(f" Other pins (init) : {other_pins}") + print(f" LED (pin={self.LED_PIN}) transitions: {led}") + print(f" UART lines : {lines}") + + self.assertTrue(len(ev) > 0 or len(lines) > 0, + "Neither GPIO events nor UART output were observed") + + +if __name__ == "__main__": + unittest.main(verbosity=2)