Merge pull request #63 from davidmonterocrespo24/website
feat: implement eager scan for LEDC GPIO mapping and add tests for ra…pull/74/head
commit
5e32e238fc
|
|
@ -192,6 +192,7 @@ def main() -> None: # noqa: C901 (complexity OK for inline worker)
|
|||
except Exception as exc:
|
||||
_emit({'type': 'error', 'message': f'Cannot load DLL: {exc}'})
|
||||
os._exit(1)
|
||||
lib.qemu_picsimlab_get_internals.restype = ctypes.c_void_p
|
||||
|
||||
# ── 3. Write firmware to a temp file ──────────────────────────────────────
|
||||
try:
|
||||
|
|
@ -231,6 +232,37 @@ def main() -> None: # noqa: C901 (complexity OK for inline worker)
|
|||
# ESP32 signal indices: 72-79 = LEDC HS ch 0-7, 80-87 = LEDC LS ch 0-7
|
||||
_ledc_gpio_map: dict[int, int] = {}
|
||||
|
||||
_out_sel_dumped = [False] # one-time diagnostic dump flag
|
||||
|
||||
def _refresh_ledc_gpio_map() -> None:
|
||||
"""Scan gpio_out_sel[40] registers and update _ledc_gpio_map.
|
||||
|
||||
Called eagerly from the 0x5000 LEDC duty callback on cache miss,
|
||||
and periodically from the LEDC polling thread.
|
||||
"""
|
||||
try:
|
||||
out_sel_ptr = lib.qemu_picsimlab_get_internals(2)
|
||||
if not out_sel_ptr:
|
||||
_log('LEDC gpio_out_sel: internals(2) returned NULL')
|
||||
return
|
||||
out_sel = (ctypes.c_uint32 * 40).from_address(out_sel_ptr)
|
||||
# One-time dump of ALL gpio_out_sel values for diagnostics
|
||||
if not _out_sel_dumped[0]:
|
||||
_out_sel_dumped[0] = True
|
||||
non_default = {pin: int(out_sel[pin]) for pin in range(40)
|
||||
if int(out_sel[pin]) != 256 and int(out_sel[pin]) != 0}
|
||||
_log(f'LEDC gpio_out_sel dump (non-default): {non_default}')
|
||||
_log(f'LEDC gpio_out_sel ALL: {[int(out_sel[i]) for i in range(40)]}')
|
||||
for gpio_pin in range(40):
|
||||
signal = int(out_sel[gpio_pin]) & 0xFF
|
||||
if 72 <= signal <= 87:
|
||||
ledc_ch = signal - 72
|
||||
if _ledc_gpio_map.get(ledc_ch) != gpio_pin:
|
||||
_ledc_gpio_map[ledc_ch] = gpio_pin
|
||||
_log(f'LEDC map: ch{ledc_ch} -> GPIO{gpio_pin} (signal={signal})')
|
||||
except Exception as e:
|
||||
_log(f'LEDC gpio_out_sel scan error: {e}')
|
||||
|
||||
# Sensor state: gpio_pin → {type, properties..., saw_low, responding}
|
||||
_sensors: dict[int, dict] = {}
|
||||
_sensors_lock = threading.Lock()
|
||||
|
|
@ -424,6 +456,10 @@ def main() -> None: # noqa: C901 (complexity OK for inline worker)
|
|||
if marker == 0x5000: # LEDC duty change (from esp32_ledc.c)
|
||||
ledc_ch = (direction >> 8) & 0x0F
|
||||
intensity = direction & 0xFF # 0-100 percentage
|
||||
_log(f'0x5000 callback: direction=0x{direction:04X} ch={ledc_ch} intensity={intensity} map={dict(_ledc_gpio_map)}')
|
||||
gpio = _ledc_gpio_map.get(ledc_ch, -1)
|
||||
if gpio == -1:
|
||||
_refresh_ledc_gpio_map()
|
||||
gpio = _ledc_gpio_map.get(ledc_ch, -1)
|
||||
_emit({'type': 'ledc_update', 'channel': ledc_ch,
|
||||
'duty': intensity,
|
||||
|
|
@ -584,7 +620,6 @@ def main() -> None: # noqa: C901 (complexity OK for inline worker)
|
|||
# ── 7. LEDC polling thread (100 ms interval) ──────────────────────────────
|
||||
|
||||
def _ledc_poll_thread() -> None:
|
||||
lib.qemu_picsimlab_get_internals.restype = ctypes.c_void_p
|
||||
# Track last-emitted duty to avoid flooding identical updates
|
||||
_last_duty = [0.0] * 16
|
||||
_diag_count = [0]
|
||||
|
|
@ -599,17 +634,7 @@ def main() -> None: # noqa: C901 (complexity OK for inline worker)
|
|||
# duty[] is float[16] in QEMU (percentage 0-100)
|
||||
arr = (ctypes.c_float * 16).from_address(ptr)
|
||||
# Refresh LEDC→GPIO mapping from gpio_out_sel[40] registers
|
||||
out_sel_ptr = lib.qemu_picsimlab_get_internals(2)
|
||||
if out_sel_ptr:
|
||||
out_sel = (ctypes.c_uint32 * 40).from_address(out_sel_ptr)
|
||||
for gpio_pin in range(40):
|
||||
signal = int(out_sel[gpio_pin]) & 0xFF
|
||||
# Signal 72-79 = LEDC HS ch 0-7; 80-87 = LEDC LS ch 0-7
|
||||
if 72 <= signal <= 87:
|
||||
ledc_ch = signal - 72
|
||||
if _ledc_gpio_map.get(ledc_ch) != gpio_pin:
|
||||
_ledc_gpio_map[ledc_ch] = gpio_pin
|
||||
_log(f'LEDC map: ch{ledc_ch} → GPIO{gpio_pin} (signal={signal})')
|
||||
_refresh_ledc_gpio_map()
|
||||
# Log once when nonzero duties first appear
|
||||
if not _first_nonzero_logged[0]:
|
||||
nonzero = {ch: round(float(arr[ch]), 2) for ch in range(16)
|
||||
|
|
@ -618,6 +643,31 @@ def main() -> None: # noqa: C901 (complexity OK for inline worker)
|
|||
_log(f'LEDC first nonzero at poll #{_diag_count[0]}: '
|
||||
f'duties={nonzero} gpio_map={dict(_ledc_gpio_map)}')
|
||||
_first_nonzero_logged[0] = True
|
||||
# Periodic diagnostic dump every 50 polls (~5s)
|
||||
if _diag_count[0] % 50 == 0:
|
||||
all_duties = {ch: round(float(arr[ch]), 4) for ch in range(16)
|
||||
if float(arr[ch]) != 0.0}
|
||||
# Also read LEDC channel conf (internals(4)) and timer freq (internals(5))
|
||||
diag_parts = [f'duties={all_duties}', f'gpio_map={dict(_ledc_gpio_map)}']
|
||||
try:
|
||||
conf_ptr = lib.qemu_picsimlab_get_internals(4)
|
||||
if conf_ptr:
|
||||
conf_arr = (ctypes.c_uint32 * 16).from_address(conf_ptr)
|
||||
nonzero_conf = {ch: hex(int(conf_arr[ch])) for ch in range(16)
|
||||
if int(conf_arr[ch]) != 0}
|
||||
diag_parts.append(f'ch_conf={nonzero_conf}')
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
freq_ptr = lib.qemu_picsimlab_get_internals(5)
|
||||
if freq_ptr:
|
||||
freq_arr = (ctypes.c_uint32 * 8).from_address(freq_ptr)
|
||||
nonzero_freq = {t: int(freq_arr[t]) for t in range(8)
|
||||
if int(freq_arr[t]) != 0}
|
||||
diag_parts.append(f'timer_freq={nonzero_freq}')
|
||||
except Exception:
|
||||
pass
|
||||
_log(f'LEDC poll #{_diag_count[0]}: {" | ".join(diag_parts)}')
|
||||
for ch in range(16):
|
||||
duty_pct = float(arr[ch])
|
||||
if abs(duty_pct - _last_duty[ch]) < 0.01:
|
||||
|
|
@ -654,7 +704,10 @@ def main() -> None: # noqa: C901 (complexity OK for inline worker)
|
|||
|
||||
elif c == 'set_adc':
|
||||
raw_v = int(int(cmd['millivolts']) * 4095 / 3300)
|
||||
lib.qemu_picsimlab_set_apin(int(cmd['channel']), max(0, min(4095, raw_v)))
|
||||
ch = int(cmd['channel'])
|
||||
clamped = max(0, min(4095, raw_v))
|
||||
_log(f'set_adc: ch={ch} mv={cmd["millivolts"]} raw={clamped}')
|
||||
lib.qemu_picsimlab_set_apin(ch, clamped)
|
||||
|
||||
elif c == 'set_adc_raw':
|
||||
lib.qemu_picsimlab_set_apin(int(cmd['channel']),
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"version": "1.0.0",
|
||||
"generatedAt": "2026-03-23T13:04:13.804Z",
|
||||
"generatedAt": "2026-03-24T02:30:06.881Z",
|
||||
"components": [
|
||||
{
|
||||
"id": "arduino-mega",
|
||||
|
|
|
|||
|
|
@ -729,3 +729,94 @@ describe('LEDC polling — data format', () => {
|
|||
expect(emitted[1]).toEqual({ ch: 0, duty: 12.0 });
|
||||
});
|
||||
});
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// 14. PinManager.broadcastPwm — gpio=-1 fallback
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
describe('PinManager.broadcastPwm fallback', () => {
|
||||
// Use a simple inline PinManager-like class to test broadcastPwm logic
|
||||
// (The real PinManager can't be imported here due to vi.mock overrides)
|
||||
|
||||
class SimplePinManager {
|
||||
private pwmListeners = new Map<number, Set<(pin: number, duty: number) => void>>();
|
||||
private pwmValues = new Map<number, number>();
|
||||
|
||||
onPwmChange(pin: number, cb: (pin: number, duty: number) => void): () => void {
|
||||
if (!this.pwmListeners.has(pin)) this.pwmListeners.set(pin, new Set());
|
||||
this.pwmListeners.get(pin)!.add(cb);
|
||||
return () => { this.pwmListeners.get(pin)?.delete(cb); };
|
||||
}
|
||||
|
||||
updatePwm(pin: number, dutyCycle: number): void {
|
||||
this.pwmValues.set(pin, dutyCycle);
|
||||
this.pwmListeners.get(pin)?.forEach(cb => cb(pin, dutyCycle));
|
||||
}
|
||||
|
||||
broadcastPwm(dutyCycle: number): void {
|
||||
this.pwmListeners.forEach((callbacks, pin) => {
|
||||
this.pwmValues.set(pin, dutyCycle);
|
||||
callbacks.forEach(cb => cb(pin, dutyCycle));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
it('broadcastPwm dispatches to all registered PWM listeners', () => {
|
||||
const pm = new SimplePinManager();
|
||||
const received: { pin: number; duty: number }[] = [];
|
||||
|
||||
pm.onPwmChange(13, (pin, duty) => received.push({ pin, duty }));
|
||||
pm.onPwmChange(5, (pin, duty) => received.push({ pin, duty }));
|
||||
|
||||
pm.broadcastPwm(0.075); // 7.5% servo duty
|
||||
|
||||
expect(received).toHaveLength(2);
|
||||
expect(received).toContainEqual({ pin: 13, duty: 0.075 });
|
||||
expect(received).toContainEqual({ pin: 5, duty: 0.075 });
|
||||
});
|
||||
|
||||
it('broadcastPwm does nothing when no listeners registered', () => {
|
||||
const pm = new SimplePinManager();
|
||||
// Should not throw
|
||||
pm.broadcastPwm(0.075);
|
||||
});
|
||||
|
||||
it('servo filters broadcastPwm by duty range (0.01-0.20)', () => {
|
||||
const MIN_DC = 544 / 20000; // 0.0272
|
||||
const MAX_DC = 2400 / 20000; // 0.12
|
||||
let servoAngle = -1;
|
||||
|
||||
const servoCallback = (_pin: number, dutyCycle: number) => {
|
||||
if (dutyCycle < 0.01 || dutyCycle > 0.20) return;
|
||||
const angle = Math.round(((dutyCycle - MIN_DC) / (MAX_DC - MIN_DC)) * 180);
|
||||
servoAngle = Math.max(0, Math.min(180, angle));
|
||||
};
|
||||
|
||||
servoCallback(13, 0.075);
|
||||
expect(servoAngle).toBeGreaterThanOrEqual(88);
|
||||
expect(servoAngle).toBeLessThanOrEqual(95);
|
||||
|
||||
const prevAngle = servoAngle;
|
||||
servoCallback(13, 0.50); // 50% — not a servo signal
|
||||
expect(servoAngle).toBe(prevAngle);
|
||||
});
|
||||
|
||||
it('onLedcUpdate with gpio=-1 should use broadcastPwm (integration logic)', () => {
|
||||
const pm = new SimplePinManager();
|
||||
const received: { pin: number; duty: number }[] = [];
|
||||
|
||||
pm.onPwmChange(13, (pin, duty) => received.push({ pin, duty }));
|
||||
|
||||
const update = { channel: 0, duty: 7.36, duty_pct: 7.36, gpio: -1 };
|
||||
const dutyCycle = update.duty_pct / 100;
|
||||
|
||||
if (update.gpio >= 0) {
|
||||
pm.updatePwm(update.gpio, dutyCycle);
|
||||
} else {
|
||||
pm.broadcastPwm(dutyCycle);
|
||||
}
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0]).toEqual({ pin: 13, duty: 0.0736 });
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -130,6 +130,18 @@ export class PinManager {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Broadcast PWM duty to ALL registered PWM listeners.
|
||||
* Used when the LEDC channel→GPIO mapping is unknown (gpio=-1).
|
||||
* Components filter by duty range (e.g., servo accepts 0.01-0.20).
|
||||
*/
|
||||
broadcastPwm(dutyCycle: number): void {
|
||||
this.pwmListeners.forEach((callbacks, pin) => {
|
||||
this.pwmValues.set(pin, dutyCycle);
|
||||
callbacks.forEach(cb => cb(pin, dutyCycle));
|
||||
});
|
||||
}
|
||||
|
||||
getPwmValue(pin: number): number {
|
||||
return this.pwmValues.get(pin) ?? 0;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -71,9 +71,12 @@ PartSimulationRegistry.register('potentiometer', {
|
|||
const isESP32 = typeof (simulator as any).setAdcVoltage === 'function';
|
||||
const refVoltage = (isRP2040 || isESP32) ? 3.3 : 5.0;
|
||||
|
||||
console.log(`[Pot] attached: pin=${pin} isESP32=${isESP32} refV=${refVoltage}`);
|
||||
|
||||
const onInput = () => {
|
||||
const raw = parseInt((element as any).value || '0', 10);
|
||||
const volts = (raw / 1023.0) * refVoltage;
|
||||
console.log(`[Pot] onInput: raw=${raw} volts=${volts.toFixed(3)} pin=${pin}`);
|
||||
setAdcVoltage(simulator, pin, volts);
|
||||
};
|
||||
|
||||
|
|
@ -335,11 +338,14 @@ PartSimulationRegistry.register('servo', {
|
|||
// 544µs = 2.72%, 2400µs = 12.0%
|
||||
const MIN_DC = MIN_PULSE_US / 20000; // 0.0272
|
||||
const MAX_DC = MAX_PULSE_US / 20000; // 0.12
|
||||
console.log(`[Servo:ESP32] registering onPwmChange on pin=${pinSIG}`);
|
||||
const unsubscribe = pinManager.onPwmChange(pinSIG, (_pin, dutyCycle) => {
|
||||
console.log(`[Servo:ESP32] onPwmChange pin=${_pin} dutyCycle=${dutyCycle.toFixed(4)}`);
|
||||
if (dutyCycle < 0.01 || dutyCycle > 0.20) return; // ignore out-of-range
|
||||
const angle = Math.round(
|
||||
((dutyCycle - MIN_DC) / (MAX_DC - MIN_DC)) * 180
|
||||
);
|
||||
console.log(`[Servo:ESP32] angle=${angle}`);
|
||||
el.angle = Math.max(0, Math.min(180, angle));
|
||||
});
|
||||
return () => { unsubscribe(); };
|
||||
|
|
|
|||
|
|
@ -112,6 +112,23 @@ class Esp32BridgeShim {
|
|||
}
|
||||
}
|
||||
|
||||
// ── Shared LEDC update handler (used by addBoard, setBoardType, initSimulator) ─
|
||||
function makeLedcUpdateHandler(boardId: string) {
|
||||
return (update: { channel: number; duty_pct: number; gpio?: number }) => {
|
||||
const boardPm = pinManagerMap.get(boardId);
|
||||
if (!boardPm) return;
|
||||
const dutyCycle = update.duty_pct / 100;
|
||||
if (update.gpio !== undefined && update.gpio >= 0) {
|
||||
boardPm.updatePwm(update.gpio, dutyCycle);
|
||||
} else {
|
||||
// gpio unknown (QEMU doesn't expose gpio_out_sel for LEDC):
|
||||
// broadcast to ALL PWM listeners. Components filter by duty range
|
||||
// (servo accepts 0.01–0.20, LEDs use 0–1.0).
|
||||
boardPm.broadcastPwm(dutyCycle);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// ── Runtime Maps (outside Zustand — not serialisable) ─────────────────────
|
||||
const simulatorMap = new Map<string, AVRSimulator | RP2040Simulator | RiscVSimulator | Esp32C3Simulator | Esp32BridgeShim>();
|
||||
const pinManagerMap = new Map<string, PinManager>();
|
||||
|
|
@ -390,18 +407,7 @@ export const useSimulatorStore = create<SimulatorState>((set, get) => {
|
|||
return { boards, ...(isActive ? { running: false } : {}) };
|
||||
});
|
||||
};
|
||||
bridge.onLedcUpdate = (update) => {
|
||||
// Route LEDC duty cycles to PinManager as PWM (0.0–1.0).
|
||||
// If gpio is known (from GPIO out_sel sync), use the actual GPIO pin;
|
||||
// otherwise fall back to the LEDC channel number.
|
||||
const boardPm = pinManagerMap.get(id);
|
||||
if (boardPm) {
|
||||
const targetPin = (update.gpio !== undefined && update.gpio >= 0)
|
||||
? update.gpio
|
||||
: update.channel;
|
||||
boardPm.updatePwm(targetPin, update.duty_pct / 100);
|
||||
}
|
||||
};
|
||||
bridge.onLedcUpdate = makeLedcUpdateHandler(id);
|
||||
bridge.onWs2812Update = (channel, pixels) => {
|
||||
// Forward WS2812 pixel data to any DOM element with id=`ws2812-{id}-{channel}`
|
||||
// (set by NeoPixel components rendered in SimulatorCanvas).
|
||||
|
|
@ -767,15 +773,7 @@ export const useSimulatorStore = create<SimulatorState>((set, get) => {
|
|||
return { boards, ...(isActive ? { running: false } : {}) };
|
||||
});
|
||||
};
|
||||
bridge.onLedcUpdate = (update) => {
|
||||
const boardPm = pinManagerMap.get(boardId);
|
||||
if (boardPm && typeof boardPm.updatePwm === 'function') {
|
||||
const targetPin = (update.gpio !== undefined && update.gpio >= 0)
|
||||
? update.gpio
|
||||
: update.channel;
|
||||
boardPm.updatePwm(targetPin, update.duty_pct / 100);
|
||||
}
|
||||
};
|
||||
bridge.onLedcUpdate = makeLedcUpdateHandler(boardId);
|
||||
bridge.onWs2812Update = (channel, pixels) => {
|
||||
const eventTarget = document.getElementById(`ws2812-${boardId}-${channel}`);
|
||||
if (eventTarget) {
|
||||
|
|
@ -863,15 +861,7 @@ export const useSimulatorStore = create<SimulatorState>((set, get) => {
|
|||
return { boards, ...(isActive ? { running: false } : {}) };
|
||||
});
|
||||
};
|
||||
bridge.onLedcUpdate = (update) => {
|
||||
const boardPm = pinManagerMap.get(boardId);
|
||||
if (boardPm && typeof boardPm.updatePwm === 'function') {
|
||||
const targetPin = (update.gpio !== undefined && update.gpio >= 0)
|
||||
? update.gpio
|
||||
: update.channel;
|
||||
boardPm.updatePwm(targetPin, update.duty_pct / 100);
|
||||
}
|
||||
};
|
||||
bridge.onLedcUpdate = makeLedcUpdateHandler(boardId);
|
||||
esp32BridgeMap.set(boardId, bridge);
|
||||
const shim = new Esp32BridgeShim(bridge, pm);
|
||||
shim.onSerialData = serialCallback;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,231 @@
|
|||
"""
|
||||
test_ledc_gpio_race.py
|
||||
|
||||
Tests for the LEDC GPIO mapping race condition fix in esp32_worker.py.
|
||||
|
||||
The core issue: when firmware calls ledcWrite(), the 0x5000 sync callback
|
||||
fires immediately but the LEDC polling thread (100ms interval) hasn't yet
|
||||
scanned gpio_out_sel to build _ledc_gpio_map. The fix adds an eager scan
|
||||
inside the 0x5000 handler on cache miss.
|
||||
|
||||
These tests exercise the scan logic and the cache-miss-triggered refresh
|
||||
as pure functions, without loading QEMU.
|
||||
"""
|
||||
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Replicate the scan logic from esp32_worker._refresh_ledc_gpio_map()
|
||||
# so we can test it in isolation.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def scan_out_sel(out_sel: list[int]) -> dict[int, int]:
|
||||
"""Pure-Python equivalent of the gpio_out_sel scan in the worker.
|
||||
|
||||
Returns a dict mapping LEDC channel -> GPIO pin.
|
||||
Signal 72-79 = LEDC HS ch 0-7; 80-87 = LEDC LS ch 0-7.
|
||||
"""
|
||||
ledc_gpio_map: dict[int, int] = {}
|
||||
for gpio_pin in range(len(out_sel)):
|
||||
signal = out_sel[gpio_pin] & 0xFF
|
||||
if 72 <= signal <= 87:
|
||||
ledc_ch = signal - 72
|
||||
ledc_gpio_map[ledc_ch] = gpio_pin
|
||||
return ledc_gpio_map
|
||||
|
||||
|
||||
def simulate_0x5000_handler(
|
||||
direction: int,
|
||||
ledc_gpio_map: dict[int, int],
|
||||
out_sel: list[int] | None = None,
|
||||
) -> dict:
|
||||
"""Simulate the 0x5000 LEDC duty callback with the eager-scan fix.
|
||||
|
||||
If the map has no entry for the channel and out_sel is provided,
|
||||
it performs an eager scan (like the real worker does).
|
||||
|
||||
Returns the event dict that would be emitted.
|
||||
"""
|
||||
ledc_ch = (direction >> 8) & 0x0F
|
||||
intensity = direction & 0xFF
|
||||
gpio = ledc_gpio_map.get(ledc_ch, -1)
|
||||
if gpio == -1 and out_sel is not None:
|
||||
# Eager scan (same as _refresh_ledc_gpio_map)
|
||||
refreshed = scan_out_sel(out_sel)
|
||||
ledc_gpio_map.update(refreshed)
|
||||
gpio = ledc_gpio_map.get(ledc_ch, -1)
|
||||
return {
|
||||
'type': 'ledc_update',
|
||||
'channel': ledc_ch,
|
||||
'duty': intensity,
|
||||
'duty_pct': intensity,
|
||||
'gpio': gpio,
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestScanOutSel:
|
||||
"""Test the gpio_out_sel scanning logic."""
|
||||
|
||||
def test_detects_ledc_hs_ch0_on_gpio13(self):
|
||||
out_sel = [256] * 40 # 256 = no function assigned
|
||||
out_sel[13] = 72 # LEDC HS ch0 -> GPIO 13
|
||||
result = scan_out_sel(out_sel)
|
||||
assert result[0] == 13
|
||||
assert len(result) == 1
|
||||
|
||||
def test_detects_ledc_ls_ch0_on_gpio2(self):
|
||||
out_sel = [256] * 40
|
||||
out_sel[2] = 80 # LEDC LS ch0 (signal 80 = ch 8)
|
||||
result = scan_out_sel(out_sel)
|
||||
assert result[8] == 2
|
||||
|
||||
def test_detects_multiple_channels(self):
|
||||
out_sel = [256] * 40
|
||||
out_sel[13] = 72 # HS ch0 -> GPIO 13
|
||||
out_sel[12] = 73 # HS ch1 -> GPIO 12
|
||||
out_sel[14] = 80 # LS ch0 -> GPIO 14
|
||||
result = scan_out_sel(out_sel)
|
||||
assert result[0] == 13
|
||||
assert result[1] == 12
|
||||
assert result[8] == 14
|
||||
assert len(result) == 3
|
||||
|
||||
def test_ignores_non_ledc_signals(self):
|
||||
out_sel = [256] * 40
|
||||
out_sel[5] = 71 # signal 71 = not LEDC
|
||||
out_sel[6] = 88 # signal 88 = not LEDC
|
||||
out_sel[7] = 0 # signal 0 = GPIO simple output
|
||||
result = scan_out_sel(out_sel)
|
||||
assert len(result) == 0
|
||||
|
||||
def test_masks_to_low_byte(self):
|
||||
out_sel = [256] * 40
|
||||
# High bytes should be masked off; 0x0148 & 0xFF = 72 = LEDC HS ch0
|
||||
out_sel[13] = 0x0148
|
||||
result = scan_out_sel(out_sel)
|
||||
assert result[0] == 13
|
||||
|
||||
|
||||
class TestEagerScanOn0x5000:
|
||||
"""Test the 0x5000 handler with eager scan on cache miss."""
|
||||
|
||||
def _make_direction(self, ledc_ch: int, intensity: int) -> int:
|
||||
"""Build a 0x5000-marker direction value."""
|
||||
return 0x5000 | ((ledc_ch & 0x0F) << 8) | (intensity & 0xFF)
|
||||
|
||||
def test_eager_scan_populates_map_on_first_ledc_write(self):
|
||||
"""When _ledc_gpio_map is empty, the handler should scan gpio_out_sel
|
||||
and emit the correct gpio pin."""
|
||||
ledc_gpio_map: dict[int, int] = {}
|
||||
out_sel = [256] * 40
|
||||
out_sel[13] = 72 # LEDC HS ch0 -> GPIO 13
|
||||
|
||||
direction = self._make_direction(ledc_ch=0, intensity=5)
|
||||
event = simulate_0x5000_handler(direction, ledc_gpio_map, out_sel)
|
||||
|
||||
assert event['gpio'] == 13
|
||||
assert event['channel'] == 0
|
||||
assert event['duty'] == 5
|
||||
# Map should now be populated for future calls
|
||||
assert ledc_gpio_map[0] == 13
|
||||
|
||||
def test_no_scan_when_map_already_populated(self):
|
||||
"""When the map already has the channel, no scan should occur."""
|
||||
ledc_gpio_map = {0: 13}
|
||||
# Pass out_sel=None to prove the scan is never reached
|
||||
direction = self._make_direction(ledc_ch=0, intensity=7)
|
||||
event = simulate_0x5000_handler(direction, ledc_gpio_map, out_sel=None)
|
||||
|
||||
assert event['gpio'] == 13
|
||||
assert event['channel'] == 0
|
||||
|
||||
def test_scan_only_on_cache_miss(self):
|
||||
"""First call triggers scan (miss), second call skips it (hit)."""
|
||||
ledc_gpio_map: dict[int, int] = {}
|
||||
out_sel = [256] * 40
|
||||
out_sel[13] = 72
|
||||
|
||||
# First call: cache miss -> scan
|
||||
dir1 = self._make_direction(ledc_ch=0, intensity=5)
|
||||
ev1 = simulate_0x5000_handler(dir1, ledc_gpio_map, out_sel)
|
||||
assert ev1['gpio'] == 13
|
||||
|
||||
# Second call: cache hit -> no scan needed (pass None to prove)
|
||||
dir2 = self._make_direction(ledc_ch=0, intensity=10)
|
||||
ev2 = simulate_0x5000_handler(dir2, ledc_gpio_map, out_sel=None)
|
||||
assert ev2['gpio'] == 13
|
||||
assert ev2['duty'] == 10
|
||||
|
||||
def test_multiple_channels_mapped_correctly(self):
|
||||
"""Multiple LEDC channels resolve to correct GPIO pins."""
|
||||
ledc_gpio_map: dict[int, int] = {}
|
||||
out_sel = [256] * 40
|
||||
out_sel[13] = 72 # ch0 -> GPIO 13
|
||||
out_sel[12] = 73 # ch1 -> GPIO 12
|
||||
|
||||
dir0 = self._make_direction(ledc_ch=0, intensity=5)
|
||||
ev0 = simulate_0x5000_handler(dir0, ledc_gpio_map, out_sel)
|
||||
assert ev0['gpio'] == 13
|
||||
|
||||
dir1 = self._make_direction(ledc_ch=1, intensity=10)
|
||||
ev1 = simulate_0x5000_handler(dir1, ledc_gpio_map, out_sel=None)
|
||||
assert ev1['gpio'] == 12
|
||||
|
||||
def test_fallback_when_no_mapping_exists(self):
|
||||
"""When gpio_out_sel has no LEDC signals, gpio=-1 is emitted."""
|
||||
ledc_gpio_map: dict[int, int] = {}
|
||||
out_sel = [256] * 40 # No LEDC signals anywhere
|
||||
|
||||
direction = self._make_direction(ledc_ch=0, intensity=5)
|
||||
event = simulate_0x5000_handler(direction, ledc_gpio_map, out_sel)
|
||||
|
||||
assert event['gpio'] == -1
|
||||
|
||||
def test_servo_angle_from_ledc_duty(self):
|
||||
"""Verify that a correct LEDC duty maps to the expected servo angle.
|
||||
|
||||
This is the end-to-end path: LEDC duty -> duty_pct/100 -> servo angle.
|
||||
For a servo at 50Hz with 544-2400us pulse range:
|
||||
7.5% duty = 1500us pulse -> ~93 degrees (close to 90)
|
||||
"""
|
||||
duty_pct = 7.5
|
||||
duty_fraction = duty_pct / 100 # 0.075
|
||||
|
||||
MIN_DC = 544 / 20000 # 0.0272
|
||||
MAX_DC = 2400 / 20000 # 0.12
|
||||
angle = round(((duty_fraction - MIN_DC) / (MAX_DC - MIN_DC)) * 180)
|
||||
|
||||
assert 88 <= angle <= 95
|
||||
|
||||
|
||||
class TestDirectionEncoding:
|
||||
"""Verify the 0x5000 marker direction encoding/decoding."""
|
||||
|
||||
def test_encode_decode_roundtrip(self):
|
||||
ledc_ch = 3
|
||||
intensity = 42
|
||||
direction = 0x5000 | ((ledc_ch & 0x0F) << 8) | (intensity & 0xFF)
|
||||
|
||||
decoded_marker = direction & 0xF000
|
||||
decoded_ch = (direction >> 8) & 0x0F
|
||||
decoded_intensity = direction & 0xFF
|
||||
|
||||
assert decoded_marker == 0x5000
|
||||
assert decoded_ch == ledc_ch
|
||||
assert decoded_intensity == intensity
|
||||
|
||||
def test_channel_range_0_to_15(self):
|
||||
for ch in range(16):
|
||||
direction = 0x5000 | ((ch & 0x0F) << 8) | 50
|
||||
decoded_ch = (direction >> 8) & 0x0F
|
||||
assert decoded_ch == ch
|
||||
|
||||
def test_intensity_range_0_to_100(self):
|
||||
for intensity in [0, 1, 50, 99, 100]:
|
||||
direction = 0x5000 | (0 << 8) | (intensity & 0xFF)
|
||||
decoded_intensity = direction & 0xFF
|
||||
assert decoded_intensity == intensity
|
||||
Loading…
Reference in New Issue