feat: enhance ESP32 emulation with GPIO pinmap and improved QEMU initialization handling

pull/47/head
David Montero Crespo 2026-03-14 12:05:35 -03:00
parent c957190061
commit 4a7c9e2e55
11 changed files with 1576 additions and 160 deletions

View File

@ -37,6 +37,9 @@ async def simulation_websocket(websocket: WebSocket, client_id: str):
payload = json.dumps({'type': event_type, 'data': data}) payload = json.dumps({'type': event_type, 'data': data})
await manager.send(client_id, payload) await manager.send(client_id, payload)
def _use_lib() -> bool:
return esp_lib_manager.is_available()
try: try:
while True: while True:
raw = await websocket.receive_text() raw = await websocket.receive_text()
@ -44,6 +47,7 @@ async def simulation_websocket(websocket: WebSocket, client_id: str):
msg_type: str = message.get('type', '') msg_type: str = message.get('type', '')
msg_data: dict = message.get('data', {}) msg_data: dict = message.get('data', {})
# ── Raspberry Pi ─────────────────────────────────────────────
if msg_type == 'start_pi': if msg_type == 'start_pi':
board = msg_data.get('board', 'raspberry-pi-3') board = msg_data.get('board', 'raspberry-pi-3')
qemu_manager.start_instance(client_id, board, qemu_callback) qemu_manager.start_instance(client_id, board, qemu_callback)
@ -52,28 +56,20 @@ async def simulation_websocket(websocket: WebSocket, client_id: str):
qemu_manager.stop_instance(client_id) qemu_manager.stop_instance(client_id)
elif msg_type == 'serial_input': elif msg_type == 'serial_input':
# bytes: list[int] — characters typed by the user / sent by Arduino
raw_bytes: list[int] = msg_data.get('bytes', []) raw_bytes: list[int] = msg_data.get('bytes', [])
if raw_bytes: if raw_bytes:
await qemu_manager.send_serial_bytes(client_id, bytes(raw_bytes)) await qemu_manager.send_serial_bytes(client_id, bytes(raw_bytes))
elif msg_type == 'gpio_in': elif msg_type in ('gpio_in', 'pin_change'):
# External (e.g. Arduino) drives a Pi GPIO pin
pin = msg_data.get('pin', 0) pin = msg_data.get('pin', 0)
state = msg_data.get('state', 0) state = msg_data.get('state', 0)
qemu_manager.set_pin_state(client_id, pin, state) qemu_manager.set_pin_state(client_id, pin, state)
elif msg_type == 'pin_change': # ── ESP32 lifecycle ──────────────────────────────────────────
# Legacy alias for gpio_in
pin = msg_data.get('pin', 0)
state = msg_data.get('state', 0)
qemu_manager.set_pin_state(client_id, pin, state)
# ── ESP32 messages ──────────────────────────────────────────────
elif msg_type == 'start_esp32': elif msg_type == 'start_esp32':
board = msg_data.get('board', 'esp32') board = msg_data.get('board', 'esp32')
firmware_b64 = msg_data.get('firmware_b64') firmware_b64 = msg_data.get('firmware_b64')
if esp_lib_manager.is_available(): if _use_lib():
esp_lib_manager.start_instance(client_id, board, qemu_callback, firmware_b64) esp_lib_manager.start_instance(client_id, board, qemu_callback, firmware_b64)
else: else:
esp_qemu_manager.start_instance(client_id, board, qemu_callback, firmware_b64) esp_qemu_manager.start_instance(client_id, board, qemu_callback, firmware_b64)
@ -85,34 +81,97 @@ async def simulation_websocket(websocket: WebSocket, client_id: str):
elif msg_type == 'load_firmware': elif msg_type == 'load_firmware':
firmware_b64 = msg_data.get('firmware_b64', '') firmware_b64 = msg_data.get('firmware_b64', '')
if firmware_b64: if firmware_b64:
if esp_lib_manager.is_available(): if _use_lib():
esp_lib_manager.load_firmware(client_id, firmware_b64) esp_lib_manager.load_firmware(client_id, firmware_b64)
else: else:
esp_qemu_manager.load_firmware(client_id, firmware_b64) esp_qemu_manager.load_firmware(client_id, firmware_b64)
# ── ESP32 serial (UART 0 / 1 / 2) ───────────────────────────
elif msg_type == 'esp32_serial_input': elif msg_type == 'esp32_serial_input':
raw_bytes: list[int] = msg_data.get('bytes', []) raw_bytes = msg_data.get('bytes', [])
uart_id = int(msg_data.get('uart', 0))
if raw_bytes: if raw_bytes:
if esp_lib_manager.is_available(): if _use_lib():
await esp_lib_manager.send_serial_bytes(client_id, bytes(raw_bytes)) await esp_lib_manager.send_serial_bytes(
client_id, bytes(raw_bytes), uart_id
)
else: else:
await esp_qemu_manager.send_serial_bytes(client_id, bytes(raw_bytes)) await esp_qemu_manager.send_serial_bytes(
client_id, bytes(raw_bytes)
)
# ── ESP32 GPIO input (from connected component / button) ──────
elif msg_type == 'esp32_gpio_in': elif msg_type == 'esp32_gpio_in':
pin = msg_data.get('pin', 0) pin = msg_data.get('pin', 0)
state = msg_data.get('state', 0) state = msg_data.get('state', 0)
if esp_lib_manager.is_available(): if _use_lib():
esp_lib_manager.set_pin_state(client_id, pin, state) esp_lib_manager.set_pin_state(client_id, pin, state)
else: else:
esp_qemu_manager.set_pin_state(client_id, pin, state) esp_qemu_manager.set_pin_state(client_id, pin, state)
# ── ESP32 ADC (analog input from potentiometer, sensor, etc.) ─
elif msg_type == 'esp32_adc_set':
# Frontend sends {channel: int, millivolts: int}
# or {channel: int, raw: int} for direct 12-bit value
channel = int(msg_data.get('channel', 0))
if 'millivolts' in msg_data:
if _use_lib():
esp_lib_manager.set_adc(
client_id, channel, int(msg_data['millivolts'])
)
elif 'raw' in msg_data:
if _use_lib():
esp_lib_manager.set_adc_raw(
client_id, channel, int(msg_data['raw'])
)
# ── ESP32 I2C device simulation ───────────────────────────────
elif msg_type == 'esp32_i2c_response':
# Frontend configures what an I2C device at addr returns
# {addr: int, response: int}
addr = int(msg_data.get('addr', 0))
resp = int(msg_data.get('response', 0))
if _use_lib():
esp_lib_manager.set_i2c_response(client_id, addr, resp)
# ── ESP32 SPI device simulation ───────────────────────────────
elif msg_type == 'esp32_spi_response':
# {response: int} — byte to return as MISO
resp = int(msg_data.get('response', 0xFF))
if _use_lib():
esp_lib_manager.set_spi_response(client_id, resp)
# ── ESP32 UART 1 / 2 input ────────────────────────────────────
elif msg_type == 'esp32_uart1_input':
raw_bytes = msg_data.get('bytes', [])
if raw_bytes and _use_lib():
await esp_lib_manager.send_serial_bytes(
client_id, bytes(raw_bytes), uart_id=1
)
elif msg_type == 'esp32_uart2_input':
raw_bytes = msg_data.get('bytes', [])
if raw_bytes and _use_lib():
await esp_lib_manager.send_serial_bytes(
client_id, bytes(raw_bytes), uart_id=2
)
# ── ESP32 status query ────────────────────────────────────────
elif msg_type == 'esp32_status':
if _use_lib():
status = esp_lib_manager.get_status(client_id)
await manager.send(
client_id,
json.dumps({'type': 'esp32_status', 'data': status})
)
except WebSocketDisconnect: except WebSocketDisconnect:
manager.disconnect(client_id) manager.disconnect(client_id)
qemu_manager.stop_instance(client_id) qemu_manager.stop_instance(client_id)
esp_lib_manager.stop_instance(client_id) esp_lib_manager.stop_instance(client_id)
esp_qemu_manager.stop_instance(client_id) esp_qemu_manager.stop_instance(client_id)
except Exception as e: except Exception as exc:
logger.error('WebSocket error for %s: %s', client_id, e) logger.error('WebSocket error for %s: %s', client_id, exc)
manager.disconnect(client_id) manager.disconnect(client_id)
qemu_manager.stop_instance(client_id) qemu_manager.stop_instance(client_id)
esp_lib_manager.stop_instance(client_id) esp_lib_manager.stop_instance(client_id)

View File

@ -262,13 +262,29 @@ class ArduinoCLIService:
try: try:
# Run compilation using subprocess.run in a thread (Windows compatible) # Run compilation using subprocess.run in a thread (Windows compatible)
cmd = [ # ESP32 lcgamboa emulator requires DIO flash mode and
self.cli_path, # IRAM-safe interrupt placement to avoid cache errors.
"compile", # Force these at compile time for all ESP32 targets.
"--fqbn", board_fqbn, cmd = [self.cli_path, "compile", "--fqbn", board_fqbn]
"--output-dir", str(build_dir), if self._is_esp32_board(board_fqbn):
str(sketch_dir) # FlashMode=dio: required by esp32-picsimlab QEMU machine
] # IRAM_ATTR on all interrupt handlers prevents cache crashes
# when WiFi emulation disables the SPI flash cache on core 1.
fqbn_dio = board_fqbn
if 'FlashMode' not in board_fqbn:
fqbn_dio = board_fqbn + ':FlashMode=dio'
cmd[2] = '--fqbn'
cmd.insert(3, fqbn_dio)
cmd = cmd[:4] # trim accidental duplicates
cmd = [self.cli_path, "compile", "--fqbn", fqbn_dio,
"--build-property",
"build.extra_flags=-DARDUINO_ESP32_LCGAMBOA=1",
"--output-dir", str(build_dir),
str(sketch_dir)]
else:
cmd = [self.cli_path, "compile", "--fqbn", board_fqbn,
"--output-dir", str(build_dir),
str(sketch_dir)]
print(f"Running command: {' '.join(cmd)}") print(f"Running command: {' '.join(cmd)}")
# Use subprocess.run in a thread for Windows compatibility # Use subprocess.run in a thread for Windows compatibility

View File

@ -1,7 +1,8 @@
""" """
Esp32LibBridge ESP32 emulation via lcgamboa QEMU shared library (libqemu-xtensa.dll). Esp32LibBridge ESP32 emulation via lcgamboa QEMU shared library (libqemu-xtensa.dll).
Enables full GPIO, ADC, UART, and WiFi emulation using the PICSimLab callback bridge. Enables full GPIO, ADC, UART, I2C, SPI, RMT, LEDC/PWM, and WiFi emulation
using the PICSimLab callback bridge.
C API exposed by the library: C API exposed by the library:
qemu_init(argc, argv, envp) qemu_init(argc, argv, envp)
@ -11,15 +12,30 @@ C API exposed by the library:
qemu_picsimlab_set_pin(pin: int, value: int) qemu_picsimlab_set_pin(pin: int, value: int)
qemu_picsimlab_set_apin(channel: int, value: int) qemu_picsimlab_set_apin(channel: int, value: int)
qemu_picsimlab_uart_receive(id: int, buf: bytes, size: int) qemu_picsimlab_uart_receive(id: int, buf: bytes, size: int)
qemu_picsimlab_get_internals(type: int) -> void*
qemu_picsimlab_get_TIOCM() -> int
callbacks_t struct (from hw/xtensa/esp32_picsimlab.c): callbacks_t struct (from hw/xtensa/esp32_picsimlab.c):
void (*picsimlab_write_pin)(int pin, int value); void (*picsimlab_write_pin)(int pin, int value)
void (*picsimlab_dir_pin)(int pin, int value); void (*picsimlab_dir_pin)(int pin, int value)
int (*picsimlab_i2c_event)(uint8_t id, uint8_t addr, uint16_t event); int (*picsimlab_i2c_event)(uint8_t id, uint8_t addr, uint16_t event)
uint8_t (*picsimlab_spi_event)(uint8_t id, uint16_t event); uint8_t (*picsimlab_spi_event)(uint8_t id, uint16_t event)
void (*picsimlab_uart_tx_event)(uint8_t id, uint8_t value); void (*picsimlab_uart_tx_event)(uint8_t id, uint8_t value)
const short int *pinmap; const short int *pinmap
void (*picsimlab_rmt_event)(uint8_t channel, uint32_t config0, uint32_t value); void (*picsimlab_rmt_event)(uint8_t channel, uint32_t config0, uint32_t value)
I2C event flags (picsimlab convention):
0x0000 = idle / stop
0x0100 = start + address phase (READ if bit0=1 of addr)
0x0200 = write data byte (byte in bits 7:0)
0x0300 = read request (must return byte to place on SDA)
SPI event flags:
High byte = control flags, low byte = MOSI data
RMT item encoding (value param):
level0<<31 | duration0<<16 | level1<<15 | duration1
duration units = RMT clock ticks (typ. 12.5 ns at 80 MHz APB)
""" """
import asyncio import asyncio
import base64 import base64
@ -38,6 +54,19 @@ _MINGW64_BIN = r"C:\msys64\mingw64\bin"
# Default DLL path: same directory as this module (copied there after build) # Default DLL path: same directory as this module (copied there after build)
_DEFAULT_LIB = str(pathlib.Path(__file__).parent / "libqemu-xtensa.dll") _DEFAULT_LIB = str(pathlib.Path(__file__).parent / "libqemu-xtensa.dll")
# ── GPIO pinmap ──────────────────────────────────────────────────────────────
# pinmap[0] = total number of pin slots (40 for ESP32)
# pinmap[i] = GPIO number for QEMU IRQ slot i (identity mapping: slot i → GPIO i-1)
# When GPIO N changes: callback fires with slot=i where pinmap[i]==N.
_GPIO_COUNT = 40
_PINMAP = (ctypes.c_int16 * (_GPIO_COUNT + 1))(
_GPIO_COUNT, # pinmap[0] = slot count
*range(_GPIO_COUNT), # pinmap[1..40] = GPIO 0..39
)
# Input-only GPIOs on ESP32-WROOM-32 (cannot be driven as output by firmware)
_INPUT_ONLY_GPIOS = frozenset({34, 35, 36, 39})
# ── Callback function types ───────────────────────────────────────────────── # ── Callback function types ─────────────────────────────────────────────────
_WRITE_PIN = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_int) _WRITE_PIN = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_int)
_DIR_PIN = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_int) _DIR_PIN = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_int)
@ -64,114 +93,271 @@ class Esp32LibBridge:
Wraps one libqemu-xtensa.dll instance for a single ESP32 board. Wraps one libqemu-xtensa.dll instance for a single ESP32 board.
The QEMU event loop runs in a daemon thread so it does not block asyncio. The QEMU event loop runs in a daemon thread so it does not block asyncio.
GPIO and UART callbacks are injected back into the asyncio event loop via All async callbacks are dispatched into the asyncio event loop via
call_soon_threadsafe(), keeping the asyncio side thread-safe. call_soon_threadsafe(), keeping the asyncio side thread-safe.
GPIO listeners receive (gpio_num, value) where gpio_num is the real
ESP32 GPIO number (0-39), automatically translated from QEMU IRQ slots.
I2C/SPI handlers are *synchronous* (called from QEMU thread) and must
return the response byte immediately. Register async notifications
separately via the manager layer.
""" """
def __init__(self, lib_path: str, loop: asyncio.AbstractEventLoop): def __init__(self, lib_path: str, loop: asyncio.AbstractEventLoop):
# On Windows, add MinGW64/bin to DLL search path so glib2/gcrypt deps are found
if os.name == 'nt' and os.path.isdir(_MINGW64_BIN): if os.name == 'nt' and os.path.isdir(_MINGW64_BIN):
os.add_dll_directory(_MINGW64_BIN) os.add_dll_directory(_MINGW64_BIN)
self._lib: ctypes.CDLL = ctypes.CDLL(lib_path) self._lib: ctypes.CDLL = ctypes.CDLL(lib_path)
self._loop: asyncio.AbstractEventLoop = loop self._loop: asyncio.AbstractEventLoop = loop
self._thread: threading.Thread | None = None self._thread: threading.Thread | None = None
self._callbacks_ref: _CallbacksT | None = None # keep alive (GC guard) self._callbacks_ref: _CallbacksT | None = None # GC guard
self._firmware_path: str | None = None self._firmware_path: str | None = None
self._gpio_listeners: list = [] # fn(pin: int, state: int)
self._uart_listeners: list = [] # fn(uart_id: int, byte_val: int)
# ── Listener registration ──────────────────────────────────────────────── # ── Listener/handler lists ────────────────────────────────────────
self._gpio_listeners: list = [] # fn(gpio_num: int, value: int)
self._dir_listeners: list = [] # fn(gpio_num: int, direction: int)
self._uart_listeners: list = [] # fn(uart_id: int, byte_val: int)
self._i2c_handlers: list = [] # sync fn(bus, addr, event) -> int
self._spi_handlers: list = [] # sync fn(bus, event) -> int
self._rmt_listeners: list = [] # fn(channel: int, config0: int, value: int)
# GPIO direction state: gpio_num → 0 (input) | 1 (output)
self._gpio_dir: dict[int, int] = {}
# ── Listener registration ─────────────────────────────────────────────
def register_gpio_listener(self, fn) -> None: def register_gpio_listener(self, fn) -> None:
"""fn(gpio_num: int, value: int) — GPIO output changed."""
self._gpio_listeners.append(fn) self._gpio_listeners.append(fn)
def register_dir_listener(self, fn) -> None:
"""fn(gpio_num: int, direction: int) — GPIO direction changed (0=in, 1=out)."""
self._dir_listeners.append(fn)
def register_uart_listener(self, fn) -> None: def register_uart_listener(self, fn) -> None:
"""fn(uart_id: int, byte_val: int) — UART TX byte from ESP32."""
self._uart_listeners.append(fn) self._uart_listeners.append(fn)
# ── Lifecycle ──────────────────────────────────────────────────────────── def register_i2c_handler(self, fn) -> None:
"""Sync fn(bus_id, addr, event) -> int — I2C event (called from QEMU thread)."""
self._i2c_handlers.append(fn)
def register_spi_handler(self, fn) -> None:
"""Sync fn(bus_id, event) -> int — SPI event (called from QEMU thread)."""
self._spi_handlers.append(fn)
def register_rmt_listener(self, fn) -> None:
"""fn(channel: int, config0: int, value: int) — RMT pulse event."""
self._rmt_listeners.append(fn)
# ── Lifecycle ─────────────────────────────────────────────────────────
def start(self, firmware_b64: str, machine: str = 'esp32-picsimlab') -> None: def start(self, firmware_b64: str, machine: str = 'esp32-picsimlab') -> None:
"""Decode firmware, init QEMU, start event loop in daemon thread.""" """Decode firmware, init QEMU, start event loop in daemon thread."""
# Write firmware to temp file
fw_bytes = base64.b64decode(firmware_b64) fw_bytes = base64.b64decode(firmware_b64)
tmp = tempfile.NamedTemporaryFile(suffix='.bin', delete=False) tmp = tempfile.NamedTemporaryFile(suffix='.bin', delete=False)
tmp.write(fw_bytes) tmp.write(fw_bytes)
tmp.close() tmp.close()
self._firmware_path = tmp.name self._firmware_path = tmp.name
# Build argv (bytes) # ROM directory: esp32-v3-rom.bin lives beside the DLL
rom_dir = str(pathlib.Path(_DEFAULT_LIB).parent).encode()
args_bytes = [ args_bytes = [
b'qemu', b'qemu',
b'-M', machine.encode(), b'-M', machine.encode(),
b'-nographic', b'-nographic',
b'-L', rom_dir,
b'-drive', f'file={self._firmware_path},if=mtd,format=raw'.encode(), b'-drive', f'file={self._firmware_path},if=mtd,format=raw'.encode(),
] ]
argc = len(args_bytes) argc = len(args_bytes)
argv = (ctypes.c_char_p * argc)(*args_bytes) argv = (ctypes.c_char_p * argc)(*args_bytes)
# Build and register callbacks BEFORE qemu_init
cbs = _CallbacksT( cbs = _CallbacksT(
picsimlab_write_pin = _WRITE_PIN(self._on_pin_change), picsimlab_write_pin = _WRITE_PIN(self._on_pin_change),
picsimlab_dir_pin = _DIR_PIN(lambda _p, _v: None), picsimlab_dir_pin = _DIR_PIN(self._on_dir_change),
picsimlab_i2c_event = _I2C_EVENT(lambda *_a: 0), picsimlab_i2c_event = _I2C_EVENT(self._on_i2c_event),
picsimlab_spi_event = _SPI_EVENT(lambda *_a: 0), picsimlab_spi_event = _SPI_EVENT(self._on_spi_event),
picsimlab_uart_tx_event = _UART_TX(self._on_uart_tx), picsimlab_uart_tx_event = _UART_TX(self._on_uart_tx),
pinmap = None, pinmap = ctypes.cast(_PINMAP, ctypes.c_void_p).value,
picsimlab_rmt_event = _RMT_EVENT(lambda *_a: None), picsimlab_rmt_event = _RMT_EVENT(self._on_rmt_event),
) )
self._callbacks_ref = cbs # prevent GC while QEMU is running self._callbacks_ref = cbs
self._lib.qemu_picsimlab_register_callbacks(ctypes.byref(cbs)) self._lib.qemu_picsimlab_register_callbacks(ctypes.byref(cbs))
# Initialize QEMU (sets up machine, loads firmware) # qemu_init() and qemu_main_loop() MUST run in the same thread (BQL)
self._lib.qemu_init(argc, argv, None) self._init_done = threading.Event()
self._init_error: str | None = None
def _qemu_thread() -> None:
try:
self._lib.qemu_init(argc, argv, None)
except Exception as exc:
self._init_error = str(exc)
finally:
self._init_done.set()
if self._init_error is None:
self._lib.qemu_main_loop()
# Run QEMU event loop in a daemon thread
self._thread = threading.Thread( self._thread = threading.Thread(
target=self._lib.qemu_main_loop, target=_qemu_thread,
daemon=True, daemon=True,
name=f'qemu-esp32-{machine}', name=f'qemu-esp32-{machine}',
) )
self._thread.start() self._thread.start()
if not self._init_done.wait(timeout=30.0):
raise TimeoutError('qemu_init() did not complete within 30 s')
if self._init_error:
raise RuntimeError(f'qemu_init() failed: {self._init_error}')
logger.info('lcgamboa QEMU started: machine=%s firmware=%s', machine, self._firmware_path) logger.info('lcgamboa QEMU started: machine=%s firmware=%s', machine, self._firmware_path)
def stop(self) -> None: def stop(self) -> None:
"""Terminate the QEMU instance and clean up firmware temp file.""" """Terminate the QEMU instance and clean up."""
try: try:
self._lib.qemu_cleanup() self._lib.qemu_cleanup()
except Exception as e: except Exception as exc:
logger.debug('qemu_cleanup: %s', e) logger.debug('qemu_cleanup: %s', exc)
self._callbacks_ref = None self._callbacks_ref = None
if self._firmware_path and os.path.exists(self._firmware_path): if self._firmware_path and os.path.exists(self._firmware_path):
try: try:
os.unlink(self._firmware_path) os.unlink(self._firmware_path)
except Exception: except OSError:
pass pass
self._firmware_path = None self._firmware_path = None
logger.info('Esp32LibBridge stopped') logger.info('Esp32LibBridge stopped')
# ── GPIO / ADC / UART control ──────────────────────────────────────────── @property
def is_alive(self) -> bool:
"""Return True if the QEMU daemon thread is still running."""
return self._thread is not None and self._thread.is_alive()
def set_pin(self, pin: int, value: int) -> None: # ── GPIO / ADC / UART control ─────────────────────────────────────────
"""Drive a digital GPIO pin (from an external source, e.g. connected Arduino)."""
self._lib.qemu_picsimlab_set_pin(pin, value)
def set_adc(self, channel: int, value: int) -> None: def set_pin(self, gpio_num: int, value: int) -> None:
"""Set ADC channel (0-9). value is 12-bit raw (0-4095).""" """Drive a GPIO input from outside (e.g. button press, connected component).
self._lib.qemu_picsimlab_set_apin(channel, value) gpio_num is the real ESP32 GPIO number (0-39)."""
# Identity pinmap: slot = gpio_num + 1
slot = gpio_num + 1
self._lib.qemu_picsimlab_set_pin(slot, value)
def set_adc(self, channel: int, millivolts: int) -> None:
"""Set ADC channel voltage. channel 0-9, millivolts 0-3300."""
raw = int(millivolts * 4095 / 3300)
self._lib.qemu_picsimlab_set_apin(channel, max(0, min(4095, raw)))
def set_adc_raw(self, channel: int, raw: int) -> None:
"""Set ADC channel directly with 12-bit raw value (0-4095)."""
self._lib.qemu_picsimlab_set_apin(channel, max(0, min(4095, raw)))
def uart_send(self, uart_id: int, data: bytes) -> None: def uart_send(self, uart_id: int, data: bytes) -> None:
"""Send bytes to the ESP32's UART (simulated RX).""" """Send bytes to the ESP32's UART RX (simulated serial input)."""
buf = (ctypes.c_uint8 * len(data))(*data) buf = (ctypes.c_uint8 * len(data))(*data)
self._lib.qemu_picsimlab_uart_receive(uart_id, buf, len(data)) self._lib.qemu_picsimlab_uart_receive(uart_id, buf, len(data))
# ── Internal callbacks (called from QEMU thread) ───────────────────────── def get_gpio_direction(self, gpio_num: int) -> int:
"""Return last known direction for gpio_num: 0=input, 1=output."""
return self._gpio_dir.get(gpio_num, 0)
def _on_pin_change(self, pin: int, value: int) -> None: # ── LEDC / PWM introspection ──────────────────────────────────────────
"""Called by QEMU whenever the ESP32 drives a GPIO output."""
def get_ledc_duty(self, channel: int) -> int | None:
"""
Read LEDC channel duty cycle via qemu_picsimlab_get_internals(0).
Returns raw 32-bit duty register value, or None if unavailable.
channel 0-15 maps to LEDC channels 0-15 (low-speed + high-speed).
"""
try:
self._lib.qemu_picsimlab_get_internals.restype = ctypes.c_void_p
ptr = self._lib.qemu_picsimlab_get_internals(0)
if ptr is None:
return None
arr = (ctypes.c_uint32 * 16).from_address(ptr)
return int(arr[channel]) if 0 <= channel < 16 else None
except Exception:
return None
def get_tiocm(self) -> int:
"""Read UART modem control lines bitmask (TIOCM_*)."""
try:
self._lib.qemu_picsimlab_get_TIOCM.restype = ctypes.c_int
return int(self._lib.qemu_picsimlab_get_TIOCM())
except Exception:
return 0
# ── Static helpers ────────────────────────────────────────────────────
@staticmethod
def decode_rmt_item(value: int) -> tuple[int, int, int, int]:
"""
Decode a 32-bit RMT item into (level0, duration0, level1, duration1).
Bit layout: level0[31] | duration0[30:16] | level1[15] | duration1[14:0]
Durations are in RMT clock ticks (12.5 ns per tick at 80 MHz APB).
"""
level0 = (value >> 31) & 1
duration0 = (value >> 16) & 0x7FFF
level1 = (value >> 15) & 1
duration1 = value & 0x7FFF
return level0, duration0, level1, duration1
# ── Internal callbacks (called from QEMU thread) ──────────────────────
def _slot_to_gpio(self, slot: int) -> int:
"""Translate QEMU IRQ slot index to ESP32 GPIO number via pinmap."""
if 1 <= slot <= _GPIO_COUNT:
return int(_PINMAP[slot])
return slot
def _on_pin_change(self, slot: int, value: int) -> None:
"""GPIO output changed — translate slot→GPIO, dispatch to async listeners."""
gpio = self._slot_to_gpio(slot)
for fn in self._gpio_listeners: for fn in self._gpio_listeners:
self._loop.call_soon_threadsafe(fn, pin, value) self._loop.call_soon_threadsafe(fn, gpio, value)
def _on_dir_change(self, slot: int, direction: int) -> None:
"""GPIO direction changed (0=input, 1=output)."""
gpio = self._slot_to_gpio(slot)
self._gpio_dir[gpio] = direction
for fn in self._dir_listeners:
self._loop.call_soon_threadsafe(fn, gpio, direction)
def _on_i2c_event(self, bus_id: int, addr: int, event: int) -> int:
"""
I2C bus event synchronous, called from QEMU thread.
Calls all registered sync handlers; returns last non-zero response byte.
"""
response = 0
for fn in self._i2c_handlers:
try:
resp = fn(bus_id, addr, event)
if resp:
response = resp & 0xFF
except Exception as exc:
logger.debug('i2c_handler error: %s', exc)
return response
def _on_spi_event(self, bus_id: int, event: int) -> int:
"""
SPI bus event synchronous, called from QEMU thread.
Returns MISO byte (0xFF = idle bus).
"""
response = 0xFF
for fn in self._spi_handlers:
try:
resp = fn(bus_id, event)
if resp is not None:
response = resp & 0xFF
except Exception as exc:
logger.debug('spi_handler error: %s', exc)
return response
def _on_uart_tx(self, uart_id: int, byte_val: int) -> None: def _on_uart_tx(self, uart_id: int, byte_val: int) -> None:
"""Called by QEMU for each byte the ESP32 transmits on UART.""" """UART TX byte transmitted by ESP32 firmware."""
for fn in self._uart_listeners: for fn in self._uart_listeners:
self._loop.call_soon_threadsafe(fn, uart_id, byte_val) self._loop.call_soon_threadsafe(fn, uart_id, byte_val)
def _on_rmt_event(self, channel: int, config0: int, value: int) -> None:
"""RMT pulse event — used for NeoPixel/WS2812, IR remotes, etc."""
for fn in self._rmt_listeners:
self._loop.call_soon_threadsafe(fn, channel, config0, value)

View File

@ -3,10 +3,24 @@ EspLibManager — ESP32 emulation via lcgamboa libqemu-xtensa.dll.
Exposes the same public API as EspQemuManager so simulation.py can Exposes the same public API as EspQemuManager so simulation.py can
transparently switch between the two backends: transparently switch between the two backends:
- DLL available full GPIO + ADC + UART + WiFi (this module) - DLL available full GPIO + ADC + UART + I2C + SPI + RMT + WiFi (this module)
- DLL missing serial-only via subprocess (esp_qemu_manager.py) - DLL missing serial-only via subprocess (esp_qemu_manager.py)
Activation: set environment variable QEMU_ESP32_LIB to the DLL path. Activation: set environment variable QEMU_ESP32_LIB to the DLL path,
or place libqemu-xtensa.dll in the same directory as this module.
Events emitted via callback(event_type, data):
system {event: 'booting'|'booted'|'crash'|'reboot'}
serial_output {data: str, uart: int} UART 0/1/2 text
gpio_change {pin: int, state: int} real GPIO number (0-39)
gpio_dir {pin: int, dir: int} 0=input 1=output
i2c_event {bus: int, addr: int, event: int, response: int}
spi_event {bus: int, event: int, response: int}
rmt_event {channel: int, config0: int, value: int,
level0: int, dur0: int, level1: int, dur1: int}
ws2812_update {channel: int, pixels: list[{r,g,b}]}
ledc_update {channel: int, duty: int, duty_pct: float}
error {message: str}
""" """
import asyncio import asyncio
import logging import logging
@ -17,49 +31,160 @@ from .esp32_lib_bridge import Esp32LibBridge, _DEFAULT_LIB
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Path to libqemu-xtensa.dll — env var takes priority, then auto-detect beside this module # DLL path: env var takes priority, then auto-detect beside this module
LIB_PATH: str = os.environ.get('QEMU_ESP32_LIB', '') or ( LIB_PATH: str = os.environ.get('QEMU_ESP32_LIB', '') or (
_DEFAULT_LIB if os.path.isfile(_DEFAULT_LIB) else '' _DEFAULT_LIB if os.path.isfile(_DEFAULT_LIB) else ''
) )
EventCallback = Callable[[str, dict], Awaitable[None]] EventCallback = Callable[[str, dict], Awaitable[None]]
# lcgamboa machine names (esp32-picsimlab has the GPIO callback bridge) # lcgamboa machine names
_MACHINE: dict[str, str] = { _MACHINE: dict[str, str] = {
'esp32': 'esp32-picsimlab', 'esp32': 'esp32-picsimlab',
'esp32-s3': 'esp32s3-picsimlab', 'esp32-s3': 'esp32s3-picsimlab',
'esp32-c3': 'esp32c3-picsimlab', 'esp32-c3': 'esp32c3-picsimlab',
} }
# ── WS2812 / NeoPixel RMT decoder ────────────────────────────────────────────
# WS2812 timing at 80 MHz APB (12.5 ns per tick):
# Bit 1: high ~64 ticks (800 ns), low ~36 ticks (450 ns)
# Bit 0: high ~32 ticks (400 ns), low ~68 ticks (850 ns)
# Threshold: high pulse > 48 ticks → bit 1
_WS2812_HIGH_THRESHOLD = 48
class _UartBuffer:
"""Accumulates bytes per UART channel, flushes on newline or size limit."""
def __init__(self, uart_id: int, flush_size: int = 256):
self.uart_id = uart_id
self.flush_size = flush_size
self._buf: bytearray = bytearray()
def feed(self, byte_val: int) -> str | None:
"""Add one byte. Returns decoded string if a flush occurred, else None."""
self._buf.append(byte_val)
if byte_val == ord('\n') or len(self._buf) >= self.flush_size:
text = self._buf.decode('utf-8', errors='replace')
self._buf.clear()
return text
return None
def flush(self) -> str | None:
"""Force-flush any remaining bytes."""
if self._buf:
text = self._buf.decode('utf-8', errors='replace')
self._buf.clear()
return text
return None
class _RmtDecoder:
"""
Per-channel RMT bit accumulator for WS2812 NeoPixel decoding.
Each RMT item encodes two pulses (level0/dur0 and level1/dur1).
High pulses are classified as bit 1 or bit 0 based on duration.
Every 24 bits assemble one GRB pixel, converted to RGB on output.
A reset pulse (both durations == 0) signals end-of-frame.
"""
def __init__(self, channel: int):
self.channel = channel
self._bits: list[int] = []
self._pixels: list[dict] = []
def feed(self, config0: int, value: int) -> list[dict] | None:
"""
Process one RMT item. Returns list of {r,g,b} dicts when a full
NeoPixel frame ends, else None.
"""
level0, dur0, level1, dur1 = Esp32LibBridge.decode_rmt_item(value)
# Reset pulse (end of frame)
if dur0 == 0 and dur1 == 0:
pixels = self._consume_pixels()
self._bits.clear()
return pixels or None
# Classify the high pulse (level0=1 carries the bit)
if level0 == 1 and dur0 > 0:
self._bits.append(1 if dur0 > _WS2812_HIGH_THRESHOLD else 0)
# Every 24 bits → one pixel (WS2812 GRB order)
while len(self._bits) >= 24:
g = self._byte(0)
r = self._byte(8)
b = self._byte(16)
self._pixels.append({'r': r, 'g': g, 'b': b})
self._bits = self._bits[24:]
return None
def _byte(self, offset: int) -> int:
val = 0
for i in range(8):
val = (val << 1) | self._bits[offset + i]
return val
def _consume_pixels(self) -> list[dict]:
pix = list(self._pixels)
self._pixels.clear()
return pix
class _InstanceState: class _InstanceState:
"""Tracks one running ESP32 instance.""" """Runtime state for one running ESP32 instance."""
def __init__(self, bridge: Esp32LibBridge, callback: EventCallback, board_type: str): def __init__(self, bridge: Esp32LibBridge, callback: EventCallback, board_type: str):
self.bridge = bridge self.bridge = bridge
self.callback = callback self.callback = callback
self.board_type = board_type self.board_type = board_type
self.reboot_count = 0
self.crashed = False
# Per-UART buffers (0=main Serial, 1=Serial1, 2=Serial2)
self.uart_bufs: dict[int, _UartBuffer] = {
0: _UartBuffer(0),
1: _UartBuffer(1),
2: _UartBuffer(2),
}
# Per-RMT-channel decoders (lazy init)
self.rmt_decoders: dict[int, _RmtDecoder] = {}
# I2C device simulation: 7-bit addr → response byte
self.i2c_responses: dict[int, int] = {}
# SPI MISO byte returned during transfers
self.spi_response: int = 0xFF
self._CRASH_STR = 'Cache disabled but cached memory region accessed'
self._REBOOT_STR = 'Rebooting...'
class EspLibManager: class EspLibManager:
""" """
Manager for ESP32 emulation via libqemu-xtensa.dll. Manager for ESP32 emulation via libqemu-xtensa.dll.
Uses Esp32LibBridge to load the lcgamboa QEMU shared library. Translates raw hardware callbacks from Esp32LibBridge into rich
GPIO, ADC, UART, and WiFi events are delivered via the callback WebSocket events for the Velxio frontend, including:
function registered at start_instance(). GPIO changes with real GPIO numbers (not QEMU slot indices)
GPIO direction tracking
UART output on all 3 UARTs with crash/reboot detection
I2C / SPI bus events with configurable device simulation
RMT pulse events + automatic WS2812 NeoPixel decoding
LEDC/PWM duty cycle polling
""" """
def __init__(self): def __init__(self):
self._instances: dict[str, _InstanceState] = {} self._instances: dict[str, _InstanceState] = {}
# ── Availability check ─────────────────────────────────────────────────── # ── Availability ──────────────────────────────────────────────────────
@staticmethod @staticmethod
def is_available() -> bool: def is_available() -> bool:
"""Return True if the DLL path is configured and the file exists."""
return bool(LIB_PATH) and os.path.isfile(LIB_PATH) return bool(LIB_PATH) and os.path.isfile(LIB_PATH)
# ── Public API (mirrors EspQemuManager) ───────────────────────────────── # ── Public API ────────────────────────────────────────────────────────
def start_instance( def start_instance(
self, self,
@ -72,64 +197,135 @@ class EspLibManager:
logger.warning('start_instance: %s already running', client_id) logger.warning('start_instance: %s already running', client_id)
return return
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
bridge = Esp32LibBridge(LIB_PATH, loop) bridge = Esp32LibBridge(LIB_PATH, loop)
state = _InstanceState(bridge, callback, board_type)
# ── GPIO listener → emit gpio_change events ──────────────────────────
async def _on_gpio(pin: int, state: int) -> None:
await callback('gpio_change', {'pin': pin, 'state': state})
# ── UART listener → accumulate bytes, emit serial_output ─────────────
uart_buf: bytearray = bytearray()
async def _on_uart(uart_id: int, byte_val: int) -> None:
if uart_id == 0:
uart_buf.append(byte_val)
# Flush on newline or if buffer gets large
if byte_val == ord('\n') or len(uart_buf) >= 256:
text = uart_buf.decode('utf-8', errors='replace')
uart_buf.clear()
await callback('serial_output', {'data': text})
bridge.register_gpio_listener(
lambda p, s: loop.call_soon_threadsafe(
lambda: asyncio.ensure_future(_on_gpio(p, s), loop=loop)
)
)
bridge.register_uart_listener(
lambda i, b: loop.call_soon_threadsafe(
lambda: asyncio.ensure_future(_on_uart(i, b), loop=loop)
)
)
machine = _MACHINE.get(board_type, 'esp32-picsimlab')
state = _InstanceState(bridge, callback, board_type)
self._instances[client_id] = state self._instances[client_id] = state
# ── GPIO output ───────────────────────────────────────────────────
async def _on_gpio(gpio: int, value: int) -> None:
await callback('gpio_change', {'pin': gpio, 'state': value})
# ── GPIO direction ────────────────────────────────────────────────
async def _on_dir(gpio: int, direction: int) -> None:
await callback('gpio_dir', {'pin': gpio, 'dir': direction})
# ── UART (all channels) ───────────────────────────────────────────
async def _on_uart(uart_id: int, byte_val: int) -> None:
buf = state.uart_bufs.get(uart_id)
if buf is None:
return
text = buf.feed(byte_val)
if text is None:
return
# Crash / reboot detection (UART 0 only)
if uart_id == 0:
if state._CRASH_STR in text and not state.crashed:
state.crashed = True
await callback('system', {
'event': 'crash',
'reason': 'cache_error',
'reboot': state.reboot_count,
})
if state._REBOOT_STR in text:
state.crashed = False
state.reboot_count += 1
await callback('system', {
'event': 'reboot',
'count': state.reboot_count,
})
await callback('serial_output', {'data': text, 'uart': uart_id})
# ── I2C sync handler (called from QEMU thread) ────────────────────
def _i2c_sync(bus_id: int, addr: int, event: int) -> int:
resp = state.i2c_responses.get(addr, 0)
async def _notify() -> None:
await callback('i2c_event', {
'bus': bus_id, 'addr': addr,
'event': event, 'response': resp,
})
loop.call_soon_threadsafe(
lambda: asyncio.ensure_future(_notify(), loop=loop)
)
return resp
# ── SPI sync handler (called from QEMU thread) ────────────────────
def _spi_sync(bus_id: int, event: int) -> int:
resp = state.spi_response
async def _notify() -> None:
await callback('spi_event', {
'bus': bus_id, 'event': event, 'response': resp,
})
loop.call_soon_threadsafe(
lambda: asyncio.ensure_future(_notify(), loop=loop)
)
return resp
# ── RMT + WS2812 decoder ──────────────────────────────────────────
async def _on_rmt(channel: int, config0: int, value: int) -> None:
if channel not in state.rmt_decoders:
state.rmt_decoders[channel] = _RmtDecoder(channel)
level0, dur0, level1, dur1 = Esp32LibBridge.decode_rmt_item(value)
await callback('rmt_event', {
'channel': channel, 'config0': config0, 'value': value,
'level0': level0, 'dur0': dur0,
'level1': level1, 'dur1': dur1,
})
pixels = state.rmt_decoders[channel].feed(config0, value)
if pixels:
await callback('ws2812_update', {
'channel': channel,
'pixels': pixels,
})
# ── Helper: wrap async fn for call_soon_threadsafe ────────────────
def _async_wrap(coro_fn):
def _caller(*args):
asyncio.ensure_future(coro_fn(*args), loop=loop)
return _caller
bridge.register_gpio_listener(_async_wrap(_on_gpio))
bridge.register_dir_listener(_async_wrap(_on_dir))
bridge.register_uart_listener(_async_wrap(_on_uart))
bridge.register_i2c_handler(_i2c_sync)
bridge.register_spi_handler(_spi_sync)
bridge.register_rmt_listener(_async_wrap(_on_rmt))
asyncio.ensure_future(callback('system', {'event': 'booting'})) asyncio.ensure_future(callback('system', {'event': 'booting'}))
machine = _MACHINE.get(board_type, 'esp32-picsimlab')
if firmware_b64: if firmware_b64:
try: try:
bridge.start(firmware_b64, machine) bridge.start(firmware_b64, machine)
asyncio.ensure_future(callback('system', {'event': 'booted'})) asyncio.ensure_future(callback('system', {'event': 'booted'}))
except Exception as e: except Exception as exc:
logger.error('start_instance %s: bridge.start failed: %s', client_id, e) logger.error('start_instance %s: bridge.start failed: %s', client_id, exc)
self._instances.pop(client_id, None) self._instances.pop(client_id, None)
asyncio.ensure_future(callback('error', {'message': str(e)})) asyncio.ensure_future(callback('error', {'message': str(exc)}))
else: else:
# No firmware yet — instance registered, waiting for load_firmware()
logger.info('start_instance %s: no firmware, waiting for load_firmware()', client_id) logger.info('start_instance %s: no firmware, waiting for load_firmware()', client_id)
def stop_instance(self, client_id: str) -> None: def stop_instance(self, client_id: str) -> None:
state = self._instances.pop(client_id, None) state = self._instances.pop(client_id, None)
if state: if not state:
try: return
state.bridge.stop() for buf in state.uart_bufs.values():
except Exception as e: remaining = buf.flush()
logger.warning('stop_instance %s: %s', client_id, e) if remaining:
asyncio.ensure_future(
state.callback('serial_output', {'data': remaining, 'uart': buf.uart_id})
)
try:
state.bridge.stop()
except Exception as exc:
logger.warning('stop_instance %s: %s', client_id, exc)
def load_firmware(self, client_id: str, firmware_b64: str) -> None: def load_firmware(self, client_id: str, firmware_b64: str) -> None:
"""Hot-reload firmware: stop current bridge, start fresh with new firmware.""" """Hot-reload firmware: stop current bridge, restart with new firmware."""
state = self._instances.get(client_id) state = self._instances.get(client_id)
if not state: if not state:
logger.warning('load_firmware: no instance %s', client_id) logger.warning('load_firmware: no instance %s', client_id)
@ -144,17 +340,83 @@ class EspLibManager:
asyncio.create_task(_restart()) asyncio.create_task(_restart())
def set_pin_state(self, client_id: str, pin: int | str, state: int) -> None: # ── GPIO / ADC / UART control ─────────────────────────────────────────
"""Drive a GPIO pin from an external board (e.g. Arduino output → ESP32 input)."""
inst = self._instances.get(client_id)
if inst:
inst.bridge.set_pin(int(pin), state)
async def send_serial_bytes(self, client_id: str, data: bytes) -> None: def set_pin_state(self, client_id: str, pin: int | str, state_val: int) -> None:
"""Send bytes to the ESP32 UART0 RX (user serial input).""" """Drive a GPIO input pin (real GPIO number 0-39)."""
inst = self._instances.get(client_id) inst = self._instances.get(client_id)
if inst: if inst:
inst.bridge.uart_send(0, data) inst.bridge.set_pin(int(pin), state_val)
async def send_serial_bytes(
self, client_id: str, data: bytes, uart_id: int = 0
) -> None:
"""Send bytes to ESP32 UART RX (uart_id 0/1/2)."""
inst = self._instances.get(client_id)
if inst:
inst.bridge.uart_send(uart_id, data)
def set_adc(self, client_id: str, channel: int, millivolts: int) -> None:
"""Set ADC channel voltage in millivolts (0-3300 mV → 0-4095 raw)."""
inst = self._instances.get(client_id)
if inst:
inst.bridge.set_adc(channel, millivolts)
def set_adc_raw(self, client_id: str, channel: int, raw: int) -> None:
"""Set ADC channel with 12-bit raw value (0-4095)."""
inst = self._instances.get(client_id)
if inst:
inst.bridge.set_adc_raw(channel, raw)
# ── I2C / SPI device simulation ───────────────────────────────────────
def set_i2c_response(self, client_id: str, addr: int, response_byte: int) -> None:
"""Configure the byte returned when ESP32 reads from I2C address addr."""
inst = self._instances.get(client_id)
if inst:
inst.i2c_responses[addr] = response_byte & 0xFF
def set_spi_response(self, client_id: str, response_byte: int) -> None:
"""Configure the MISO byte returned during SPI transfers."""
inst = self._instances.get(client_id)
if inst:
inst.spi_response = response_byte & 0xFF
# ── LEDC / PWM polling ────────────────────────────────────────────────
async def poll_ledc(self, client_id: str) -> None:
"""
Read all 16 LEDC channels and emit ledc_update for any with non-zero duty.
Call periodically (e.g. every 50 ms) for PWM-driven LEDs/servos.
"""
inst = self._instances.get(client_id)
if not inst:
return
for ch in range(16):
duty = inst.bridge.get_ledc_duty(ch)
if duty is not None and duty > 0:
duty_pct = round(duty / 8192 * 100, 1)
await inst.callback('ledc_update', {
'channel': ch,
'duty': duty,
'duty_pct': duty_pct,
})
# ── Status ────────────────────────────────────────────────────────────
def get_status(self, client_id: str) -> dict:
"""Return runtime status for a client instance."""
inst = self._instances.get(client_id)
if not inst:
return {'running': False}
return {
'running': True,
'alive': inst.bridge.is_alive,
'board': inst.board_type,
'reboot_count': inst.reboot_count,
'crashed': inst.crashed,
'gpio_dir': dict(inst.bridge._gpio_dir),
}
esp_lib_manager = EspLibManager() esp_lib_manager = EspLibManager()

560
docs/ESP32_EMULATION.md Normal file
View File

@ -0,0 +1,560 @@
# ESP32 Emulation — Documentación Técnica
> Estado: **Funcional** · Backend completo · Frontend parcial
> Motor: **lcgamboa/qemu-8.1.3** · Plataforma: **arduino-esp32 2.0.17 (IDF 4.4.x)**
---
## 1. Arquitectura general
```
Usuario (browser)
└── WebSocket (/ws/{client_id})
└── simulation.py (FastAPI router)
├── EspLibManager ← backend con DLL (GPIO, WiFi, I2C, SPI, RMT…)
└── EspQemuManager ← fallback solo-UART via subprocess
[QEMU_ESP32_LIB=libqemu-xtensa.dll]
Esp32LibBridge (ctypes)
libqemu-xtensa.dll ← lcgamboa fork de QEMU 8.1.3
Machine: esp32-picsimlab
┌──────────┴──────────┐
CPU Xtensa LX6 periféricos emulados
(dual-core) GPIO · ADC · UART · I2C · SPI
RMT · LEDC · Timer · WiFi · Flash
```
El sistema selecciona backend automáticamente:
- **DLL disponible**`EspLibManager` (GPIO completo + todos los periféricos)
- **DLL ausente**`EspQemuManager` (solo UART serial via TCP, subprocess QEMU)
Activación de DLL: colocar `libqemu-xtensa.dll` en `backend/app/services/` o setear:
```bash
QEMU_ESP32_LIB=C:/ruta/a/libqemu-xtensa.dll uvicorn app.main:app
```
---
## 2. Componentes del sistema
### 2.1 `libqemu-xtensa.dll`
Compilada desde el fork [lcgamboa/qemu](https://github.com/lcgamboa/qemu) rama `qemu-8.1.3`.
**Dependencias en runtime (Windows):**
```
C:\msys64\mingw64\bin\
libglib-2.0-0.dll
libgcrypt-20.dll
libgpg-error-0.dll
libslirp-0.dll
libintl-8.dll
libpcre2-8-0.dll
(y ~15 DLLs más de MinGW64)
```
El bridge las registra automáticamente con `os.add_dll_directory()`.
**ROM binaries requeridas** (deben estar en la misma carpeta que la DLL):
```
backend/app/services/
libqemu-xtensa.dll ← motor principal
esp32-v3-rom.bin ← ROM de boot del ESP32 (copiar de esp-qemu)
esp32-v3-rom-app.bin ← ROM de aplicación
```
**Cómo obtener los ROM binaries:**
```bash
# Desde instalación de Espressif QEMU:
copy "C:\esp-qemu\qemu\share\qemu\esp32-v3-rom.bin" backend\app\services\
copy "C:\esp-qemu\qemu\share\qemu\esp32-v3-rom-app.bin" backend\app\services\
```
**Exports de la DLL:**
```c
void qemu_init(int argc, char** argv, char** envp)
void qemu_main_loop(void)
void qemu_cleanup(void)
void qemu_picsimlab_register_callbacks(callbacks_t* cbs)
void qemu_picsimlab_set_pin(int slot, int value) // GPIO input
void qemu_picsimlab_set_apin(int channel, int value) // ADC input (0-4095)
void qemu_picsimlab_uart_receive(int id, uint8_t* buf, int size)
void* qemu_picsimlab_get_internals(int type) // LEDC duty array
int qemu_picsimlab_get_TIOCM(void) // UART modem lines
```
**Struct de callbacks C:**
```c
typedef struct {
void (*picsimlab_write_pin)(int pin, int value); // GPIO output changed
void (*picsimlab_dir_pin)(int pin, int value); // GPIO direction changed
int (*picsimlab_i2c_event)(uint8_t id, uint8_t addr, uint16_t event);
uint8_t (*picsimlab_spi_event)(uint8_t id, uint16_t event);
void (*picsimlab_uart_tx_event)(uint8_t id, uint8_t value);
const short int *pinmap; // slot → GPIO number mapping
void (*picsimlab_rmt_event)(uint8_t ch, uint32_t config0, uint32_t value);
} callbacks_t;
```
---
### 2.2 GPIO Pinmap
```python
# Identity mapping: QEMU IRQ slot i → GPIO number i-1
_PINMAP = (ctypes.c_int16 * 41)(
40, # pinmap[0] = count
*range(40) # pinmap[1..40] = GPIO 0..39
)
```
Cuando GPIO N cambia, QEMU llama `picsimlab_write_pin(slot=N+1, value)`.
El bridge traduce automáticamente slot → GPIO real antes de notificar listeners.
**GPIOs input-only en ESP32-WROOM-32:** `{34, 35, 36, 39}` — no pueden ser output.
---
### 2.3 `Esp32LibBridge` (Python ctypes)
Archivo: `backend/app/services/esp32_lib_bridge.py`
```python
bridge = Esp32LibBridge(lib_path, asyncio_loop)
# Registrar listeners (async, llamados desde asyncio)
bridge.register_gpio_listener(fn) # fn(gpio_num: int, value: int)
bridge.register_dir_listener(fn) # fn(gpio_num: int, direction: int)
bridge.register_uart_listener(fn) # fn(uart_id: int, byte_val: int)
bridge.register_rmt_listener(fn) # fn(channel: int, config0: int, value: int)
# Registrar handlers I2C/SPI (sync, llamados desde thread QEMU)
bridge.register_i2c_handler(fn) # fn(bus, addr, event) -> int
bridge.register_spi_handler(fn) # fn(bus, event) -> int
# Control
bridge.start(firmware_b64, machine='esp32-picsimlab')
bridge.stop()
bridge.is_alive # bool
# GPIO / ADC / UART
bridge.set_pin(gpio_num, value) # Drive GPIO input (usa GPIO real 0-39)
bridge.set_adc(channel, millivolts) # ADC en mV (0-3300)
bridge.set_adc_raw(channel, raw) # ADC en raw 12-bit (0-4095)
bridge.uart_send(uart_id, data) # Enviar bytes al UART RX del ESP32
# LEDC/PWM
bridge.get_ledc_duty(channel) # canal 0-15 → raw duty | None
bridge.get_tiocm() # UART modem lines bitmask
# Helper
bridge.decode_rmt_item(value) # → (level0, dur0, level1, dur1)
```
**Threading crítico:**
`qemu_init()` y `qemu_main_loop()` **deben correr en el mismo thread** (BQL — Big QEMU Lock es thread-local). El bridge los ejecuta en un único daemon thread y usa `threading.Event` para sincronizar el inicio:
```python
# Correcto:
def _qemu_thread():
lib.qemu_init(argc, argv, None) # init + init_done.set()
lib.qemu_main_loop() # bloquea indefinidamente
# Incorrecto:
lib.qemu_init(...) # en thread A
lib.qemu_main_loop() # en thread B ← crash: "qemu_mutex_unlock_iothread assertion failed"
```
---
### 2.4 `EspLibManager` (Python)
Archivo: `backend/app/services/esp32_lib_manager.py`
Convierte callbacks de hardware en **eventos WebSocket** para el frontend:
| Evento emitido | Datos | Cuándo |
|----------------|-------|--------|
| `system` | `{event: 'booting'\|'booted'\|'crash'\|'reboot', ...}` | Ciclo de vida |
| `serial_output` | `{data: str, uart: 0\|1\|2}` | UART TX del ESP32 |
| `gpio_change` | `{pin: int, state: 0\|1}` | GPIO output cambia |
| `gpio_dir` | `{pin: int, dir: 0\|1}` | GPIO cambia dirección |
| `i2c_event` | `{bus, addr, event, response}` | Transacción I2C |
| `spi_event` | `{bus, event, response}` | Transacción SPI |
| `rmt_event` | `{channel, config0, value, level0, dur0, level1, dur1}` | Pulso RMT |
| `ws2812_update` | `{channel, pixels: [{r,g,b}]}` | Frame NeoPixel completo |
| `ledc_update` | `{channel, duty, duty_pct}` | PWM duty cycle |
| `error` | `{message: str}` | Error de boot |
**Detección de crash y reboot:**
```python
# El firmware imprime en UART cuando crashea:
"Cache disabled but cached memory region accessed" → event: crash
"Rebooting..." → event: reboot
```
**API pública del manager:**
```python
manager = esp_lib_manager # singleton
manager.start_instance(client_id, board_type, callback, firmware_b64)
manager.stop_instance(client_id)
manager.load_firmware(client_id, firmware_b64) # hot-reload
manager.set_pin_state(client_id, gpio_num, value) # GPIO input
manager.set_adc(client_id, channel, millivolts)
manager.set_adc_raw(client_id, channel, raw)
await manager.send_serial_bytes(client_id, data, uart_id=0)
manager.set_i2c_response(client_id, addr, byte) # Simular dispositivo I2C
manager.set_spi_response(client_id, byte) # Simular dispositivo SPI
await manager.poll_ledc(client_id) # Leer PWM (llamar periódicamente)
manager.get_status(client_id) # → dict con runtime state
```
---
### 2.5 `simulation.py` — Mensajes WebSocket
**Frontend → Backend (mensajes entrantes):**
| Tipo | Datos | Acción |
|------|-------|--------|
| `start_esp32` | `{board, firmware_b64?}` | Iniciar emulación |
| `stop_esp32` | `{}` | Detener |
| `load_firmware` | `{firmware_b64}` | Hot-reload firmware |
| `esp32_gpio_in` | `{pin, state}` | Drive GPIO input (GPIO real 0-39) |
| `esp32_serial_input` | `{bytes: [int], uart: 0}` | Enviar serial al ESP32 |
| `esp32_uart1_input` | `{bytes: [int]}` | UART1 RX |
| `esp32_uart2_input` | `{bytes: [int]}` | UART2 RX |
| `esp32_adc_set` | `{channel, millivolts?}` o `{channel, raw?}` | Setear ADC |
| `esp32_i2c_response` | `{addr, response}` | Configurar respuesta I2C |
| `esp32_spi_response` | `{response}` | Configurar MISO SPI |
| `esp32_status` | `{}` | Query estado runtime |
---
## 3. Firmware — Requisitos para lcgamboa
### 3.1 Versión de plataforma requerida
**✅ Usar: arduino-esp32 2.x (IDF 4.4.x)**
**❌ No usar: arduino-esp32 3.x (IDF 5.x)**
```bash
# Instalar/cambiar a 2.x:
arduino-cli core install esp32:esp32@2.0.17
```
**Por qué:** El WiFi emulado de lcgamboa (core 1) desactiva la caché SPI flash periódicamente. En IDF 5.x esto provoca un crash cuando las interrupciones del core 0 intentan ejecutar código desde IROM (flash cache). En IDF 4.4.x el comportamiento de la caché es diferente y compatible.
**Mensaje de crash (IDF 5.x):**
```
Guru Meditation Error: Core / panic'ed (Cache error).
Cache disabled but cached memory region accessed
EXCCAUSE: 0x00000007
```
### 3.2 Imagen de flash
La imagen debe ser un archivo binario completo de **4 MB** (formato merged flash):
```bash
# Compilar con DIO flash mode:
arduino-cli compile --fqbn esp32:esp32:esp32:FlashMode=dio \
--output-dir build/ sketch/
# Crear imagen 4MB completa (¡obligatorio! QEMU requiere 2/4/8/16 MB exactos):
esptool --chip esp32 merge_bin \
--fill-flash-size 4MB \ # ← sin esto QEMU falla con "only 2,4,8,16 MB supported"
-o firmware.merged.bin \
--flash_mode dio --flash_size 4MB \
0x1000 bootloader.bin \
0x8000 partitions.bin \
0x10000 app.bin
```
El backend (`arduino_cli.py`) fuerza `FlashMode=dio` automáticamente para todos los targets `esp32:*`.
### 3.3 Sketch compatible con lcgamboa (ejemplo mínimo)
Para sketches que necesiten máxima compatibilidad (sin Arduino framework):
```cpp
// GPIO directo vía registros (evita código en flash en ISRs)
#define GPIO_OUT_W1TS (*((volatile uint32_t*)0x3FF44008))
#define GPIO_OUT_W1TC (*((volatile uint32_t*)0x3FF4400C))
#define GPIO_ENABLE_W1TS (*((volatile uint32_t*)0x3FF44020))
#define LED_BIT (1u << 2) // GPIO2
// Funciones ROM (siempre en IRAM, nunca crashean)
extern "C" {
void ets_delay_us(uint32_t us);
int esp_rom_printf(const char* fmt, ...);
}
// Strings en DRAM (no en flash)
static const char DRAM_ATTR s_on[] = "LED_ON\n";
static const char DRAM_ATTR s_off[] = "LED_OFF\n";
void IRAM_ATTR setup() {
GPIO_ENABLE_W1TS = LED_BIT;
for (int i = 0; i < 5; i++) {
GPIO_OUT_W1TS = LED_BIT;
esp_rom_printf(s_on);
ets_delay_us(300000); // 300 ms
GPIO_OUT_W1TC = LED_BIT;
esp_rom_printf(s_off);
ets_delay_us(300000);
}
}
void IRAM_ATTR loop() { ets_delay_us(1000000); }
```
**Sketches Arduino normales** (con `Serial.print`, `delay`, `digitalWrite`) también funcionan correctamente con IDF 4.4.x.
---
## 4. WiFi emulada
lcgamboa implementa una WiFi simulada con SSIDs hardcoded:
```cpp
// Solo estas redes están disponibles en la emulación:
WiFi.begin("PICSimLabWifi", ""); // sin contraseña
WiFi.begin("Espressif", "");
```
El ESP32 emulado puede:
- Escanear redes (`WiFi.scanNetworks()`) → devuelve las dos SSIDs
- Conectar y obtener IP (`192.168.4.x`)
- Abrir sockets TCP/UDP (via SLIRP — NAT hacia el host)
- Usar `HTTPClient`, `WebServer`, etc.
**Limitaciones:**
- No hay forma de configurar las SSIDs o contraseñas desde Python
- La IP del "router" virtual es `10.0.2.2` (host Windows)
- El ESP32 emulado es accesible en `localhost:PORT` via port forwarding SLIRP
---
## 5. I2C emulado
El callback I2C es **síncrono** — QEMU espera la respuesta antes de continuar:
```python
# Protocolo de eventos I2C (campo `event`):
0x0100 # START + dirección (READ si bit0 de addr=1)
0x0200 # WRITE byte (byte en bits 7:0 del event)
0x0300 # READ request (el callback debe retornar el byte a poner en SDA)
0x0000 # STOP / idle
```
**Simular un sensor I2C** (ej. temperatura):
```python
# Configurar qué byte devuelve el ESP32 cuando lee la dirección 0x48:
esp_lib_manager.set_i2c_response(client_id, addr=0x48, response_byte=75)
# → analogRead equivalente: el firmware leerá 75 de ese registro
```
Desde WebSocket:
```json
{"type": "esp32_i2c_response", "data": {"addr": 72, "response": 75}}
```
---
## 6. RMT / NeoPixel (WS2812)
El evento RMT lleva un item de 32 bits codificado así:
```
bit31: level0 | bits[30:16]: duration0 | bit15: level1 | bits[14:0]: duration1
```
El `_RmtDecoder` acumula bits y decodifica frames WS2812 (24 bits por LED en orden GRB):
```python
# Threshold de bit: pulso alto > 48 ticks (a 80 MHz APB = ~600 ns) → bit 1
_WS2812_HIGH_THRESHOLD = 48
# Bit 1: high ~64 ticks (800 ns), low ~36 ticks (450 ns)
# Bit 0: high ~32 ticks (400 ns), low ~68 ticks (850 ns)
```
El evento emitido al frontend:
```json
{
"type": "ws2812_update",
"data": {
"channel": 0,
"pixels": [
{"r": 255, "g": 0, "b": 0},
{"r": 0, "g": 255, "b": 0}
]
}
}
```
---
## 7. LEDC / PWM
`qemu_picsimlab_get_internals(0)` retorna un puntero a un array de 16 `uint32_t` con el duty cycle de cada canal LEDC. Llamar periódicamente (cada ~50 ms):
```python
await esp_lib_manager.poll_ledc(client_id)
# Emite: {"type": "ledc_update", "data": {"channel": 0, "duty": 4096, "duty_pct": 50.0}}
```
El duty máximo típico es 8192 (timer de 13 bits). Para brillo de LED: `duty_pct / 100`.
---
## 8. Compilación de la DLL
### 8.1 Requisitos
- **MSYS2** instalado en `C:\msys64`
- Paquetes MINGW64: `gcc glib2 libgcrypt libslirp pixman ninja meson python git`
```bash
pacman -S mingw-w64-x86_64-{gcc,glib2,libgcrypt,libslirp,pixman,ninja,meson,python,git}
```
### 8.2 Proceso de build
```bash
# 1. Configurar (en MSYS2 MINGW64):
cd wokwi-libs/qemu-lcgamboa
./configure \
--target-list=xtensa-softmmu \
--disable-werror --enable-tcg \
--enable-gcrypt --enable-slirp \
--enable-iconv --without-default-features
# 2. Compilar el binario principal:
ninja -j$(nproc) qemu-system-xtensa.exe
# 3. Relinkar como DLL (script automatizado):
bash build_qemu_step4.sh
# → genera libqemu-xtensa.dll en build/
# → la copia a backend/app/services/
```
### 8.3 Detalle del relink como DLL
El proceso extrae el comando de link de `build.ninja`, elimina `softmmu_main.c.obj` (que contiene `main()`), y agrega flags de DLL:
```bash
cc -m64 -mcx16 -shared \
-Wl,--export-all-symbols \
-Wl,--allow-multiple-definition \
-o libqemu-xtensa.dll \
@dll_link.rsp # todos los .obj excepto softmmu_main
```
### 8.4 Verificar exports
```bash
objdump -p libqemu-xtensa.dll | grep -i "qemu_picsimlab\|qemu_init\|qemu_main"
# Debe mostrar:
# qemu_init
# qemu_main_loop
# qemu_cleanup
# qemu_picsimlab_register_callbacks
# qemu_picsimlab_set_pin
# qemu_picsimlab_set_apin
# qemu_picsimlab_uart_receive
# qemu_picsimlab_get_internals
# qemu_picsimlab_get_TIOCM
```
### 8.5 Parche requerido en scripts/symlink-install-tree.py
Windows no permite crear symlinks sin privilegios de administrador. El script de QEMU falla con `WinError 1314`. Parche aplicado:
```python
# En scripts/symlink-install-tree.py, dentro del loop de symlinks:
if os.name == 'nt':
if not os.path.exists(source):
continue
import shutil
try:
shutil.copy2(source, bundle_dest)
except Exception as copy_err:
print(f'error copying {source}: {copy_err}', file=sys.stderr)
continue
```
---
## 9. Tests
Archivo: `test/esp32/test_esp32_lib_bridge.py`
```bash
# Ejecutar todos los tests:
backend/venv/Scripts/python.exe -m pytest test/esp32/test_esp32_lib_bridge.py -v
# Resultado esperado: 28 passed en ~13 segundos
```
**Grupos de tests:**
| Grupo | Tests | Qué verifica |
|-------|-------|--------------|
| `TestDllExists` | 5 | Rutas de DLL, ROM binaries, MinGW64 |
| `TestDllLoads` | 3 | Carga de DLL, symbols exportados |
| `TestPinmap` | 3 | Estructura del pinmap, GPIO2 en slot 3 |
| `TestManagerAvailability` | 2 | `is_available()`, API surface |
| `TestEsp32LibIntegration` | 15 | QEMU real con firmware blink: boot, UART, GPIO, ADC, SPI, I2C |
**Firmware de test:** `test/esp32-emulator/binaries_lcgamboa/blink_lcgamboa.ino.merged.bin`
Compilado con arduino-esp32 2.0.17, DIO flash mode, imagen 4MB completa.
---
## 10. Limitaciones conocidas (no solucionables sin modificar QEMU)
| Limitación | Causa | Workaround |
|------------|-------|------------|
| **Una sola instancia ESP32 por proceso** | QEMU usa estado global en variables estáticas | Lanzar múltiples procesos Python |
| **WiFi solo con SSIDs hardcoded** | lcgamboa codifica "PICSimLabWifi" y "Espressif" en C | Modificar y recompilar la DLL |
| **Sin BLE / Bluetooth Classic** | No implementado en lcgamboa | No disponible |
| **Sin touch capacitivo** | `touchRead()` no tiene callback en picsimlab | No disponible |
| **Sin DAC** | GPIO25/GPIO26 analógico no expuesto por picsimlab | No disponible |
| **Flash fija en 4MB** | Hardcoded en la machine esp32-picsimlab | Recompilar DLL |
| **arduino-esp32 3.x causa crash** | IDF 5.x maneja caché diferente al WiFi emulado | Usar 2.x (IDF 4.4.x) |
---
## 11. Pendiente en el frontend
Los eventos son emitidos por el backend pero el frontend aún no los consume:
| Evento | Componente frontend a crear |
|--------|-----------------------------|
| `ws2812_update` | `NeoPixel.tsx` — strip de LEDs RGB |
| `ledc_update` | Modificar `LED.tsx` para brillo variable |
| `gpio_change` | Conectar al `PinManager` del ESP32 (análogo al AVR) |
| `gpio_dir` | Mostrar dirección de pin en el inspector |
| `i2c_event` | Sensores I2C simulados (SSD1306, BME280, etc.) |
| `spi_event` | Displays SPI (ILI9341 ya implementado para AVR) |
| `system: crash` | Notificación en la UI + botón de restart |
| `system: reboot` | Indicador de reinicio en el canvas |
---
## 12. Variables de entorno
| Variable | Valor | Efecto |
|----------|-------|--------|
| `QEMU_ESP32_LIB` | ruta a `libqemu-xtensa.dll` | Fuerza ruta de DLL (override auto-detect) |
| `QEMU_ESP32_BINARY` | ruta a `qemu-system-xtensa.exe` | Fallback subprocess (sin DLL) |
Si `QEMU_ESP32_LIB` no está seteado, el sistema busca `libqemu-xtensa.dll` en la misma carpeta que `esp32_lib_bridge.py`.

View File

@ -5,6 +5,8 @@ interface LEDProps {
id?: string; id?: string;
color?: 'red' | 'green' | 'blue' | 'yellow' | 'white' | 'orange'; color?: 'red' | 'green' | 'blue' | 'yellow' | 'white' | 'orange';
value?: boolean; value?: boolean;
/** PWM brightness 0.01.0. When set, overrides value for intermediate brightness. */
brightness?: number;
label?: string; label?: string;
x?: number; x?: number;
y?: number; y?: number;
@ -15,6 +17,7 @@ export const LED = ({
id, id,
color = 'red', color = 'red',
value = false, value = false,
brightness,
label, label,
x = 0, x = 0,
y = 0, y = 0,
@ -24,14 +27,20 @@ export const LED = ({
useEffect(() => { useEffect(() => {
if (ledRef.current) { if (ledRef.current) {
// Set properties directly on DOM element (Web Component API) const el = ledRef.current as any;
(ledRef.current as any).value = value; // If brightness given, use it (wokwi-led supports 0.01.0 via `value` float)
(ledRef.current as any).color = color; if (brightness !== undefined) {
if (label) { el.value = brightness > 0;
(ledRef.current as any).label = label; // wokwi-led doesn't natively support float brightness; we simulate via opacity
(ledRef.current as HTMLElement).style.opacity = String(brightness);
} else {
el.value = value;
(ledRef.current as HTMLElement).style.opacity = '';
} }
el.color = color;
if (label) el.label = label;
} }
}, [value, color, label]); }, [value, brightness, color, label]);
useEffect(() => { useEffect(() => {
if (ledRef.current && onPinClick) { if (ledRef.current && onPinClick) {

View File

@ -0,0 +1,126 @@
/**
* NeoPixel WS2812 LED strip component.
*
* Receives pixel data from the ESP32 backend via `ws2812_update` events.
* Each pixel is an RGB object { r, g, b } (0255 each).
*
* For ESP32 boards, the canvas element must have id="ws2812-{boardId}-{channel}"
* so the store can dispatch CustomEvents to it.
*
* Usage:
* <NeoPixel id="ws2812-esp32-0" count={8} x={100} y={200} />
*
* When no pixels have arrived yet the strip shows dimmed placeholder circles.
*/
import { useEffect, useRef, useState } from 'react';
export interface NeoPixelPixel { r: number; g: number; b: number }
interface NeoPixelProps {
id?: string;
/** Number of LEDs in the strip (used for layout when pixels array is empty) */
count?: number;
x?: number;
y?: number;
/** Layout direction */
direction?: 'horizontal' | 'vertical';
onPinClick?: (pinName: string) => void;
}
const LED_SIZE = 16;
const LED_GAP = 4;
function drawPixels(
canvas: HTMLCanvasElement,
pixels: NeoPixelPixel[],
numLeds: number,
direction: 'horizontal' | 'vertical',
): void {
const ctx = canvas.getContext('2d');
if (!ctx) return;
ctx.clearRect(0, 0, canvas.width, canvas.height);
for (let i = 0; i < numLeds; i++) {
const px = pixels[i] ?? { r: 20, g: 20, b: 20 };
const cx = direction === 'horizontal'
? LED_SIZE / 2 + i * (LED_SIZE + LED_GAP)
: LED_SIZE / 2;
const cy = direction === 'vertical'
? LED_SIZE / 2 + i * (LED_SIZE + LED_GAP)
: LED_SIZE / 2;
const r = LED_SIZE / 2;
const gradient = ctx.createRadialGradient(cx, cy, 1, cx, cy, r);
const color = `rgb(${px.r},${px.g},${px.b})`;
gradient.addColorStop(0, color);
gradient.addColorStop(1, 'rgba(0,0,0,0.6)');
ctx.beginPath();
ctx.arc(cx, cy, r, 0, Math.PI * 2);
ctx.fillStyle = gradient;
ctx.fill();
ctx.strokeStyle = 'rgba(255,255,255,0.2)';
ctx.lineWidth = 1;
ctx.stroke();
}
}
export const NeoPixel = ({
id,
count = 8,
x = 0,
y = 0,
direction = 'horizontal',
onPinClick,
}: NeoPixelProps) => {
const canvasRef = useRef<HTMLCanvasElement>(null);
const [pixels, setPixels] = useState<NeoPixelPixel[]>([]);
const numLeds = Math.max(count, pixels.length);
// Initial draw (placeholder)
useEffect(() => {
if (canvasRef.current) {
drawPixels(canvasRef.current, pixels, numLeds, direction);
}
}, [pixels, numLeds, direction]);
// Listen for ws2812-pixels CustomEvents dispatched by useSimulatorStore
useEffect(() => {
const canvas = canvasRef.current;
if (!canvas || !id) return;
const handler = (e: Event) => {
const detail = (e as CustomEvent<{ pixels: NeoPixelPixel[] }>).detail;
setPixels(detail.pixels);
};
canvas.addEventListener('ws2812-pixels', handler);
return () => canvas.removeEventListener('ws2812-pixels', handler);
}, [id]);
const w = direction === 'horizontal'
? numLeds * (LED_SIZE + LED_GAP) - LED_GAP
: LED_SIZE;
const h = direction === 'vertical'
? numLeds * (LED_SIZE + LED_GAP) - LED_GAP
: LED_SIZE;
return (
<canvas
id={id}
ref={canvasRef}
width={w}
height={h}
style={{
position: 'absolute',
left: `${x}px`,
top: `${y}px`,
cursor: onPinClick ? 'pointer' : 'default',
borderRadius: '4px',
background: '#111',
}}
onClick={() => onPinClick?.('DIN')}
title="WS2812 NeoPixel Strip"
/>
);
};

View File

@ -50,6 +50,10 @@ export const SimulatorCanvas = () => {
const oscilloscopeOpen = useOscilloscopeStore((s) => s.open); const oscilloscopeOpen = useOscilloscopeStore((s) => s.open);
const toggleOscilloscope = useOscilloscopeStore((s) => s.toggleOscilloscope); const toggleOscilloscope = useOscilloscopeStore((s) => s.toggleOscilloscope);
// ESP32 crash notification
const esp32CrashBoardId = useSimulatorStore((s) => s.esp32CrashBoardId);
const dismissEsp32Crash = useSimulatorStore((s) => s.dismissEsp32Crash);
// Board picker modal // Board picker modal
const [showBoardPicker, setShowBoardPicker] = useState(false); const [showBoardPicker, setShowBoardPicker] = useState(false);
@ -779,6 +783,27 @@ export const SimulatorCanvas = () => {
return ( return (
<div className="simulator-canvas-container"> <div className="simulator-canvas-container">
{/* ESP32 crash notification */}
{esp32CrashBoardId && (
<div style={{
position: 'absolute', top: 8, left: '50%', transform: 'translateX(-50%)',
zIndex: 1000, background: '#c0392b', color: '#fff',
padding: '8px 16px', borderRadius: 6, display: 'flex', alignItems: 'center',
gap: 12, fontSize: 13, boxShadow: '0 2px 8px rgba(0,0,0,0.4)',
}}>
<span>ESP32 crash detected on board <strong>{esp32CrashBoardId}</strong> cache error (IDF incompatibility)</span>
<button
onClick={dismissEsp32Crash}
style={{
background: 'transparent', border: '1px solid rgba(255,255,255,0.6)',
color: '#fff', borderRadius: 4, padding: '2px 8px', cursor: 'pointer',
}}
>
Dismiss
</button>
</div>
)}
{/* Main Canvas */} {/* Main Canvas */}
<div className="simulator-canvas"> <div className="simulator-canvas">
<div className="canvas-header"> <div className="canvas-header">

View File

@ -9,12 +9,20 @@
* { type: 'start_esp32', data: { board: BoardKind, firmware_b64?: string } } * { type: 'start_esp32', data: { board: BoardKind, firmware_b64?: string } }
* { type: 'stop_esp32' } * { type: 'stop_esp32' }
* { type: 'load_firmware', data: { firmware_b64: string } } * { type: 'load_firmware', data: { firmware_b64: string } }
* { type: 'esp32_serial_input', data: { bytes: number[] } } * { type: 'esp32_serial_input', data: { bytes: number[], uart?: number } }
* { type: 'esp32_gpio_in', data: { pin: number, state: 0 | 1 } } * { type: 'esp32_gpio_in', data: { pin: number, state: 0 | 1 } }
* { type: 'esp32_adc_set', data: { channel: number, millivolts: number } }
* { type: 'esp32_i2c_response', data: { addr: number, response: number } }
* { type: 'esp32_spi_response', data: { response: number } }
* *
* Backend Frontend * Backend Frontend
* { type: 'serial_output', data: { data: string } } * { type: 'serial_output', data: { data: string, uart?: number } }
* { type: 'gpio_change', data: { pin: number, state: 0 | 1 } } * { type: 'gpio_change', data: { pin: number, state: 0 | 1 } }
* { type: 'gpio_dir', data: { pin: number, dir: 0 | 1 } }
* { type: 'ledc_update', data: { channel: number, duty: number, duty_pct: number } }
* { type: 'ws2812_update', data: { channel: number, pixels: [number, number, number][] } }
* { type: 'i2c_event', data: { addr: number, data: number } }
* { type: 'spi_event', data: { data: number } }
* { type: 'system', data: { event: string, ... } } * { type: 'system', data: { event: string, ... } }
* { type: 'error', data: { message: string } } * { type: 'error', data: { message: string } }
*/ */
@ -24,17 +32,26 @@ import type { BoardKind } from '../types/board';
const API_BASE = (): string => const API_BASE = (): string =>
(import.meta.env.VITE_API_BASE as string | undefined) ?? 'http://localhost:8001/api'; (import.meta.env.VITE_API_BASE as string | undefined) ?? 'http://localhost:8001/api';
export interface Ws2812Pixel { r: number; g: number; b: number }
export interface LedcUpdate { channel: number; duty: number; duty_pct: number }
export class Esp32Bridge { export class Esp32Bridge {
readonly boardId: string; readonly boardId: string;
readonly boardKind: BoardKind; readonly boardKind: BoardKind;
// Callbacks wired up by useSimulatorStore // Callbacks wired up by useSimulatorStore
onSerialData: ((char: string) => void) | null = null; onSerialData: ((char: string, uart?: number) => void) | null = null;
onPinChange: ((gpioPin: number, state: boolean) => void) | null = null; onPinChange: ((gpioPin: number, state: boolean) => void) | null = null;
onConnected: (() => void) | null = null; onPinDir: ((gpioPin: number, dir: 0 | 1) => void) | null = null;
onDisconnected: (() => void) | null = null; onLedcUpdate: ((update: LedcUpdate) => void) | null = null;
onError: ((msg: string) => void) | null = null; onWs2812Update: ((channel: number, pixels: Ws2812Pixel[]) => void) | null = null;
onSystemEvent: ((event: string, data: Record<string, unknown>) => void) | null = null; onI2cEvent: ((addr: number, data: number) => void) | null = null;
onSpiEvent: ((data: number) => void) | null = null;
onConnected: (() => void) | null = null;
onDisconnected: (() => void) | null = null;
onError: ((msg: string) => void) | null = null;
onSystemEvent: ((event: string, data: Record<string, unknown>) => void) | null = null;
onCrash: ((data: Record<string, unknown>) => void) | null = null;
private socket: WebSocket | null = null; private socket: WebSocket | null = null;
private _connected = false; private _connected = false;
@ -63,7 +80,6 @@ export class Esp32Bridge {
socket.onopen = () => { socket.onopen = () => {
this._connected = true; this._connected = true;
this.onConnected?.(); this.onConnected?.();
// Boot the ESP32 via QEMU, optionally with pre-loaded firmware
this._send({ this._send({
type: 'start_esp32', type: 'start_esp32',
data: { data: {
@ -84,8 +100,9 @@ export class Esp32Bridge {
switch (msg.type) { switch (msg.type) {
case 'serial_output': { case 'serial_output': {
const text = (msg.data.data as string) ?? ''; const text = (msg.data.data as string) ?? '';
const uart = msg.data.uart as number | undefined;
if (this.onSerialData) { if (this.onSerialData) {
for (const ch of text) this.onSerialData(ch); for (const ch of text) this.onSerialData(ch, uart);
} }
break; break;
} }
@ -95,9 +112,42 @@ export class Esp32Bridge {
this.onPinChange?.(pin, state); this.onPinChange?.(pin, state);
break; break;
} }
case 'system': case 'gpio_dir': {
this.onSystemEvent?.(msg.data.event as string, msg.data); const pin = msg.data.pin as number;
const dir = msg.data.dir as 0 | 1;
this.onPinDir?.(pin, dir);
break; break;
}
case 'ledc_update': {
this.onLedcUpdate?.(msg.data as unknown as LedcUpdate);
break;
}
case 'ws2812_update': {
const channel = msg.data.channel as number;
const raw = msg.data.pixels as [number, number, number][];
const pixels: Ws2812Pixel[] = raw.map(([r, g, b]) => ({ r, g, b }));
this.onWs2812Update?.(channel, pixels);
break;
}
case 'i2c_event': {
const addr = msg.data.addr as number;
const data = msg.data.data as number;
this.onI2cEvent?.(addr, data);
break;
}
case 'spi_event': {
const data = msg.data.data as number;
this.onSpiEvent?.(data);
break;
}
case 'system': {
const evt = msg.data.event as string;
if (evt === 'crash') {
this.onCrash?.(msg.data);
}
this.onSystemEvent?.(evt, msg.data);
break;
}
case 'error': case 'error':
this.onError?.(msg.data.message as string); this.onError?.(msg.data.message as string);
break; break;
@ -135,15 +185,15 @@ export class Esp32Bridge {
} }
} }
/** Send a byte to the ESP32 UART0 */ /** Send a byte to the ESP32 UART0 (or UART1/2) */
sendSerialByte(byte: number): void { sendSerialByte(byte: number, uart = 0): void {
this._send({ type: 'esp32_serial_input', data: { bytes: [byte] } }); this._send({ type: 'esp32_serial_input', data: { bytes: [byte], uart } });
} }
/** Send multiple bytes at once */ /** Send multiple bytes at once */
sendSerialBytes(bytes: number[]): void { sendSerialBytes(bytes: number[], uart = 0): void {
if (bytes.length === 0) return; if (bytes.length === 0) return;
this._send({ type: 'esp32_serial_input', data: { bytes } }); this._send({ type: 'esp32_serial_input', data: { bytes, uart } });
} }
/** Drive a GPIO pin from an external source (e.g. connected Arduino) */ /** Drive a GPIO pin from an external source (e.g. connected Arduino) */
@ -151,6 +201,21 @@ export class Esp32Bridge {
this._send({ type: 'esp32_gpio_in', data: { pin: gpioPin, state: state ? 1 : 0 } }); this._send({ type: 'esp32_gpio_in', data: { pin: gpioPin, state: state ? 1 : 0 } });
} }
/** Set an ADC channel voltage (millivolts, 03300) */
setAdc(channel: number, millivolts: number): void {
this._send({ type: 'esp32_adc_set', data: { channel, millivolts } });
}
/** Configure the byte an I2C device at addr returns */
setI2cResponse(addr: number, response: number): void {
this._send({ type: 'esp32_i2c_response', data: { addr, response } });
}
/** Configure the MISO byte returned during an SPI transaction */
setSpiResponse(response: number): void {
this._send({ type: 'esp32_spi_response', data: { response } });
}
private _send(payload: unknown): void { private _send(payload: unknown): void {
if (this.socket && this.socket.readyState === WebSocket.OPEN) { if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify(payload)); this.socket.send(JSON.stringify(payload));

View File

@ -105,6 +105,10 @@ interface SimulatorState {
disconnectRemoteSimulator: () => void; disconnectRemoteSimulator: () => void;
sendRemotePinEvent: (pin: string, state: number) => void; sendRemotePinEvent: (pin: string, state: number) => void;
// ── ESP32 crash notification ─────────────────────────────────────────────
esp32CrashBoardId: string | null;
dismissEsp32Crash: () => void;
// ── Components ────────────────────────────────────────────────────────── // ── Components ──────────────────────────────────────────────────────────
components: Component[]; components: Component[];
addComponent: (component: Component) => void; addComponent: (component: Component) => void;
@ -251,8 +255,31 @@ export const useSimulatorStore = create<SimulatorState>((set, get) => {
} else if (isEsp32Kind(boardKind)) { } else if (isEsp32Kind(boardKind)) {
const bridge = new Esp32Bridge(id, boardKind); const bridge = new Esp32Bridge(id, boardKind);
bridge.onSerialData = serialCallback; bridge.onSerialData = serialCallback;
bridge.onPinChange = (_gpioPin, _state) => { bridge.onPinChange = (gpioPin, state) => {
// Cross-board routing handled in SimulatorCanvas const boardPm = pinManagerMap.get(id);
if (boardPm) boardPm.triggerPinChange(gpioPin, state);
};
bridge.onCrash = () => {
set({ esp32CrashBoardId: id });
};
bridge.onLedcUpdate = (update) => {
// Route LEDC duty cycles to PinManager as PWM.
// LEDC channel N drives a GPIO; the mapping is firmware-defined.
const boardPm = pinManagerMap.get(id);
if (boardPm && typeof boardPm.updatePwm === 'function') {
boardPm.updatePwm(update.channel, update.duty_pct);
}
};
bridge.onWs2812Update = (channel, pixels) => {
// Forward WS2812 pixel data to any DOM element with id=`ws2812-{id}-{channel}`
// (set by NeoPixel components rendered in SimulatorCanvas).
// We fire a custom event that NeoPixel components can listen to.
const eventTarget = document.getElementById(`ws2812-${id}-${channel}`);
if (eventTarget) {
eventTarget.dispatchEvent(
new CustomEvent('ws2812-pixels', { detail: { pixels } })
);
}
}; };
esp32BridgeMap.set(id, bridge); esp32BridgeMap.set(id, bridge);
} else { } else {
@ -487,6 +514,9 @@ export const useSimulatorStore = create<SimulatorState>((set, get) => {
remoteConnected: false, remoteConnected: false,
remoteSocket: null, remoteSocket: null,
esp32CrashBoardId: null,
dismissEsp32Crash: () => set({ esp32CrashBoardId: null }),
setBoardType: (type: BoardType) => { setBoardType: (type: BoardType) => {
const { activeBoardId, running, stopSimulation } = get(); const { activeBoardId, running, stopSimulation } = get();
if (running) stopSimulation(); if (running) stopSimulation();

78
test_dll_minimal.py Normal file
View File

@ -0,0 +1,78 @@
"""Minimal test: start lcgamboa QEMU DLL with blink firmware and collect output."""
import ctypes, os, sys, threading, time, pathlib
MINGW = r"C:\msys64\mingw64\bin"
DLL = r"E:\Hardware\wokwi_clon\backend\app\services\libqemu-xtensa.dll"
FW = r"E:\Hardware\wokwi_clon\test\esp32-emulator\binaries\esp32_blink.ino.merged.bin"
sys.path.insert(0, r"E:\Hardware\wokwi_clon\backend")
from app.services.esp32_lib_bridge import (
_WRITE_PIN, _DIR_PIN, _I2C_EVENT, _SPI_EVENT, _UART_TX, _RMT_EVENT,
_CallbacksT, _PINMAP,
)
print("=== Loading DLL ===")
os.add_dll_directory(MINGW)
lib = ctypes.CDLL(DLL)
print(" OK")
uart_buf = bytearray()
gpio_events = []
def on_write_pin(pin, value):
gpio_events.append((pin, value))
print(f" [GPIO] pin={pin} value={value}")
def on_dir_pin(pin, dir_):
pass
def on_uart_tx(uart_id, byte_val):
uart_buf.append(byte_val)
if byte_val == ord('\n'):
line = uart_buf.decode("utf-8", errors="replace").rstrip()
uart_buf.clear()
print(f" [UART] {line}")
cb_write = _WRITE_PIN(on_write_pin)
cb_dir = _DIR_PIN(on_dir_pin)
cb_i2c = _I2C_EVENT(lambda *a: 0)
cb_spi = _SPI_EVENT(lambda *a: 0)
cb_uart = _UART_TX(on_uart_tx)
cb_rmt = _RMT_EVENT(lambda *a: None)
cbs = _CallbacksT(
picsimlab_write_pin = cb_write,
picsimlab_dir_pin = cb_dir,
picsimlab_i2c_event = cb_i2c,
picsimlab_spi_event = cb_spi,
picsimlab_uart_tx_event = cb_uart,
pinmap = ctypes.cast(_PINMAP, ctypes.c_void_p).value,
picsimlab_rmt_event = cb_rmt,
)
_keep_alive = (cbs, cb_write, cb_dir, cb_i2c, cb_spi, cb_uart, cb_rmt)
print("=== Registering callbacks ===")
lib.qemu_picsimlab_register_callbacks(ctypes.byref(cbs))
fw_bytes = FW.encode()
args = [b"qemu", b"-M", b"esp32-picsimlab", b"-nographic",
b"-drive", b"file=" + fw_bytes + b",if=mtd,format=raw"]
argc = len(args)
argv = (ctypes.c_char_p * argc)(*args)
print("=== Calling qemu_init ===")
lib.qemu_init(argc, argv, None)
print("=== qemu_init returned, starting main_loop thread ===")
t = threading.Thread(target=lib.qemu_main_loop, daemon=True, name="qemu-test")
t.start()
print("=== Waiting 20s for output ===")
time.sleep(20)
print(f"\n=== Results ===")
print(f"UART bytes: {len(uart_buf)} buffered, output so far:")
print(f"GPIO events: {len(gpio_events)}")
for ev in gpio_events[:20]:
print(f" pin={ev[0]} value={ev[1]}")
print(f"Thread alive: {t.is_alive()}")