54 KiB
Blockly ROS2 Robot Controller — Technical Documentation
Comprehensive technical documentation for the Blockly ROS2 Robot Controller project. This document covers architecture, setup, file details, custom block creation, integration flow, testing, and troubleshooting.
Table of Contents
- 1. Project Overview
- 2. System Architecture
- 3. Directory Structure
- 4. Installation
- 5. Running the Project
- 6. Detailed File Reference
- 7. Creating Custom Blocks in Blockly
- 8. Blockly–ROS2 Integration Flow
- 9. Testing
- 10. Troubleshooting & Known Issues
1. Project Overview
The Blockly ROS2 Robot Controller is a visual programming system that allows users to control an AMR (Autonomous Mobile Robot) by assembling drag-and-drop blocks in a desktop application. The system bridges three technologies:
| Technology | Role |
|---|---|
| Blockly | Visual programming editor AND runtime executor |
| pywebview | Desktop window hosting the Blockly UI, with a JS↔Python bridge |
| rclpy (ROS2 Jazzy) | Action Client/Server for robot command execution |
| Pixi | Environment manager with RoboStack for isolated ROS2 dependencies |
Key design principle: Blockly is not just an editor — it is the program executor. When Blockly encounters a robot action block, it calls Python via the pywebview bridge, waits for the ROS2 Action to complete, then continues to the next block. This means native Blockly constructs like loops, conditionals, and variables work naturally without any ROS2-side implementation.
2. System Architecture
2.1 High-Level Architecture Diagram
┌────────────────────────────────────────────────────────────────┐
│ Desktop Application │
│ (pywebview) │
│ │
│ ┌──────────────────────┐ │
│ │ Blockly UI │ HTML/JS │
│ │ │ │
│ │ • User assembles │ Blockly is the EXECUTOR — │
│ │ blocks visually │ not just an editor. │
│ │ • if/else, loops, │ │
│ │ variables (native)│ When encountering a robot │
│ │ • Block highlighting│ action block, Blockly calls │
│ │ during execution │ Python and WAITS for result. │
│ │ │ │
│ │ [Run] [Stop] │ │
│ └──────────┬───────────┘ │
│ │ JS ↔ Python bridge (pywebview API) │
│ │ • execute_action(command, keys, values) │
│ │ ← return: {success, message} │
│ ┌──────────▼───────────┐ │
│ │ BlocklyAPI │ Python / rclpy │
│ │ (blockly_app/app.py)│ │
│ │ │ Runs in pywebview thread; │
│ │ • Receives command │ polls futures resolved by │
│ │ from Blockly │ background spin thread. │
│ │ • Sends Action Goal │ │
│ │ • Waits for Result │ │
│ │ • Returns to JS │ │
│ └──────────┬───────────┘ │
└─────────────┼──────────────────────────────────────────────────┘
│ ROS2 Action — BlocklyAction.action
│ (one action per call)
┌─────────────▼──────────────────────────────────────────────────┐
│ Executor Node │
│ (Action Server) │
│ │
│ Handles ONE action at a time. Has no concept of │
│ "program" — sequencing is controlled entirely by Blockly. │
│ │
│ ┌─────────────────────┐ │
│ │ HandlerRegistry │ Extensible command map │
│ │ • led_on │ │
│ │ • led_off │ │
│ │ • delay │ │
│ └──────────┬──────────┘ │
│ ┌──────────▼──────────┐ │
│ │ Hardware Interface │ Abstraction layer │
│ │ • DummyHardware │ for dev & test │
│ │ • GpioHardware │ for Raspberry Pi │
│ └─────────────────────┘ │
└────────────────────────────────────────────────────────────────┘
2.2 Threading Model
The application uses a carefully designed threading model to avoid rclpy's "Executor is already spinning" error:
┌─────────────────────────────────────────────────────┐
│ Process: app.py │
│ │
│ Main Thread (pywebview) │
│ ├── webview.start() ← blocks here │
│ ├── BlocklyAPI.execute_action() called from JS │
│ │ └── _wait_for_future() ← polls future.done() │
│ │ (does NOT call spin) │
│ │ │
│ Background Thread (daemon) │
│ └── MultiThreadedExecutor.spin() │
│ └── processes action client callbacks │
│ (goal response, result, feedback) │
└─────────────────────────────────────────────────────┘
Why MultiThreadedExecutor in the app but NOT in the executor node:
-
App (client side): Uses
MultiThreadedExecutorbecause the background spin thread must process action client callbacks while the main thread pollsfuture.done(). A single-threaded executor would work too, butMultiThreadedExecutorensures callbacks are processed promptly. -
Executor Node (server side): Uses simple
rclpy.spin(node)with the default single-threaded executor. UsingMultiThreadedExecutorwithReentrantCallbackGroupon the server side causes action result delivery failures withrmw_fastrtps_cpp— the client receives default-constructed results (success=False, message='') instead of the actual values.
2.3 ROS2 Interface Contract
Defined in BlocklyAction.action:
# GOAL — one instruction to execute
string command # e.g. "led_on", "delay", "move_forward"
string[] param_keys # e.g. ["pin"]
string[] param_values # e.g. ["3"]
---
# RESULT — sent after action completes or fails
bool success
string message # success message or informative error description
---
# FEEDBACK — sent during execution
string status # "executing" | "done" | "error"
This interface is generic by design — adding new commands never requires modifying the .action file. The command + param_keys/param_values pattern supports any instruction with any parameters.
3. Directory Structure
amr-ros-k4/ # ROS2 Workspace root
│
├── pixi.toml # Environment & task definitions
├── pixi.lock # Locked dependency versions
├── DOCUMENTATION.md # This file
├── PROJECT_MANAGEMENT.md # Project tracking & guides
│
└── src/ # All ROS2 packages
├── blockly_interfaces/ # ROS2 custom interface package (ament_cmake)
│ ├── package.xml
│ ├── CMakeLists.txt
│ └── action/
│ └── BlocklyAction.action # Single action for all instructions
│
├── blockly_executor/ # ROS2 Executor Node package (ament_python)
│ ├── package.xml
│ ├── setup.py
│ ├── setup.cfg
│ ├── resource/blockly_executor # Ament index marker
│ ├── blockly_executor/ # Python module
│ │ ├── __init__.py
│ │ ├── executor_node.py # ROS2 Action Server (thin wrapper)
│ │ ├── handlers/ # @handler decorator + auto-discovery
│ │ │ ├── __init__.py # HandlerRegistry, @handler, auto-discover
│ │ │ ├── gpio.py # @handler("led_on"), @handler("led_off")
│ │ │ └── timing.py # @handler("delay")
│ │ ├── utils.py # parse_params and helpers
│ │ └── hardware/
│ │ ├── __init__.py
│ │ ├── interface.py # HardwareInterface abstract class
│ │ ├── dummy_hardware.py # In-memory impl for dev & test
│ │ ├── real_hardware.py # ROS2 topics/services to Pi nodes
│ │ └── gpio_hardware.py # Direct RPi.GPIO (legacy)
│ └── test/ # Integration test suite
│ ├── __init__.py
│ ├── conftest.py # Shared fixtures: ros_context, exe_action
│ ├── test_block_led_on.py
│ ├── test_block_led_off.py
│ └── test_block_delay.py
│
└── blockly_app/ # pywebview desktop application (ament_python)
├── package.xml
├── setup.py
├── setup.cfg
├── resource/blockly_app # Ament index marker
└── blockly_app/ # Python module
├── __init__.py
├── app.py # Entry point: pywebview + Action Client
└── ui/ # Frontend assets
├── index.html # Main UI with toolbar & workspace
├── vendor/ # Local Blockly JS files (no CDN)
│ ├── blockly.min.js
│ ├── blocks_compressed.js
│ ├── javascript_compressed.js
│ └── en.js
└── blockly/ # Modular block system
├── core/ # Shared infrastructure
│ ├── registry.js # BlockRegistry — auto-register + toolbox
│ ├── breakpoints.js # Debug breakpoint management
│ ├── bridge.js # executeAction — pywebview bridge
│ ├── debug-engine.js # Run, debug, step, stop logic
│ ├── ui-controls.js # Button states and callbacks
│ ├── ui-tabs.js # switchTab(), refreshCodePanel()
│ └── workspace-io.js # exportWorkspace(), importWorkspace()
├── blocks/ # One file per block (auto-discovered)
│ ├── manifest.js # BLOCK_FILES array
│ ├── led_on.js
│ ├── led_off.js
│ └── delay.js
├── loader.js # Dynamic script loader from manifest
└── workspace-init.js # Auto-toolbox + workspace setup
4. Installation
4.1 Prerequisites
| Requirement | Version | Notes |
|---|---|---|
| OS | Ubuntu 22.04+ or Raspberry Pi OS (64-bit) | linux-64 or linux-aarch64 |
| Pixi | Latest | Package manager — install guide |
| Node.js | ≥18 | Only needed once for setup-ui task |
No system-level ROS2 installation is required. Pixi installs ROS2 Jazzy from the RoboStack channel in an isolated environment.
4.2 Step-by-Step Setup
# 1. Clone the repository
git clone <repository-url>
cd amr-ros-k4
# 2. Install all dependencies (ROS2, Python, Qt, etc.)
pixi install
# 3. Build the ROS2 custom interfaces (required once)
pixi run build-interfaces
# 4. Build all packages
pixi run build
# 5. Verify ROS2 is working
pixi run python -c "import rclpy; print('rclpy OK')"
4.3 Building the Blockly Vendor Files
Blockly is loaded from local files (no CDN) to support offline operation on robots in the field:
# Download Blockly and copy to vendor/ (requires internet, run once)
pixi run setup-ui
This runs npm install blockly and copies the built files to src/blockly_app/blockly_app/ui/vendor/.
5. Running the Project
5.1 Running the Desktop Application
Requires two terminals:
# Terminal 1: Start the Executor Node (Action Server)
pixi run executor
# Terminal 2: Start the desktop application (Action Client + UI)
pixi run app
The app window opens with the Blockly workspace. Drag blocks from the toolbox, connect them, and press Run.
5.2 Running the Executor Node Standalone
The executor has two hardware modes controlled by the ROS2 parameter use_real_hardware:
# Dummy mode (default) — in-memory hardware, no real GPIO/motor access
pixi run executor
# Real hardware mode — communicates with hardware nodes on Raspberry Pi via ROS2 topics/services
pixi run executor-hw
The executor does NOT run on the Raspberry Pi directly. In real hardware mode, RealHardware creates ROS2 publishers/service clients that talk to hardware nodes running on the Pi.
The executor logs all received goals and their results to the terminal.
5.3 Running the Test Suite
The executor must be running in a separate terminal before starting tests:
# Terminal 1: Start executor
pixi run executor
# Terminal 2: Run all tests
pixi run test
# Run a specific test file
pixi run test -- src/blockly_executor/test/test_block_led_on.py -v
# Run a single test function
pixi run test -- src/blockly_executor/test/test_block_led_on.py::test_block_led_on_returns_success -v
If the executor is not running, tests are skipped with an informative message rather than failing with a cryptic timeout error.
6. Detailed File Reference
6.1 Application Layer — blockly_app
blockly_app/app.py — Application Entry Point
Purpose: Combines pywebview (desktop UI) with a ROS2 Action Client. This is the bridge between the JavaScript Blockly runtime and the ROS2 ecosystem.
Key components:
| Component | Description |
|---|---|
_native_save_dialog() / _native_open_dialog() |
Opens native OS file dialogs via tkinter.filedialog. Uses Tcl/Tk interpreter separate from Qt — no thread conflict. Available in pixi environment without extra dependencies. |
_wait_for_future(future, timeout_sec) |
Polls future.done() without calling rclpy.spin(). Used because the node is already being spun by a background thread. |
BlocklyAPI |
Python class exposed to JavaScript via pywebview. Its methods are callable as window.pywebview.api.<method>(). |
BlocklyAPI.execute_action() |
Sends a ROS2 Action Goal and blocks until the result arrives. Returns {success: bool, message: str} to JavaScript. |
BlocklyAPI.save_workspace(json_string) |
Opens native "Save As" dialog via tkinter, writes workspace JSON to chosen file. Returns {success, path} directly to JS. |
BlocklyAPI.load_workspace() |
Opens native "Open" dialog via tkinter, reads and validates JSON, returns {success, data, path} to JS. |
main() |
Initializes rclpy, creates the Action Client, starts the background spin thread with MultiThreadedExecutor, creates the pywebview window, and handles cleanup on exit. |
Threading design: The main() function starts MultiThreadedExecutor.spin() in a daemon thread. When JavaScript calls execute_action(), the method uses _wait_for_future() to poll for completion — it never calls rclpy.spin_until_future_complete(), which would conflict with the background spin.
blockly_app/ui/index.html — Main UI
Purpose: The single HTML page that hosts the Blockly workspace, toolbar buttons, and console panel.
Structure:
- Toolbar: Run, Step Over, Step Into, Stop buttons, and Debug Mode toggle
- Blockly Workspace: The drag-and-drop canvas
- Console Panel: Scrollable log output showing execution progress
- Script loading: Loads Blockly vendor files, then core infrastructure, then blocks via auto-loader
Script loading order (fixed — never needs changing when adding blocks):
vendor/blockly.min.js+ other Blockly libsblockly/core/registry.js→breakpoints.js→bridge.js→debug-engine.js→ui-controls.js→ui-tabs.js→workspace-io.jsblockly/blocks/manifest.js+blockly/loader.jsblockly/workspace-init.js- Inline script:
loadAllBlocks().then(() => initWorkspace())
blockly_app/ui/blockly/core/registry.js — Block Registry
Purpose: Central registration system for custom blocks. Each block file calls BlockRegistry.register() to self-register its visual definition, code generator, and toolbox metadata.
| Method | Description |
|---|---|
BlockRegistry.register(config) |
Register a block with name, category, color, definition, and generator |
BlockRegistry.getBlocks() |
Get all registered blocks |
BlockRegistry.getToolboxJSON() |
Build Blockly toolbox JSON from registered blocks + built-in categories |
blockly_app/ui/blockly/core/ui-tabs.js — Tab Switching
Purpose: Manages switching between the Blocks editor tab and the Code preview tab.
| Function | Description |
|---|---|
switchTab(tab) |
Shows/hides #blockly-area and #code-panel. Calls Blockly.svgResize() when returning to Blocks tab (canvas dimensions are zeroed while hidden). |
refreshCodePanel() |
Regenerates JS code from workspace via javascript.javascriptGenerator.workspaceToCode() and displays in #code-output. |
The Code tab updates automatically on every workspace change (via a change listener in workspace-init.js).
blockly_app/ui/blockly/core/workspace-io.js — Workspace Export/Import
Purpose: Exports workspace to JSON (Save As dialog) and imports from JSON (Open dialog) via the Python bridge.
| Function | Description |
|---|---|
exportWorkspace() |
Serializes workspace with Blockly.serialization.workspaces.save(), calls window.pywebview.api.save_workspace(json), logs result path. |
importWorkspace() |
Calls window.pywebview.api.load_workspace(), clears workspace, loads returned JSON with Blockly.serialization.workspaces.load(). |
Both functions use async/await — they return after the file dialog closes and the file has been read/written.
blockly_app/ui/blockly/core/bridge.js — pywebview Bridge
Purpose: Provides executeAction(command, params) which calls Python via the pywebview JS-to-Python bridge. Falls back to a mock when running outside pywebview (browser dev).
blockly_app/ui/blockly/core/debug-engine.js — Debug Engine
Purpose: Implements Run, Debug, Step Over, Step Into, and Stop functionality.
| Function | Description |
|---|---|
runProgram() |
Non-debug execution: wraps generated code in async function and eval()s it |
runDebug() |
Debug execution: wraps executeAction to check breakpoints and add delays |
stepOver() |
Resumes from pause, executes current block, pauses at next block |
stepInto() |
Resumes from pause, pauses at very next highlightBlock call |
stopExecution() |
Sets stopRequested flag, resolves any pending pause Promise |
highlightBlock(blockId) |
Highlights the currently executing block in the workspace |
blockly_app/ui/blockly/blocks/manifest.js — Block Manifest
Purpose: Lists all block files to auto-load. This is the only file you edit when adding a new block (besides creating the block file itself).
const BLOCK_FILES = ['led_on.js', 'led_off.js', 'delay.js'];
blockly_app/ui/blockly/blocks/led_on.js — Example Block
Purpose: Self-contained block definition. Contains both the visual appearance AND the code generator.
Each block file calls BlockRegistry.register() with all metadata, so the toolbox is automatically generated.
6.2 Executor Layer — blockly_executor
blockly_executor/executor_node.py — ROS2 Action Server
Purpose: Thin ROS2 wrapper that receives BlocklyAction goals, delegates to HandlerRegistry, and returns results.
| Component | Description |
|---|---|
ExecutorNode.__init__() |
Creates the Action Server on topic execute_blockly_action. Reads ROS2 parameter use_real_hardware (bool, default False) to select DummyHardware or RealHardware. |
_goal_callback() |
Always returns GoalResponse.ACCEPT |
_execute_callback(goal_handle) |
Publishes "executing" feedback, calls HandlerRegistry.execute(), catches exceptions, always calls goal_handle.succeed() |
main() |
Entry point: rclpy.init() → ExecutorNode() → rclpy.spin(node) |
Important design decision: The execute callback always calls goal_handle.succeed() regardless of whether the command succeeded or failed. The result.success and result.message fields communicate command-level outcome. Using goal_handle.abort() causes result delivery failures with rmw_fastrtps_cpp.
blockly_executor/handlers/ — Decorator-Based Command Handlers
Purpose: Maps command names to handler functions using @handler decorator and auto-discovery. Mirrors the JS frontend's BlockRegistry.register() pattern.
handlers/
├── __init__.py # @handler decorator, auto-discovery, HandlerRegistry
├── gpio.py # @handler("led_on"), @handler("led_off")
└── timing.py # @handler("delay")
@handler decorator — each handler is a plain function:
from . import handler
@handler("led_on")
def handle_led_on(params: dict[str, str], hardware) -> tuple[bool, str]:
pin = int(params["pin"])
hardware.set_led(pin, True)
return (True, f"LED on pin {pin} turned ON")
Auto-discovery: On HandlerRegistry.__init__, all .py files in handlers/ are imported automatically. The @handler decorator collects (command, function) pairs, and the registry binds hardware to each function. No manual imports or module lists needed.
HandlerRegistry (handlers/__init__.py):
| Method | Description |
|---|---|
HandlerRegistry.__init__(hardware) |
Auto-discovers handler modules, binds hardware to all @handler functions |
execute(command, params) |
Looks up handler by name, returns (False, "Unknown command: ...") if not found |
Adding a new handler: Create handlers/<name>.py, use @handler("command"). That's it — no other files to edit.
blockly_executor/utils.py — Utility Functions
| Function | Description |
|---|---|
parse_params(keys, values) |
Converts two parallel arrays into a dict. Raises ValueError if lengths differ. |
blockly_executor/hardware/interface.py — Hardware Abstract Class
| Method | Description |
|---|---|
set_led(pin, state) |
Abstract. Set LED on/off at given GPIO pin. |
is_ready() |
Abstract. Check if hardware is initialized. |
blockly_executor/hardware/dummy_hardware.py — Test/Dev Hardware
In-memory implementation for development and testing. No ROS2 communication, no real hardware.
| Attribute/Method | Description |
|---|---|
led_states: dict[int, bool] |
In-memory LED state storage |
call_log: list[str] |
Log of all method calls for test inspection |
blockly_executor/hardware/real_hardware.py — Real Hardware via ROS2
Communicates with hardware nodes running on the Raspberry Pi via ROS2 topics/services. Requires a Node reference to create publishers and service clients. The executor does NOT run on the Pi — it sends commands over the ROS2 network.
| Parameter | Description |
|---|---|
node: Node |
ROS2 node used to create publishers/clients for Pi hardware nodes |
6.3 ROS2 Interfaces — blockly_interfaces
blockly_interfaces/action/BlocklyAction.action
The single ROS2 action interface used for all commands. See Section 2.3 for the full definition.
Built by pixi run build-interfaces using colcon. The generated Python module is importable as:
from blockly_interfaces.action import BlocklyAction
6.4 Test Suite
Tests are located at src/blockly_executor/test/.
test/conftest.py — Shared Test Fixtures
See Section 9.2 for detailed explanation.
test/test_block_led_on.py
Tests for the led_on command: happy path (success with valid pin), feedback verification, missing parameter failure, and error message content.
test/test_block_led_off.py
Tests for the led_off command with equivalent coverage to led_on.
test/test_block_delay.py
Tests for the delay command including timing verification (±100ms tolerance).
6.5 Configuration Files
pixi.toml — Environment & Task Definitions
| Section | Purpose |
|---|---|
[workspace] |
Project name, version, channels (conda-forge, robostack-jazzy), platforms |
[dependencies] |
Shared deps: python >=3.11, ros-jazzy-base, ros-jazzy-rclpy, pytest, colcon-common-extensions |
[target.linux-64.dependencies] |
Desktop-only: nodejs, pyqtwebengine, qtpy |
[target.linux-64.pypi-dependencies] |
pywebview (PyPI only, not on conda-forge) |
[tasks] |
Shortcut commands — see below |
Task definitions:
| Task | Command | Depends On |
|---|---|---|
build-interfaces |
colcon build --symlink-install --packages-select blockly_interfaces |
— |
build-executor |
colcon build --symlink-install --packages-select blockly_executor |
build-interfaces |
build-app |
colcon build --symlink-install --packages-select blockly_app |
build-interfaces |
build |
colcon build --symlink-install |
build-interfaces |
executor |
source install/setup.bash && ros2 run blockly_executor executor_node |
build-executor |
executor-hw |
... executor_node --ros-args -p use_real_hardware:=true |
build-executor |
app |
source install/setup.bash && python -m blockly_app.app |
build-app |
test |
source install/setup.bash && pytest src/blockly_executor/test/ -v |
build-interfaces |
setup-ui |
Downloads Blockly via npm and copies to src/blockly_app/blockly_app/ui/vendor/ |
— |
7. Creating Custom Blocks in Blockly
7.1 Overview: Auto-Discovery on Both Sides
Both JS and Python use the same pattern: decorator/register + auto-discovery. Adding a new block:
Step File Action
──── ──── ──────
1. JS src/blockly_app/blockly_app/ui/blockly/blocks/<name>.js Create — BlockRegistry.register({...})
src/blockly_app/blockly_app/ui/blockly/blocks/manifest.js Edit — add filename to BLOCK_FILES array
2. Py src/blockly_executor/blockly_executor/handlers/<name>.py Create — @handler("command") function
Files that do NOT need changes: index.html, conftest.py, executor_node.py, BlocklyAction.action, handlers/__init__.py. Both toolbox and handler registry are auto-generated.
7.2 Step 1 — Create Block File (JS)
Create src/blockly_app/blockly_app/ui/blockly/blocks/move_forward.js:
BlockRegistry.register({
name: 'move_forward',
category: 'Robot',
categoryColor: '#5b80a5',
color: '#7B1FA2',
tooltip: 'Move robot forward with given speed and duration',
definition: {
init: function () {
this.appendDummyInput()
.appendField('Move forward speed')
.appendField(new Blockly.FieldNumber(50, 0, 100, 1), 'SPEED')
.appendField('for')
.appendField(new Blockly.FieldNumber(1000, 0), 'DURATION_MS')
.appendField('ms');
this.setPreviousStatement(true, null);
this.setNextStatement(true, null);
this.setColour('#7B1FA2');
this.setTooltip('Move robot forward with given speed and duration');
}
},
generator: function (block) {
const speed = block.getFieldValue('SPEED');
const durationMs = block.getFieldValue('DURATION_MS');
return (
"highlightBlock('" + block.id + "');\n" +
"await executeAction('move_forward', { speed: '" + speed + "', duration_ms: '" + durationMs + "' });\n"
);
}
});
Then add to manifest.js:
const BLOCK_FILES = [
'led_on.js',
'led_off.js',
'delay.js',
'move_forward.js', // ← add here
];
No index.html changes needed. No toolbox XML editing. The block appears automatically in the "Robot" category.
7.3 Step 2 — Register Handler in Python
Create a new file in handlers/ or add to an existing one. Use the @handler decorator — auto-discovery handles the rest.
# src/blockly_executor/blockly_executor/handlers/movement.py
from . import handler
@handler("move_forward")
def handle_move_forward(params: dict[str, str], hardware) -> tuple[bool, str]:
speed = int(params["speed"])
duration_ms = int(params["duration_ms"])
if not (0 <= speed <= 100):
raise ValueError(f"speed must be 0-100, got: {speed}")
hardware.move(direction="forward", speed=speed, duration_ms=duration_ms)
return (True, f"Moved forward at speed {speed} for {duration_ms}ms")
No imports to update, no registry list to edit. The file is auto-discovered on startup.
Verify immediately (no Blockly UI needed):
# Terminal 1
pixi run executor
# Terminal 2 — send goal manually
pixi run bash -c 'source install/setup.bash && ros2 action send_goal /execute_blockly_action blockly_interfaces/action/BlocklyAction "{command: move_forward, param_keys: [speed, duration_ms], param_values: [50, 1000]}"'
7.4 Step 3 — Write Integration Test (Optional)
Create src/blockly_executor/test/test_block_move_forward.py:
"""Integration test for Blockly instruction: move_forward"""
def test_block_move_forward_returns_success(exe_action):
result = exe_action("move_forward", speed="50", duration_ms="200")
assert result.result.success is True
def test_block_move_forward_sends_executing_feedback(exe_action):
result = exe_action("move_forward", speed="50", duration_ms="200")
assert len(result.feedbacks) > 0
assert result.feedbacks[0].status == "executing"
def test_block_move_forward_missing_speed_returns_failure(exe_action):
result = exe_action("move_forward", duration_ms="200")
assert result.result.success is False
assert "speed" in result.result.message.lower()
7.5 Name Consistency Reference Table
handlers.py blocks/<name>.js blocks/<name>.js
─────────── ──────────────── ─────────────────
"move_forward" == name: 'move_forward' == 'move_forward' in executeAction
params["speed"] == 'SPEED' in FieldNumber == getFieldValue('SPEED')
params["duration_ms"] == 'DURATION_MS' == getFieldValue('DURATION_MS')
8. Blockly–ROS2 Integration Flow
8.1 End-to-End Execution Flow
When the user presses Run, the following sequence occurs:
User presses [Run]
│
▼
① Blockly generates JavaScript code from workspace blocks
javascript.javascriptGenerator.workspaceToCode(workspace)
│
▼
② Generated code is wrapped in async function and eval()'d
(async function() {
highlightBlock('block_abc123');
await executeAction('led_on', { pin: '1' });
highlightBlock('block_def456');
await executeAction('delay', { duration_ms: '500' });
})()
│
▼
③ executeAction() calls Python via pywebview bridge
window.pywebview.api.execute_action("led_on", ["pin"], ["1"])
│
▼
④ BlocklyAPI.execute_action() builds ROS2 Action Goal
goal.command = "led_on"
goal.param_keys = ["pin"]
goal.param_values = ["1"]
│
▼
⑤ Action Client sends goal asynchronously
send_future = client.send_goal_async(goal)
│
▼
⑥ _wait_for_future() polls until goal is accepted
(background spin thread processes the callback)
│
▼
⑦ Executor Node receives goal, publishes "executing" feedback
│
▼
⑧ HandlerRegistry.execute("led_on", {"pin": "1"})
→ hardware.set_led(1, True)
→ returns (True, "LED on pin 1 turned ON")
│
▼
⑨ Executor Node calls goal_handle.succeed(), returns Result
│
▼
⑩ _wait_for_future() receives result, returns to BlocklyAPI
│
▼
⑪ BlocklyAPI returns {success: true, message: "..."} to JavaScript
│
▼
⑫ Blockly continues to next block (await resolves)
8.2 Code Generation Pipeline
Each custom block has a code generator defined in its block file (e.g., blocks/led_on.js) that produces JavaScript code. For example, the led_on block with pin=3 generates:
highlightBlock('block_abc123');
await executeAction('led_on', { pin: '3' });
Native Blockly blocks (loops, conditionals, variables) use Blockly's built-in JavaScript generators.
8.3 pywebview Bridge Mechanism
pywebview exposes Python objects to JavaScript through window.pywebview.api. In app.py:
window = webview.create_window(..., js_api=api)
This makes all public methods of BlocklyAPI callable from JavaScript:
const result = await window.pywebview.api.execute_action("led_on", ["pin"], ["3"]);
// result = { success: true, message: "LED on pin 3 turned ON" }
The call is synchronous from JavaScript's perspective — the await pauses Blockly's execution until Python returns.
8.4 Future Waiting Without Blocking
The _wait_for_future() function is the key to avoiding the "Executor is already spinning" error:
def _wait_for_future(future, timeout_sec=30.0):
deadline = time.monotonic() + timeout_sec
while not future.done():
if time.monotonic() > deadline:
raise TimeoutError(...)
time.sleep(0.01) # 10ms polling
return future.result()
Why this works: The background thread running MultiThreadedExecutor.spin() processes all ROS2 callbacks, including action client responses. When a response arrives, the executor's spin loop invokes the callback which marks the future as done. The _wait_for_future() function simply waits for this to happen.
8.5 Debug Mode Flow
When Debug Mode is enabled:
runDebug()wrapsexecuteActionwith breakpoint checking- Before each action, it checks if
debugState.currentBlockIdis inactiveBreakpoints - If a breakpoint is hit, execution pauses via a
Promisethat only resolves when the user clicks Step Over/Step Into - A 300ms delay is added between blocks for visual feedback
- Stop sets
stopRequested = trueand resolves any pending pause Promise, causing the nextexecuteActioncall to throw'STOP_EXECUTION'
9. Testing
9.1 Testing Philosophy
All tests are integration tests that communicate through the real ROS2 Action interface — not unit tests that call internal functions directly. This provides high confidence because the test exercises the exact same communication path as the real application.
Key architectural decisions:
- Executor runs as a separate process — eliminates race conditions from two threads competing for rclpy's global context
DummyHardwareisolates physical hardware — tests run on any laptop without a Raspberry Pi- Real ROS2 nodes are used during tests — ROS2 code is verified, not just Python logic
- One file per block — all scenarios for
led_onlive intest_block_led_on.py
9.2 conftest.py — Shared Fixtures
test/conftest.py provides two fixtures:
ros_context (session-scoped)
@pytest.fixture(scope="session")
def ros_context():
rclpy.init()
yield
rclpy.shutdown()
Initializes ROS2 exactly once for the entire test session.
exe_action (function-scoped)
Each test gets a clean Node("test_action_client") to prevent state leakage. The fixture:
- Creates an
ActionClienton topicexecute_blockly_action - Waits 5 seconds for the server — skips (not fails) if not found
- Returns a
_send()function that builds a Goal, sends it, collects feedback, and returns the result - Destroys the node after the test
9.3 Test File Structure
Every test file follows this pattern:
# src/blockly_executor/test/test_block_<name>.py
"""Integration test for Blockly instruction: <name>"""
# -- HAPPY PATH --
def test_block_<name>_returns_success(exe_action):
result = exe_action("<name>", param="value")
assert result.result.success is True
def test_block_<name>_sends_executing_feedback(exe_action):
result = exe_action("<name>", param="value")
assert len(result.feedbacks) > 0
assert result.feedbacks[0].status == "executing"
# -- SAD PATH --
def test_block_<name>_missing_<param>_returns_failure(exe_action):
result = exe_action("<name>") # intentionally missing param
assert result.result.success is False
9.4 Adding a New Test File
- Create
src/blockly_executor/test/test_block_<name>.py - Write test functions using
exe_actionfixture - No changes needed to
conftest.pyor any other test file - Run:
pixi run test
9.5 Running Tests
# All tests
pixi run test
# Single file
pixi run test -- src/blockly_executor/test/test_block_led_on.py -v
# Single test
pixi run test -- src/blockly_executor/test/test_block_led_on.py::test_block_led_on_returns_success -v
10. Guide: Adding a New ROS2 Package
Every new ament_python package under src/ must follow this structure:
src/<package_name>/
├── package.xml
├── setup.py
├── setup.cfg
├── resource/
│ └── <package_name> # Empty file — ament index marker
├── <package_name>/ # Python module — same name as package
│ ├── __init__.py
│ └── <your_node>.py
└── test/
├── __init__.py
└── test_<feature>.py
setup.cfg:
[develop]
script_dir=$base/lib/<package_name>
[install]
install_scripts=$base/lib/<package_name>
package.xml:
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd"
schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>PACKAGE_NAME</name>
<version>0.1.0</version>
<description>DESCRIPTION</description>
<maintainer email="dev@example.com">developer</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<test_depend>pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
setup.py:
from setuptools import setup, find_packages
package_name = "PACKAGE_NAME"
setup(
name=package_name,
version="0.1.0",
packages=find_packages(exclude=["test"]),
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
("share/" + package_name, ["package.xml"]),
],
install_requires=["setuptools"],
entry_points={
"console_scripts": [
"node_name = PACKAGE_NAME.module:main",
],
},
)
Steps:
mkdir -p src/<package_name>/<package_name>- Create
package.xml,setup.py,setup.cfgfrom templates above touch src/<package_name>/resource/<package_name>- Add
__init__.pyand node files - Add build/run tasks to
pixi.toml colcon build --symlink-install --packages-select <package_name>
11. Blockly Block Types — Templates & Quick Reference
Block Type Overview
| Pattern | Shape | Use Case | Example |
|---|---|---|---|
| Statement block | Top/bottom connectors | Perform an action | led_on, delay, motor_stop |
| Output block | Left plug only | Return a sensor value | read_distance, read_temperature |
| Input block | Input sockets | Accept values from other blocks | move_to(x, y) |
Template A — Statement Block (action command)
BlockRegistry.register({
name: 'led_on', // Must match Python handler name
category: 'Robot',
categoryColor: '#5b80a5',
color: '#4CAF50',
tooltip: 'Turn on LED at the specified GPIO pin',
definition: {
init: function () {
this.appendDummyInput()
.appendField('LED ON pin')
// FieldNumber(default, min, max, step)
.appendField(new Blockly.FieldNumber(1, 0, 40, 1), 'PIN');
this.setPreviousStatement(true, null); // connect above
this.setNextStatement(true, null); // connect below
this.setColour('#4CAF50');
this.setTooltip('Turn on LED at the specified GPIO pin');
}
},
generator: function (block) {
const pin = block.getFieldValue('PIN');
return (
'highlightBlock(\'' + block.id + '\');\n' +
'await executeAction(\'led_on\', { pin: \'' + pin + '\' });\n'
);
}
});
Template B — Output Block (sensor value)
BlockRegistry.register({
name: 'read_distance',
category: 'Sensors',
categoryColor: '#a5745b',
color: '#E91E63',
tooltip: 'Read distance from ultrasonic sensor in cm',
outputType: 'Number',
definition: {
init: function () {
this.appendDummyInput()
.appendField('Distance sensor')
.appendField(new Blockly.FieldDropdown([
['Front', 'front'], ['Left', 'left'], ['Right', 'right'],
]), 'SENSOR_ID');
this.setOutput(true, 'Number'); // left plug, no top/bottom connectors
this.setColour('#E91E63');
this.setTooltip('Read distance from ultrasonic sensor in cm');
}
},
// Output blocks return [code, order] ARRAY, not a string
generator: function (block) {
const sensorId = block.getFieldValue('SENSOR_ID');
const code =
'(await executeAction(\'read_distance\', { sensor_id: \'' + sensorId + '\' })).message';
return [code, javascript.Order.AWAIT];
}
});
Template C — Block with Value Inputs (accepts other blocks)
BlockRegistry.register({
name: 'move_to',
category: 'Navigation',
categoryColor: '#5ba55b',
color: '#00BCD4',
tooltip: 'Move robot to target X and Y coordinates',
definition: {
init: function () {
// appendValueInput creates a socket where output blocks plug in
this.appendValueInput('X').setCheck('Number').appendField('Move to X');
this.appendValueInput('Y').setCheck('Number').appendField('Y');
this.setInputsInline(true);
this.setPreviousStatement(true, null);
this.setNextStatement(true, null);
this.setColour('#00BCD4');
this.setTooltip('Move robot to target X and Y coordinates');
}
},
generator: function (block) {
// valueToCode reads the plugged-in block, returns '0' if empty
const x = javascript.javascriptGenerator.valueToCode(block, 'X', javascript.Order.ATOMIC) || '0';
const y = javascript.javascriptGenerator.valueToCode(block, 'Y', javascript.Order.ATOMIC) || '0';
return (
'highlightBlock(\'' + block.id + '\');\n' +
'await executeAction(\'move_to\', { x: \'' + x + '\', y: \'' + y + '\' });\n'
);
}
});
Quick Reference: Blockly Field Types
| Field Type | Code | Use Case |
|---|---|---|
| Number | new Blockly.FieldNumber(default, min, max, step) |
Pin number, duration, speed |
| Text | new Blockly.FieldTextInput('default') |
Custom labels, names |
| Dropdown | new Blockly.FieldDropdown([['Label','value'], ...]) |
Sensor selection, direction |
| Checkbox | new Blockly.FieldCheckbox('TRUE') |
On/off toggle |
| Color | new Blockly.FieldColour('#ff0000') |
LED color picker |
| Angle | new Blockly.FieldAngle(90) |
Rotation angle with dial |
| Image | new Blockly.FieldImage('url', width, height) |
Icon on block |
Quick Reference: Block Connection Types
| Method | Effect | When to use |
|---|---|---|
setPreviousStatement(true) + setNextStatement(true) |
Stackable block | Action/command blocks |
setOutput(true, type) |
Output plug — returns a value | Sensor/calculation blocks |
appendDummyInput() |
Row with only inline fields | Simple blocks with fixed fields |
appendValueInput('NAME') |
Input socket — accepts output blocks | Blocks that take dynamic values |
appendStatementInput('DO') |
Statement socket — accepts block stack | Container blocks like loops, if-then |
12. Troubleshooting & Known Issues
"Executor is already spinning" in app.py
Symptom: RuntimeError: Executor is already spinning when Blockly calls execute_action().
Cause: Code calls rclpy.spin_until_future_complete() while a background thread is already spinning the same node.
Solution: Use _wait_for_future() which polls future.done() instead of calling spin. The background thread's spin loop resolves the futures.
"Ignoring unexpected goal response" warnings
Symptom: Warning messages about unexpected goal responses.
Cause: Two executor nodes are running simultaneously on the same action topic.
Solution: Ensure only one executor is running:
pkill -f "executor_node"
pixi run executor
Action result always success=False, message=''
Symptom: Executor logs show successful execution, but the client receives default-constructed results.
Cause: Using MultiThreadedExecutor with ReentrantCallbackGroup on the server side causes result delivery failures with rmw_fastrtps_cpp.
Solution: The executor node uses simple rclpy.spin(node) with the default single-threaded executor. Do not add MultiThreadedExecutor or ReentrantCallbackGroup to executor_node.py.
goal_handle.abort() causes empty results
Symptom: When the executor calls goal_handle.abort() for failed commands, the client receives empty result fields.
Solution: Always call goal_handle.succeed(). The result.success field communicates command-level success/failure.
Tests skipped with "Executor Node tidak ditemukan"
Symptom: All tests show SKIPPED with message about executor not found.
Cause: The executor node is not running in a separate terminal.
Solution:
# Terminal 1
pixi run executor
# Terminal 2
pixi run test
Export/Import button has no effect (force close or nothing happens)
Symptom: Clicking Export or Import either force-closes the app or does nothing.
Cause: Qt file dialogs (QFileDialog, pywebview.create_file_dialog) must run on the Qt main thread. pywebview calls Python API methods from a background thread. Attempting to open a Qt dialog from there causes:
pywebview.create_file_dialog→ deadlock viaBlockingQueuedConnection→ force closeQFileDialogviaQTimer.singleShot→ no effect, because non-QThread background threads have no Qt event loop
Solution: Use tkinter.filedialog — tkinter uses its own Tcl/Tk interpreter, completely separate from Qt. filedialog.asksaveasfilename() blocks the calling background thread until the user responds. Already available in the pixi environment (no extra dependency needed).
See _native_save_dialog() in app.py.
pywebview shows "GTK cannot be loaded"
Symptom: Warning about ModuleNotFoundError: No module named 'gi' followed by "Using Qt 5.15".
Impact: This is informational only. pywebview tries GTK first, falls back to Qt (which is installed via pyqtwebengine). The application works correctly with the Qt backend.