【ROS2】twist_mux パッケージ利用メモ

ROSでは twist_muxパッケージという、複数の速度指令(Twistメッセージ)を統合するためのパッケージが用意されています。 前から存在は知っていたものの、使ったことがなかったので使ってみました。そのときのメモです。

実際のシナリオでは、キーボードからは右旋回、ジョイスティックからは左旋回の指示が同時に発信されることがあり、その場合ロボットはどちらの指示を優先すべきか迷ってしまいます。twist_muxは、このような場合に各入力の優先順位やタイムアウトを考慮して、最も適切な速度指令にまとめてくれます。


動作確認

ミニPC: TRIGKEY Green G5
OS: Ubuntu 22.04.3
ROS: humble

インストール

sudo apt-get install ros-humble-twist-mux



起動ファイル

twist_muxノードを起動するためのファイルを twist_mux.launch.py という名前で用意します。
ファイルの内容:

import sys
from launch import LaunchDescription, LaunchService
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(
            package='twist_mux',
            executable='twist_mux',
            name='twist_mux',
            output='screen',
            parameters=[{
                'topics': {
                    'key': {'topic': '/key_cmd_vel', 'timeout': 10.0, 'priority': 10},
                    'joy': {'topic': '/joy_cmd_vel', 'timeout': 2.0, 'priority': 8},
                    'llm': {'topic': '/llm_cmd_vel', 'timeout': 0.5, 'priority': 5}
                },
            }],
            remappings=[
                ('/cmd_vel_out', '/merged_cmd_vel')
            ],
        )
    ])

if __name__ == '__main__':
    ls = LaunchService(argv=sys.argv[1:])
    ls.include_launch_description(generate_launch_description())
    sys.exit(ls.run())

起動方法

python3 twist_mux.launch.py

もしくは

ros2 launch twist_mux.launch.py

動かしたあとの状態


入力のシミュレーション

キーボードからの指示をシミュレート。0.05Hz(=20秒に1回)で併進速度X方向10m/sを指示。 別ターミナルで下記を実行

ros2 topic pub -r 0.05 /key_cmd_vel geometry_msgs/msg/Twist "{linear: {x: 10.0}}"

ジョイスティックからの指示をシミュレート。0.2Hz(=5秒に1回) で併進速度X方向8m/sを指示。 別ターミナルで下記を実行

ros2 topic pub -r 0.2 /joy_cmd_vel geometry_msgs/msg/Twist "{linear: {x: 8.0}}"

LLMからの指示をシミュレート。1Hz=(1秒に1回) で併進速度X方向5m/sを指示。 別ターミナルで下記を実行

ros2 topic pub -r 1 /llm_cmd_vel geometry_msgs/msg/Twist "{linear: {x: 5.0}}"



出力確認

/twist_muxからの出力を確認
別ターミナルで下記を実行

ros2 topic echo /merged_cmd_vel --field linear.x --csv | python3 -c "import sys, datetime; [print(datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3], line, end='') for line in sys.stdin]"

「 --field linear.x」で併進速度のX方向のみに絞り込み。
「--csv」でCSVフォーマットで出力。
パイプを使って時分秒ミリ秒の情報を付与。

出力結果は下記のようになる。「→」以降は追記したコメント

16:18:30.893 5.0
16:18:30.974 10.0
→ /key_cmd_velのタイムアウト10秒間が経過するまで、より低い優先度は出力されない
16:18:41.893 5.0
→ 10秒が経過したので、1秒間隔で出力している /llm_cmd_vel を出力
16:18:42.893 5.0
16:18:43.553 8.0
→ /joy_cmd_vel のタイムアウト2秒間が経過するまで、より低い優先度は出力されない
16:18:45.893 5.0
→ 2秒が経過したので、1秒間隔で出力している /llm_cmd_vel を出力
16:18:46.893 5.0
16:18:47.893 5.0



リンク

twist_mux