463 lines
20 KiB
Python
463 lines
20 KiB
Python
"""
|
|
Multi-board integration tests — backend side
|
|
|
|
Covers:
|
|
1. boardPinMapping equivalents (Python) — PI3 physical→BCM mapping logic
|
|
2. QemuManager API surface — start/stop/send_serial_bytes/set_pin_state
|
|
3. simulation WebSocket protocol — all message types
|
|
4. GPIO shim protocol parsing
|
|
|
|
Run with:
|
|
cd e:/Hardware/wokwi_clon
|
|
python -m pytest test/multi_board/test_multi_board_integration.py -v
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import sys
|
|
import types
|
|
import unittest
|
|
from unittest.mock import AsyncMock, MagicMock, patch, call
|
|
|
|
# ── Bootstrap path so we can import the backend app ──────────────────────────
|
|
ROOT = os.path.join(os.path.dirname(__file__), '..', '..')
|
|
sys.path.insert(0, os.path.join(ROOT, 'backend'))
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# 1. BCM pin mapping (Python mirror of boardPinMapping.ts)
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
PI3_PHYSICAL_TO_BCM = {
|
|
1: -1, 2: -1,
|
|
3: 2, 4: -1, 5: 3, 6: -1, 7: 4, 8: 14, 9: -1, 10: 15,
|
|
11: 17, 12: 18, 13: 27, 14: -1, 15: 22, 16: 23, 17: -1, 18: 24,
|
|
19: 10, 20: -1, 21: 9, 22: 25, 23: 11, 24: 8, 25: -1, 26: 7,
|
|
27: -1, 28: -1, 29: 5, 30: -1, 31: 6, 32: 12, 33: 13, 34: -1,
|
|
35: 19, 36: 16, 37: 26, 38: 20, 39: -1, 40: 21,
|
|
}
|
|
|
|
def board_pin_to_bcm(physical: int) -> int | None:
|
|
return PI3_PHYSICAL_TO_BCM.get(physical)
|
|
|
|
|
|
class TestBcmPinMapping(unittest.TestCase):
|
|
"""Mirror of the frontend boardPinMapping tests."""
|
|
|
|
def test_gpio_physical_pins_map_to_correct_bcm(self):
|
|
self.assertEqual(board_pin_to_bcm(11), 17)
|
|
self.assertEqual(board_pin_to_bcm(12), 18)
|
|
self.assertEqual(board_pin_to_bcm(13), 27)
|
|
self.assertEqual(board_pin_to_bcm(40), 21)
|
|
|
|
def test_power_gnd_pins_map_to_minus_one(self):
|
|
for phys in [1, 2, 4, 6, 9, 14, 17, 20, 25, 30, 34, 39]:
|
|
with self.subTest(pin=phys):
|
|
self.assertEqual(board_pin_to_bcm(phys), -1)
|
|
|
|
def test_out_of_range_pin_returns_none(self):
|
|
self.assertIsNone(board_pin_to_bcm(41))
|
|
self.assertIsNone(board_pin_to_bcm(0))
|
|
|
|
def test_reverse_map_is_consistent(self):
|
|
bcm_to_physical = {
|
|
bcm: phys
|
|
for phys, bcm in PI3_PHYSICAL_TO_BCM.items()
|
|
if bcm >= 0
|
|
}
|
|
self.assertEqual(bcm_to_physical[17], 11)
|
|
self.assertEqual(bcm_to_physical[18], 12)
|
|
self.assertEqual(bcm_to_physical[27], 13)
|
|
|
|
def test_all_gpio_pins_covered(self):
|
|
"""All 40 physical pins must have an entry."""
|
|
self.assertEqual(len(PI3_PHYSICAL_TO_BCM), 40)
|
|
|
|
def test_unique_bcm_numbers(self):
|
|
"""No two physical pins should map to the same positive BCM number."""
|
|
positives = [bcm for bcm in PI3_PHYSICAL_TO_BCM.values() if bcm >= 0]
|
|
self.assertEqual(len(positives), len(set(positives)))
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# 2. QemuManager public API (without real QEMU process)
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
class TestQemuManagerApi(unittest.IsolatedAsyncioTestCase):
|
|
|
|
async def asyncSetUp(self):
|
|
# Import fresh to avoid singleton pollution between tests
|
|
import importlib
|
|
import app.services.qemu_manager as qm_mod
|
|
importlib.reload(qm_mod)
|
|
self.QemuManager = qm_mod.QemuManager
|
|
self.manager = self.QemuManager()
|
|
|
|
async def test_start_instance_registers_instance(self):
|
|
cb = AsyncMock()
|
|
with patch.object(self.manager, '_boot', new=AsyncMock()):
|
|
self.manager.start_instance('pi-1', 'raspberry-pi-3', cb)
|
|
self.assertIn('pi-1', self.manager._instances)
|
|
|
|
async def test_start_instance_does_not_duplicate(self):
|
|
cb = AsyncMock()
|
|
with patch.object(self.manager, '_boot', new=AsyncMock()):
|
|
self.manager.start_instance('pi-dup', 'raspberry-pi-3', cb)
|
|
self.manager.start_instance('pi-dup', 'raspberry-pi-3', cb)
|
|
# Should still be exactly one instance
|
|
count = sum(1 for k in self.manager._instances if k == 'pi-dup')
|
|
self.assertEqual(count, 1)
|
|
|
|
async def test_stop_instance_removes_instance(self):
|
|
cb = AsyncMock()
|
|
with patch.object(self.manager, '_boot', new=AsyncMock()):
|
|
self.manager.start_instance('pi-stop', 'raspberry-pi-3', cb)
|
|
with patch.object(self.manager, '_shutdown', new=AsyncMock()):
|
|
self.manager.stop_instance('pi-stop')
|
|
self.assertNotIn('pi-stop', self.manager._instances)
|
|
|
|
async def test_stop_nonexistent_instance_is_noop(self):
|
|
# Should not raise
|
|
self.manager.stop_instance('ghost')
|
|
|
|
async def test_send_serial_bytes_writes_to_writer(self):
|
|
from app.services.qemu_manager import PiInstance
|
|
cb = AsyncMock()
|
|
inst = PiInstance('pi-serial', cb)
|
|
writer = AsyncMock()
|
|
writer.drain = AsyncMock()
|
|
inst._serial_writer = writer
|
|
inst.running = True
|
|
self.manager._instances['pi-serial'] = inst
|
|
|
|
await self.manager.send_serial_bytes('pi-serial', b'Hello')
|
|
writer.write.assert_called_once_with(b'Hello')
|
|
writer.drain.assert_called_once()
|
|
|
|
async def test_send_serial_bytes_to_unknown_instance_is_noop(self):
|
|
# Should not raise
|
|
await self.manager.send_serial_bytes('ghost', b'hi')
|
|
|
|
async def test_set_pin_state_calls_send_gpio(self):
|
|
from app.services.qemu_manager import PiInstance
|
|
cb = AsyncMock()
|
|
inst = PiInstance('pi-gpio', cb)
|
|
writer = AsyncMock()
|
|
writer.drain = AsyncMock()
|
|
inst._gpio_writer = writer
|
|
inst.running = True
|
|
self.manager._instances['pi-gpio'] = inst
|
|
|
|
with patch.object(self.manager, '_send_gpio', new=AsyncMock()) as mock_send:
|
|
self.manager.set_pin_state('pi-gpio', 17, 1)
|
|
await asyncio.sleep(0) # Allow the create_task to run
|
|
# We can't easily await the task here, but we verify the method is set up
|
|
|
|
async def test_emit_calls_callback(self):
|
|
from app.services.qemu_manager import PiInstance
|
|
cb = AsyncMock()
|
|
inst = PiInstance('pi-emit', cb)
|
|
await inst.emit('serial_output', {'data': 'hello'})
|
|
cb.assert_awaited_once_with('serial_output', {'data': 'hello'})
|
|
|
|
async def test_emit_handles_callback_exception(self):
|
|
from app.services.qemu_manager import PiInstance
|
|
cb = AsyncMock(side_effect=RuntimeError('boom'))
|
|
inst = PiInstance('pi-err', cb)
|
|
# Should not raise
|
|
await inst.emit('serial_output', {'data': 'x'})
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# 3. GPIO shim protocol parsing (handle_gpio_line)
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
class TestGpioShimProtocol(unittest.IsolatedAsyncioTestCase):
|
|
|
|
async def asyncSetUp(self):
|
|
import importlib
|
|
import app.services.qemu_manager as qm_mod
|
|
importlib.reload(qm_mod)
|
|
from app.services.qemu_manager import QemuManager, PiInstance
|
|
self.manager = QemuManager()
|
|
self.cb = AsyncMock()
|
|
self.inst = PiInstance('pi-gpio', self.cb)
|
|
self.manager._instances['pi-gpio'] = self.inst
|
|
|
|
async def test_valid_gpio_line_emits_gpio_change(self):
|
|
await self.manager._handle_gpio_line(self.inst, 'GPIO 17 1')
|
|
self.cb.assert_awaited_once_with('gpio_change', {'pin': 17, 'state': 1})
|
|
|
|
async def test_gpio_line_low(self):
|
|
await self.manager._handle_gpio_line(self.inst, 'GPIO 18 0')
|
|
self.cb.assert_awaited_once_with('gpio_change', {'pin': 18, 'state': 0})
|
|
|
|
async def test_malformed_gpio_line_is_ignored(self):
|
|
await self.manager._handle_gpio_line(self.inst, 'INVALID DATA')
|
|
self.cb.assert_not_awaited()
|
|
|
|
async def test_gpio_line_wrong_prefix_is_ignored(self):
|
|
await self.manager._handle_gpio_line(self.inst, 'SET 17 1')
|
|
self.cb.assert_not_awaited()
|
|
|
|
async def test_gpio_line_non_numeric_pin_is_ignored(self):
|
|
await self.manager._handle_gpio_line(self.inst, 'GPIO abc 1')
|
|
self.cb.assert_not_awaited()
|
|
|
|
async def test_gpio_line_with_trailing_whitespace(self):
|
|
await self.manager._handle_gpio_line(self.inst, 'GPIO 22 1 ')
|
|
self.cb.assert_awaited_once_with('gpio_change', {'pin': 22, 'state': 1})
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# 4. WebSocket simulation route — message handling
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
class TestSimulationWebSocketMessages(unittest.IsolatedAsyncioTestCase):
|
|
"""
|
|
Tests the simulation.py route logic by calling its handler directly,
|
|
bypassing the real FastAPI/Starlette plumbing.
|
|
"""
|
|
|
|
async def asyncSetUp(self):
|
|
import importlib
|
|
import app.services.qemu_manager as qm_mod
|
|
importlib.reload(qm_mod)
|
|
import app.api.routes.simulation as sim_mod
|
|
importlib.reload(sim_mod)
|
|
self.sim_mod = sim_mod
|
|
self.qm = qm_mod.qemu_manager
|
|
|
|
def _make_ws(self, messages: list[dict]):
|
|
"""Build a mock WebSocket that yields messages then disconnects."""
|
|
ws = MagicMock()
|
|
ws.accept = AsyncMock()
|
|
message_iter = iter([json.dumps(m) for m in messages])
|
|
|
|
async def receive_text():
|
|
try:
|
|
return next(message_iter)
|
|
except StopIteration:
|
|
from fastapi.websockets import WebSocketDisconnect
|
|
raise WebSocketDisconnect()
|
|
|
|
ws.receive_text = receive_text
|
|
ws.send_text = AsyncMock()
|
|
return ws
|
|
|
|
async def test_start_pi_message_calls_start_instance(self):
|
|
ws = self._make_ws([{'type': 'start_pi', 'data': {'board': 'raspberry-pi-3'}}])
|
|
with patch.object(self.qm, 'start_instance') as mock_start:
|
|
with patch.object(self.qm, 'stop_instance'):
|
|
try:
|
|
await self.sim_mod.simulation_websocket(ws, 'ws-test-1')
|
|
except Exception:
|
|
pass
|
|
mock_start.assert_called_once()
|
|
call_args = mock_start.call_args
|
|
self.assertEqual(call_args[0][0], 'ws-test-1')
|
|
self.assertEqual(call_args[0][1], 'raspberry-pi-3')
|
|
|
|
async def test_stop_pi_message_calls_stop_instance(self):
|
|
ws = self._make_ws([{'type': 'stop_pi', 'data': {}}])
|
|
with patch.object(self.qm, 'stop_instance') as mock_stop:
|
|
try:
|
|
await self.sim_mod.simulation_websocket(ws, 'ws-test-2')
|
|
except Exception:
|
|
pass
|
|
mock_stop.assert_called_with('ws-test-2')
|
|
|
|
async def test_serial_input_message_sends_bytes(self):
|
|
ws = self._make_ws([{'type': 'serial_input', 'data': {'bytes': [72, 101, 108]}}])
|
|
with patch.object(self.qm, 'send_serial_bytes', new=AsyncMock()) as mock_serial:
|
|
with patch.object(self.qm, 'stop_instance'):
|
|
try:
|
|
await self.sim_mod.simulation_websocket(ws, 'ws-test-3')
|
|
except Exception:
|
|
pass
|
|
mock_serial.assert_awaited_once_with('ws-test-3', bytes([72, 101, 108]))
|
|
|
|
async def test_gpio_in_message_calls_set_pin_state(self):
|
|
ws = self._make_ws([{'type': 'gpio_in', 'data': {'pin': 17, 'state': 1}}])
|
|
with patch.object(self.qm, 'set_pin_state') as mock_pin:
|
|
with patch.object(self.qm, 'stop_instance'):
|
|
try:
|
|
await self.sim_mod.simulation_websocket(ws, 'ws-test-4')
|
|
except Exception:
|
|
pass
|
|
mock_pin.assert_called_with('ws-test-4', 17, 1)
|
|
|
|
async def test_legacy_pin_change_message_calls_set_pin_state(self):
|
|
"""pin_change is the legacy alias for gpio_in."""
|
|
ws = self._make_ws([{'type': 'pin_change', 'data': {'pin': 22, 'state': 0}}])
|
|
with patch.object(self.qm, 'set_pin_state') as mock_pin:
|
|
with patch.object(self.qm, 'stop_instance'):
|
|
try:
|
|
await self.sim_mod.simulation_websocket(ws, 'ws-test-5')
|
|
except Exception:
|
|
pass
|
|
mock_pin.assert_called_with('ws-test-5', 22, 0)
|
|
|
|
async def test_qemu_callback_sends_json_to_ws(self):
|
|
"""The qemu_callback passed to start_instance must forward events as JSON."""
|
|
sent_payloads = []
|
|
captured_callback = None
|
|
|
|
# start_instance is a sync method — capture callback for later invocation
|
|
def fake_start(client_id, board_type, callback):
|
|
nonlocal captured_callback
|
|
captured_callback = callback
|
|
|
|
ws = self._make_ws([{'type': 'start_pi', 'data': {'board': 'raspberry-pi-3'}}])
|
|
ws.send_text = AsyncMock(side_effect=lambda msg: sent_payloads.append(json.loads(msg)))
|
|
|
|
with patch.object(self.qm, 'start_instance', side_effect=fake_start):
|
|
with patch.object(self.qm, 'stop_instance'):
|
|
try:
|
|
await self.sim_mod.simulation_websocket(ws, 'ws-test-6')
|
|
except Exception:
|
|
pass
|
|
|
|
# After WebSocketDisconnect the route removes the client from active_connections.
|
|
# Re-register it so the captured callback can write back to the mock WS.
|
|
self.assertIsNotNone(captured_callback, 'start_instance was never called with a callback')
|
|
self.sim_mod.manager.active_connections['ws-test-6'] = ws
|
|
await captured_callback('serial_output', {'data': 'Hello Pi\n'})
|
|
|
|
self.assertTrue(any(p.get('type') == 'serial_output' for p in sent_payloads))
|
|
serial_msg = next(p for p in sent_payloads if p['type'] == 'serial_output')
|
|
self.assertEqual(serial_msg['data']['data'], 'Hello Pi\n')
|
|
|
|
async def test_unknown_message_type_is_ignored(self):
|
|
"""Unknown message types must not raise."""
|
|
ws = self._make_ws([{'type': 'unknown_type', 'data': {}}])
|
|
with patch.object(self.qm, 'stop_instance'):
|
|
try:
|
|
await self.sim_mod.simulation_websocket(ws, 'ws-test-7')
|
|
except Exception:
|
|
pass # WebSocketDisconnect is expected after messages are exhausted
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# 5. GPIO shim module (gpio_shim.py) unit tests
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
class TestGpioShimModule(unittest.TestCase):
|
|
"""Tests gpio_shim.py constants, mode, and API surface without real hardware."""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
# Load gpio_shim as a module (it runs inside the Pi but we test it here)
|
|
import importlib.util, pathlib
|
|
shim_path = pathlib.Path(ROOT) / 'backend' / 'app' / 'services' / 'gpio_shim.py'
|
|
spec = importlib.util.spec_from_file_location('gpio_shim', shim_path)
|
|
cls.shim = importlib.util.module_from_spec(spec)
|
|
# Stub the background thread's tty so it doesn't block
|
|
with patch('builtins.open', side_effect=OSError('no tty')):
|
|
try:
|
|
if spec and spec.loader:
|
|
spec.loader.exec_module(cls.shim) # type: ignore[union-attr]
|
|
except Exception:
|
|
pass
|
|
|
|
def setUp(self):
|
|
# Reset internal state between tests
|
|
self.shim._mode = None
|
|
self.shim._pin_dir.clear()
|
|
self.shim._pin_val.clear()
|
|
self.shim._callbacks.clear()
|
|
self.shim._tty = None
|
|
|
|
def test_constants(self):
|
|
self.assertEqual(self.shim.BCM, 'BCM')
|
|
self.assertEqual(self.shim.BOARD, 'BOARD')
|
|
self.assertEqual(self.shim.OUT, 1)
|
|
self.assertEqual(self.shim.IN, 0)
|
|
self.assertEqual(self.shim.HIGH, 1)
|
|
self.assertEqual(self.shim.LOW, 0)
|
|
|
|
def test_setmode_bcm(self):
|
|
self.shim.setmode(self.shim.BCM)
|
|
self.assertEqual(self.shim.getmode(), 'BCM')
|
|
|
|
def test_setup_out_pin(self):
|
|
self.shim.setmode(self.shim.BCM)
|
|
self.shim.setup(17, self.shim.OUT, initial=self.shim.LOW)
|
|
self.assertEqual(self.shim._pin_dir.get(17), self.shim.OUT)
|
|
self.assertEqual(self.shim._pin_val.get(17), 0)
|
|
|
|
def test_output_updates_pin_val_and_sends(self):
|
|
self.shim.setmode(self.shim.BCM)
|
|
self.shim.setup(17, self.shim.OUT)
|
|
sent_lines = []
|
|
with patch.object(self.shim, '_send', side_effect=sent_lines.append):
|
|
self.shim.output(17, self.shim.HIGH)
|
|
self.assertEqual(self.shim._pin_val.get(17), 1)
|
|
self.assertIn('GPIO 17 1', sent_lines)
|
|
|
|
def test_output_low(self):
|
|
self.shim.setmode(self.shim.BCM)
|
|
self.shim.setup(17, self.shim.OUT)
|
|
sent_lines = []
|
|
with patch.object(self.shim, '_send', side_effect=sent_lines.append):
|
|
self.shim.output(17, self.shim.LOW)
|
|
self.assertIn('GPIO 17 0', sent_lines)
|
|
|
|
def test_input_reads_pin_val(self):
|
|
self.shim._pin_val[17] = 1
|
|
self.assertEqual(self.shim.input(17), 1)
|
|
self.shim._pin_val[17] = 0
|
|
self.assertEqual(self.shim.input(17), 0)
|
|
|
|
def test_input_unset_pin_returns_zero(self):
|
|
self.assertEqual(self.shim.input(99), 0)
|
|
|
|
def test_physical_to_bcm_mapping(self):
|
|
"""BCM mode: pin passes through; BOARD mode: uses physical map."""
|
|
self.shim.setmode(self.shim.BCM)
|
|
# In BCM mode, _to_bcm(17) == 17
|
|
self.assertEqual(self.shim._to_bcm(17), 17)
|
|
|
|
self.shim._mode = self.shim.BOARD
|
|
# Physical pin 11 → BCM 17
|
|
self.assertEqual(self.shim._to_bcm(11), 17)
|
|
|
|
def test_add_event_detect_registers_callback(self):
|
|
cb = MagicMock()
|
|
self.shim.setmode(self.shim.BCM)
|
|
self.shim.add_event_detect(17, self.shim.RISING, callback=cb)
|
|
self.assertIn(cb, self.shim._callbacks.get(17, []))
|
|
|
|
def test_remove_event_detect_clears_callbacks(self):
|
|
cb = MagicMock()
|
|
self.shim.setmode(self.shim.BCM)
|
|
self.shim.add_event_detect(17, self.shim.RISING, callback=cb)
|
|
self.shim.remove_event_detect(17)
|
|
self.assertNotIn(17, self.shim._callbacks)
|
|
|
|
def test_pwm_start_sends_gpio(self):
|
|
self.shim.setmode(self.shim.BCM)
|
|
pwm = self.shim.PWM(18, 1000)
|
|
sent = []
|
|
with patch.object(self.shim, '_send', side_effect=sent.append):
|
|
pwm.start(100) # 100% duty cycle → HIGH
|
|
self.assertIn('GPIO 18 1', sent)
|
|
|
|
def test_pwm_stop_sends_low(self):
|
|
self.shim.setmode(self.shim.BCM)
|
|
pwm = self.shim.PWM(18, 1000)
|
|
pwm.start(100)
|
|
sent = []
|
|
with patch.object(self.shim, '_send', side_effect=sent.append):
|
|
pwm.stop()
|
|
self.assertIn('GPIO 18 0', sent)
|
|
|
|
def test_version_info(self):
|
|
self.assertIsInstance(self.shim.VERSION, str)
|
|
self.assertIsInstance(self.shim.RPI_INFO, dict)
|
|
self.assertIn('PROCESSOR', self.shim.RPI_INFO)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main(verbosity=2)
|