diff --git a/backend/app/services/esp32_worker.py b/backend/app/services/esp32_worker.py index 795864c..7268100 100644 --- a/backend/app/services/esp32_worker.py +++ b/backend/app/services/esp32_worker.py @@ -192,6 +192,7 @@ def main() -> None: # noqa: C901 (complexity OK for inline worker) except Exception as exc: _emit({'type': 'error', 'message': f'Cannot load DLL: {exc}'}) os._exit(1) + lib.qemu_picsimlab_get_internals.restype = ctypes.c_void_p # ── 3. Write firmware to a temp file ────────────────────────────────────── try: @@ -231,6 +232,37 @@ def main() -> None: # noqa: C901 (complexity OK for inline worker) # ESP32 signal indices: 72-79 = LEDC HS ch 0-7, 80-87 = LEDC LS ch 0-7 _ledc_gpio_map: dict[int, int] = {} + _out_sel_dumped = [False] # one-time diagnostic dump flag + + def _refresh_ledc_gpio_map() -> None: + """Scan gpio_out_sel[40] registers and update _ledc_gpio_map. + + Called eagerly from the 0x5000 LEDC duty callback on cache miss, + and periodically from the LEDC polling thread. + """ + try: + out_sel_ptr = lib.qemu_picsimlab_get_internals(2) + if not out_sel_ptr: + _log('LEDC gpio_out_sel: internals(2) returned NULL') + return + out_sel = (ctypes.c_uint32 * 40).from_address(out_sel_ptr) + # One-time dump of ALL gpio_out_sel values for diagnostics + if not _out_sel_dumped[0]: + _out_sel_dumped[0] = True + non_default = {pin: int(out_sel[pin]) for pin in range(40) + if int(out_sel[pin]) != 256 and int(out_sel[pin]) != 0} + _log(f'LEDC gpio_out_sel dump (non-default): {non_default}') + _log(f'LEDC gpio_out_sel ALL: {[int(out_sel[i]) for i in range(40)]}') + for gpio_pin in range(40): + signal = int(out_sel[gpio_pin]) & 0xFF + if 72 <= signal <= 87: + ledc_ch = signal - 72 + if _ledc_gpio_map.get(ledc_ch) != gpio_pin: + _ledc_gpio_map[ledc_ch] = gpio_pin + _log(f'LEDC map: ch{ledc_ch} -> GPIO{gpio_pin} (signal={signal})') + except Exception as e: + _log(f'LEDC gpio_out_sel scan error: {e}') + # Sensor state: gpio_pin → {type, properties..., saw_low, responding} _sensors: dict[int, dict] = {} _sensors_lock = threading.Lock() @@ -424,7 +456,11 @@ def main() -> None: # noqa: C901 (complexity OK for inline worker) if marker == 0x5000: # LEDC duty change (from esp32_ledc.c) ledc_ch = (direction >> 8) & 0x0F intensity = direction & 0xFF # 0-100 percentage + _log(f'0x5000 callback: direction=0x{direction:04X} ch={ledc_ch} intensity={intensity} map={dict(_ledc_gpio_map)}') gpio = _ledc_gpio_map.get(ledc_ch, -1) + if gpio == -1: + _refresh_ledc_gpio_map() + gpio = _ledc_gpio_map.get(ledc_ch, -1) _emit({'type': 'ledc_update', 'channel': ledc_ch, 'duty': intensity, 'duty_pct': intensity, @@ -584,7 +620,6 @@ def main() -> None: # noqa: C901 (complexity OK for inline worker) # ── 7. LEDC polling thread (100 ms interval) ────────────────────────────── def _ledc_poll_thread() -> None: - lib.qemu_picsimlab_get_internals.restype = ctypes.c_void_p # Track last-emitted duty to avoid flooding identical updates _last_duty = [0.0] * 16 _diag_count = [0] @@ -599,17 +634,7 @@ def main() -> None: # noqa: C901 (complexity OK for inline worker) # duty[] is float[16] in QEMU (percentage 0-100) arr = (ctypes.c_float * 16).from_address(ptr) # Refresh LEDC→GPIO mapping from gpio_out_sel[40] registers - out_sel_ptr = lib.qemu_picsimlab_get_internals(2) - if out_sel_ptr: - out_sel = (ctypes.c_uint32 * 40).from_address(out_sel_ptr) - for gpio_pin in range(40): - signal = int(out_sel[gpio_pin]) & 0xFF - # Signal 72-79 = LEDC HS ch 0-7; 80-87 = LEDC LS ch 0-7 - if 72 <= signal <= 87: - ledc_ch = signal - 72 - if _ledc_gpio_map.get(ledc_ch) != gpio_pin: - _ledc_gpio_map[ledc_ch] = gpio_pin - _log(f'LEDC map: ch{ledc_ch} → GPIO{gpio_pin} (signal={signal})') + _refresh_ledc_gpio_map() # Log once when nonzero duties first appear if not _first_nonzero_logged[0]: nonzero = {ch: round(float(arr[ch]), 2) for ch in range(16) @@ -618,6 +643,31 @@ def main() -> None: # noqa: C901 (complexity OK for inline worker) _log(f'LEDC first nonzero at poll #{_diag_count[0]}: ' f'duties={nonzero} gpio_map={dict(_ledc_gpio_map)}') _first_nonzero_logged[0] = True + # Periodic diagnostic dump every 50 polls (~5s) + if _diag_count[0] % 50 == 0: + all_duties = {ch: round(float(arr[ch]), 4) for ch in range(16) + if float(arr[ch]) != 0.0} + # Also read LEDC channel conf (internals(4)) and timer freq (internals(5)) + diag_parts = [f'duties={all_duties}', f'gpio_map={dict(_ledc_gpio_map)}'] + try: + conf_ptr = lib.qemu_picsimlab_get_internals(4) + if conf_ptr: + conf_arr = (ctypes.c_uint32 * 16).from_address(conf_ptr) + nonzero_conf = {ch: hex(int(conf_arr[ch])) for ch in range(16) + if int(conf_arr[ch]) != 0} + diag_parts.append(f'ch_conf={nonzero_conf}') + except Exception: + pass + try: + freq_ptr = lib.qemu_picsimlab_get_internals(5) + if freq_ptr: + freq_arr = (ctypes.c_uint32 * 8).from_address(freq_ptr) + nonzero_freq = {t: int(freq_arr[t]) for t in range(8) + if int(freq_arr[t]) != 0} + diag_parts.append(f'timer_freq={nonzero_freq}') + except Exception: + pass + _log(f'LEDC poll #{_diag_count[0]}: {" | ".join(diag_parts)}') for ch in range(16): duty_pct = float(arr[ch]) if abs(duty_pct - _last_duty[ch]) < 0.01: @@ -654,7 +704,10 @@ def main() -> None: # noqa: C901 (complexity OK for inline worker) elif c == 'set_adc': raw_v = int(int(cmd['millivolts']) * 4095 / 3300) - lib.qemu_picsimlab_set_apin(int(cmd['channel']), max(0, min(4095, raw_v))) + ch = int(cmd['channel']) + clamped = max(0, min(4095, raw_v)) + _log(f'set_adc: ch={ch} mv={cmd["millivolts"]} raw={clamped}') + lib.qemu_picsimlab_set_apin(ch, clamped) elif c == 'set_adc_raw': lib.qemu_picsimlab_set_apin(int(cmd['channel']), diff --git a/frontend/public/components-metadata.json b/frontend/public/components-metadata.json index ed817b2..c57dc6e 100644 --- a/frontend/public/components-metadata.json +++ b/frontend/public/components-metadata.json @@ -1,6 +1,6 @@ { "version": "1.0.0", - "generatedAt": "2026-03-23T13:04:13.804Z", + "generatedAt": "2026-03-24T02:30:06.881Z", "components": [ { "id": "arduino-mega", diff --git a/frontend/src/__tests__/esp32-servo-pot.test.ts b/frontend/src/__tests__/esp32-servo-pot.test.ts index 27c6aee..d180291 100644 --- a/frontend/src/__tests__/esp32-servo-pot.test.ts +++ b/frontend/src/__tests__/esp32-servo-pot.test.ts @@ -729,3 +729,94 @@ describe('LEDC polling — data format', () => { expect(emitted[1]).toEqual({ ch: 0, duty: 12.0 }); }); }); + +// ───────────────────────────────────────────────────────────────────────────── +// 14. PinManager.broadcastPwm — gpio=-1 fallback +// ───────────────────────────────────────────────────────────────────────────── + +describe('PinManager.broadcastPwm fallback', () => { + // Use a simple inline PinManager-like class to test broadcastPwm logic + // (The real PinManager can't be imported here due to vi.mock overrides) + + class SimplePinManager { + private pwmListeners = new Map void>>(); + private pwmValues = new Map(); + + onPwmChange(pin: number, cb: (pin: number, duty: number) => void): () => void { + if (!this.pwmListeners.has(pin)) this.pwmListeners.set(pin, new Set()); + this.pwmListeners.get(pin)!.add(cb); + return () => { this.pwmListeners.get(pin)?.delete(cb); }; + } + + updatePwm(pin: number, dutyCycle: number): void { + this.pwmValues.set(pin, dutyCycle); + this.pwmListeners.get(pin)?.forEach(cb => cb(pin, dutyCycle)); + } + + broadcastPwm(dutyCycle: number): void { + this.pwmListeners.forEach((callbacks, pin) => { + this.pwmValues.set(pin, dutyCycle); + callbacks.forEach(cb => cb(pin, dutyCycle)); + }); + } + } + + it('broadcastPwm dispatches to all registered PWM listeners', () => { + const pm = new SimplePinManager(); + const received: { pin: number; duty: number }[] = []; + + pm.onPwmChange(13, (pin, duty) => received.push({ pin, duty })); + pm.onPwmChange(5, (pin, duty) => received.push({ pin, duty })); + + pm.broadcastPwm(0.075); // 7.5% servo duty + + expect(received).toHaveLength(2); + expect(received).toContainEqual({ pin: 13, duty: 0.075 }); + expect(received).toContainEqual({ pin: 5, duty: 0.075 }); + }); + + it('broadcastPwm does nothing when no listeners registered', () => { + const pm = new SimplePinManager(); + // Should not throw + pm.broadcastPwm(0.075); + }); + + it('servo filters broadcastPwm by duty range (0.01-0.20)', () => { + const MIN_DC = 544 / 20000; // 0.0272 + const MAX_DC = 2400 / 20000; // 0.12 + let servoAngle = -1; + + const servoCallback = (_pin: number, dutyCycle: number) => { + if (dutyCycle < 0.01 || dutyCycle > 0.20) return; + const angle = Math.round(((dutyCycle - MIN_DC) / (MAX_DC - MIN_DC)) * 180); + servoAngle = Math.max(0, Math.min(180, angle)); + }; + + servoCallback(13, 0.075); + expect(servoAngle).toBeGreaterThanOrEqual(88); + expect(servoAngle).toBeLessThanOrEqual(95); + + const prevAngle = servoAngle; + servoCallback(13, 0.50); // 50% — not a servo signal + expect(servoAngle).toBe(prevAngle); + }); + + it('onLedcUpdate with gpio=-1 should use broadcastPwm (integration logic)', () => { + const pm = new SimplePinManager(); + const received: { pin: number; duty: number }[] = []; + + pm.onPwmChange(13, (pin, duty) => received.push({ pin, duty })); + + const update = { channel: 0, duty: 7.36, duty_pct: 7.36, gpio: -1 }; + const dutyCycle = update.duty_pct / 100; + + if (update.gpio >= 0) { + pm.updatePwm(update.gpio, dutyCycle); + } else { + pm.broadcastPwm(dutyCycle); + } + + expect(received).toHaveLength(1); + expect(received[0]).toEqual({ pin: 13, duty: 0.0736 }); + }); +}); diff --git a/frontend/src/simulation/PinManager.ts b/frontend/src/simulation/PinManager.ts index 3e17022..c8c37c8 100644 --- a/frontend/src/simulation/PinManager.ts +++ b/frontend/src/simulation/PinManager.ts @@ -130,6 +130,18 @@ export class PinManager { } } + /** + * Broadcast PWM duty to ALL registered PWM listeners. + * Used when the LEDC channel→GPIO mapping is unknown (gpio=-1). + * Components filter by duty range (e.g., servo accepts 0.01-0.20). + */ + broadcastPwm(dutyCycle: number): void { + this.pwmListeners.forEach((callbacks, pin) => { + this.pwmValues.set(pin, dutyCycle); + callbacks.forEach(cb => cb(pin, dutyCycle)); + }); + } + getPwmValue(pin: number): number { return this.pwmValues.get(pin) ?? 0; } diff --git a/frontend/src/simulation/parts/ComplexParts.ts b/frontend/src/simulation/parts/ComplexParts.ts index 414fccd..e01e491 100644 --- a/frontend/src/simulation/parts/ComplexParts.ts +++ b/frontend/src/simulation/parts/ComplexParts.ts @@ -71,9 +71,12 @@ PartSimulationRegistry.register('potentiometer', { const isESP32 = typeof (simulator as any).setAdcVoltage === 'function'; const refVoltage = (isRP2040 || isESP32) ? 3.3 : 5.0; + console.log(`[Pot] attached: pin=${pin} isESP32=${isESP32} refV=${refVoltage}`); + const onInput = () => { const raw = parseInt((element as any).value || '0', 10); const volts = (raw / 1023.0) * refVoltage; + console.log(`[Pot] onInput: raw=${raw} volts=${volts.toFixed(3)} pin=${pin}`); setAdcVoltage(simulator, pin, volts); }; @@ -335,11 +338,14 @@ PartSimulationRegistry.register('servo', { // 544µs = 2.72%, 2400µs = 12.0% const MIN_DC = MIN_PULSE_US / 20000; // 0.0272 const MAX_DC = MAX_PULSE_US / 20000; // 0.12 + console.log(`[Servo:ESP32] registering onPwmChange on pin=${pinSIG}`); const unsubscribe = pinManager.onPwmChange(pinSIG, (_pin, dutyCycle) => { + console.log(`[Servo:ESP32] onPwmChange pin=${_pin} dutyCycle=${dutyCycle.toFixed(4)}`); if (dutyCycle < 0.01 || dutyCycle > 0.20) return; // ignore out-of-range const angle = Math.round( ((dutyCycle - MIN_DC) / (MAX_DC - MIN_DC)) * 180 ); + console.log(`[Servo:ESP32] angle=${angle}`); el.angle = Math.max(0, Math.min(180, angle)); }); return () => { unsubscribe(); }; diff --git a/frontend/src/store/useSimulatorStore.ts b/frontend/src/store/useSimulatorStore.ts index f9cfa51..30dc70c 100644 --- a/frontend/src/store/useSimulatorStore.ts +++ b/frontend/src/store/useSimulatorStore.ts @@ -112,6 +112,23 @@ class Esp32BridgeShim { } } +// ── Shared LEDC update handler (used by addBoard, setBoardType, initSimulator) ─ +function makeLedcUpdateHandler(boardId: string) { + return (update: { channel: number; duty_pct: number; gpio?: number }) => { + const boardPm = pinManagerMap.get(boardId); + if (!boardPm) return; + const dutyCycle = update.duty_pct / 100; + if (update.gpio !== undefined && update.gpio >= 0) { + boardPm.updatePwm(update.gpio, dutyCycle); + } else { + // gpio unknown (QEMU doesn't expose gpio_out_sel for LEDC): + // broadcast to ALL PWM listeners. Components filter by duty range + // (servo accepts 0.01–0.20, LEDs use 0–1.0). + boardPm.broadcastPwm(dutyCycle); + } + }; +} + // ── Runtime Maps (outside Zustand — not serialisable) ───────────────────── const simulatorMap = new Map(); const pinManagerMap = new Map(); @@ -390,18 +407,7 @@ export const useSimulatorStore = create((set, get) => { return { boards, ...(isActive ? { running: false } : {}) }; }); }; - bridge.onLedcUpdate = (update) => { - // Route LEDC duty cycles to PinManager as PWM (0.0–1.0). - // If gpio is known (from GPIO out_sel sync), use the actual GPIO pin; - // otherwise fall back to the LEDC channel number. - const boardPm = pinManagerMap.get(id); - if (boardPm) { - const targetPin = (update.gpio !== undefined && update.gpio >= 0) - ? update.gpio - : update.channel; - boardPm.updatePwm(targetPin, update.duty_pct / 100); - } - }; + bridge.onLedcUpdate = makeLedcUpdateHandler(id); bridge.onWs2812Update = (channel, pixels) => { // Forward WS2812 pixel data to any DOM element with id=`ws2812-{id}-{channel}` // (set by NeoPixel components rendered in SimulatorCanvas). @@ -767,15 +773,7 @@ export const useSimulatorStore = create((set, get) => { return { boards, ...(isActive ? { running: false } : {}) }; }); }; - bridge.onLedcUpdate = (update) => { - const boardPm = pinManagerMap.get(boardId); - if (boardPm && typeof boardPm.updatePwm === 'function') { - const targetPin = (update.gpio !== undefined && update.gpio >= 0) - ? update.gpio - : update.channel; - boardPm.updatePwm(targetPin, update.duty_pct / 100); - } - }; + bridge.onLedcUpdate = makeLedcUpdateHandler(boardId); bridge.onWs2812Update = (channel, pixels) => { const eventTarget = document.getElementById(`ws2812-${boardId}-${channel}`); if (eventTarget) { @@ -863,15 +861,7 @@ export const useSimulatorStore = create((set, get) => { return { boards, ...(isActive ? { running: false } : {}) }; }); }; - bridge.onLedcUpdate = (update) => { - const boardPm = pinManagerMap.get(boardId); - if (boardPm && typeof boardPm.updatePwm === 'function') { - const targetPin = (update.gpio !== undefined && update.gpio >= 0) - ? update.gpio - : update.channel; - boardPm.updatePwm(targetPin, update.duty_pct / 100); - } - }; + bridge.onLedcUpdate = makeLedcUpdateHandler(boardId); esp32BridgeMap.set(boardId, bridge); const shim = new Esp32BridgeShim(bridge, pm); shim.onSerialData = serialCallback; diff --git a/test/esp32/test_ledc_gpio_race.py b/test/esp32/test_ledc_gpio_race.py new file mode 100644 index 0000000..3a8e34b --- /dev/null +++ b/test/esp32/test_ledc_gpio_race.py @@ -0,0 +1,231 @@ +""" +test_ledc_gpio_race.py + +Tests for the LEDC GPIO mapping race condition fix in esp32_worker.py. + +The core issue: when firmware calls ledcWrite(), the 0x5000 sync callback +fires immediately but the LEDC polling thread (100ms interval) hasn't yet +scanned gpio_out_sel to build _ledc_gpio_map. The fix adds an eager scan +inside the 0x5000 handler on cache miss. + +These tests exercise the scan logic and the cache-miss-triggered refresh +as pure functions, without loading QEMU. +""" + + + +# --------------------------------------------------------------------------- +# Replicate the scan logic from esp32_worker._refresh_ledc_gpio_map() +# so we can test it in isolation. +# --------------------------------------------------------------------------- + +def scan_out_sel(out_sel: list[int]) -> dict[int, int]: + """Pure-Python equivalent of the gpio_out_sel scan in the worker. + + Returns a dict mapping LEDC channel -> GPIO pin. + Signal 72-79 = LEDC HS ch 0-7; 80-87 = LEDC LS ch 0-7. + """ + ledc_gpio_map: dict[int, int] = {} + for gpio_pin in range(len(out_sel)): + signal = out_sel[gpio_pin] & 0xFF + if 72 <= signal <= 87: + ledc_ch = signal - 72 + ledc_gpio_map[ledc_ch] = gpio_pin + return ledc_gpio_map + + +def simulate_0x5000_handler( + direction: int, + ledc_gpio_map: dict[int, int], + out_sel: list[int] | None = None, +) -> dict: + """Simulate the 0x5000 LEDC duty callback with the eager-scan fix. + + If the map has no entry for the channel and out_sel is provided, + it performs an eager scan (like the real worker does). + + Returns the event dict that would be emitted. + """ + ledc_ch = (direction >> 8) & 0x0F + intensity = direction & 0xFF + gpio = ledc_gpio_map.get(ledc_ch, -1) + if gpio == -1 and out_sel is not None: + # Eager scan (same as _refresh_ledc_gpio_map) + refreshed = scan_out_sel(out_sel) + ledc_gpio_map.update(refreshed) + gpio = ledc_gpio_map.get(ledc_ch, -1) + return { + 'type': 'ledc_update', + 'channel': ledc_ch, + 'duty': intensity, + 'duty_pct': intensity, + 'gpio': gpio, + } + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +class TestScanOutSel: + """Test the gpio_out_sel scanning logic.""" + + def test_detects_ledc_hs_ch0_on_gpio13(self): + out_sel = [256] * 40 # 256 = no function assigned + out_sel[13] = 72 # LEDC HS ch0 -> GPIO 13 + result = scan_out_sel(out_sel) + assert result[0] == 13 + assert len(result) == 1 + + def test_detects_ledc_ls_ch0_on_gpio2(self): + out_sel = [256] * 40 + out_sel[2] = 80 # LEDC LS ch0 (signal 80 = ch 8) + result = scan_out_sel(out_sel) + assert result[8] == 2 + + def test_detects_multiple_channels(self): + out_sel = [256] * 40 + out_sel[13] = 72 # HS ch0 -> GPIO 13 + out_sel[12] = 73 # HS ch1 -> GPIO 12 + out_sel[14] = 80 # LS ch0 -> GPIO 14 + result = scan_out_sel(out_sel) + assert result[0] == 13 + assert result[1] == 12 + assert result[8] == 14 + assert len(result) == 3 + + def test_ignores_non_ledc_signals(self): + out_sel = [256] * 40 + out_sel[5] = 71 # signal 71 = not LEDC + out_sel[6] = 88 # signal 88 = not LEDC + out_sel[7] = 0 # signal 0 = GPIO simple output + result = scan_out_sel(out_sel) + assert len(result) == 0 + + def test_masks_to_low_byte(self): + out_sel = [256] * 40 + # High bytes should be masked off; 0x0148 & 0xFF = 72 = LEDC HS ch0 + out_sel[13] = 0x0148 + result = scan_out_sel(out_sel) + assert result[0] == 13 + + +class TestEagerScanOn0x5000: + """Test the 0x5000 handler with eager scan on cache miss.""" + + def _make_direction(self, ledc_ch: int, intensity: int) -> int: + """Build a 0x5000-marker direction value.""" + return 0x5000 | ((ledc_ch & 0x0F) << 8) | (intensity & 0xFF) + + def test_eager_scan_populates_map_on_first_ledc_write(self): + """When _ledc_gpio_map is empty, the handler should scan gpio_out_sel + and emit the correct gpio pin.""" + ledc_gpio_map: dict[int, int] = {} + out_sel = [256] * 40 + out_sel[13] = 72 # LEDC HS ch0 -> GPIO 13 + + direction = self._make_direction(ledc_ch=0, intensity=5) + event = simulate_0x5000_handler(direction, ledc_gpio_map, out_sel) + + assert event['gpio'] == 13 + assert event['channel'] == 0 + assert event['duty'] == 5 + # Map should now be populated for future calls + assert ledc_gpio_map[0] == 13 + + def test_no_scan_when_map_already_populated(self): + """When the map already has the channel, no scan should occur.""" + ledc_gpio_map = {0: 13} + # Pass out_sel=None to prove the scan is never reached + direction = self._make_direction(ledc_ch=0, intensity=7) + event = simulate_0x5000_handler(direction, ledc_gpio_map, out_sel=None) + + assert event['gpio'] == 13 + assert event['channel'] == 0 + + def test_scan_only_on_cache_miss(self): + """First call triggers scan (miss), second call skips it (hit).""" + ledc_gpio_map: dict[int, int] = {} + out_sel = [256] * 40 + out_sel[13] = 72 + + # First call: cache miss -> scan + dir1 = self._make_direction(ledc_ch=0, intensity=5) + ev1 = simulate_0x5000_handler(dir1, ledc_gpio_map, out_sel) + assert ev1['gpio'] == 13 + + # Second call: cache hit -> no scan needed (pass None to prove) + dir2 = self._make_direction(ledc_ch=0, intensity=10) + ev2 = simulate_0x5000_handler(dir2, ledc_gpio_map, out_sel=None) + assert ev2['gpio'] == 13 + assert ev2['duty'] == 10 + + def test_multiple_channels_mapped_correctly(self): + """Multiple LEDC channels resolve to correct GPIO pins.""" + ledc_gpio_map: dict[int, int] = {} + out_sel = [256] * 40 + out_sel[13] = 72 # ch0 -> GPIO 13 + out_sel[12] = 73 # ch1 -> GPIO 12 + + dir0 = self._make_direction(ledc_ch=0, intensity=5) + ev0 = simulate_0x5000_handler(dir0, ledc_gpio_map, out_sel) + assert ev0['gpio'] == 13 + + dir1 = self._make_direction(ledc_ch=1, intensity=10) + ev1 = simulate_0x5000_handler(dir1, ledc_gpio_map, out_sel=None) + assert ev1['gpio'] == 12 + + def test_fallback_when_no_mapping_exists(self): + """When gpio_out_sel has no LEDC signals, gpio=-1 is emitted.""" + ledc_gpio_map: dict[int, int] = {} + out_sel = [256] * 40 # No LEDC signals anywhere + + direction = self._make_direction(ledc_ch=0, intensity=5) + event = simulate_0x5000_handler(direction, ledc_gpio_map, out_sel) + + assert event['gpio'] == -1 + + def test_servo_angle_from_ledc_duty(self): + """Verify that a correct LEDC duty maps to the expected servo angle. + + This is the end-to-end path: LEDC duty -> duty_pct/100 -> servo angle. + For a servo at 50Hz with 544-2400us pulse range: + 7.5% duty = 1500us pulse -> ~93 degrees (close to 90) + """ + duty_pct = 7.5 + duty_fraction = duty_pct / 100 # 0.075 + + MIN_DC = 544 / 20000 # 0.0272 + MAX_DC = 2400 / 20000 # 0.12 + angle = round(((duty_fraction - MIN_DC) / (MAX_DC - MIN_DC)) * 180) + + assert 88 <= angle <= 95 + + +class TestDirectionEncoding: + """Verify the 0x5000 marker direction encoding/decoding.""" + + def test_encode_decode_roundtrip(self): + ledc_ch = 3 + intensity = 42 + direction = 0x5000 | ((ledc_ch & 0x0F) << 8) | (intensity & 0xFF) + + decoded_marker = direction & 0xF000 + decoded_ch = (direction >> 8) & 0x0F + decoded_intensity = direction & 0xFF + + assert decoded_marker == 0x5000 + assert decoded_ch == ledc_ch + assert decoded_intensity == intensity + + def test_channel_range_0_to_15(self): + for ch in range(16): + direction = 0x5000 | ((ch & 0x0F) << 8) | 50 + decoded_ch = (direction >> 8) & 0x0F + assert decoded_ch == ch + + def test_intensity_range_0_to_100(self): + for intensity in [0, 1, 50, 99, 100]: + direction = 0x5000 | (0 << 8) | (intensity & 0xFF) + decoded_intensity = direction & 0xFF + assert decoded_intensity == intensity