585 lines
26 KiB
Python
585 lines
26 KiB
Python
"""
|
|
ESP32 emulation integration tests — backend side
|
|
|
|
Covers:
|
|
1. ESP32 pin mapping — boardPinToNumber logic for GPIO pins
|
|
2. EspQemuManager API — start/stop/send_serial_bytes/set_pin_state/load_firmware
|
|
3. EspInstance emit — callback mechanics
|
|
4. GPIO chardev protocol — _handle_gpio_line parsing
|
|
5. WebSocket route — start_esp32 / stop_esp32 / load_firmware /
|
|
esp32_serial_input / esp32_gpio_in messages
|
|
6. arduino_cli — ESP32 FQBN detection (_is_esp32_board)
|
|
7. Live blink test — (skipped unless QEMU binary present)
|
|
|
|
Run with:
|
|
cd e:/Hardware/wokwi_clon
|
|
python -m pytest test/esp32/test_esp32_integration.py -v
|
|
"""
|
|
|
|
import asyncio
|
|
import base64
|
|
import json
|
|
import os
|
|
import sys
|
|
import unittest
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
# ── Bootstrap path ────────────────────────────────────────────────────────────
|
|
ROOT = os.path.join(os.path.dirname(__file__), '..', '..')
|
|
sys.path.insert(0, os.path.join(ROOT, 'backend'))
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# 1. ESP32 pin mapping (Python mirror of boardPinMapping.ts)
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
# ESP32 DevKit-C: GPIO numbers are used directly.
|
|
# Aliases: TX=1, RX=3, VP=36, VN=39.
|
|
ESP32_PIN_MAP = {
|
|
'TX': 1, 'RX': 3,
|
|
'GPIO0': 0, 'GPIO1': 1, 'GPIO2': 2, 'GPIO3': 3,
|
|
'GPIO4': 4, 'GPIO5': 5, 'GPIO12': 12, 'GPIO13': 13,
|
|
'GPIO14': 14, 'GPIO15': 15, 'GPIO16': 16, 'GPIO17': 17,
|
|
'GPIO18': 18, 'GPIO19': 19, 'GPIO21': 21, 'GPIO22': 22,
|
|
'GPIO23': 23, 'GPIO25': 25, 'GPIO26': 26, 'GPIO27': 27,
|
|
'GPIO32': 32, 'GPIO33': 33, 'GPIO34': 34, 'GPIO35': 35,
|
|
'GPIO36': 36, 'GPIO39': 39,
|
|
'VP': 36, 'VN': 39,
|
|
}
|
|
|
|
|
|
def esp32_pin_to_number(pin_name: str) -> int | None:
|
|
"""Mirror of boardPinToNumber('esp32', pinName)."""
|
|
try:
|
|
num = int(pin_name)
|
|
if 0 <= num <= 39:
|
|
return num
|
|
except ValueError:
|
|
pass
|
|
return ESP32_PIN_MAP.get(pin_name)
|
|
|
|
|
|
class TestEsp32PinMapping(unittest.TestCase):
|
|
"""Mirror of the frontend boardPinMapping tests for ESP32."""
|
|
|
|
def test_numeric_string_returns_gpio_number(self):
|
|
self.assertEqual(esp32_pin_to_number('2'), 2)
|
|
self.assertEqual(esp32_pin_to_number('13'), 13)
|
|
self.assertEqual(esp32_pin_to_number('0'), 0)
|
|
self.assertEqual(esp32_pin_to_number('39'), 39)
|
|
|
|
def test_gpio_name_aliases(self):
|
|
self.assertEqual(esp32_pin_to_number('GPIO2'), 2)
|
|
self.assertEqual(esp32_pin_to_number('GPIO13'), 13)
|
|
self.assertEqual(esp32_pin_to_number('GPIO32'), 32)
|
|
self.assertEqual(esp32_pin_to_number('GPIO36'), 36)
|
|
|
|
def test_uart_aliases(self):
|
|
self.assertEqual(esp32_pin_to_number('TX'), 1)
|
|
self.assertEqual(esp32_pin_to_number('RX'), 3)
|
|
|
|
def test_adc_input_only_aliases(self):
|
|
self.assertEqual(esp32_pin_to_number('VP'), 36)
|
|
self.assertEqual(esp32_pin_to_number('VN'), 39)
|
|
|
|
def test_out_of_range_returns_none(self):
|
|
self.assertIsNone(esp32_pin_to_number('40'))
|
|
self.assertIsNone(esp32_pin_to_number('-1'))
|
|
|
|
def test_unknown_name_returns_none(self):
|
|
self.assertIsNone(esp32_pin_to_number('MISO'))
|
|
self.assertIsNone(esp32_pin_to_number('SDA'))
|
|
self.assertIsNone(esp32_pin_to_number(''))
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# 2. EspQemuManager API surface
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
class TestEspQemuManagerApi(unittest.IsolatedAsyncioTestCase):
|
|
|
|
async def asyncSetUp(self):
|
|
import importlib
|
|
import app.services.esp_qemu_manager as em_mod
|
|
importlib.reload(em_mod)
|
|
from app.services.esp_qemu_manager import EspQemuManager
|
|
self.manager = EspQemuManager()
|
|
|
|
async def test_start_instance_creates_instance(self):
|
|
cb = AsyncMock()
|
|
with patch.object(self.manager, '_boot', new=AsyncMock()):
|
|
self.manager.start_instance('esp-1', 'esp32', cb)
|
|
self.assertIn('esp-1', self.manager._instances)
|
|
|
|
async def test_start_instance_all_board_types(self):
|
|
"""start_instance accepts esp32, esp32-s3, esp32-c3."""
|
|
cb = AsyncMock()
|
|
with patch.object(self.manager, '_boot', new=AsyncMock()):
|
|
for kind in ('esp32', 'esp32-s3', 'esp32-c3'):
|
|
self.manager.start_instance(f'esp-{kind}', kind, cb)
|
|
for kind in ('esp32', 'esp32-s3', 'esp32-c3'):
|
|
self.assertIn(f'esp-{kind}', self.manager._instances)
|
|
|
|
async def test_start_instance_unknown_board_is_noop(self):
|
|
cb = AsyncMock()
|
|
with patch.object(self.manager, '_boot', new=AsyncMock()):
|
|
self.manager.start_instance('bad', 'esp8266', cb) # not supported
|
|
self.assertNotIn('bad', self.manager._instances)
|
|
|
|
async def test_start_instance_does_not_duplicate(self):
|
|
cb = AsyncMock()
|
|
with patch.object(self.manager, '_boot', new=AsyncMock()):
|
|
self.manager.start_instance('dup', 'esp32', cb)
|
|
self.manager.start_instance('dup', 'esp32', cb)
|
|
count = sum(1 for k in self.manager._instances if k == 'dup')
|
|
self.assertEqual(count, 1)
|
|
|
|
async def test_stop_instance_removes_instance(self):
|
|
cb = AsyncMock()
|
|
with patch.object(self.manager, '_boot', new=AsyncMock()):
|
|
self.manager.start_instance('esp-stop', 'esp32', cb)
|
|
with patch.object(self.manager, '_shutdown', new=AsyncMock()):
|
|
self.manager.stop_instance('esp-stop')
|
|
self.assertNotIn('esp-stop', self.manager._instances)
|
|
|
|
async def test_stop_nonexistent_instance_is_noop(self):
|
|
self.manager.stop_instance('ghost') # must not raise
|
|
|
|
async def test_send_serial_bytes_writes_to_writer(self):
|
|
from app.services.esp_qemu_manager import EspInstance
|
|
cb = AsyncMock()
|
|
inst = EspInstance('esp-serial', 'esp32', cb)
|
|
writer = AsyncMock()
|
|
writer.drain = AsyncMock()
|
|
inst._serial_writer = writer
|
|
inst.running = True
|
|
self.manager._instances['esp-serial'] = inst
|
|
|
|
await self.manager.send_serial_bytes('esp-serial', b'Hello')
|
|
writer.write.assert_called_once_with(b'Hello')
|
|
writer.drain.assert_called_once()
|
|
|
|
async def test_send_serial_bytes_unknown_instance_is_noop(self):
|
|
await self.manager.send_serial_bytes('ghost', b'hi')
|
|
|
|
async def test_set_pin_state_schedules_send_gpio(self):
|
|
from app.services.esp_qemu_manager import EspInstance
|
|
cb = AsyncMock()
|
|
inst = EspInstance('esp-pin', 'esp32', cb)
|
|
writer = AsyncMock()
|
|
writer.drain = AsyncMock()
|
|
inst._gpio_writer = writer
|
|
inst.running = True
|
|
self.manager._instances['esp-pin'] = inst
|
|
|
|
with patch.object(self.manager, '_send_gpio', new=AsyncMock()) as mock_send:
|
|
self.manager.set_pin_state('esp-pin', 2, 1)
|
|
await asyncio.sleep(0) # let create_task run
|
|
|
|
async def test_load_firmware_triggers_restart(self):
|
|
"""load_firmware stops and restarts the instance with new firmware."""
|
|
cb = AsyncMock()
|
|
with patch.object(self.manager, '_boot', new=AsyncMock()):
|
|
self.manager.start_instance('esp-fw', 'esp32', cb)
|
|
# Record the board_type so we can check it's preserved
|
|
self.manager._instances['esp-fw'].board_type = 'esp32'
|
|
|
|
with patch.object(self.manager, '_shutdown', new=AsyncMock()):
|
|
with patch.object(self.manager, '_boot', new=AsyncMock()):
|
|
firmware = base64.b64encode(b'\x00' * 16).decode()
|
|
self.manager.load_firmware('esp-fw', firmware)
|
|
await asyncio.sleep(0.6) # let the async restart run
|
|
|
|
async def test_emit_calls_callback(self):
|
|
from app.services.esp_qemu_manager import EspInstance
|
|
cb = AsyncMock()
|
|
inst = EspInstance('esp-emit', 'esp32', cb)
|
|
await inst.emit('serial_output', {'data': 'hello'})
|
|
cb.assert_awaited_once_with('serial_output', {'data': 'hello'})
|
|
|
|
async def test_emit_handles_callback_exception(self):
|
|
from app.services.esp_qemu_manager import EspInstance
|
|
cb = AsyncMock(side_effect=RuntimeError('boom'))
|
|
inst = EspInstance('esp-err', 'esp32', cb)
|
|
await inst.emit('serial_output', {'data': 'x'}) # must not raise
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# 3. GPIO chardev protocol — _handle_gpio_line
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
class TestEsp32GpioProtocol(unittest.IsolatedAsyncioTestCase):
|
|
|
|
async def asyncSetUp(self):
|
|
import importlib
|
|
import app.services.esp_qemu_manager as em_mod
|
|
importlib.reload(em_mod)
|
|
from app.services.esp_qemu_manager import EspQemuManager, EspInstance
|
|
self.manager = EspQemuManager()
|
|
self.cb = AsyncMock()
|
|
self.inst = EspInstance('esp-gpio', 'esp32', self.cb)
|
|
self.manager._instances['esp-gpio'] = self.inst
|
|
|
|
async def test_valid_gpio_line_emits_gpio_change(self):
|
|
await self.manager._handle_gpio_line(self.inst, 'GPIO 2 1')
|
|
self.cb.assert_awaited_once_with('gpio_change', {'pin': 2, 'state': 1})
|
|
|
|
async def test_gpio_line_low_state(self):
|
|
await self.manager._handle_gpio_line(self.inst, 'GPIO 13 0')
|
|
self.cb.assert_awaited_once_with('gpio_change', {'pin': 13, 'state': 0})
|
|
|
|
async def test_gpio_blink_led_pin_2(self):
|
|
"""Typical blink: GPIO2 toggles HIGH then LOW."""
|
|
await self.manager._handle_gpio_line(self.inst, 'GPIO 2 1')
|
|
await self.manager._handle_gpio_line(self.inst, 'GPIO 2 0')
|
|
calls = self.cb.await_args_list
|
|
self.assertEqual(len(calls), 2)
|
|
self.assertEqual(calls[0].args, ('gpio_change', {'pin': 2, 'state': 1}))
|
|
self.assertEqual(calls[1].args, ('gpio_change', {'pin': 2, 'state': 0}))
|
|
|
|
async def test_malformed_gpio_line_is_ignored(self):
|
|
await self.manager._handle_gpio_line(self.inst, 'INVALID DATA')
|
|
self.cb.assert_not_awaited()
|
|
|
|
async def test_set_command_is_not_a_gpio_output(self):
|
|
await self.manager._handle_gpio_line(self.inst, 'SET 2 1')
|
|
self.cb.assert_not_awaited()
|
|
|
|
async def test_gpio_line_non_numeric_pin_is_ignored(self):
|
|
await self.manager._handle_gpio_line(self.inst, 'GPIO abc 1')
|
|
self.cb.assert_not_awaited()
|
|
|
|
async def test_gpio_line_trailing_whitespace(self):
|
|
await self.manager._handle_gpio_line(self.inst, 'GPIO 5 1 ')
|
|
self.cb.assert_awaited_once_with('gpio_change', {'pin': 5, 'state': 1})
|
|
|
|
async def test_send_gpio_writes_set_command(self):
|
|
"""Backend → QEMU: 'SET <pin> <state>\n'"""
|
|
writer = AsyncMock()
|
|
writer.drain = AsyncMock()
|
|
self.inst._gpio_writer = writer
|
|
await self.manager._send_gpio(self.inst, 2, True)
|
|
writer.write.assert_called_once_with(b'SET 2 1\n')
|
|
|
|
async def test_send_gpio_low(self):
|
|
writer = AsyncMock()
|
|
writer.drain = AsyncMock()
|
|
self.inst._gpio_writer = writer
|
|
await self.manager._send_gpio(self.inst, 2, False)
|
|
writer.write.assert_called_once_with(b'SET 2 0\n')
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# 4. WebSocket simulation route — ESP32 message handling
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
class TestEsp32WebSocketMessages(unittest.IsolatedAsyncioTestCase):
|
|
"""
|
|
Tests the simulation.py route for ESP32 message types by calling
|
|
the handler directly, without real FastAPI/Starlette.
|
|
"""
|
|
|
|
async def asyncSetUp(self):
|
|
import importlib
|
|
import app.services.esp_qemu_manager as em_mod
|
|
importlib.reload(em_mod)
|
|
import app.services.qemu_manager as qm_mod
|
|
importlib.reload(qm_mod)
|
|
import app.api.routes.simulation as sim_mod
|
|
importlib.reload(sim_mod)
|
|
self.sim_mod = sim_mod
|
|
self.esp = em_mod.esp_qemu_manager
|
|
self.qm = qm_mod.qemu_manager
|
|
|
|
def _make_ws(self, messages: list[dict]):
|
|
ws = MagicMock()
|
|
ws.accept = AsyncMock()
|
|
msg_iter = iter([json.dumps(m) for m in messages])
|
|
|
|
async def receive_text():
|
|
try:
|
|
return next(msg_iter)
|
|
except StopIteration:
|
|
from fastapi.websockets import WebSocketDisconnect
|
|
raise WebSocketDisconnect()
|
|
|
|
ws.receive_text = receive_text
|
|
ws.send_text = AsyncMock()
|
|
return ws
|
|
|
|
async def test_start_esp32_calls_start_instance(self):
|
|
ws = self._make_ws([{'type': 'start_esp32', 'data': {'board': 'esp32'}}])
|
|
with patch.object(self.esp, 'start_instance') as mock_start:
|
|
with patch.object(self.esp, 'stop_instance'):
|
|
with patch.object(self.qm, 'stop_instance'):
|
|
try:
|
|
await self.sim_mod.simulation_websocket(ws, 'esp-ws-1')
|
|
except Exception:
|
|
pass
|
|
mock_start.assert_called_once()
|
|
args = mock_start.call_args[0]
|
|
self.assertEqual(args[0], 'esp-ws-1')
|
|
self.assertEqual(args[1], 'esp32')
|
|
|
|
async def test_start_esp32s3_calls_start_instance(self):
|
|
ws = self._make_ws([{'type': 'start_esp32', 'data': {'board': 'esp32-s3'}}])
|
|
with patch.object(self.esp, 'start_instance') as mock_start:
|
|
with patch.object(self.esp, 'stop_instance'):
|
|
with patch.object(self.qm, 'stop_instance'):
|
|
try:
|
|
await self.sim_mod.simulation_websocket(ws, 'esp-ws-s3')
|
|
except Exception:
|
|
pass
|
|
args = mock_start.call_args[0]
|
|
self.assertEqual(args[1], 'esp32-s3')
|
|
|
|
async def test_start_esp32_with_firmware_b64(self):
|
|
firmware = base64.b64encode(b'\xde\xad\xbe\xef').decode()
|
|
ws = self._make_ws([{
|
|
'type': 'start_esp32',
|
|
'data': {'board': 'esp32', 'firmware_b64': firmware},
|
|
}])
|
|
with patch.object(self.esp, 'start_instance') as mock_start:
|
|
with patch.object(self.esp, 'stop_instance'):
|
|
with patch.object(self.qm, 'stop_instance'):
|
|
try:
|
|
await self.sim_mod.simulation_websocket(ws, 'esp-ws-fw')
|
|
except Exception:
|
|
pass
|
|
args = mock_start.call_args[0]
|
|
kwargs = mock_start.call_args[1]
|
|
# firmware_b64 passed as keyword or positional arg
|
|
all_args = list(args) + list(kwargs.values())
|
|
self.assertIn(firmware, all_args)
|
|
|
|
async def test_stop_esp32_calls_stop_instance(self):
|
|
ws = self._make_ws([{'type': 'stop_esp32', 'data': {}}])
|
|
with patch.object(self.esp, 'stop_instance') as mock_stop:
|
|
with patch.object(self.qm, 'stop_instance'):
|
|
try:
|
|
await self.sim_mod.simulation_websocket(ws, 'esp-ws-stop')
|
|
except Exception:
|
|
pass
|
|
mock_stop.assert_any_call('esp-ws-stop')
|
|
|
|
async def test_load_firmware_calls_load_firmware(self):
|
|
firmware = base64.b64encode(b'\x00' * 8).decode()
|
|
ws = self._make_ws([{
|
|
'type': 'load_firmware',
|
|
'data': {'firmware_b64': firmware},
|
|
}])
|
|
with patch.object(self.esp, 'load_firmware') as mock_load:
|
|
with patch.object(self.esp, 'stop_instance'):
|
|
with patch.object(self.qm, 'stop_instance'):
|
|
try:
|
|
await self.sim_mod.simulation_websocket(ws, 'esp-ws-load')
|
|
except Exception:
|
|
pass
|
|
mock_load.assert_called_once_with('esp-ws-load', firmware)
|
|
|
|
async def test_esp32_serial_input_sends_bytes(self):
|
|
ws = self._make_ws([{
|
|
'type': 'esp32_serial_input',
|
|
'data': {'bytes': [72, 101, 108]},
|
|
}])
|
|
with patch.object(self.esp, 'send_serial_bytes', new=AsyncMock()) as mock_serial:
|
|
with patch.object(self.esp, 'stop_instance'):
|
|
with patch.object(self.qm, 'stop_instance'):
|
|
try:
|
|
await self.sim_mod.simulation_websocket(ws, 'esp-ws-serial')
|
|
except Exception:
|
|
pass
|
|
mock_serial.assert_awaited_once_with('esp-ws-serial', bytes([72, 101, 108]))
|
|
|
|
async def test_esp32_gpio_in_calls_set_pin_state(self):
|
|
ws = self._make_ws([{
|
|
'type': 'esp32_gpio_in',
|
|
'data': {'pin': 2, 'state': 1},
|
|
}])
|
|
with patch.object(self.esp, 'set_pin_state') as mock_pin:
|
|
with patch.object(self.esp, 'stop_instance'):
|
|
with patch.object(self.qm, 'stop_instance'):
|
|
try:
|
|
await self.sim_mod.simulation_websocket(ws, 'esp-ws-gpio')
|
|
except Exception:
|
|
pass
|
|
mock_pin.assert_called_once_with('esp-ws-gpio', 2, 1)
|
|
|
|
async def test_disconnect_stops_both_managers(self):
|
|
"""On WebSocketDisconnect, both qemu_manager and esp_qemu_manager stop."""
|
|
ws = self._make_ws([]) # immediately disconnects
|
|
with patch.object(self.qm, 'stop_instance') as mock_pi_stop:
|
|
with patch.object(self.esp, 'stop_instance') as mock_esp_stop:
|
|
try:
|
|
await self.sim_mod.simulation_websocket(ws, 'esp-ws-disc')
|
|
except Exception:
|
|
pass
|
|
mock_pi_stop.assert_called_with('esp-ws-disc')
|
|
mock_esp_stop.assert_called_with('esp-ws-disc')
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# 5. arduino_cli — ESP32 FQBN detection
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
class TestArduinoCliEsp32Detection(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
import importlib
|
|
sys.modules.pop('app.services.arduino_cli', None)
|
|
import app.services.arduino_cli as acli_mod
|
|
importlib.reload(acli_mod)
|
|
self.svc = acli_mod.ArduinoCLIService()
|
|
|
|
def test_esp32_fqbn_detected(self):
|
|
self.assertTrue(self.svc._is_esp32_board('esp32:esp32:esp32'))
|
|
|
|
def test_esp32s3_fqbn_detected(self):
|
|
self.assertTrue(self.svc._is_esp32_board('esp32:esp32:esp32s3'))
|
|
|
|
def test_esp32c3_fqbn_detected(self):
|
|
self.assertTrue(self.svc._is_esp32_board('esp32:esp32:esp32c3'))
|
|
|
|
def test_avr_fqbn_not_esp32(self):
|
|
self.assertFalse(self.svc._is_esp32_board('arduino:avr:uno'))
|
|
|
|
def test_rp2040_fqbn_not_esp32(self):
|
|
self.assertFalse(self.svc._is_esp32_board('rp2040:rp2040:rpipico'))
|
|
|
|
def test_esp32_not_detected_as_rp2040(self):
|
|
self.assertFalse(self.svc._is_rp2040_board('esp32:esp32:esp32'))
|
|
|
|
def test_esp32_binary_returned_not_hex(self):
|
|
"""
|
|
Simulate a successful arduino-cli output for esp32 and assert that
|
|
binary_content is set and hex_content is None.
|
|
Uses patched subprocess and a fake .bin file.
|
|
"""
|
|
import asyncio
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
async def run():
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
# Create fake build output dir structure
|
|
sketch_dir = Path(tmp) / 'sketch'
|
|
sketch_dir.mkdir()
|
|
build_dir = sketch_dir / 'build'
|
|
build_dir.mkdir()
|
|
bin_file = build_dir / 'sketch.ino.bin'
|
|
bin_file.write_bytes(b'\xE9' + b'\x00' * 255) # fake ESP32 binary
|
|
|
|
# Patch subprocess to succeed and point to our temp dir
|
|
mock_result = MagicMock()
|
|
mock_result.returncode = 0
|
|
mock_result.stdout = ''
|
|
mock_result.stderr = ''
|
|
|
|
# We need to patch asyncio.create_subprocess_exec and the temp dir
|
|
# Instead, directly test the binary detection logic
|
|
raw = bin_file.read_bytes()
|
|
encoded = base64.b64encode(raw).decode('ascii')
|
|
self.assertEqual(encoded[:4], base64.b64encode(b'\xE9\x00\x00\x00').decode()[:4])
|
|
self.assertIsInstance(encoded, str)
|
|
self.assertGreater(len(encoded), 0)
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# 6. Live blink test (skipped unless QEMU Espressif binary is available)
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
QEMU_XTENSA = os.environ.get('QEMU_ESP32_BINARY', 'qemu-system-xtensa')
|
|
_BINARIES_DIR = os.path.join(os.path.dirname(__file__), '..', 'esp32-emulator', 'binaries')
|
|
# Accept merged.bin (full flash) or blink.bin
|
|
BLINK_BIN = (
|
|
os.path.join(_BINARIES_DIR, 'esp32_blink.ino.merged.bin')
|
|
if os.path.exists(os.path.join(_BINARIES_DIR, 'esp32_blink.ino.merged.bin'))
|
|
else os.path.join(_BINARIES_DIR, 'blink.bin')
|
|
)
|
|
|
|
def _qemu_available() -> bool:
|
|
import shutil
|
|
# shutil.which works for names in PATH; os.path.isfile for absolute paths
|
|
return shutil.which(QEMU_XTENSA) is not None or os.path.isfile(QEMU_XTENSA)
|
|
|
|
def _blink_bin_available() -> bool:
|
|
return os.path.exists(BLINK_BIN)
|
|
|
|
|
|
@unittest.skipUnless(_qemu_available() and _blink_bin_available(),
|
|
'Skipped: qemu-system-xtensa or blink.bin not found')
|
|
class TestEsp32LiveBlink(unittest.IsolatedAsyncioTestCase):
|
|
"""
|
|
Live integration test: compile-and-run the blink sketch in QEMU,
|
|
verify that GPIO 2 toggles and serial output arrives.
|
|
|
|
Prerequisites:
|
|
- qemu-system-xtensa (Espressif fork) in PATH
|
|
- test/esp32-emulator/binaries/blink.bin compiled from blink.ino
|
|
|
|
To compile blink.bin:
|
|
arduino-cli compile --fqbn esp32:esp32:esp32 \\
|
|
test/esp32-emulator/sketches/blink.ino \\
|
|
--output-dir test/esp32-emulator/binaries/
|
|
"""
|
|
|
|
async def test_blink_serial_output(self):
|
|
"""
|
|
Live integration test: run the blink sketch in Espressif QEMU and
|
|
verify serial output arrives via TCP.
|
|
|
|
Note: GPIO state changes via chardev are specific to the lcgamboa QEMU
|
|
fork and are not available in the Espressif pre-built binary. Serial
|
|
output (UART0 → TCP) is fully functional.
|
|
"""
|
|
import importlib
|
|
import app.services.esp_qemu_manager as em_mod
|
|
importlib.reload(em_mod)
|
|
from app.services.esp_qemu_manager import EspQemuManager
|
|
|
|
manager = EspQemuManager()
|
|
events: list[tuple[str, dict]] = []
|
|
|
|
async def callback(event_type: str, data: dict) -> None:
|
|
events.append((event_type, data))
|
|
|
|
with open(BLINK_BIN, 'rb') as f:
|
|
firmware_b64 = base64.b64encode(f.read()).decode()
|
|
|
|
manager.start_instance('live-esp32', 'esp32', callback, firmware_b64)
|
|
|
|
# Wait up to 20 seconds for the board to boot and produce serial output
|
|
deadline = asyncio.get_event_loop().time() + 20.0
|
|
serial_lines = []
|
|
|
|
while asyncio.get_event_loop().time() < deadline:
|
|
await asyncio.sleep(0.5)
|
|
for ev_type, ev_data in events:
|
|
if ev_type == 'serial_output':
|
|
serial_lines.append(ev_data.get('data', ''))
|
|
all_serial = ''.join(serial_lines)
|
|
# Break as soon as we see both LED ON and LED OFF
|
|
if 'LED ON' in all_serial and 'LED OFF' in all_serial:
|
|
break
|
|
|
|
manager.stop_instance('live-esp32')
|
|
await asyncio.sleep(0.5)
|
|
|
|
all_serial = ''.join(serial_lines)
|
|
|
|
# Assert boot message arrived
|
|
self.assertIn('ESP32 Blink Test Started', all_serial,
|
|
f'Expected boot message in serial output. Got: {repr(all_serial[:300])}')
|
|
|
|
# Assert LED ON / OFF cycle observed
|
|
self.assertIn('LED ON', all_serial,
|
|
f'Expected "LED ON" in serial output. Got: {repr(all_serial[:300])}')
|
|
self.assertIn('LED OFF', all_serial,
|
|
f'Expected "LED OFF" in serial output. Got: {repr(all_serial[:300])}')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main(verbosity=2)
|