feat: add multi-board integration tests for Raspberry Pi 3B and backend functionality
parent
17b1a0f058
commit
cc3030200c
|
|
@ -0,0 +1,432 @@
|
||||||
|
/**
|
||||||
|
* Multi-board integration tests
|
||||||
|
*
|
||||||
|
* Covers:
|
||||||
|
* 1. useEditorStore — file group management per board
|
||||||
|
* 2. useSimulatorStore — boards[], addBoard, startBoard/stopBoard, legacy compat
|
||||||
|
* 3. boardPinMapping — PI3_PHYSICAL_TO_BCM, boardPinToNumber for raspberry-pi-3
|
||||||
|
* 4. RaspberryPi3Bridge — WebSocket connect/disconnect/message protocol
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
|
||||||
|
|
||||||
|
// ── Mocks ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
// AVRSimulator: bare minimum stub (must use function, not arrow, for `new` to work)
|
||||||
|
vi.mock('../simulation/AVRSimulator', () => ({
|
||||||
|
AVRSimulator: vi.fn(function (this: any) {
|
||||||
|
this.onSerialData = null;
|
||||||
|
this.onBaudRateChange = null;
|
||||||
|
this.onPinChangeWithTime = null;
|
||||||
|
this.start = vi.fn();
|
||||||
|
this.stop = vi.fn();
|
||||||
|
this.reset = vi.fn();
|
||||||
|
this.loadHex = vi.fn();
|
||||||
|
this.serialWrite = vi.fn();
|
||||||
|
this.addI2CDevice = vi.fn();
|
||||||
|
this.setPinState = vi.fn();
|
||||||
|
}),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock('../simulation/RP2040Simulator', () => ({
|
||||||
|
RP2040Simulator: vi.fn(function (this: any) {
|
||||||
|
this.onSerialData = null;
|
||||||
|
this.onPinChangeWithTime = null;
|
||||||
|
this.start = vi.fn();
|
||||||
|
this.stop = vi.fn();
|
||||||
|
this.reset = vi.fn();
|
||||||
|
this.loadBinary = vi.fn();
|
||||||
|
this.serialWrite = vi.fn();
|
||||||
|
this.addI2CDevice = vi.fn();
|
||||||
|
}),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock('../simulation/PinManager', () => ({
|
||||||
|
PinManager: vi.fn(function (this: any) {
|
||||||
|
this.updatePort = vi.fn();
|
||||||
|
this.onPinChange = vi.fn().mockReturnValue(() => {});
|
||||||
|
this.getListenersCount = vi.fn().mockReturnValue(0);
|
||||||
|
}),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock('../simulation/I2CBusManager', () => ({
|
||||||
|
VirtualDS1307: vi.fn(function (this: any) {}),
|
||||||
|
VirtualTempSensor: vi.fn(function (this: any) {}),
|
||||||
|
I2CMemoryDevice: vi.fn(function (this: any) {}),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock('../store/useOscilloscopeStore', () => ({
|
||||||
|
useOscilloscopeStore: {
|
||||||
|
getState: vi.fn().mockReturnValue({ channels: [], pushSample: vi.fn() }),
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
|
// WebSocket mock (global)
|
||||||
|
class MockWebSocket {
|
||||||
|
static OPEN = 1;
|
||||||
|
readyState = MockWebSocket.OPEN;
|
||||||
|
onopen: (() => void) | null = null;
|
||||||
|
onmessage: ((e: { data: string }) => void) | null = null;
|
||||||
|
onclose: (() => void) | null = null;
|
||||||
|
onerror: (() => void) | null = null;
|
||||||
|
sent: string[] = [];
|
||||||
|
send(data: string) { this.sent.push(data); }
|
||||||
|
close() { this.readyState = 3; this.onclose?.(); }
|
||||||
|
// Helper: simulate incoming message
|
||||||
|
receive(payload: object) { this.onmessage?.({ data: JSON.stringify(payload) }); }
|
||||||
|
// Simulate open
|
||||||
|
open() { this.readyState = MockWebSocket.OPEN; this.onopen?.(); }
|
||||||
|
}
|
||||||
|
|
||||||
|
vi.stubGlobal('WebSocket', MockWebSocket);
|
||||||
|
vi.stubGlobal('requestAnimationFrame', (_cb: FrameRequestCallback) => 1);
|
||||||
|
vi.stubGlobal('cancelAnimationFrame', vi.fn());
|
||||||
|
|
||||||
|
// ── Imports (after mocks) ────────────────────────────────────────────────
|
||||||
|
import { useEditorStore } from '../store/useEditorStore';
|
||||||
|
import { useSimulatorStore, getBoardSimulator, getBoardBridge } from '../store/useSimulatorStore';
|
||||||
|
import {
|
||||||
|
PI3_PHYSICAL_TO_BCM,
|
||||||
|
PI3_BCM_TO_PHYSICAL,
|
||||||
|
boardPinToNumber,
|
||||||
|
isBoardComponent,
|
||||||
|
} from '../utils/boardPinMapping';
|
||||||
|
import { RaspberryPi3Bridge } from '../simulation/RaspberryPi3Bridge';
|
||||||
|
|
||||||
|
// ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
// 1. boardPinMapping — Raspberry Pi 3B BCM map
|
||||||
|
// ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
describe('boardPinMapping — Pi3B BCM', () => {
|
||||||
|
it('maps known GPIO physical pins to correct BCM numbers', () => {
|
||||||
|
expect(PI3_PHYSICAL_TO_BCM[11]).toBe(17); // BCM17
|
||||||
|
expect(PI3_PHYSICAL_TO_BCM[12]).toBe(18); // BCM18 PWM0
|
||||||
|
expect(PI3_PHYSICAL_TO_BCM[13]).toBe(27); // BCM27
|
||||||
|
expect(PI3_PHYSICAL_TO_BCM[40]).toBe(21); // BCM21
|
||||||
|
});
|
||||||
|
|
||||||
|
it('maps power/GND pins to -1', () => {
|
||||||
|
expect(PI3_PHYSICAL_TO_BCM[1]).toBe(-1); // 3.3V
|
||||||
|
expect(PI3_PHYSICAL_TO_BCM[2]).toBe(-1); // 5V
|
||||||
|
expect(PI3_PHYSICAL_TO_BCM[6]).toBe(-1); // GND
|
||||||
|
expect(PI3_PHYSICAL_TO_BCM[9]).toBe(-1); // GND
|
||||||
|
});
|
||||||
|
|
||||||
|
it('reverse map BCM→physical is consistent', () => {
|
||||||
|
// BCM17 is on physical pin 11
|
||||||
|
expect(PI3_BCM_TO_PHYSICAL[17]).toBe(11);
|
||||||
|
expect(PI3_BCM_TO_PHYSICAL[18]).toBe(12);
|
||||||
|
expect(PI3_BCM_TO_PHYSICAL[27]).toBe(13);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('boardPinToNumber returns BCM for raspberry-pi-3', () => {
|
||||||
|
expect(boardPinToNumber('raspberry-pi-3', '11')).toBe(17);
|
||||||
|
expect(boardPinToNumber('raspberry-pi-3', '12')).toBe(18);
|
||||||
|
expect(boardPinToNumber('raspberry-pi-3', '40')).toBe(21);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('boardPinToNumber returns -1 for power/GND pins', () => {
|
||||||
|
expect(boardPinToNumber('raspberry-pi-3', '1')).toBe(-1);
|
||||||
|
expect(boardPinToNumber('raspberry-pi-3', '6')).toBe(-1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('boardPinToNumber returns null for out-of-range pin', () => {
|
||||||
|
expect(boardPinToNumber('raspberry-pi-3', '41')).toBeNull();
|
||||||
|
expect(boardPinToNumber('raspberry-pi-3', 'SDA')).toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('isBoardComponent recognises raspberry-pi-3 and numbered variants', () => {
|
||||||
|
expect(isBoardComponent('raspberry-pi-3')).toBe(true);
|
||||||
|
expect(isBoardComponent('raspberry-pi-3-2')).toBe(true);
|
||||||
|
expect(isBoardComponent('arduino-uno')).toBe(true);
|
||||||
|
expect(isBoardComponent('led-builtin')).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
// ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
// 2. useEditorStore — file groups
|
||||||
|
// ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
describe('useEditorStore — file groups', () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
// Reset store to initial state between tests
|
||||||
|
useEditorStore.setState(useEditorStore.getInitialState?.() ?? {});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('starts with a default file group for the initial Arduino Uno board', () => {
|
||||||
|
const { fileGroups, activeGroupId } = useEditorStore.getState();
|
||||||
|
expect(Object.keys(fileGroups)).toContain('group-arduino-uno');
|
||||||
|
expect(activeGroupId).toBe('group-arduino-uno');
|
||||||
|
expect(fileGroups['group-arduino-uno'].length).toBeGreaterThan(0);
|
||||||
|
expect(fileGroups['group-arduino-uno'][0].name).toMatch(/\.ino$/);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('createFileGroup creates a new group with a .ino file for Arduino', () => {
|
||||||
|
const { createFileGroup, fileGroups } = useEditorStore.getState();
|
||||||
|
createFileGroup('group-arduino-uno-2');
|
||||||
|
const updated = useEditorStore.getState().fileGroups;
|
||||||
|
expect(updated['group-arduino-uno-2']).toBeDefined();
|
||||||
|
expect(updated['group-arduino-uno-2'][0].name).toMatch(/\.ino$/);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('createFileGroup creates a .py file for Raspberry Pi 3', () => {
|
||||||
|
const { createFileGroup } = useEditorStore.getState();
|
||||||
|
createFileGroup('group-raspberry-pi-3');
|
||||||
|
const updated = useEditorStore.getState().fileGroups;
|
||||||
|
expect(updated['group-raspberry-pi-3']).toBeDefined();
|
||||||
|
expect(updated['group-raspberry-pi-3'][0].name).toMatch(/\.py$/);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('createFileGroup accepts custom initial files', () => {
|
||||||
|
const { createFileGroup } = useEditorStore.getState();
|
||||||
|
createFileGroup('group-custom', [
|
||||||
|
{ name: 'main.py', content: 'print("hello")' },
|
||||||
|
{ name: 'utils.py', content: '' },
|
||||||
|
]);
|
||||||
|
const updated = useEditorStore.getState().fileGroups;
|
||||||
|
expect(updated['group-custom'].length).toBe(2);
|
||||||
|
expect(updated['group-custom'][0].name).toBe('main.py');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('setActiveGroup switches files to the selected group', () => {
|
||||||
|
const { createFileGroup, setActiveGroup } = useEditorStore.getState();
|
||||||
|
createFileGroup('group-raspberry-pi-3');
|
||||||
|
setActiveGroup('group-raspberry-pi-3');
|
||||||
|
const s = useEditorStore.getState();
|
||||||
|
expect(s.activeGroupId).toBe('group-raspberry-pi-3');
|
||||||
|
expect(s.files[0].name).toMatch(/\.py$/);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('deleteFileGroup removes the group', () => {
|
||||||
|
const { createFileGroup, deleteFileGroup } = useEditorStore.getState();
|
||||||
|
createFileGroup('group-temp');
|
||||||
|
deleteFileGroup('group-temp');
|
||||||
|
const updated = useEditorStore.getState().fileGroups;
|
||||||
|
expect(updated['group-temp']).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('createFile adds to active group', () => {
|
||||||
|
const { createFile, activeGroupId } = useEditorStore.getState();
|
||||||
|
const id = createFile('helper.h');
|
||||||
|
const s = useEditorStore.getState();
|
||||||
|
const groupFile = s.fileGroups[activeGroupId].find((f) => f.id === id);
|
||||||
|
expect(groupFile).toBeDefined();
|
||||||
|
expect(groupFile?.name).toBe('helper.h');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does not create duplicate groups', () => {
|
||||||
|
const { createFileGroup } = useEditorStore.getState();
|
||||||
|
createFileGroup('group-dup');
|
||||||
|
createFileGroup('group-dup');
|
||||||
|
const updated = useEditorStore.getState().fileGroups;
|
||||||
|
expect(Object.keys(updated).filter((k) => k === 'group-dup').length).toBe(1);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
// ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
// 3. useSimulatorStore — multi-board
|
||||||
|
// ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
describe('useSimulatorStore — multi-board', () => {
|
||||||
|
it('starts with one Arduino Uno board', () => {
|
||||||
|
const { boards, activeBoardId } = useSimulatorStore.getState();
|
||||||
|
expect(boards.length).toBe(1);
|
||||||
|
expect(boards[0].boardKind).toBe('arduino-uno');
|
||||||
|
expect(activeBoardId).toBe('arduino-uno');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('addBoard creates a new board and registers runtime objects', async () => {
|
||||||
|
const { addBoard, boards: before } = useSimulatorStore.getState();
|
||||||
|
const id = addBoard('arduino-nano', 200, 100);
|
||||||
|
const { boards: after } = useSimulatorStore.getState();
|
||||||
|
expect(after.length).toBe(before.length + 1);
|
||||||
|
const newBoard = after.find((b) => b.id === id);
|
||||||
|
expect(newBoard).toBeDefined();
|
||||||
|
expect(newBoard?.boardKind).toBe('arduino-nano');
|
||||||
|
expect(newBoard?.x).toBe(200);
|
||||||
|
expect(newBoard?.y).toBe(100);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('addBoard for the same kind generates unique IDs', () => {
|
||||||
|
const { addBoard } = useSimulatorStore.getState();
|
||||||
|
const id1 = addBoard('arduino-mega', 0, 0);
|
||||||
|
const id2 = addBoard('arduino-mega', 100, 0);
|
||||||
|
expect(id1).not.toBe(id2);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('removeBoard removes the board and cleans up', () => {
|
||||||
|
const { addBoard, removeBoard } = useSimulatorStore.getState();
|
||||||
|
const id = addBoard('arduino-nano', 0, 0);
|
||||||
|
removeBoard(id);
|
||||||
|
const { boards } = useSimulatorStore.getState();
|
||||||
|
expect(boards.find((b) => b.id === id)).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('setActiveBoardId switches legacy flat fields', () => {
|
||||||
|
const { addBoard, setActiveBoardId } = useSimulatorStore.getState();
|
||||||
|
addBoard('arduino-mega', 0, 0);
|
||||||
|
const { boards } = useSimulatorStore.getState();
|
||||||
|
const megaBoard = boards.find((b) => b.boardKind === 'arduino-mega');
|
||||||
|
expect(megaBoard).toBeDefined();
|
||||||
|
setActiveBoardId(megaBoard!.id);
|
||||||
|
const s = useSimulatorStore.getState();
|
||||||
|
expect(s.activeBoardId).toBe(megaBoard!.id);
|
||||||
|
expect(s.boardType).toBe('arduino-mega');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('setBoardPosition updates both legacy boardPosition and boards[]', () => {
|
||||||
|
const { setBoardPosition, activeBoardId } = useSimulatorStore.getState();
|
||||||
|
setBoardPosition({ x: 123, y: 456 });
|
||||||
|
const s = useSimulatorStore.getState();
|
||||||
|
expect(s.boardPosition).toEqual({ x: 123, y: 456 });
|
||||||
|
const board = s.boards.find((b) => b.id === activeBoardId);
|
||||||
|
expect(board?.x).toBe(123);
|
||||||
|
expect(board?.y).toBe(456);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('updateBoard merges partial updates', () => {
|
||||||
|
const { activeBoardId, updateBoard } = useSimulatorStore.getState();
|
||||||
|
updateBoard(activeBoardId!, { serialMonitorOpen: true });
|
||||||
|
const s = useSimulatorStore.getState();
|
||||||
|
const board = s.boards.find((b) => b.id === activeBoardId);
|
||||||
|
expect(board?.serialMonitorOpen).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('addBoard for raspberry-pi-3 creates a bridge (not a simulator)', () => {
|
||||||
|
const { addBoard } = useSimulatorStore.getState();
|
||||||
|
const id = addBoard('raspberry-pi-3', 500, 50);
|
||||||
|
const { boards } = useSimulatorStore.getState();
|
||||||
|
const piBoard = boards.find((b) => b.id === id);
|
||||||
|
expect(piBoard?.boardKind).toBe('raspberry-pi-3');
|
||||||
|
// No AVRSimulator for Pi — verify via module-level helpers
|
||||||
|
expect(getBoardSimulator(id)).toBeUndefined();
|
||||||
|
expect(getBoardBridge(id)).toBeDefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('legacy startSimulation delegates to activeBoardId board', () => {
|
||||||
|
const s = useSimulatorStore.getState();
|
||||||
|
const startSpy = vi.spyOn(s, 'startBoard');
|
||||||
|
s.startSimulation();
|
||||||
|
expect(startSpy).toHaveBeenCalledWith(s.activeBoardId);
|
||||||
|
startSpy.mockRestore();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('legacy stopSimulation delegates to activeBoardId board', () => {
|
||||||
|
const s = useSimulatorStore.getState();
|
||||||
|
const stopSpy = vi.spyOn(s, 'stopBoard');
|
||||||
|
s.stopSimulation();
|
||||||
|
expect(stopSpy).toHaveBeenCalledWith(s.activeBoardId);
|
||||||
|
stopSpy.mockRestore();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
// ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
// 4. RaspberryPi3Bridge — WebSocket protocol
|
||||||
|
// ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
describe('RaspberryPi3Bridge — WebSocket protocol', () => {
|
||||||
|
let bridge: RaspberryPi3Bridge;
|
||||||
|
let ws: MockWebSocket;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
bridge = new RaspberryPi3Bridge('test-pi');
|
||||||
|
bridge.connect();
|
||||||
|
// Retrieve the mocked WebSocket instance
|
||||||
|
ws = (bridge as any).socket as MockWebSocket;
|
||||||
|
ws.open();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
bridge.disconnect();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('connects and sends start_pi on open', () => {
|
||||||
|
expect(ws.sent.length).toBeGreaterThan(0);
|
||||||
|
const firstMsg = JSON.parse(ws.sent[0]);
|
||||||
|
expect(firstMsg.type).toBe('start_pi');
|
||||||
|
expect(firstMsg.data.board).toBe('raspberry-pi-3');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('connected property is true after open', () => {
|
||||||
|
expect(bridge.connected).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('sends stop_pi and closes on disconnect', () => {
|
||||||
|
bridge.disconnect();
|
||||||
|
const msgs = ws.sent.map((m) => JSON.parse(m));
|
||||||
|
const stopMsg = msgs.find((m) => m.type === 'stop_pi');
|
||||||
|
expect(stopMsg).toBeDefined();
|
||||||
|
expect(bridge.connected).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('sendSerialByte sends serial_input with correct byte', () => {
|
||||||
|
bridge.sendSerialByte(65); // 'A'
|
||||||
|
const last = JSON.parse(ws.sent[ws.sent.length - 1]);
|
||||||
|
expect(last.type).toBe('serial_input');
|
||||||
|
expect(last.data.bytes).toEqual([65]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('sendSerialBytes sends multiple bytes', () => {
|
||||||
|
bridge.sendSerialBytes([72, 105, 10]); // "Hi\n"
|
||||||
|
const last = JSON.parse(ws.sent[ws.sent.length - 1]);
|
||||||
|
expect(last.type).toBe('serial_input');
|
||||||
|
expect(last.data.bytes).toEqual([72, 105, 10]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('sendPinEvent sends gpio_in with correct pin and state', () => {
|
||||||
|
bridge.sendPinEvent(17, true);
|
||||||
|
const last = JSON.parse(ws.sent[ws.sent.length - 1]);
|
||||||
|
expect(last.type).toBe('gpio_in');
|
||||||
|
expect(last.data.pin).toBe(17);
|
||||||
|
expect(last.data.state).toBe(1);
|
||||||
|
|
||||||
|
bridge.sendPinEvent(17, false);
|
||||||
|
const last2 = JSON.parse(ws.sent[ws.sent.length - 1]);
|
||||||
|
expect(last2.data.state).toBe(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('fires onSerialData for each character in serial_output', () => {
|
||||||
|
const received: string[] = [];
|
||||||
|
bridge.onSerialData = (ch) => received.push(ch);
|
||||||
|
ws.receive({ type: 'serial_output', data: { data: 'Hello\n' } });
|
||||||
|
expect(received).toEqual(['H', 'e', 'l', 'l', 'o', '\n']);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('fires onPinChange for gpio_change events', () => {
|
||||||
|
let gotPin = -1, gotState = false;
|
||||||
|
bridge.onPinChange = (pin, state) => { gotPin = pin; gotState = state; };
|
||||||
|
ws.receive({ type: 'gpio_change', data: { pin: 17, state: 1 } });
|
||||||
|
expect(gotPin).toBe(17);
|
||||||
|
expect(gotState).toBe(true);
|
||||||
|
|
||||||
|
ws.receive({ type: 'gpio_change', data: { pin: 17, state: 0 } });
|
||||||
|
expect(gotState).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('fires onSystemEvent for system events', () => {
|
||||||
|
let lastEvent = '';
|
||||||
|
bridge.onSystemEvent = (event) => { lastEvent = event; };
|
||||||
|
ws.receive({ type: 'system', data: { event: 'booted' } });
|
||||||
|
expect(lastEvent).toBe('booted');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('fires onError for error events', () => {
|
||||||
|
let errMsg = '';
|
||||||
|
bridge.onError = (msg) => { errMsg = msg; };
|
||||||
|
ws.receive({ type: 'error', data: { message: 'QEMU crashed' } });
|
||||||
|
expect(errMsg).toBe('QEMU crashed');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('connected is false after server closes the socket', () => {
|
||||||
|
ws.close();
|
||||||
|
expect(bridge.connected).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does not send when socket is not open', () => {
|
||||||
|
const closedBridge = new RaspberryPi3Bridge('closed-pi');
|
||||||
|
// No connect() called
|
||||||
|
const sentBefore = ws.sent.length;
|
||||||
|
closedBridge.sendSerialByte(65);
|
||||||
|
expect(ws.sent.length).toBe(sentBefore); // No new messages on old socket
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
@ -0,0 +1,462 @@
|
||||||
|
"""
|
||||||
|
Multi-board integration tests — backend side
|
||||||
|
|
||||||
|
Covers:
|
||||||
|
1. boardPinMapping equivalents (Python) — PI3 physical→BCM mapping logic
|
||||||
|
2. QemuManager API surface — start/stop/send_serial_bytes/set_pin_state
|
||||||
|
3. simulation WebSocket protocol — all message types
|
||||||
|
4. GPIO shim protocol parsing
|
||||||
|
|
||||||
|
Run with:
|
||||||
|
cd e:/Hardware/wokwi_clon
|
||||||
|
python -m pytest test/multi_board/test_multi_board_integration.py -v
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import types
|
||||||
|
import unittest
|
||||||
|
from unittest.mock import AsyncMock, MagicMock, patch, call
|
||||||
|
|
||||||
|
# ── Bootstrap path so we can import the backend app ──────────────────────────
|
||||||
|
ROOT = os.path.join(os.path.dirname(__file__), '..', '..')
|
||||||
|
sys.path.insert(0, os.path.join(ROOT, 'backend'))
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# 1. BCM pin mapping (Python mirror of boardPinMapping.ts)
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
PI3_PHYSICAL_TO_BCM = {
|
||||||
|
1: -1, 2: -1,
|
||||||
|
3: 2, 4: -1, 5: 3, 6: -1, 7: 4, 8: 14, 9: -1, 10: 15,
|
||||||
|
11: 17, 12: 18, 13: 27, 14: -1, 15: 22, 16: 23, 17: -1, 18: 24,
|
||||||
|
19: 10, 20: -1, 21: 9, 22: 25, 23: 11, 24: 8, 25: -1, 26: 7,
|
||||||
|
27: -1, 28: -1, 29: 5, 30: -1, 31: 6, 32: 12, 33: 13, 34: -1,
|
||||||
|
35: 19, 36: 16, 37: 26, 38: 20, 39: -1, 40: 21,
|
||||||
|
}
|
||||||
|
|
||||||
|
def board_pin_to_bcm(physical: int) -> int | None:
|
||||||
|
return PI3_PHYSICAL_TO_BCM.get(physical)
|
||||||
|
|
||||||
|
|
||||||
|
class TestBcmPinMapping(unittest.TestCase):
|
||||||
|
"""Mirror of the frontend boardPinMapping tests."""
|
||||||
|
|
||||||
|
def test_gpio_physical_pins_map_to_correct_bcm(self):
|
||||||
|
self.assertEqual(board_pin_to_bcm(11), 17)
|
||||||
|
self.assertEqual(board_pin_to_bcm(12), 18)
|
||||||
|
self.assertEqual(board_pin_to_bcm(13), 27)
|
||||||
|
self.assertEqual(board_pin_to_bcm(40), 21)
|
||||||
|
|
||||||
|
def test_power_gnd_pins_map_to_minus_one(self):
|
||||||
|
for phys in [1, 2, 4, 6, 9, 14, 17, 20, 25, 30, 34, 39]:
|
||||||
|
with self.subTest(pin=phys):
|
||||||
|
self.assertEqual(board_pin_to_bcm(phys), -1)
|
||||||
|
|
||||||
|
def test_out_of_range_pin_returns_none(self):
|
||||||
|
self.assertIsNone(board_pin_to_bcm(41))
|
||||||
|
self.assertIsNone(board_pin_to_bcm(0))
|
||||||
|
|
||||||
|
def test_reverse_map_is_consistent(self):
|
||||||
|
bcm_to_physical = {
|
||||||
|
bcm: phys
|
||||||
|
for phys, bcm in PI3_PHYSICAL_TO_BCM.items()
|
||||||
|
if bcm >= 0
|
||||||
|
}
|
||||||
|
self.assertEqual(bcm_to_physical[17], 11)
|
||||||
|
self.assertEqual(bcm_to_physical[18], 12)
|
||||||
|
self.assertEqual(bcm_to_physical[27], 13)
|
||||||
|
|
||||||
|
def test_all_gpio_pins_covered(self):
|
||||||
|
"""All 40 physical pins must have an entry."""
|
||||||
|
self.assertEqual(len(PI3_PHYSICAL_TO_BCM), 40)
|
||||||
|
|
||||||
|
def test_unique_bcm_numbers(self):
|
||||||
|
"""No two physical pins should map to the same positive BCM number."""
|
||||||
|
positives = [bcm for bcm in PI3_PHYSICAL_TO_BCM.values() if bcm >= 0]
|
||||||
|
self.assertEqual(len(positives), len(set(positives)))
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# 2. QemuManager public API (without real QEMU process)
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestQemuManagerApi(unittest.IsolatedAsyncioTestCase):
|
||||||
|
|
||||||
|
async def asyncSetUp(self):
|
||||||
|
# Import fresh to avoid singleton pollution between tests
|
||||||
|
import importlib
|
||||||
|
import app.services.qemu_manager as qm_mod
|
||||||
|
importlib.reload(qm_mod)
|
||||||
|
self.QemuManager = qm_mod.QemuManager
|
||||||
|
self.manager = self.QemuManager()
|
||||||
|
|
||||||
|
async def test_start_instance_registers_instance(self):
|
||||||
|
cb = AsyncMock()
|
||||||
|
with patch.object(self.manager, '_boot', new=AsyncMock()):
|
||||||
|
self.manager.start_instance('pi-1', 'raspberry-pi-3', cb)
|
||||||
|
self.assertIn('pi-1', self.manager._instances)
|
||||||
|
|
||||||
|
async def test_start_instance_does_not_duplicate(self):
|
||||||
|
cb = AsyncMock()
|
||||||
|
with patch.object(self.manager, '_boot', new=AsyncMock()):
|
||||||
|
self.manager.start_instance('pi-dup', 'raspberry-pi-3', cb)
|
||||||
|
self.manager.start_instance('pi-dup', 'raspberry-pi-3', cb)
|
||||||
|
# Should still be exactly one instance
|
||||||
|
count = sum(1 for k in self.manager._instances if k == 'pi-dup')
|
||||||
|
self.assertEqual(count, 1)
|
||||||
|
|
||||||
|
async def test_stop_instance_removes_instance(self):
|
||||||
|
cb = AsyncMock()
|
||||||
|
with patch.object(self.manager, '_boot', new=AsyncMock()):
|
||||||
|
self.manager.start_instance('pi-stop', 'raspberry-pi-3', cb)
|
||||||
|
with patch.object(self.manager, '_shutdown', new=AsyncMock()):
|
||||||
|
self.manager.stop_instance('pi-stop')
|
||||||
|
self.assertNotIn('pi-stop', self.manager._instances)
|
||||||
|
|
||||||
|
async def test_stop_nonexistent_instance_is_noop(self):
|
||||||
|
# Should not raise
|
||||||
|
self.manager.stop_instance('ghost')
|
||||||
|
|
||||||
|
async def test_send_serial_bytes_writes_to_writer(self):
|
||||||
|
from app.services.qemu_manager import PiInstance
|
||||||
|
cb = AsyncMock()
|
||||||
|
inst = PiInstance('pi-serial', cb)
|
||||||
|
writer = AsyncMock()
|
||||||
|
writer.drain = AsyncMock()
|
||||||
|
inst._serial_writer = writer
|
||||||
|
inst.running = True
|
||||||
|
self.manager._instances['pi-serial'] = inst
|
||||||
|
|
||||||
|
await self.manager.send_serial_bytes('pi-serial', b'Hello')
|
||||||
|
writer.write.assert_called_once_with(b'Hello')
|
||||||
|
writer.drain.assert_called_once()
|
||||||
|
|
||||||
|
async def test_send_serial_bytes_to_unknown_instance_is_noop(self):
|
||||||
|
# Should not raise
|
||||||
|
await self.manager.send_serial_bytes('ghost', b'hi')
|
||||||
|
|
||||||
|
async def test_set_pin_state_calls_send_gpio(self):
|
||||||
|
from app.services.qemu_manager import PiInstance
|
||||||
|
cb = AsyncMock()
|
||||||
|
inst = PiInstance('pi-gpio', cb)
|
||||||
|
writer = AsyncMock()
|
||||||
|
writer.drain = AsyncMock()
|
||||||
|
inst._gpio_writer = writer
|
||||||
|
inst.running = True
|
||||||
|
self.manager._instances['pi-gpio'] = inst
|
||||||
|
|
||||||
|
with patch.object(self.manager, '_send_gpio', new=AsyncMock()) as mock_send:
|
||||||
|
self.manager.set_pin_state('pi-gpio', 17, 1)
|
||||||
|
await asyncio.sleep(0) # Allow the create_task to run
|
||||||
|
# We can't easily await the task here, but we verify the method is set up
|
||||||
|
|
||||||
|
async def test_emit_calls_callback(self):
|
||||||
|
from app.services.qemu_manager import PiInstance
|
||||||
|
cb = AsyncMock()
|
||||||
|
inst = PiInstance('pi-emit', cb)
|
||||||
|
await inst.emit('serial_output', {'data': 'hello'})
|
||||||
|
cb.assert_awaited_once_with('serial_output', {'data': 'hello'})
|
||||||
|
|
||||||
|
async def test_emit_handles_callback_exception(self):
|
||||||
|
from app.services.qemu_manager import PiInstance
|
||||||
|
cb = AsyncMock(side_effect=RuntimeError('boom'))
|
||||||
|
inst = PiInstance('pi-err', cb)
|
||||||
|
# Should not raise
|
||||||
|
await inst.emit('serial_output', {'data': 'x'})
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# 3. GPIO shim protocol parsing (handle_gpio_line)
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestGpioShimProtocol(unittest.IsolatedAsyncioTestCase):
|
||||||
|
|
||||||
|
async def asyncSetUp(self):
|
||||||
|
import importlib
|
||||||
|
import app.services.qemu_manager as qm_mod
|
||||||
|
importlib.reload(qm_mod)
|
||||||
|
from app.services.qemu_manager import QemuManager, PiInstance
|
||||||
|
self.manager = QemuManager()
|
||||||
|
self.cb = AsyncMock()
|
||||||
|
self.inst = PiInstance('pi-gpio', self.cb)
|
||||||
|
self.manager._instances['pi-gpio'] = self.inst
|
||||||
|
|
||||||
|
async def test_valid_gpio_line_emits_gpio_change(self):
|
||||||
|
await self.manager._handle_gpio_line(self.inst, 'GPIO 17 1')
|
||||||
|
self.cb.assert_awaited_once_with('gpio_change', {'pin': 17, 'state': 1})
|
||||||
|
|
||||||
|
async def test_gpio_line_low(self):
|
||||||
|
await self.manager._handle_gpio_line(self.inst, 'GPIO 18 0')
|
||||||
|
self.cb.assert_awaited_once_with('gpio_change', {'pin': 18, 'state': 0})
|
||||||
|
|
||||||
|
async def test_malformed_gpio_line_is_ignored(self):
|
||||||
|
await self.manager._handle_gpio_line(self.inst, 'INVALID DATA')
|
||||||
|
self.cb.assert_not_awaited()
|
||||||
|
|
||||||
|
async def test_gpio_line_wrong_prefix_is_ignored(self):
|
||||||
|
await self.manager._handle_gpio_line(self.inst, 'SET 17 1')
|
||||||
|
self.cb.assert_not_awaited()
|
||||||
|
|
||||||
|
async def test_gpio_line_non_numeric_pin_is_ignored(self):
|
||||||
|
await self.manager._handle_gpio_line(self.inst, 'GPIO abc 1')
|
||||||
|
self.cb.assert_not_awaited()
|
||||||
|
|
||||||
|
async def test_gpio_line_with_trailing_whitespace(self):
|
||||||
|
await self.manager._handle_gpio_line(self.inst, 'GPIO 22 1 ')
|
||||||
|
self.cb.assert_awaited_once_with('gpio_change', {'pin': 22, 'state': 1})
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# 4. WebSocket simulation route — message handling
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestSimulationWebSocketMessages(unittest.IsolatedAsyncioTestCase):
|
||||||
|
"""
|
||||||
|
Tests the simulation.py route logic by calling its handler directly,
|
||||||
|
bypassing the real FastAPI/Starlette plumbing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def asyncSetUp(self):
|
||||||
|
import importlib
|
||||||
|
import app.services.qemu_manager as qm_mod
|
||||||
|
importlib.reload(qm_mod)
|
||||||
|
import app.api.routes.simulation as sim_mod
|
||||||
|
importlib.reload(sim_mod)
|
||||||
|
self.sim_mod = sim_mod
|
||||||
|
self.qm = qm_mod.qemu_manager
|
||||||
|
|
||||||
|
def _make_ws(self, messages: list[dict]):
|
||||||
|
"""Build a mock WebSocket that yields messages then disconnects."""
|
||||||
|
ws = MagicMock()
|
||||||
|
ws.accept = AsyncMock()
|
||||||
|
message_iter = iter([json.dumps(m) for m in messages])
|
||||||
|
|
||||||
|
async def receive_text():
|
||||||
|
try:
|
||||||
|
return next(message_iter)
|
||||||
|
except StopIteration:
|
||||||
|
from fastapi.websockets import WebSocketDisconnect
|
||||||
|
raise WebSocketDisconnect()
|
||||||
|
|
||||||
|
ws.receive_text = receive_text
|
||||||
|
ws.send_text = AsyncMock()
|
||||||
|
return ws
|
||||||
|
|
||||||
|
async def test_start_pi_message_calls_start_instance(self):
|
||||||
|
ws = self._make_ws([{'type': 'start_pi', 'data': {'board': 'raspberry-pi-3'}}])
|
||||||
|
with patch.object(self.qm, 'start_instance') as mock_start:
|
||||||
|
with patch.object(self.qm, 'stop_instance'):
|
||||||
|
try:
|
||||||
|
await self.sim_mod.simulation_websocket(ws, 'ws-test-1')
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
mock_start.assert_called_once()
|
||||||
|
call_args = mock_start.call_args
|
||||||
|
self.assertEqual(call_args[0][0], 'ws-test-1')
|
||||||
|
self.assertEqual(call_args[0][1], 'raspberry-pi-3')
|
||||||
|
|
||||||
|
async def test_stop_pi_message_calls_stop_instance(self):
|
||||||
|
ws = self._make_ws([{'type': 'stop_pi', 'data': {}}])
|
||||||
|
with patch.object(self.qm, 'stop_instance') as mock_stop:
|
||||||
|
try:
|
||||||
|
await self.sim_mod.simulation_websocket(ws, 'ws-test-2')
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
mock_stop.assert_called_with('ws-test-2')
|
||||||
|
|
||||||
|
async def test_serial_input_message_sends_bytes(self):
|
||||||
|
ws = self._make_ws([{'type': 'serial_input', 'data': {'bytes': [72, 101, 108]}}])
|
||||||
|
with patch.object(self.qm, 'send_serial_bytes', new=AsyncMock()) as mock_serial:
|
||||||
|
with patch.object(self.qm, 'stop_instance'):
|
||||||
|
try:
|
||||||
|
await self.sim_mod.simulation_websocket(ws, 'ws-test-3')
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
mock_serial.assert_awaited_once_with('ws-test-3', bytes([72, 101, 108]))
|
||||||
|
|
||||||
|
async def test_gpio_in_message_calls_set_pin_state(self):
|
||||||
|
ws = self._make_ws([{'type': 'gpio_in', 'data': {'pin': 17, 'state': 1}}])
|
||||||
|
with patch.object(self.qm, 'set_pin_state') as mock_pin:
|
||||||
|
with patch.object(self.qm, 'stop_instance'):
|
||||||
|
try:
|
||||||
|
await self.sim_mod.simulation_websocket(ws, 'ws-test-4')
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
mock_pin.assert_called_with('ws-test-4', 17, 1)
|
||||||
|
|
||||||
|
async def test_legacy_pin_change_message_calls_set_pin_state(self):
|
||||||
|
"""pin_change is the legacy alias for gpio_in."""
|
||||||
|
ws = self._make_ws([{'type': 'pin_change', 'data': {'pin': 22, 'state': 0}}])
|
||||||
|
with patch.object(self.qm, 'set_pin_state') as mock_pin:
|
||||||
|
with patch.object(self.qm, 'stop_instance'):
|
||||||
|
try:
|
||||||
|
await self.sim_mod.simulation_websocket(ws, 'ws-test-5')
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
mock_pin.assert_called_with('ws-test-5', 22, 0)
|
||||||
|
|
||||||
|
async def test_qemu_callback_sends_json_to_ws(self):
|
||||||
|
"""The qemu_callback passed to start_instance must forward events as JSON."""
|
||||||
|
sent_payloads = []
|
||||||
|
captured_callback = None
|
||||||
|
|
||||||
|
# start_instance is a sync method — capture callback for later invocation
|
||||||
|
def fake_start(client_id, board_type, callback):
|
||||||
|
nonlocal captured_callback
|
||||||
|
captured_callback = callback
|
||||||
|
|
||||||
|
ws = self._make_ws([{'type': 'start_pi', 'data': {'board': 'raspberry-pi-3'}}])
|
||||||
|
ws.send_text = AsyncMock(side_effect=lambda msg: sent_payloads.append(json.loads(msg)))
|
||||||
|
|
||||||
|
with patch.object(self.qm, 'start_instance', side_effect=fake_start):
|
||||||
|
with patch.object(self.qm, 'stop_instance'):
|
||||||
|
try:
|
||||||
|
await self.sim_mod.simulation_websocket(ws, 'ws-test-6')
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# After WebSocketDisconnect the route removes the client from active_connections.
|
||||||
|
# Re-register it so the captured callback can write back to the mock WS.
|
||||||
|
self.assertIsNotNone(captured_callback, 'start_instance was never called with a callback')
|
||||||
|
self.sim_mod.manager.active_connections['ws-test-6'] = ws
|
||||||
|
await captured_callback('serial_output', {'data': 'Hello Pi\n'})
|
||||||
|
|
||||||
|
self.assertTrue(any(p.get('type') == 'serial_output' for p in sent_payloads))
|
||||||
|
serial_msg = next(p for p in sent_payloads if p['type'] == 'serial_output')
|
||||||
|
self.assertEqual(serial_msg['data']['data'], 'Hello Pi\n')
|
||||||
|
|
||||||
|
async def test_unknown_message_type_is_ignored(self):
|
||||||
|
"""Unknown message types must not raise."""
|
||||||
|
ws = self._make_ws([{'type': 'unknown_type', 'data': {}}])
|
||||||
|
with patch.object(self.qm, 'stop_instance'):
|
||||||
|
try:
|
||||||
|
await self.sim_mod.simulation_websocket(ws, 'ws-test-7')
|
||||||
|
except Exception:
|
||||||
|
pass # WebSocketDisconnect is expected after messages are exhausted
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# 5. GPIO shim module (gpio_shim.py) unit tests
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestGpioShimModule(unittest.TestCase):
|
||||||
|
"""Tests gpio_shim.py constants, mode, and API surface without real hardware."""
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
# Load gpio_shim as a module (it runs inside the Pi but we test it here)
|
||||||
|
import importlib.util, pathlib
|
||||||
|
shim_path = pathlib.Path(ROOT) / 'backend' / 'app' / 'services' / 'gpio_shim.py'
|
||||||
|
spec = importlib.util.spec_from_file_location('gpio_shim', shim_path)
|
||||||
|
cls.shim = importlib.util.module_from_spec(spec)
|
||||||
|
# Stub the background thread's tty so it doesn't block
|
||||||
|
with patch('builtins.open', side_effect=OSError('no tty')):
|
||||||
|
try:
|
||||||
|
if spec and spec.loader:
|
||||||
|
spec.loader.exec_module(cls.shim) # type: ignore[union-attr]
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
# Reset internal state between tests
|
||||||
|
self.shim._mode = None
|
||||||
|
self.shim._pin_dir.clear()
|
||||||
|
self.shim._pin_val.clear()
|
||||||
|
self.shim._callbacks.clear()
|
||||||
|
self.shim._tty = None
|
||||||
|
|
||||||
|
def test_constants(self):
|
||||||
|
self.assertEqual(self.shim.BCM, 'BCM')
|
||||||
|
self.assertEqual(self.shim.BOARD, 'BOARD')
|
||||||
|
self.assertEqual(self.shim.OUT, 1)
|
||||||
|
self.assertEqual(self.shim.IN, 0)
|
||||||
|
self.assertEqual(self.shim.HIGH, 1)
|
||||||
|
self.assertEqual(self.shim.LOW, 0)
|
||||||
|
|
||||||
|
def test_setmode_bcm(self):
|
||||||
|
self.shim.setmode(self.shim.BCM)
|
||||||
|
self.assertEqual(self.shim.getmode(), 'BCM')
|
||||||
|
|
||||||
|
def test_setup_out_pin(self):
|
||||||
|
self.shim.setmode(self.shim.BCM)
|
||||||
|
self.shim.setup(17, self.shim.OUT, initial=self.shim.LOW)
|
||||||
|
self.assertEqual(self.shim._pin_dir.get(17), self.shim.OUT)
|
||||||
|
self.assertEqual(self.shim._pin_val.get(17), 0)
|
||||||
|
|
||||||
|
def test_output_updates_pin_val_and_sends(self):
|
||||||
|
self.shim.setmode(self.shim.BCM)
|
||||||
|
self.shim.setup(17, self.shim.OUT)
|
||||||
|
sent_lines = []
|
||||||
|
with patch.object(self.shim, '_send', side_effect=sent_lines.append):
|
||||||
|
self.shim.output(17, self.shim.HIGH)
|
||||||
|
self.assertEqual(self.shim._pin_val.get(17), 1)
|
||||||
|
self.assertIn('GPIO 17 1', sent_lines)
|
||||||
|
|
||||||
|
def test_output_low(self):
|
||||||
|
self.shim.setmode(self.shim.BCM)
|
||||||
|
self.shim.setup(17, self.shim.OUT)
|
||||||
|
sent_lines = []
|
||||||
|
with patch.object(self.shim, '_send', side_effect=sent_lines.append):
|
||||||
|
self.shim.output(17, self.shim.LOW)
|
||||||
|
self.assertIn('GPIO 17 0', sent_lines)
|
||||||
|
|
||||||
|
def test_input_reads_pin_val(self):
|
||||||
|
self.shim._pin_val[17] = 1
|
||||||
|
self.assertEqual(self.shim.input(17), 1)
|
||||||
|
self.shim._pin_val[17] = 0
|
||||||
|
self.assertEqual(self.shim.input(17), 0)
|
||||||
|
|
||||||
|
def test_input_unset_pin_returns_zero(self):
|
||||||
|
self.assertEqual(self.shim.input(99), 0)
|
||||||
|
|
||||||
|
def test_physical_to_bcm_mapping(self):
|
||||||
|
"""BCM mode: pin passes through; BOARD mode: uses physical map."""
|
||||||
|
self.shim.setmode(self.shim.BCM)
|
||||||
|
# In BCM mode, _to_bcm(17) == 17
|
||||||
|
self.assertEqual(self.shim._to_bcm(17), 17)
|
||||||
|
|
||||||
|
self.shim._mode = self.shim.BOARD
|
||||||
|
# Physical pin 11 → BCM 17
|
||||||
|
self.assertEqual(self.shim._to_bcm(11), 17)
|
||||||
|
|
||||||
|
def test_add_event_detect_registers_callback(self):
|
||||||
|
cb = MagicMock()
|
||||||
|
self.shim.setmode(self.shim.BCM)
|
||||||
|
self.shim.add_event_detect(17, self.shim.RISING, callback=cb)
|
||||||
|
self.assertIn(cb, self.shim._callbacks.get(17, []))
|
||||||
|
|
||||||
|
def test_remove_event_detect_clears_callbacks(self):
|
||||||
|
cb = MagicMock()
|
||||||
|
self.shim.setmode(self.shim.BCM)
|
||||||
|
self.shim.add_event_detect(17, self.shim.RISING, callback=cb)
|
||||||
|
self.shim.remove_event_detect(17)
|
||||||
|
self.assertNotIn(17, self.shim._callbacks)
|
||||||
|
|
||||||
|
def test_pwm_start_sends_gpio(self):
|
||||||
|
self.shim.setmode(self.shim.BCM)
|
||||||
|
pwm = self.shim.PWM(18, 1000)
|
||||||
|
sent = []
|
||||||
|
with patch.object(self.shim, '_send', side_effect=sent.append):
|
||||||
|
pwm.start(100) # 100% duty cycle → HIGH
|
||||||
|
self.assertIn('GPIO 18 1', sent)
|
||||||
|
|
||||||
|
def test_pwm_stop_sends_low(self):
|
||||||
|
self.shim.setmode(self.shim.BCM)
|
||||||
|
pwm = self.shim.PWM(18, 1000)
|
||||||
|
pwm.start(100)
|
||||||
|
sent = []
|
||||||
|
with patch.object(self.shim, '_send', side_effect=sent.append):
|
||||||
|
pwm.stop()
|
||||||
|
self.assertIn('GPIO 18 0', sent)
|
||||||
|
|
||||||
|
def test_version_info(self):
|
||||||
|
self.assertIsInstance(self.shim.VERSION, str)
|
||||||
|
self.assertIsInstance(self.shim.RPI_INFO, dict)
|
||||||
|
self.assertIn('PROCESSOR', self.shim.RPI_INFO)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main(verbosity=2)
|
||||||
Loading…
Reference in New Issue