feat: refactor hardware handling to consolidate dummy and real implementations; add cancellation support for actions

master
a2nr 2026-03-10 10:10:33 +07:00
parent 3a081a6fd9
commit 5cedcf0b86
14 changed files with 262 additions and 181 deletions

View File

@ -10,16 +10,32 @@ Dokumentasi lengkap dapat dilihat di [DOCUMENTATION.md](DOCUMENTATION.md).
# Aturan pengggunaan dokumen
bab pada dokumen merepresentasikan alur rencana pengembangan.
Potential Enhancements : Feasibility Study
Planned Feature : Backlog
Feature Task : Penjabaran Pekerjaan yang ready untuk dikerjakan. Task harus dijelaskan apa yang akan dikerjakan dan terdapat definition of done nya
## Potential Enhancements
bab ini digunakan untuk Feasibility Study
## Planned Feature
Backlog. Setelah kita pelajari untuk di kerjakan maka kita pindah ke backlog
## Feature Task
penjabaran Pekerjaan yang ready untuk dikerjakan. Task harus dijelaskan apa yang akan dikerjakan dan terdapat definition of done nya
Berikut ini adalah template untuk pembuatan task :
```
## <nomor task> <judul task> : <state: [ ] >
jelaskan permasalah di bab ini
### Definition Of Done
jelaskan apa yang dimaksut untuk menyelesaikan task
```
---
# Potential Enhancements
this list is short by priority
- **Potensial inefective development**: in handlers/hardware use interface.py to all hardware (dummy, ros2, and hardware) class that posibly haavily change.
- **UI bug stop button**: stop button not actualy stop execution. tried with long delay with loop and press stop button, program still continue
- **ROS Feature in generated block blocky**: currently, block blocky only generate action client, and there is sub/pub and other ROS feature need to implement to get/set value to node.
- **Launch files**: `blockly_bringup` package with ROS2 launch files to start all nodes with one command
- **Sensor integration**: Subscriber nodes for sensor data feeding back into Blockly visual feedback
@ -28,34 +44,41 @@ this list is short by priority
- **Simulation**: Integrate with Gazebo/Isaac Sim for testing Kiwi Wheel kinematics before deploying to hardware
- **Block categories**: Future blocks grouped into Robot, Sensors, Navigation categories
# Planned Feature
| Package | Purpose | Status |
|---------|---------|--------|
| `blockly_app` | Desktop Blockly GUI + Action Client | ✅ Done |
| `blockly_executor` | Action Server — command handler registry | ✅ Done |
| `blockly_interfaces` | Custom ROS2 action definitions | ✅ Done |
| `kiwi_controller` | Adaptive control for Kiwi Wheel drive | 📋 Planned |
| **Enhance UI** | Tab Code preview, Export/Import, dark toolbox | ✅ Done |
# Feature Task
## Enhance UI
## 1 Hardware Interface Consolidation [✅]
| # | Task | Status |
|---|------|--------|
| 1 | Fix toolbox text color (dark theme via `Blockly.Theme.defineTheme`) | ✅ Done |
| 2 | Add "Code" tab — realtime generated JS preview | ✅ Done |
| 3 | Export/Import workspace (.json) via toolbar buttons | ✅ Done |
Menambah block baru sebelumnya butuh edit 4 file (interface, dummy, real, handler). Sekarang refactored agar handler menjadi single source of truth — satu file handler berisi logika dummy dan real hardware.
**Files changed:**
- `src/blockly_app/blockly_app/ui/blockly/workspace-init.js` — dark theme definition + code panel change listener
- `src/blockly_app/blockly_app/ui/blockly/core/ui-tabs.js` *(new)*`switchTab()`, `refreshCodePanel()`
- `src/blockly_app/blockly_app/ui/blockly/core/workspace-io.js` *(new)*`exportWorkspace()`, `importWorkspace()` via `await window.pywebview.api`
- `src/blockly_app/blockly_app/ui/index.html` — CSS, HTML tab bar, code panel, toolbar buttons
- `src/blockly_app/blockly_app/app.py``save_workspace()`, `load_workspace()` via zenity subprocess
Perubahan:
- Hapus `interface.py`, `dummy_hardware.py`, `real_hardware.py` (ABC + 3 implementasi)
- Buat `hardware.py` — class `Hardware` generic (mode, node, call_log) yang tidak pernah perlu diubah
- Handler berisi logika dummy (log) dan real (ROS2 publish) — cek via `hardware.is_real()`
- Dependency real hardware di-import lazy (di dalam `if`) untuk menghindari import error di dev machine
### Definition Of Done
- Menambah block baru hanya perlu 1 file handler (+ JS block + manifest)
- `pixi run executor` dan `pixi run executor-hw` tetap berfungsi
- `hardware.call_log` tetap bisa dipakai untuk testing
- Semua test pass (`pixi run test`)
## 2 Fix Stop Button — Client-Side Cancellation [✅]
Stop button tidak benar-benar menghentikan eksekusi. Root cause: saat `await executeAction()` blocking (misal delay 10 detik), flag `stopRequested` tidak dicek. Di Python, `_wait_for_future()` tidak punya mekanisme cancel. Di executor, `_cancel_callback()` tidak bisa jalan karena single-threaded `rclpy.spin()`.
Solusi: client-side cancellation — cancel di sisi JS dan Python, executor handler tetap jalan sampai selesai tapi hasilnya di-discard.
Perubahan yang diperlukan:
- `bridge.js`: tambah `cancelCurrentAction()` + `Promise.race` di `executeAction()` agar bisa di-cancel segera
- `debug-engine.js`: wire `stopExecution()``cancelCurrentAction()`
- `app.py`: tambah `cancel_action()` bridge method + cancel-aware `_wait_for_future()` dengan `threading.Event`
### Definition Of Done
- Klik Stop saat delay 10 detik → UI respond dalam < 500ms
- Console tampilkan "Program stopped by user"
- Block highlighting di-clear setelah stop
- Tombol Run/Stop state benar setelah stop
- Stop berfungsi di mode Run dan Debug
- Stop saat block instan (misal led_on) tidak crash
- Semua test existing pass (`pixi run test`)
**Note — Export/Import implementation:**
`pywebview.create_file_dialog` dan `QFileDialog` via `QTimer.singleShot` keduanya gagal dari background thread (deadlock / no event loop).
Solusi final: `tkinter.filedialog` — interpreter Tcl/Tk terpisah dari Qt, tidak ada konflik thread, sudah tersedia di pixi environment tanpa dependency tambahan.

View File

@ -71,17 +71,26 @@ const BLOCK_FILES = [
Create a new file in `handlers/` or add to an existing one. Use the `@handler` decorator — auto-discovery handles the rest.
Each handler contains **both dummy and real hardware logic**. Use `hardware.is_real()` to branch, and `hardware.log()` to record actions for testing.
```python
# src/blockly_executor/blockly_executor/handlers/movement.py
from . import handler
from ..hardware import Hardware
@handler("move_forward")
def handle_move_forward(params: dict[str, str], hardware) -> tuple[bool, str]:
def handle_move_forward(params: dict[str, str], hardware: Hardware) -> tuple[bool, str]:
speed = int(params["speed"])
duration_ms = int(params["duration_ms"])
if not (0 <= speed <= 100):
raise ValueError(f"speed must be 0-100, got: {speed}")
hardware.move(direction="forward", speed=speed, duration_ms=duration_ms)
hardware.log(f"move_forward(speed={speed}, duration_ms={duration_ms})")
if hardware.is_real():
# Real hardware — lazy import, publish to ROS2 topic
# TODO: hardware.node.create_publisher(...) etc.
hardware.node.get_logger().info(f"Moving forward at {speed} for {duration_ms}ms")
return (True, f"Moved forward at speed {speed} for {duration_ms}ms")
```
@ -467,7 +476,8 @@ ExecutorNode receives goal, calls HandlerRegistry.execute(command, params)
@handler("my_command") function(params, hardware)
→ hardware.set_led(pin, True) ← DummyHardware or RealHardware
→ hardware.log(...) ← always logged for testing
→ if hardware.is_real(): ... ← real hardware (ROS2 publish/GPIO)
Returns (True, "LED on pin 3 turned ON")
@ -521,11 +531,17 @@ BlockRegistry.register({
**Python** ([handlers/gpio.py](../blockly_executor/blockly_executor/handlers/gpio.py)):
```python
from . import handler
from ..hardware import Hardware
@handler("led_on") # ← must match JS name
def handle_led_on(params: dict[str, str], hardware) -> tuple[bool, str]:
def handle_led_on(params: dict[str, str], hardware: Hardware) -> tuple[bool, str]:
pin = int(params["pin"]) # params values are always strings — cast as needed
hardware.set_led(pin, True)
hardware.log(f"set_led(pin={pin}, state=True)") # always log for testing
if hardware.is_real():
# TODO: publish to ROS2 topic for Pi hardware
hardware.node.get_logger().info(f"LED on pin {pin} ON")
return (True, f"LED on pin {pin} turned ON") # (success: bool, message: str)
```
@ -567,9 +583,13 @@ BlockRegistry.register({
**Python** ([handlers/gpio.py](../blockly_executor/blockly_executor/handlers/gpio.py)):
```python
@handler("led_off")
def handle_led_off(params: dict[str, str], hardware) -> tuple[bool, str]:
def handle_led_off(params: dict[str, str], hardware: Hardware) -> tuple[bool, str]:
pin = int(params["pin"])
hardware.set_led(pin, False)
hardware.log(f"set_led(pin={pin}, state=False)")
if hardware.is_real():
hardware.node.get_logger().info(f"LED on pin {pin} OFF")
return (True, f"LED on pin {pin} turned OFF")
```
@ -615,9 +635,10 @@ BlockRegistry.register({
```python
import time
from . import handler
from ..hardware import Hardware
@handler("delay")
def handle_delay(params: dict[str, str], hardware) -> tuple[bool, str]:
def handle_delay(params: dict[str, str], hardware: Hardware) -> tuple[bool, str]:
duration_ms = int(params["duration_ms"])
time.sleep(duration_ms / 1000.0)
return (True, f"Delayed {duration_ms}ms")
@ -668,11 +689,18 @@ Corresponding Python handler:
```python
# src/blockly_executor/blockly_executor/handlers/MY_FILE.py
from . import handler
from ..hardware import Hardware
@handler("MY_COMMAND")
def handle_my_command(params: dict[str, str], hardware) -> tuple[bool, str]:
def handle_my_command(params: dict[str, str], hardware: Hardware) -> tuple[bool, str]:
param1 = params["param1"] # always str — cast as needed (int, float, etc.)
# ... do something with hardware ...
hardware.log(f"my_command(param1={param1})")
if hardware.is_real():
# Real hardware logic — ROS2 publish, GPIO, etc.
# Use lazy imports here for dependencies not available in dev
hardware.node.get_logger().info(f"my_command: {param1}")
return (True, f"Done: {param1}")
```
@ -733,9 +761,16 @@ BlockRegistry.register({
Python handler:
```python
@handler("read_distance")
def handle_read_distance(params: dict[str, str], hardware) -> tuple[bool, str]:
def handle_read_distance(params: dict[str, str], hardware: Hardware) -> tuple[bool, str]:
sensor_id = params["sensor_id"]
distance = hardware.read_distance(sensor_id) # returns float in cm
hardware.log(f"read_distance(sensor_id={sensor_id})")
if hardware.is_real():
# TODO: subscribe to sensor topic, return actual reading
distance = 0.0 # placeholder
else:
distance = 42.0 # dummy test value
return (True, str(distance)) # message becomes the expression value in JS
```
@ -790,10 +825,15 @@ BlockRegistry.register({
Python handler:
```python
@handler("move_to")
def handle_move_to(params: dict[str, str], hardware) -> tuple[bool, str]:
def handle_move_to(params: dict[str, str], hardware: Hardware) -> tuple[bool, str]:
x = float(params["x"])
y = float(params["y"])
hardware.move_to(x, y)
hardware.log(f"move_to(x={x}, y={y})")
if hardware.is_real():
# TODO: publish to cmd_vel or navigation topic
hardware.node.get_logger().info(f"Moving to ({x}, {y})")
return (True, f"Moved to ({x}, {y})")
```
@ -934,8 +974,10 @@ All field values retrieved via `block.getFieldValue('FIELD_NAME')` are **strings
3. Create src/blockly_executor/…/handlers/<name>.py (or add to existing file)
└─ from . import handler
└─ from ..hardware import Hardware
└─ @handler("name")
└─ def handle_name(params, hardware): ... → return (bool, str)
└─ def handle_name(params, hardware: Hardware): ... → return (bool, str)
└─ hardware.log(...) for testing + hardware.is_real() for real logic
4. Test pixi run executor (Terminal 1)
pixi run app (Terminal 2) — drag block, click Run

View File

@ -64,7 +64,12 @@ def _native_open_dialog() -> str:
return path or ""
def _wait_for_future(future, timeout_sec: float = 30.0):
class _CancelledError(Exception):
"""Raised when a wait is cancelled via cancel_event."""
pass
def _wait_for_future(future, timeout_sec: float = 30.0, cancel_event=None):
"""
Wait for an rclpy Future to complete without calling spin.
@ -72,10 +77,13 @@ def _wait_for_future(future, timeout_sec: float = 30.0):
will be resolved by that thread's spin loop. This function simply
polls future.done() with a small sleep to avoid busy-waiting.
If cancel_event is provided and gets set, raises _CancelledError.
Raises TimeoutError if the future doesn't complete within timeout.
"""
deadline = time.monotonic() + timeout_sec
while not future.done():
if cancel_event is not None and cancel_event.is_set():
raise _CancelledError("Cancelled by user")
if time.monotonic() > deadline:
raise TimeoutError(
f"Future did not complete within {timeout_sec}s"
@ -98,6 +106,7 @@ class BlocklyAPI:
def __init__(self, ros_node: Node, action_client: ActionClient) -> None:
self._node = ros_node
self._client = action_client
self._cancel_event = threading.Event()
def execute_action(
self,
@ -144,13 +153,19 @@ class BlocklyAPI:
"message": "Action server not available (timeout 5s)",
}
# Clear cancel state for this new action
self._cancel_event.clear()
# Send goal and wait for result
try:
send_future = self._client.send_goal_async(goal)
# Wait for goal acceptance — the background spin thread
# processes callbacks that resolve this future.
goal_handle = _wait_for_future(send_future, timeout_sec=10.0)
goal_handle = _wait_for_future(
send_future, timeout_sec=10.0,
cancel_event=self._cancel_event,
)
if not goal_handle.accepted:
return {
@ -160,7 +175,10 @@ class BlocklyAPI:
# Wait for result
result_future = goal_handle.get_result_async()
wrapped = _wait_for_future(result_future, timeout_sec=30.0)
wrapped = _wait_for_future(
result_future, timeout_sec=30.0,
cancel_event=self._cancel_event,
)
result = wrapped.result
return {
@ -168,6 +186,13 @@ class BlocklyAPI:
"message": result.message,
}
except _CancelledError:
logger.info("Action cancelled by user")
return {
"success": False,
"message": "Cancelled by user",
}
except TimeoutError as e:
logger.error(f"Timeout: {e}")
return {
@ -182,6 +207,17 @@ class BlocklyAPI:
"message": f"Error: {str(e)}",
}
def cancel_action(self) -> dict:
"""
Cancel the currently running action.
Called from JS via cancelCurrentAction() in bridge.js.
Sets the cancel event so _wait_for_future() returns early.
"""
logger.info("cancel_action: cancel requested from JS")
self._cancel_event.set()
return {"success": True, "message": "Cancel requested"}
def save_workspace(self, json_string: str) -> dict:
"""
Open a native OS "Save As" dialog via zenity/kdialog subprocess and

View File

@ -8,6 +8,24 @@
* Depends on: consoleLog (defined in index.html inline script)
*/
// ─── Cancellation Support ──────────────────────────────────────────────────
let _cancelResolve = null;
/**
* Cancel the currently running executeAction() call.
* Resolves the Promise.race immediately so the JS side unblocks,
* and tells Python to stop waiting for the ROS2 result.
*/
function cancelCurrentAction() {
if (_cancelResolve) {
_cancelResolve({ success: false, message: 'Cancelled by user' });
_cancelResolve = null;
}
if (window.pywebview && window.pywebview.api) {
window.pywebview.api.cancel_action();
}
}
/**
* Execute a ROS2 action via the pywebview bridge.
*
@ -16,6 +34,11 @@
* @returns {Promise<{success: boolean, message: string}>}
*/
async function executeAction(command, params) {
// If stop was already requested, throw immediately to break out of loops
if (typeof debugState !== 'undefined' && debugState.stopRequested) {
throw new Error('STOP_EXECUTION');
}
const keys = Object.keys(params);
const values = Object.values(params).map(String);
@ -27,8 +50,15 @@ async function executeAction(command, params) {
let result;
if (window.pywebview && window.pywebview.api) {
// Running inside pywebview — call real Python bridge
result = await window.pywebview.api.execute_action(command, keys, values);
// Running inside pywebview — call real Python bridge with cancellation support
const cancelPromise = new Promise((resolve) => {
_cancelResolve = resolve;
});
result = await Promise.race([
window.pywebview.api.execute_action(command, keys, values),
cancelPromise,
]);
_cancelResolve = null;
} else {
// Fallback for browser dev: simulate a successful response
consoleLog(

View File

@ -226,6 +226,9 @@ function stopExecution() {
debugState.stopRequested = true;
debugState.isPaused = false;
// Cancel any in-flight executeAction() call (JS + Python side)
cancelCurrentAction();
// If paused, resolve to allow the throw to propagate
if (debugResolve) {
debugResolve();

View File

@ -6,8 +6,7 @@ from rclpy.action.server import ServerGoalHandle
from rclpy.node import Node
from .handlers import HandlerRegistry
from .hardware.dummy_hardware import DummyHardware
from .hardware.real_hardware import RealHardware
from .hardware import Hardware
from .utils import parse_params
# Import will work after colcon build of interfaces package.
@ -27,14 +26,18 @@ class ExecutorNode(Node):
def __init__(self) -> None:
super().__init__("blockly_executor_node")
hardware_mode = self.declare_parameter("hardware_mode", "dummy").value
use_real = self.declare_parameter("use_real_hardware", False).value
if use_real:
hardware = RealHardware(self)
self.get_logger().info("Using RealHardware (ROS2 topics/services to Pi)")
else:
hardware = DummyHardware()
self.get_logger().info("Using DummyHardware (no real hardware)")
# Backward compat: use_real_hardware=true overrides hardware_mode
if hardware_mode == "dummy" and use_real:
hardware_mode = "real"
hardware = Hardware(
mode=hardware_mode,
node=self if hardware_mode == "real" else None,
)
self.get_logger().info(f"Using Hardware(mode='{hardware_mode}')")
self._registry = HandlerRegistry(hardware)

View File

@ -22,7 +22,7 @@ import pkgutil
from pathlib import Path
from typing import Callable
from ..hardware.interface import HardwareInterface
from ..hardware import Hardware
# Global list that collects (command_name, function) pairs during import.
# Each @handler("name") call appends here. HandlerRegistry.__init__
@ -60,7 +60,7 @@ class HandlerRegistry:
the hardware interface to each registered handler.
"""
def __init__(self, hardware: HardwareInterface) -> None:
def __init__(self, hardware: Hardware) -> None:
self._hardware = hardware
self._handlers: dict[str, Callable[[dict[str, str]], tuple[bool, str]]] = {}

View File

@ -1,25 +1,46 @@
"""GPIO command handlers — LED on/off and future GPIO operations."""
"""GPIO command handlers — LED on/off and digital output.
Each handler contains both dummy (logging) and real (ROS2/GPIO) behavior.
When adding a new GPIO block, just add a new @handler function here.
"""
from . import handler
from ..hardware import Hardware
@handler("led_on")
def handle_led_on(params: dict[str, str], hardware) -> tuple[bool, str]:
def handle_led_on(params: dict[str, str], hardware: Hardware) -> tuple[bool, str]:
pin = int(params["pin"])
hardware.set_led(pin, True)
hardware.log(f"set_led(pin={pin}, state=True)")
if hardware.is_real():
# TODO: publish to ROS2 topic for Pi hardware node
hardware.node.get_logger().info(f"LED on pin {pin} ON")
return (True, f"LED on pin {pin} turned ON")
@handler("led_off")
def handle_led_off(params: dict[str, str], hardware) -> tuple[bool, str]:
def handle_led_off(params: dict[str, str], hardware: Hardware) -> tuple[bool, str]:
pin = int(params["pin"])
hardware.set_led(pin, False)
hardware.log(f"set_led(pin={pin}, state=False)")
if hardware.is_real():
# TODO: publish to ROS2 topic for Pi hardware node
hardware.node.get_logger().info(f"LED on pin {pin} OFF")
return (True, f"LED on pin {pin} turned OFF")
@handler("digital_out")
def handle_digital_out(params: dict[str, str], hardware) -> tuple[bool, str]:
def handle_digital_out(params: dict[str, str], hardware: Hardware) -> tuple[bool, str]:
gpio = int(params["gpio"])
state = bool(params["state"])
hardware.set_digital_out(gpio, state)
state_str = "HIGH" if state else "LOW"
hardware.log(f"set_digital_out(gpio={gpio}, state={state})")
if hardware.is_real():
# TODO: publish to ROS2 topic for Pi hardware node
hardware.node.get_logger().info(f"GPIO pin {gpio} set to {state_str}")
return (True, f"GPIO pin {gpio} set to {state_str}")

View File

@ -3,10 +3,11 @@
import time
from . import handler
from ..hardware import Hardware
@handler("delay")
def handle_delay(params: dict[str, str], hardware) -> tuple[bool, str]:
def handle_delay(params: dict[str, str], hardware: Hardware) -> tuple[bool, str]:
duration_ms = int(params["duration_ms"])
time.sleep(duration_ms / 1000.0)
return (True, f"Delayed {duration_ms}ms")

View File

@ -1,7 +1,5 @@
"""Hardware abstraction layer for the executor."""
"""Hardware context for the executor."""
from .interface import HardwareInterface
from .dummy_hardware import DummyHardware
from .real_hardware import RealHardware
from .hardware import Hardware
__all__ = ["HardwareInterface", "DummyHardware", "RealHardware"]
__all__ = ["Hardware"]

View File

@ -1,27 +0,0 @@
"""Dummy hardware implementation for development and testing."""
from .interface import HardwareInterface
class DummyHardware(HardwareInterface):
"""
In-memory hardware implementation that logs all operations.
LED states are stored in a dict and every command is recorded
in call_log for inspection during tests.
"""
def __init__(self) -> None:
self.led_states: dict[int, bool] = {}
self.call_log: list[str] = []
def set_led(self, pin: int, state: bool) -> None:
self.led_states[pin] = state
self.call_log.append(f"set_led(pin={pin}, state={state})")
def is_ready(self) -> bool:
self.call_log.append("is_ready()")
return True
def set_digital_out(self, gpio: int, state: bool) -> None:
self.call_log.append(f"set_digital_out(gpio={gpio}, state={state})")

View File

@ -0,0 +1,31 @@
"""Hardware context — provides mode, ROS2 node, and call_log to handlers."""
from __future__ import annotations
class Hardware:
"""
Hardware context passed to every handler.
Handlers check hardware.is_real() to decide between dummy (logging)
and real (ROS2 publish/GPIO) behavior. This class never needs
modification when adding new blocks all logic lives in handlers.
Attributes:
mode: "dummy" for testing/dev, "real" for actual hardware.
node: ROS2 Node instance (only available when mode="real").
call_log: List of logged actions for testing/debugging.
"""
def __init__(self, mode: str = "dummy", node=None) -> None:
self.mode = mode
self.node = node
self.call_log: list[str] = []
def is_real(self) -> bool:
"""Check if running in real hardware mode."""
return self.mode == "real"
def log(self, message: str) -> None:
"""Log a hardware action for testing/debugging."""
self.call_log.append(message)

View File

@ -1,45 +0,0 @@
"""Abstract base class for hardware interfaces."""
from abc import ABC, abstractmethod
class HardwareInterface(ABC):
"""
Abstract hardware interface that all hardware implementations must extend.
Subclasses must implement all abstract methods. Attempting to instantiate
this class directly or a subclass that doesn't implement all methods
will raise TypeError.
"""
@abstractmethod
def set_led(self, pin: int, state: bool) -> None:
"""
Set LED state on the given pin.
Args:
pin: GPIO pin number.
state: True to turn on, False to turn off.
"""
...
@abstractmethod
def is_ready(self) -> bool:
"""
Check if the hardware interface is ready for commands.
Returns:
True if hardware is initialized and ready.
"""
...
@abstractmethod
def set_digital_out(self, gpio: int, state: bool) -> None:
"""
Set a GPIO pin to HIGH or LOW.
Args:
gpio: GPIO pin number.
state: True for HIGH, False for LOW.
"""
...

View File

@ -1,35 +0,0 @@
"""Real hardware implementation — communicates with hardware nodes via ROS2."""
from rclpy.node import Node
from .interface import HardwareInterface
class RealHardware(HardwareInterface):
"""Hardware interface that talks to ROS2 nodes running on the Raspberry Pi.
Each hardware operation publishes to a topic or calls a service on the
Pi-side hardware nodes. The executor does NOT run on the Pi directly.
Args:
node: The ROS2 node to create publishers/clients on.
"""
def __init__(self, node: Node) -> None:
self._node = node
self._logger = node.get_logger()
# TODO: create publishers/service clients for Pi hardware nodes
# e.g. self._led_pub = node.create_publisher(...)
self._logger.info("RealHardware initialized (stub — publishers TBD)")
def set_led(self, pin: int, state: bool) -> None:
# TODO: publish to Pi hardware node
self._logger.info(f"RealHardware.set_led(pin={pin}, state={state}) — stub")
def is_ready(self) -> bool:
# TODO: check if Pi hardware nodes are reachable
return True
def set_digital_out(self, gpio: int, state: bool) -> None:
# TODO: publish to Pi hardware node
self._logger.info(f"RealHardware.set_digital_out(gpio={gpio}, state={state}) — stub")