""" EspQemuManager — backend service for ESP32/ESP32-S3/ESP32-C3 emulation via QEMU. Architecture ------------ Each ESP32 instance gets: - qemu-system-xtensa (ESP32/ESP32-S3) or qemu-system-riscv32 (ESP32-C3) process - UART0 → TCP socket on a dynamic port → user serial I/O - GPIO chardev → TCP socket on a dynamic port → GPIO pin protocol - Firmware loaded as a flash image (-drive if=mtd) GPIO protocol (chardev socket) ------------------------------- QEMU → backend : "GPIO <0|1>\\n" backend → QEMU : "SET <0|1>\\n" Board types and QEMU machines ------------------------------ 'esp32' → qemu-system-xtensa -M esp32 'esp32-s3' → qemu-system-xtensa -M esp32s3 'esp32-c3' → qemu-system-riscv32 -M esp32c3 """ import asyncio import base64 import logging import os import socket import tempfile from typing import Callable, Awaitable logger = logging.getLogger(__name__) # ── QEMU binary paths (configurable via env) ────────────────────────────────── QEMU_XTENSA = os.environ.get('QEMU_ESP32_BINARY', 'qemu-system-xtensa') QEMU_RISCV32 = os.environ.get('QEMU_RISCV32_BINARY', 'qemu-system-riscv32') # ── Machine names per board type ────────────────────────────────────────────── _MACHINE: dict[str, tuple[str, str]] = { 'esp32': (QEMU_XTENSA, 'esp32'), 'esp32-s3': (QEMU_XTENSA, 'esp32s3'), 'esp32-c3': (QEMU_RISCV32, 'esp32c3'), } EventCallback = Callable[[str, dict], Awaitable[None]] def _find_free_port() -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('127.0.0.1', 0)) return s.getsockname()[1] class EspInstance: """State for one running ESP32 board.""" def __init__(self, client_id: str, board_type: str, callback: EventCallback): self.client_id = client_id self.board_type = board_type # 'esp32' | 'esp32-s3' | 'esp32-c3' self.callback = callback # Runtime state self.process: asyncio.subprocess.Process | None = None self.firmware_path: str | None = None # temp file, deleted on stop self.serial_port: int = 0 # UART0 TCP port self.gpio_port: int = 0 # GPIO chardev TCP port self._serial_writer: asyncio.StreamWriter | None = None self._gpio_writer: asyncio.StreamWriter | None = None self._tasks: list[asyncio.Task] = [] self.running: bool = False async def emit(self, event_type: str, data: dict) -> None: try: await self.callback(event_type, data) except Exception as e: logger.error('emit(%s): %s', event_type, e) class EspQemuManager: def __init__(self): self._instances: dict[str, EspInstance] = {} # ── Public API ───────────────────────────────────────────────────────────── def start_instance(self, client_id: str, board_type: str, callback: EventCallback, firmware_b64: str | None = None) -> None: if client_id in self._instances: logger.warning('start_instance: %s already running', client_id) return if board_type not in _MACHINE: logger.error('Unknown ESP32 board type: %s', board_type) return inst = EspInstance(client_id, board_type, callback) self._instances[client_id] = inst asyncio.create_task(self._boot(inst, firmware_b64)) def stop_instance(self, client_id: str) -> None: inst = self._instances.pop(client_id, None) if inst: asyncio.create_task(self._shutdown(inst)) def load_firmware(self, client_id: str, firmware_b64: str) -> None: """Hot-reload firmware into a running instance (stop + restart).""" inst = self._instances.get(client_id) if not inst: logger.warning('load_firmware: no instance %s', client_id) return board_type = inst.board_type callback = inst.callback self.stop_instance(client_id) # Re-start with new firmware after brief delay for cleanup async def _restart() -> None: await asyncio.sleep(0.5) self.start_instance(client_id, board_type, callback, firmware_b64) asyncio.create_task(_restart()) def set_pin_state(self, client_id: str, pin: int | str, state: int) -> None: """Drive a GPIO pin from outside (e.g. connected Arduino output).""" inst = self._instances.get(client_id) if inst and inst._gpio_writer: asyncio.create_task(self._send_gpio(inst, int(pin), bool(state))) async def send_serial_bytes(self, client_id: str, data: bytes) -> None: inst = self._instances.get(client_id) if inst and inst._serial_writer: inst._serial_writer.write(data) try: await inst._serial_writer.drain() except Exception as e: logger.warning('send_serial_bytes drain: %s', e) # ── Boot sequence ────────────────────────────────────────────────────────── async def _boot(self, inst: EspInstance, firmware_b64: str | None) -> None: # Write firmware to temp file if provided firmware_path: str | None = None if firmware_b64: try: firmware_bytes = base64.b64decode(firmware_b64) tmp = tempfile.NamedTemporaryFile(suffix='.bin', delete=False) tmp.write(firmware_bytes) tmp.close() firmware_path = tmp.name inst.firmware_path = firmware_path except Exception as e: await inst.emit('error', {'message': f'Failed to decode firmware: {e}'}) self._instances.pop(inst.client_id, None) return qemu_bin, machine = _MACHINE[inst.board_type] # Allocate TCP port for UART0 serial inst.serial_port = _find_free_port() # Build QEMU command # Note: Espressif QEMU v9.x uses server=on,wait=off syntax # GPIO chardev (lcgamboa fork) is not available in the Espressif pre-built binary; # serial I/O via TCP is fully functional. cmd = [ qemu_bin, '-nographic', '-machine', machine, # UART0 → TCP (serial I/O) '-serial', f'tcp:127.0.0.1:{inst.serial_port},server=on,wait=off', ] if firmware_path: cmd += ['-drive', f'file={firmware_path},if=mtd,format=raw'] logger.info('Launching ESP32 QEMU for %s: %s', inst.client_id, ' '.join(cmd)) try: inst.process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.DEVNULL, ) except FileNotFoundError: await inst.emit('error', {'message': f'{qemu_bin} not found in PATH'}) self._instances.pop(inst.client_id, None) return inst.running = True await inst.emit('system', {'event': 'booting'}) # Give QEMU a moment to open its TCP socket await asyncio.sleep(1.0) inst._tasks.append(asyncio.create_task(self._connect_serial(inst))) inst._tasks.append(asyncio.create_task(self._watch_stderr(inst))) # ── Serial (UART0) ───────────────────────────────────────────────────────── async def _connect_serial(self, inst: EspInstance) -> None: for attempt in range(10): try: reader, writer = await asyncio.open_connection('127.0.0.1', inst.serial_port) inst._serial_writer = writer logger.info('%s: serial connected on port %d', inst.client_id, inst.serial_port) await inst.emit('system', {'event': 'booted'}) await self._read_serial(inst, reader) return except (ConnectionRefusedError, OSError): await asyncio.sleep(1.0 * (attempt + 1)) await inst.emit('error', {'message': 'Could not connect to QEMU serial port'}) async def _read_serial(self, inst: EspInstance, reader: asyncio.StreamReader) -> None: buf = bytearray() while inst.running: try: chunk = await asyncio.wait_for(reader.read(256), timeout=0.1) if not chunk: break buf.extend(chunk) text = buf.decode('utf-8', errors='replace') buf.clear() await inst.emit('serial_output', {'data': text}) except asyncio.TimeoutError: continue except Exception as e: logger.warning('%s serial read: %s', inst.client_id, e) break # ── GPIO chardev ─────────────────────────────────────────────────────────── async def _connect_gpio(self, inst: EspInstance) -> None: for attempt in range(10): try: reader, writer = await asyncio.open_connection('127.0.0.1', inst.gpio_port) inst._gpio_writer = writer logger.info('%s: GPIO chardev connected on port %d', inst.client_id, inst.gpio_port) await self._read_gpio(inst, reader) return except (ConnectionRefusedError, OSError): await asyncio.sleep(1.0 * (attempt + 1)) logger.warning('%s: GPIO chardev connection failed', inst.client_id) async def _read_gpio(self, inst: EspInstance, reader: asyncio.StreamReader) -> None: """Parse "GPIO \n" lines from the firmware GPIO bridge.""" linebuf = b'' while inst.running: try: chunk = await asyncio.wait_for(reader.read(256), timeout=0.1) if not chunk: break linebuf += chunk while b'\n' in linebuf: line, linebuf = linebuf.split(b'\n', 1) await self._handle_gpio_line(inst, line.decode('ascii', 'ignore').strip()) except asyncio.TimeoutError: continue except Exception as e: logger.warning('%s GPIO read: %s', inst.client_id, e) break async def _handle_gpio_line(self, inst: EspInstance, line: str) -> None: # Expected: "GPIO <0|1>" parts = line.split() if len(parts) == 3 and parts[0] == 'GPIO': try: pin = int(parts[1]) state = int(parts[2]) await inst.emit('gpio_change', {'pin': pin, 'state': state}) except ValueError: pass async def _send_gpio(self, inst: EspInstance, pin: int, state: bool) -> None: if inst._gpio_writer: msg = f'SET {pin} {1 if state else 0}\n'.encode() inst._gpio_writer.write(msg) try: await inst._gpio_writer.drain() except Exception as e: logger.warning('%s GPIO send: %s', inst.client_id, e) # ── QEMU stderr watcher ──────────────────────────────────────────────────── async def _watch_stderr(self, inst: EspInstance) -> None: if not inst.process or not inst.process.stderr: return try: async for line in inst.process.stderr: text = line.decode('utf-8', errors='replace').rstrip() if text: logger.debug('QEMU[%s] %s', inst.client_id, text) except Exception: pass logger.info('QEMU[%s] process exited', inst.client_id) inst.running = False await inst.emit('system', {'event': 'exited'}) # ── Shutdown ─────────────────────────────────────────────────────────────── async def _shutdown(self, inst: EspInstance) -> None: inst.running = False for task in inst._tasks: task.cancel() inst._tasks.clear() for writer_attr in ('_gpio_writer', '_serial_writer'): writer: asyncio.StreamWriter | None = getattr(inst, writer_attr) if writer: try: writer.close() except Exception: pass setattr(inst, writer_attr, None) if inst.process: try: inst.process.terminate() await asyncio.wait_for(inst.process.wait(), timeout=5.0) except Exception: try: inst.process.kill() except Exception: pass inst.process = None # Delete temp firmware file if inst.firmware_path and os.path.exists(inst.firmware_path): try: os.unlink(inst.firmware_path) except Exception: pass inst.firmware_path = None logger.info('EspInstance %s shut down', inst.client_id) esp_qemu_manager = EspQemuManager()