feat: implement unified odometry interface with nav_msgs/Odometry; add odometry handling and integration tests

master
a2nr 2026-03-17 19:59:01 +07:00
parent f4f808e42c
commit 0f62045260
11 changed files with 467 additions and 11 deletions

View File

@ -53,3 +53,52 @@ pixi run as5600-node # start AS5600 encoder node
`build-gpio`, `build-pca9685`, and `build-as5600` automatically run `setup-dep` which installs system libraries (`libgpiod-dev`, `liblttng-ust-dev`, `i2c-tools`) via `apt`.
See [docs/installation.md](docs/installation.md) for full setup and prerequisites.
---
## Raspberry Pi GPIO Pinout
Pin yang terpakai di project ini ditandai dengan `<<<`.
```
Raspberry Pi 40-Pin Header (BCM)
Pin# NAME NAME Pin#
──────────────────────────── ● ● ────────────────────────────
01 3.3V Power ○ ● 5V Power 02
03 GPIO02 (SDA1) <<< ○ ● 5V Power 04
05 GPIO03 (SCL1) <<< ○ ○ Ground 06
07 GPIO04 ○ ○ GPIO14 08
09 Ground ○ ○ GPIO15 10
11 GPIO17 ⚠ <<< ○ ○ GPIO18 12
13 GPIO27 ⚠ <<< ○ ○ Ground 14
15 GPIO22 ⚠ <<< ○ ○ GPIO23 <<< 16
17 3.3V Power ○ ○ GPIO24 18
19 GPIO10 ○ ○ Ground 20
21 GPIO09 ○ ○ GPIO25 22
23 GPIO11 ○ ○ GPIO08 24
25 Ground ○ ○ GPIO07 26
27 ID_SD ○ ○ ID_SC 28
29 GPIO05 <<< ○ ○ Ground 30
31 GPIO06 <<< ○ ○ GPIO12 32
33 GPIO13 <<< ○ ○ Ground 34
35 GPIO19 ○ ○ GPIO16 36
37 GPIO26 ○ ○ GPIO20 38
39 Ground ○ ○ GPIO21 40
```
### Keterangan Pin Terpakai
| GPIO | Pin# | Fungsi | Node | File |
|------|------|--------|------|------|
| 2 | 03 | I2C SDA bus 1 | `pca9685_node` + `as5600_node` (encoder 0) | `src/pca9685_node/src/pca9685_node.cpp:17` / `src/as5600_node/src/as5600_node.cpp:19` |
| 3 | 05 | I2C SCL bus 1 | `pca9685_node` + `as5600_node` (encoder 0) | (sama) |
| 17 | 11 | Digital output / I2C SDA bus 3 | `gpio_node` output[0] / `as5600_node` (encoder 1) | `src/gpio_node/src/gpio_node.cpp:12` / `/boot/config.txt` |
| 27 | 13 | Digital output / I2C SCL bus 3 | `gpio_node` output[1] / `as5600_node` (encoder 1) | (sama) |
| 22 | 15 | Digital output / I2C SDA bus 4 | `gpio_node` output[2] / `as5600_node` (encoder 2) | (sama) |
| 23 | 16 | I2C SCL bus 4 | `as5600_node` (encoder 2) | `/boot/config.txt` |
| 5 | 29 | Digital input | `gpio_node` input[0] | `src/gpio_node/src/gpio_node.cpp:13` |
| 6 | 31 | Digital input | `gpio_node` input[1] | (sama) |
| 13 | 33 | Digital input | `gpio_node` input[2] | (sama) |
> **⚠ WARNING — GPIO 17, 27, 22 duplikasi:** Dipakai `gpio_node` (digital output) sekaligus `as5600_node` (I2C bit-bang bus 3 & 4). Kalau keduanya jalan bareng, ganti `output_pins` di `gpio_node` ke pin lain.

View File

@ -7,6 +7,7 @@ Every new `ament_python` package under `src/` must follow this structure:
```
src/<package_name>/
├── package.xml
├── readme.md
├── setup.py
├── setup.cfg
├── resource/

View File

@ -3,7 +3,7 @@
> **Project**: Blockly ROS2 Robot Controller (Kiwi Wheel AMR)
> **ROS2 Distro**: Jazzy
> **Last Updated**: 2026-03-16
> **Current Focus**: Task #4 — AS5600 Magnetic Encoder (I2C)
> **Current Focus**: Task #5 — Unified Odometry Interface
Dokumentasi lengkap dapat dilihat di [DOCUMENTATION.md](DOCUMENTATION.md).
@ -257,3 +257,96 @@ ros2 run as5600_node as5600_node --ros-args \
- [x] Handler `encoder_read` berfungsi di dummy mode (test passes)
- [x] Blockly block `encoderRead` muncul di toolbox, generate valid JS code
- [ ] End-to-end: Blockly block → executor (real) → cache `/encoder/state` → return angle
## 5 Enhancement: Unified Odometry Interface — nav_msgs/Odometry : [ ]
Interface odometry menggunakan standar ROS2 `nav_msgs/Odometry` agar kompatibel dengan ekosistem ROS2. Setiap jenis sensor odometry (encoder, IMU, optical) publish ke topic terpisah `odometry_<type>/odom` menggunakan message type yang sama.
**Motivasi**: Modularitas — tambah sensor baru cukup buat node yang publish `nav_msgs/Odometry` ke `odometry_<type>/odom`. Dari Blockly, user pilih source dan field yang mau dibaca.
**Arsitektur**:
```
as5600_node (encoder) → odometry_encoder/odom (nav_msgs/Odometry)
future: imu_node → odometry_imu/odom (nav_msgs/Odometry)
future: optical_node → odometry_optical/odom (nav_msgs/Odometry)
```
**Satuan**: Mengikuti REP-103 kecuali jarak menggunakan **centimeter (cm)** karena rentang pergerakan robot kecil. Angular menggunakan radian.
**Blocker**: Implementasi sensor baru (IMU, optical) menunggu desain mekanik final.
### Implementasi
#### A. Standard Interface — `nav_msgs/Odometry`
Tidak menggunakan custom message. `nav_msgs/Odometry` sudah tersedia di RoboStack (`ros-jazzy-desktop` dan `ros-jazzy-ros-base`). Message fields yang digunakan:
- `pose.pose.position.x/y` — posisi robot (cm)
- `pose.pose.orientation` — quaternion dari heading (2D: z=sin(θ/2), w=cos(θ/2))
- `twist.twist.linear.x/y` — kecepatan robot (cm/s)
- `twist.twist.angular.z` — kecepatan angular (rad/s)
- `header.frame_id` = `"odom"`, `child_frame_id` = `"base_link"`
#### B. AS5600 Node — Kiwi Wheel Kinematics (backward compatible)
`as5600_node` menghitung robot-level odometry langsung dari 3 encoder menggunakan kiwi wheel forward kinematics, lalu publish ke **dua** topic:
- `/encoder/state` (`EncoderRead`) — legacy per-wheel angle, backward compat
- `odometry_encoder/odom` (`nav_msgs/Odometry`) — robot-level pose + twist
**Kiwi Wheel Forward Kinematics** (3 roda @ 120°):
```
Constraint: ωᵢ = (1/r)(-vx·sin(αᵢ) + vy·cos(αᵢ) + L·ωz)
Forward kinematics (3 roda → robot velocity):
vx = (r/n) · Σ(-ωᵢ·sin(αᵢ)) [cm/s]
vy = (r/n) · Σ( ωᵢ·cos(αᵢ)) [cm/s]
ωz = r/(n·L) · Σ(ωᵢ) [rad/s]
Pose integration (Euler):
x += (vx·cos(θ) - vy·sin(θ))·dt [cm]
y += (vx·sin(θ) + vy·cos(θ))·dt [cm]
θ += ωz·dt [rad]
```
**Parameter kiwi wheel** (configurable via `--ros-args -p`):
| Parameter | Type | Default | Keterangan |
|---|---|---|---|
| `wheel_radius` | double | 5.0 | Radius roda (cm) — HARUS di-tune |
| `wheel_distance` | double | 15.0 | Jarak center-to-wheel (cm) — HARUS di-tune |
| `wheel_angles` | double[] | [0, 2π/3, 4π/3] | Sudut posisi roda (rad) |
#### C. Handler — `odometry.py`
```python
@handler("odometry_read")
def handle_odometry_read(params, hardware):
source = params.get("source", "encoder") # "encoder" | "imu" | "optical"
field = params["field"] # "x" | "y" | "heading" | "vx" | "vy" | "omega_z"
```
Lazy-create subscriber per source ke `odometry_<source>/odom`. Cache robot-level fields dengan quaternion → yaw conversion.
#### D. Blockly Block — `odometryRead.js`
```
┌───────────────────────────────────────────────────────────┐
│ Odometry Read source: [Encoder ▾] field: [X (cm) ▾] │
└───────────────────────────────────────────────────────────┘
```
- **source**: `FieldDropdown``Encoder` (extensible: `IMU`, `Optical`)
- **field**: `FieldDropdown``X (cm)`, `Y (cm)`, `Heading (rad)`, `Vel X (cm/s)`, `Vel Y (cm/s)`, `Angular Vel (rad/s)`
- Returns: `Number`, Category: `Robot`, Command: `odometry_read`
#### E. Backward Compatibility
- `EncoderRead.msg`, `encoder.py`, `encoderRead.js` tetap ada dan berfungsi
- Existing test `test_block_encoder.py` tetap pass
- Saved workspace yang pakai `encoderRead` block tetap bisa di-load
#### F. Future Phases (blocked on mekanik)
- Sensor nodes baru (`imu_node`, `optical_node`) publish `nav_msgs/Odometry` ke `odometry_<type>/odom`
- Update `odometryRead.js` dropdown source untuk sensor baru
- Handler `odometry.py` auto-subscribe ke topic baru via `_SOURCE_TOPICS` dict
### Definition Of Done
- [x] Interface menggunakan `nav_msgs/Odometry` (bukan custom message)
- [x] `as5600_node` publish ke `odometry_encoder/odom` dengan kiwi wheel kinematics
- [x] Parameter `wheel_radius`, `wheel_distance`, `wheel_angles` configurable
- [ ] `pixi run build-as5600` berhasil — as5600_node compile dengan nav_msgs dependency
- [x] Handler `odometry_read` berfungsi di dummy mode (test passes)
- [x] Blockly block `odometryRead` dengan source + field dropdowns
- [ ] End-to-end: Blockly → executor (real) → cache `odometry_encoder/odom` → return value
- [x] Backward compat: `encoderRead` block dan `encoder_read` handler tetap berfungsi
- [x] Integration test `test_block_odometry.py` passes di dummy mode

View File

@ -9,6 +9,8 @@ endif()
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(blockly_interfaces REQUIRED)
find_package(nav_msgs REQUIRED)
find_package(geometry_msgs REQUIRED)
add_executable(as5600_node
src/main.cpp
@ -23,6 +25,8 @@ target_include_directories(as5600_node PUBLIC
ament_target_dependencies(as5600_node
rclcpp
blockly_interfaces
nav_msgs
geometry_msgs
)
install(TARGETS as5600_node

View File

@ -1,12 +1,14 @@
#ifndef AS5600_NODE__AS5600_NODE_HPP_
#define AS5600_NODE__AS5600_NODE_HPP_
#include <cmath>
#include <string>
#include <vector>
#include <rclcpp/rclcpp.hpp>
#include "blockly_interfaces/msg/encoder_read.hpp"
#include <nav_msgs/msg/odometry.hpp>
class As5600Node : public rclcpp::Node
{
@ -17,6 +19,7 @@ public:
private:
void open_i2c_buses(const std::vector<std::string> & device_paths);
uint16_t read_raw_angle(int fd);
uint8_t read_status(int fd);
void timer_callback();
std::vector<int> i2c_fds_;
@ -28,8 +31,29 @@ private:
static constexpr uint8_t REG_RAW_ANGLE_H = 0x0C;
static constexpr uint8_t REG_STATUS = 0x0B;
// STATUS register bit masks
static constexpr uint8_t STATUS_MD = 0x20; // Magnet Detected
// Legacy publisher (EncoderRead on /encoder/state)
rclcpp::Publisher<blockly_interfaces::msg::EncoderRead>::SharedPtr encoder_pub_;
// Odometry publisher (nav_msgs/Odometry on odometry_encoder/odom)
rclcpp::Publisher<nav_msgs::msg::Odometry>::SharedPtr odom_pub_;
rclcpp::TimerBase::SharedPtr read_timer_;
// Velocity computation state
std::vector<double> prev_angle_rad_;
bool first_reading_;
double dt_;
// Kiwi wheel kinematics parameters (units: cm, rad)
double wheel_radius_; // cm
double wheel_distance_; // cm (center-to-wheel)
std::vector<double> wheel_angles_; // rad (position angle of each wheel)
// Pose integration state (units: cm, rad)
double pose_x_;
double pose_y_;
double pose_theta_;
};
#endif // AS5600_NODE__AS5600_NODE_HPP_

View File

@ -11,6 +11,8 @@
<depend>rclcpp</depend>
<depend>blockly_interfaces</depend>
<depend>nav_msgs</depend>
<depend>geometry_msgs</depend>
<export>
<build_type>ament_cmake</build_type>

View File

@ -9,22 +9,42 @@
#include <stdexcept>
As5600Node::As5600Node()
: Node("as5600_node")
: Node("as5600_node"),
first_reading_(true),
pose_x_(0.0),
pose_y_(0.0),
pose_theta_(0.0)
{
// Declare ROS2 parameters
this->declare_parameter("i2c_devices", std::vector<std::string>{"/dev/i2c-1"});
this->declare_parameter("publish_rate", 10.0);
// Kiwi wheel geometry (units: cm, rad)
this->declare_parameter("wheel_radius", 5.0); // cm — MUST tune to actual hardware
this->declare_parameter("wheel_distance", 15.0); // cm — MUST tune to actual hardware
this->declare_parameter("wheel_angles", // rad — wheel position angles
std::vector<double>{0.0, 2.094395102393195, 4.188790204786391}); // 0°, 120°, 240°
auto device_paths = this->get_parameter("i2c_devices").as_string_array();
double rate = this->get_parameter("publish_rate").as_double();
wheel_radius_ = this->get_parameter("wheel_radius").as_double();
wheel_distance_ = this->get_parameter("wheel_distance").as_double();
wheel_angles_ = this->get_parameter("wheel_angles").as_double_array();
dt_ = 1.0 / rate;
// Open all I2C buses
open_i2c_buses(device_paths);
// Create publisher for /encoder/state
// Initialize velocity computation state
prev_angle_rad_.resize(i2c_fds_.size(), 0.0);
// Create legacy publisher for /encoder/state (backward compat)
encoder_pub_ = this->create_publisher<blockly_interfaces::msg::EncoderRead>(
"/encoder/state", 10);
// Create odometry publisher on odometry_encoder/odom (nav_msgs/Odometry)
odom_pub_ = this->create_publisher<nav_msgs::msg::Odometry>(
"odometry_encoder/odom", 10);
// Create timer for periodic reads
auto period = std::chrono::duration<double>(1.0 / rate);
read_timer_ = this->create_wall_timer(
@ -33,8 +53,8 @@ As5600Node::As5600Node()
RCLCPP_INFO(
this->get_logger(),
"As5600Node ready — %zu encoder(s), publish_rate=%.1f Hz",
i2c_fds_.size(), rate);
"As5600Node ready — %zu encoder(s), rate=%.1f Hz, r=%.1f cm, L=%.1f cm",
i2c_fds_.size(), rate, wheel_radius_, wheel_distance_);
}
As5600Node::~As5600Node()
@ -91,16 +111,97 @@ uint16_t As5600Node::read_raw_angle(int fd)
return ((buf[0] & 0x0F) << 8) | buf[1];
}
uint8_t As5600Node::read_status(int fd)
{
uint8_t reg = REG_STATUS;
if (write(fd, &reg, 1) != 1) {
RCLCPP_ERROR(this->get_logger(), "I2C status register select failed");
return 0;
}
uint8_t status = 0;
if (read(fd, &status, 1) != 1) {
RCLCPP_ERROR(this->get_logger(), "I2C status read failed");
return 0;
}
return status;
}
void As5600Node::timer_callback()
{
// 1. Read all encoder angles and compute wheel angular velocities
size_t n = std::min(i2c_fds_.size(), wheel_angles_.size());
std::vector<double> wheel_velocities(n, 0.0);
for (size_t i = 0; i < i2c_fds_.size(); ++i) {
uint16_t raw = read_raw_angle(i2c_fds_[i]);
float angle = static_cast<float>(raw) * 360.0f / 4096.0f;
double angle_rad = static_cast<double>(raw) * 2.0 * M_PI / 4096.0;
blockly_interfaces::msg::EncoderRead msg;
msg.encoder_id = static_cast<uint8_t>(i);
msg.raw_angle = raw;
msg.angle = angle;
encoder_pub_->publish(msg);
// Compute wheel angular velocity (handle 0↔2π wraparound)
if (!first_reading_ && i < n) {
double delta = angle_rad - prev_angle_rad_[i];
if (delta > M_PI) delta -= 2.0 * M_PI;
if (delta < -M_PI) delta += 2.0 * M_PI;
wheel_velocities[i] = delta / dt_;
}
prev_angle_rad_[i] = angle_rad;
// Legacy: publish EncoderRead on /encoder/state (backward compat)
blockly_interfaces::msg::EncoderRead enc_msg;
enc_msg.encoder_id = static_cast<uint8_t>(i);
enc_msg.raw_angle = raw;
enc_msg.angle = static_cast<float>(raw) * 360.0f / 4096.0f;
encoder_pub_->publish(enc_msg);
}
if (first_reading_) {
first_reading_ = false;
return;
}
// 2. Kiwi wheel forward kinematics → robot velocity (cm/s, rad/s)
// Constraint: ωᵢ = (1/r)(-vx·sin(αᵢ) + vy·cos(αᵢ) + L·ωz)
// Forward: [vx, vy, ωz] = J⁻¹ · [ω₀, ω₁, ω₂]
double vx = 0.0, vy = 0.0, wz = 0.0;
for (size_t i = 0; i < n; ++i) {
vx += -wheel_velocities[i] * sin(wheel_angles_[i]);
vy += wheel_velocities[i] * cos(wheel_angles_[i]);
wz += wheel_velocities[i];
}
double dn = static_cast<double>(n);
vx *= wheel_radius_ / dn; // cm/s
vy *= wheel_radius_ / dn; // cm/s
wz *= wheel_radius_ / (dn * wheel_distance_); // rad/s
// 3. Integrate pose in world frame (Euler integration)
pose_x_ += (vx * cos(pose_theta_) - vy * sin(pose_theta_)) * dt_;
pose_y_ += (vx * sin(pose_theta_) + vy * cos(pose_theta_)) * dt_;
pose_theta_ += wz * dt_;
// 4. Publish nav_msgs/Odometry on odometry_encoder/odom
nav_msgs::msg::Odometry odom;
odom.header.stamp = this->now();
odom.header.frame_id = "odom";
odom.child_frame_id = "base_link";
// Position (cm)
odom.pose.pose.position.x = pose_x_;
odom.pose.pose.position.y = pose_y_;
odom.pose.pose.position.z = 0.0;
// Orientation — quaternion from yaw (2D: q = [0, 0, sin(θ/2), cos(θ/2)])
odom.pose.pose.orientation.x = 0.0;
odom.pose.pose.orientation.y = 0.0;
odom.pose.pose.orientation.z = sin(pose_theta_ / 2.0);
odom.pose.pose.orientation.w = cos(pose_theta_ / 2.0);
// Twist — body frame velocity (cm/s, rad/s)
odom.twist.twist.linear.x = vx;
odom.twist.twist.linear.y = vy;
odom.twist.twist.linear.z = 0.0;
odom.twist.twist.angular.x = 0.0;
odom.twist.twist.angular.y = 0.0;
odom.twist.twist.angular.z = wz;
odom_pub_->publish(odom);
}

View File

@ -15,4 +15,5 @@ const BLOCK_FILES = [
'delay.js',
'pwmWrite.js',
'encoderRead.js',
'odometryRead.js',
];

View File

@ -0,0 +1,38 @@
BlockRegistry.register({
name: 'odometryRead',
category: 'Robot',
categoryColor: '#5b80a5',
color: '#00897B',
tooltip: 'Read robot odometry — position (cm), velocity (cm/s), or heading (rad)',
definition: {
init: function () {
this.appendDummyInput()
.appendField('Odometry Read source:')
.appendField(new Blockly.FieldDropdown([
['Encoder', 'encoder']
]), 'SOURCE')
.appendField('field:')
.appendField(new Blockly.FieldDropdown([
['X (cm)', 'x'],
['Y (cm)', 'y'],
['Heading (rad)', 'heading'],
['Vel X (cm/s)', 'vx'],
['Vel Y (cm/s)', 'vy'],
['Angular Vel (rad/s)', 'omega_z']
]), 'FIELD');
this.setOutput(true, 'Number');
this.setColour('#00897B');
this.setTooltip('Read robot odometry — position (cm), velocity (cm/s), or heading (rad)');
}
},
generator: function (block) {
var source = block.getFieldValue('SOURCE');
var field = block.getFieldValue('FIELD');
var code =
'parseFloat((await executeAction(\'odometry_read\', { source: \'' + source + '\', field: \'' + field + '\' })).message)';
return [code, Blockly.JavaScript.ORDER_AWAIT];
}
});

View File

@ -0,0 +1,87 @@
"""Odometry handler — read cached robot-level pose/velocity from nav_msgs/Odometry.
Real mode communication:
odometry_read subscribe nav_msgs/Odometry from odometry_encoder/odom as5600_node
Future: odometry_imu/odom, odometry_optical/odom, etc.
Units: cm for position, cm/s for velocity, rad for heading, rad/s for angular velocity.
"""
import math
import threading
from . import handler
from .hardware import Hardware
# Topic mapping: source name → topic
_SOURCE_TOPICS = {
"encoder": "odometry_encoder/odom",
}
def _quaternion_to_yaw(q):
"""Convert quaternion to yaw angle (2D heading)."""
siny = 2.0 * (q.w * q.z + q.x * q.y)
cosy = 1.0 - 2.0 * (q.y * q.y + q.z * q.z)
return math.atan2(siny, cosy)
def _get_odometry_subscriber(hardware: Hardware, source: str):
"""Lazy-create a subscriber for a specific odometry source.
Each source gets its own subscription and cache entry.
"""
cache_attr = "_odometry_cache"
lock_attr = "_odometry_lock"
subs_attr = "_odometry_subs"
if not hasattr(hardware.node, cache_attr):
hardware.node._odometry_cache = {}
hardware.node._odometry_lock = threading.Lock()
hardware.node._odometry_subs = {}
if source not in hardware.node._odometry_subs:
from nav_msgs.msg import Odometry
topic = _SOURCE_TOPICS.get(source, f"odometry_{source}/odom")
def _odom_cb(msg: Odometry):
yaw = _quaternion_to_yaw(msg.pose.pose.orientation)
with hardware.node._odometry_lock:
hardware.node._odometry_cache[source] = {
"x": msg.pose.pose.position.x,
"y": msg.pose.pose.position.y,
"heading": yaw,
"vx": msg.twist.twist.linear.x,
"vy": msg.twist.twist.linear.y,
"omega_z": msg.twist.twist.angular.z,
}
hardware.node._odometry_subs[source] = (
hardware.node.create_subscription(Odometry, topic, _odom_cb, 10)
)
return hardware.node._odometry_cache
@handler("odometry_read")
def handle_odometry_read(
params: dict[str, str], hardware: Hardware
) -> tuple[bool, str]:
source = params.get("source", "encoder")
field = params["field"]
hardware.log(f"odometry_read(source={source}, field={field})")
if hardware.is_real():
cache = _get_odometry_subscriber(hardware, source)
with hardware.node._odometry_lock:
data = cache.get(source)
if data is None:
return (True, "0.0")
value = data.get(field, 0.0)
return (True, str(value))
# Dummy mode — return 0.0
return (True, "0.0")

View File

@ -0,0 +1,56 @@
"""Integration tests for odometry_read handler (nav_msgs/Odometry interface)."""
def test_odometry_read_x(exe_action):
result = exe_action("odometry_read", source="encoder", field="x")
assert result.result.success is True
value = float(result.result.message)
assert value == 0.0 # Dummy mode returns 0.0
def test_odometry_read_y(exe_action):
result = exe_action("odometry_read", source="encoder", field="y")
assert result.result.success is True
assert float(result.result.message) == 0.0
def test_odometry_read_heading(exe_action):
result = exe_action("odometry_read", source="encoder", field="heading")
assert result.result.success is True
assert float(result.result.message) == 0.0
def test_odometry_read_vx(exe_action):
result = exe_action("odometry_read", source="encoder", field="vx")
assert result.result.success is True
assert float(result.result.message) == 0.0
def test_odometry_read_vy(exe_action):
result = exe_action("odometry_read", source="encoder", field="vy")
assert result.result.success is True
assert float(result.result.message) == 0.0
def test_odometry_read_omega_z(exe_action):
result = exe_action("odometry_read", source="encoder", field="omega_z")
assert result.result.success is True
assert float(result.result.message) == 0.0
def test_odometry_read_default_source(exe_action):
"""When source param is omitted, defaults to encoder."""
result = exe_action("odometry_read", field="x")
assert result.result.success is True
assert float(result.result.message) == 0.0
def test_odometry_read_sends_feedback(exe_action):
result = exe_action("odometry_read", source="encoder", field="x")
assert len(result.feedbacks) > 0
assert result.feedbacks[0].status == "executing"
def test_odometry_read_missing_field_fails(exe_action):
result = exe_action("odometry_read", source="encoder")
assert result.result.success is False