
Self Driving and ROS 2 - Learn by Doing! Map & Localization: Nav2 (セクション7-2/11)
Nav2 map_serverはROS 2でマップを管理し、オキュパンシーグリッドを提供する重要なツールです。
QoS(Quality of Service)設定はノード間の信頼性ある通信を確保するために重要です。
Pythonスクリプトを使って異なるQoS設定を実験し、パブリッシャーとサブスクライバー間の互換性の問題を特定できます。
Udemyの「Self Driving and ROS 2 - Learn by Doing! Map & Localization」コースでは、ROS 2におけるマップ表現とQoS(Quality of Service)設定の詳細について学びます。このセクションでは、レッスン93から96にかけて、マップの管理とノード間の信頼性のある通信を確保する方法についての貴重な洞察が得られます。
レッスン93: Nav2 Map Server
Nav2 map_serverはROS 2における重要なツールで、マップのホスティングと管理を担当します。これは、経路計画や障害物回避などのタスクに必要なオキュパンシーグリッドを読み込み、提供します。map_serverは以下のインターフェースを提供します:
ファイルインターフェース: ファイルシステムからマップを読み込み、それをROS 2オキュパンシーグリッドメッセージに変換します。
ROS 2トピックインターフェース: オキュパンシーグリッドメッセージを`/map`トピックに公開します。
サービスインターフェース: `/map`や`/load_map`といったサービスを提供し、他のノードがマップをリクエストしたり、別のマップをロードしたりできるようにします。
ライフサイクルノード: 様々な状態(未構成、非アクティブ、アクティブ)で動作し、その機能を効率的に管理します。
レッスン94: Map Serverのセットアップ
このレッスンでは、新しいROS 2パッケージ`bumperbot_mapping`を作成し、マップを管理します。手順は以下の通りです:
1. パッケージの作成:
cd src
ros2 pkg create --build-type ament_cmake bumperbot_mapping
cd ../
2. マップファイルのコピー: 提供されたマップファイル(`map.yaml`と`map.pgm`)を新しく作成したパッケージにコピーします。
3. CMakeListsの設定: CMakeの設定で、マップディレクトリが正しくインストールされるようにします:
cmake_minimum_required(VERSION 3.8)
project(bumperbot_mapping)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
find_package(ament_cmake REQUIRED)
install(
DIRECTORY maps
DESTINATION share/${PROJECT_NAME}
)
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
ament_lint_auto_find_test_dependencies()
endif()
ament_package()
4. ランチファイルの設定: map_serverを起動し、マップを読み込むランチファイルを作成します:
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
use_sim_time_arg = DeclareLaunchArgument('use_sim_time', default_value='true')
map_name_arg = DeclareLaunchArgument("map_name", default_value="small_house")
map_name = LaunchConfiguration('map_name')
use_sim_time = LaunchConfiguration('use_sim_time')
map_path = PathJoinSubstitution([
get_package_share_directory('bumperbot_mapping'),
'maps',
map_name,
"map.yaml"
])
nav2_map_server = Node(
package='nav2_map_server',
executable='map_server',
name='map_server',
output='screen',
parameters=[{'use_sim_time': use_sim_time, 'yaml_filename': map_path}]
)
return LaunchDescription([map_name_arg, use_sim_time_arg, nav2_map_server])
5. ビルドと起動:
colcon build
source install/setup.bash
ros2 launch bumperbot_localization global_localization.launch.py
レッスン95: Quality of Service
QoS設定は、ROS 2におけるノード間の信頼性のある通信を確保するために重要です。主なQoSポリシーには以下が含まれます:
信頼性: 通信がベストエフォート(best-effort)か、信頼性のある(reliable)ものかを決定します。
耐久性: データが後から参加するサブスクライバーのために保存されるかどうかを指定します。
履歴: メッセージの保存方法を制御します。
デッドライン: メッセージが配信されるべき最大時間を設定します。
活発性: パブリッシャーが「生きている」とみなされるかどうかを定義します。
QoS設定は、ノード、パブリッシャー、およびサブスクライバーに対して個別に設定でき、ロボティックアプリケーションの特定のニーズに合わせて調整することができます。
レッスン96: マルチQoSパブリッシャー
異なるQoS設定を実験し、その影響を観察するためのシンプルなQoSパブリッシャーとサブスクライバーのPythonスクリプトを作成します。
パブリッシャースクリプト:
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy
from std_msgs.msg import String
class SimpleQoSPublisher(Node):
def __init__(self):
super().__init__("simple_qos_publisher")
self.qos_profile = QoSProfile(depth=10)
self.declare_parameter("reliability", "system_default")
self.declare_parameter("durability", "system_default")
reliability = self.get_parameter("reliability").get_parameter_value().string_value
durability = self.get_parameter("durability").get_parameter_value().string_value
if reliability == "best_effort":
self.qos_profile.reliability = QoSReliabilityPolicy.BEST_EFFORT
self.get_logger().info("Reliability set to best effort")
elif reliability == "reliable":
self.qos_profile.reliability = QoSReliabilityPolicy.RELIABLE
self.get_logger().info("Reliability set to reliable")
elif reliability == "system_default":
self.qos_profile.reliability = QoSReliabilityPolicy.SYSTEM_DEFAULT
self.get_logger().info("Reliability set to system default")
else:
self.get_logger().error("Unknown reliability setting: %s" % reliability)
return
if durability == "volatile":
self.qos_profile.durability = QoSDurabilityPolicy.VOLATILE
self.get_logger().info("Durability set to volatile")
elif durability == "transient_local":
self.qos_profile.durability = QoSDurabilityPolicy.TRANSIENT_LOCAL
self.get_logger().info("Durability set to transient local")
elif durability == "system_default":
self.qos_profile.durability = QoSDurabilityPolicy.SYSTEM_DEFAULT
self.get_logger().info("Durability set to system default")
else:
self.get_logger().error("Unknown durability setting: %s" % durability)
return
self.pub_ = self.create_publisher(String, "chatter", self.qos_profile)
self.counter_ = 0
self.timer_ = self.create_timer(1.0, self.timerCallback)
def timerCallback(self):
msg = String()
msg.data = "Hello ROS 2 - counter: %d" % self.counter_
self.pub_.publish(msg)
self.counter_ += 1
def main():
rclpy.init()
simple_publisher = SimpleQoSPublisher()
rclpy.spin(simple_publisher)
simple_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
サブスクライバースクリプト:
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy
from std_msgs.msg import String
class SimpleQoSSubscriber(Node):
def __init__(self):
super().__init__("simple_qos_subscriber")
self.qos_profile = QoSProfile(depth=10)
self.declare_parameter("reliability", "system_default")
self.declare
_parameter("durability", "system_default")
reliability = self.get_parameter("reliability").get_parameter_value().string_value
durability = self.get_parameter("durability").get_parameter_value().string_value
if reliability == "best_effort":
self.qos_profile.reliability = QoSReliabilityPolicy.BEST_EFFORT
self.get_logger().info("Reliability set to best effort")
elif reliability == "reliable":
self.qos_profile.reliability = QoSReliabilityPolicy.RELIABLE
self.get_logger().info("Reliability set to reliable")
elif reliability == "system_default":
self.qos_profile.reliability = QoSReliabilityPolicy.SYSTEM_DEFAULT
self.get_logger().info("Reliability set to system default")
else:
self.get_logger().error("Unknown reliability setting: %s" % reliability)
return
if durability == "volatile":
self.qos_profile.durability = QoSDurabilityPolicy.VOLATILE
self.get_logger().info("Durability set to volatile")
elif durability == "transient_local":
self.qos_profile.durability = QoSDurabilityPolicy.TRANSIENT_LOCAL
self.get_logger().info("Durability set to transient local")
elif durability == "system_default":
self.qos_profile.durability = QoSDurabilityPolicy.SYSTEM_DEFAULT
self.get_logger().info("Durability set to system default")
else:
self.get_logger().error("Unknown durability setting: %s" % durability)
return
self.sub_ = self.create_subscription(String, "chatter", self.msgCallback, self.qos_profile)
def msgCallback(self, msg):
self.get_logger().info("I heard: %s" % msg.data)
def main():
rclpy.init()
simple_subscriber = SimpleQoSSubscriber()
rclpy.spin(simple_subscriber)
simple_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
異なるQoS設定を実験することで、これらのポリシーが通信の信頼性や耐久性にどのように影響するかを理解し、パブリッシャーとサブスクライバー間の互換性の問題を特定することができます。
結論
ROS 2におけるマップ表現とQoS設定を理解することは、堅牢で効率的なロボティックアプリケーションを開発するために不可欠です。Nav2 map_serverはマップ管理のための重要なツールを提供し、QoS設定はノード間の信頼性のある通信を確保します。これらの概念をマスターすることで、複雑な環境をナビゲートし、様々な通信の課題に対処できる高度なシステムを構築することができます。