179 lines
7.9 KiB
Python
179 lines
7.9 KiB
Python
import json
|
|
import logging
|
|
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
|
|
from app.services.qemu_manager import qemu_manager
|
|
from app.services.esp_qemu_manager import esp_qemu_manager
|
|
from app.services.esp32_lib_manager import esp_lib_manager
|
|
|
|
router = APIRouter()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ConnectionManager:
|
|
def __init__(self):
|
|
self.active_connections: dict[str, WebSocket] = {}
|
|
|
|
async def connect(self, websocket: WebSocket, client_id: str):
|
|
await websocket.accept()
|
|
self.active_connections[client_id] = websocket
|
|
|
|
def disconnect(self, client_id: str):
|
|
self.active_connections.pop(client_id, None)
|
|
|
|
async def send(self, client_id: str, message: str):
|
|
ws = self.active_connections.get(client_id)
|
|
if ws:
|
|
await ws.send_text(message)
|
|
|
|
|
|
manager = ConnectionManager()
|
|
|
|
|
|
@router.websocket('/ws/{client_id}')
|
|
async def simulation_websocket(websocket: WebSocket, client_id: str):
|
|
await manager.connect(websocket, client_id)
|
|
|
|
async def qemu_callback(event_type: str, data: dict) -> None:
|
|
payload = json.dumps({'type': event_type, 'data': data})
|
|
await manager.send(client_id, payload)
|
|
|
|
def _use_lib() -> bool:
|
|
return esp_lib_manager.is_available()
|
|
|
|
try:
|
|
while True:
|
|
raw = await websocket.receive_text()
|
|
message = json.loads(raw)
|
|
msg_type: str = message.get('type', '')
|
|
msg_data: dict = message.get('data', {})
|
|
|
|
# ── Raspberry Pi ─────────────────────────────────────────────
|
|
if msg_type == 'start_pi':
|
|
board = msg_data.get('board', 'raspberry-pi-3')
|
|
qemu_manager.start_instance(client_id, board, qemu_callback)
|
|
|
|
elif msg_type == 'stop_pi':
|
|
qemu_manager.stop_instance(client_id)
|
|
|
|
elif msg_type == 'serial_input':
|
|
raw_bytes: list[int] = msg_data.get('bytes', [])
|
|
if raw_bytes:
|
|
await qemu_manager.send_serial_bytes(client_id, bytes(raw_bytes))
|
|
|
|
elif msg_type in ('gpio_in', 'pin_change'):
|
|
pin = msg_data.get('pin', 0)
|
|
state = msg_data.get('state', 0)
|
|
qemu_manager.set_pin_state(client_id, pin, state)
|
|
|
|
# ── ESP32 lifecycle ──────────────────────────────────────────
|
|
elif msg_type == 'start_esp32':
|
|
board = msg_data.get('board', 'esp32')
|
|
firmware_b64 = msg_data.get('firmware_b64')
|
|
if _use_lib():
|
|
esp_lib_manager.start_instance(client_id, board, qemu_callback, firmware_b64)
|
|
else:
|
|
esp_qemu_manager.start_instance(client_id, board, qemu_callback, firmware_b64)
|
|
|
|
elif msg_type == 'stop_esp32':
|
|
esp_lib_manager.stop_instance(client_id)
|
|
esp_qemu_manager.stop_instance(client_id)
|
|
|
|
elif msg_type == 'load_firmware':
|
|
firmware_b64 = msg_data.get('firmware_b64', '')
|
|
if firmware_b64:
|
|
if _use_lib():
|
|
esp_lib_manager.load_firmware(client_id, firmware_b64)
|
|
else:
|
|
esp_qemu_manager.load_firmware(client_id, firmware_b64)
|
|
|
|
# ── ESP32 serial (UART 0 / 1 / 2) ───────────────────────────
|
|
elif msg_type == 'esp32_serial_input':
|
|
raw_bytes = msg_data.get('bytes', [])
|
|
uart_id = int(msg_data.get('uart', 0))
|
|
if raw_bytes:
|
|
if _use_lib():
|
|
await esp_lib_manager.send_serial_bytes(
|
|
client_id, bytes(raw_bytes), uart_id
|
|
)
|
|
else:
|
|
await esp_qemu_manager.send_serial_bytes(
|
|
client_id, bytes(raw_bytes)
|
|
)
|
|
|
|
# ── ESP32 GPIO input (from connected component / button) ──────
|
|
elif msg_type == 'esp32_gpio_in':
|
|
pin = msg_data.get('pin', 0)
|
|
state = msg_data.get('state', 0)
|
|
if _use_lib():
|
|
esp_lib_manager.set_pin_state(client_id, pin, state)
|
|
else:
|
|
esp_qemu_manager.set_pin_state(client_id, pin, state)
|
|
|
|
# ── ESP32 ADC (analog input from potentiometer, sensor, etc.) ─
|
|
elif msg_type == 'esp32_adc_set':
|
|
# Frontend sends {channel: int, millivolts: int}
|
|
# or {channel: int, raw: int} for direct 12-bit value
|
|
channel = int(msg_data.get('channel', 0))
|
|
if 'millivolts' in msg_data:
|
|
if _use_lib():
|
|
esp_lib_manager.set_adc(
|
|
client_id, channel, int(msg_data['millivolts'])
|
|
)
|
|
elif 'raw' in msg_data:
|
|
if _use_lib():
|
|
esp_lib_manager.set_adc_raw(
|
|
client_id, channel, int(msg_data['raw'])
|
|
)
|
|
|
|
# ── ESP32 I2C device simulation ───────────────────────────────
|
|
elif msg_type == 'esp32_i2c_response':
|
|
# Frontend configures what an I2C device at addr returns
|
|
# {addr: int, response: int}
|
|
addr = int(msg_data.get('addr', 0))
|
|
resp = int(msg_data.get('response', 0))
|
|
if _use_lib():
|
|
esp_lib_manager.set_i2c_response(client_id, addr, resp)
|
|
|
|
# ── ESP32 SPI device simulation ───────────────────────────────
|
|
elif msg_type == 'esp32_spi_response':
|
|
# {response: int} — byte to return as MISO
|
|
resp = int(msg_data.get('response', 0xFF))
|
|
if _use_lib():
|
|
esp_lib_manager.set_spi_response(client_id, resp)
|
|
|
|
# ── ESP32 UART 1 / 2 input ────────────────────────────────────
|
|
elif msg_type == 'esp32_uart1_input':
|
|
raw_bytes = msg_data.get('bytes', [])
|
|
if raw_bytes and _use_lib():
|
|
await esp_lib_manager.send_serial_bytes(
|
|
client_id, bytes(raw_bytes), uart_id=1
|
|
)
|
|
|
|
elif msg_type == 'esp32_uart2_input':
|
|
raw_bytes = msg_data.get('bytes', [])
|
|
if raw_bytes and _use_lib():
|
|
await esp_lib_manager.send_serial_bytes(
|
|
client_id, bytes(raw_bytes), uart_id=2
|
|
)
|
|
|
|
# ── ESP32 status query ────────────────────────────────────────
|
|
elif msg_type == 'esp32_status':
|
|
if _use_lib():
|
|
status = esp_lib_manager.get_status(client_id)
|
|
await manager.send(
|
|
client_id,
|
|
json.dumps({'type': 'esp32_status', 'data': status})
|
|
)
|
|
|
|
except WebSocketDisconnect:
|
|
manager.disconnect(client_id)
|
|
qemu_manager.stop_instance(client_id)
|
|
esp_lib_manager.stop_instance(client_id)
|
|
esp_qemu_manager.stop_instance(client_id)
|
|
except Exception as exc:
|
|
logger.error('WebSocket error for %s: %s', client_id, exc)
|
|
manager.disconnect(client_id)
|
|
qemu_manager.stop_instance(client_id)
|
|
esp_lib_manager.stop_instance(client_id)
|
|
esp_qemu_manager.stop_instance(client_id)
|