feat: update GpioNode to use gpiod v2.x API for batch line requests and improved GPIO handling

master
a2nr 2026-03-10 21:55:00 +07:00
parent 9f7ac848d8
commit 0bdd3040e0
1 changed files with 50 additions and 37 deletions

View File

@ -11,6 +11,8 @@ Alur komunikasi:
Jika gpiod tidak tersedia (misalnya di x86 dev machine), node tetap berjalan
dalam simulation mode hanya logging tanpa akses hardware. Ini memungkinkan
pengembangan dan testing tanpa Pi.
Menggunakan gpiod v2.x API (request_lines / LineSettings / Value).
"""
import rclpy
@ -23,6 +25,7 @@ from blockly_interfaces.msg import GpioWrite, GpioRead
# berjalan dalam simulation mode.
try:
import gpiod
from gpiod.line import Direction, Value
HAS_GPIOD = True
except ImportError:
HAS_GPIOD = False
@ -40,10 +43,10 @@ class GpioNode(Node):
self.declare_parameter("input_publish_rate", 10.0) # Hz
self.declare_parameter("gpio_chip", "/dev/gpiochip0") # gpiod v2.x butuh path lengkap
self._output_pins = (
self._output_pins = list(
self.get_parameter("output_pins").get_parameter_value().integer_array_value
)
self._input_pins = (
self._input_pins = list(
self.get_parameter("input_pins").get_parameter_value().integer_array_value
)
rate = (
@ -53,9 +56,10 @@ class GpioNode(Node):
self.get_parameter("gpio_chip").get_parameter_value().string_value
)
self._chip = None
self._output_lines: dict[int, object] = {}
self._input_lines: dict[int, object] = {}
# gpiod v2.x: request_lines() mengembalikan LineRequest object,
# bukan per-line object seperti v1.x.
self._output_request = None
self._input_request = None
# Instance flag — True hanya jika gpiod tersedia DAN chip berhasil dibuka.
# Berbeda dari module-level HAS_GPIOD yang hanya cek import.
@ -93,30 +97,41 @@ class GpioNode(Node):
self._read_timer = self.create_timer(period, self._read_callback)
self.get_logger().info(
f"GpioNode ready — outputs={list(self._output_pins)}, "
f"inputs={list(self._input_pins)}, "
f"GpioNode ready — outputs={self._output_pins}, "
f"inputs={self._input_pins}, "
f"gpiod={'yes' if self._has_hardware else 'no (simulation)'}"
)
def _setup_gpio(self, chip_name: str) -> None:
"""Konfigurasi GPIO lines via gpiod.
"""Konfigurasi GPIO lines via gpiod v2.x API.
Setiap pin di-request dengan consumer name "gpio_node" agar mudah
diidentifikasi via `gpioinfo` jika ada konflik.
gpiod v2.x menggunakan request_lines() untuk merequest semua pin
sekaligus, berbeda dari v1.x yang per-line. LineSettings menentukan
direction (input/output) untuk setiap pin.
"""
self._chip = gpiod.Chip(chip_name)
# Request output lines sebagai satu batch
if self._output_pins:
out_config = {
pin: gpiod.LineSettings(direction=Direction.OUTPUT)
for pin in self._output_pins
}
self._output_request = gpiod.request_lines(
chip_name, consumer="gpio_node", config=out_config
)
for pin in self._output_pins:
self.get_logger().info(f"Configured pin {pin} as OUTPUT")
for pin in self._output_pins:
line = self._chip.get_line(pin)
line.request(consumer="gpio_node", type=gpiod.LINE_REQ_DIR_OUT)
self._output_lines[pin] = line
self.get_logger().info(f"Configured pin {pin} as OUTPUT")
for pin in self._input_pins:
line = self._chip.get_line(pin)
line.request(consumer="gpio_node", type=gpiod.LINE_REQ_DIR_IN)
self._input_lines[pin] = line
self.get_logger().info(f"Configured pin {pin} as INPUT")
# Request input lines sebagai satu batch
if self._input_pins:
in_config = {
pin: gpiod.LineSettings(direction=Direction.INPUT)
for pin in self._input_pins
}
self._input_request = gpiod.request_lines(
chip_name, consumer="gpio_node", config=in_config
)
for pin in self._input_pins:
self.get_logger().info(f"Configured pin {pin} as INPUT")
def _write_callback(self, msg: GpioWrite) -> None:
"""Handle perintah digital output dari executor.
@ -129,14 +144,15 @@ class GpioNode(Node):
state = msg.state
state_str = "HIGH" if state else "LOW"
if pin not in self._output_lines and self._has_hardware:
if pin not in self._output_pins and self._has_hardware:
self.get_logger().warn(
f"Pin {pin} not configured as output — ignoring write"
)
return
if self._has_hardware:
self._output_lines[pin].set_value(1 if state else 0)
if self._has_hardware and self._output_request:
value = Value.ACTIVE if state else Value.INACTIVE
self._output_request.set_value(pin, value)
self.get_logger().info(f"GPIO write: pin={pin} state={state_str}")
@ -147,12 +163,12 @@ class GpioNode(Node):
Setiap pin dipublish sebagai message terpisah agar subscriber
bisa memfilter berdasarkan pin number.
"""
for pin, line in self._input_lines.items():
for pin in self._input_pins:
msg = GpioRead()
msg.pin = pin
if self._has_hardware:
msg.state = bool(line.get_value())
if self._has_hardware and self._input_request:
msg.state = self._input_request.get_value(pin) == Value.ACTIVE
else:
msg.state = False # simulation: selalu LOW
@ -161,17 +177,14 @@ class GpioNode(Node):
def destroy_node(self) -> None:
"""Release GPIO lines saat shutdown.
Penting untuk melepaskan line agar pin bisa digunakan kembali
oleh proses lain tanpa reboot. Jika tidak di-release, gpiod
akan menandai pin sebagai "busy".
gpiod v2.x: release() dipanggil pada LineRequest object,
bukan per-line seperti v1.x.
"""
if self._has_hardware:
for line in self._output_lines.values():
line.release()
for line in self._input_lines.values():
line.release()
if self._chip is not None:
self._chip.close()
if self._output_request:
self._output_request.release()
if self._input_request:
self._input_request.release()
self.get_logger().info("GPIO lines released")
super().destroy_node()