前边学习了如何创建广播器,接下来我们来试试如何创建监听器,从而通过代码获取任意两个坐标系之间的关系。

一、编写监听器节点

依然使用之前创建的learning_tf2_py功能包,在其中的learning_tf2_py/learning_tf2_py文件夹中创建如下监听器节点的代码文件turtle_tf2_listener.py,内容如下:
import math

from geometry_msgs.msg import Twist

import rclpy
from rclpy.node import Node

from tf2_ros import TransformException
from tf2_ros.buffer import Buffer
from tf2_ros.transform_listener import TransformListener

from turtlesim.srv import Spawn

class FrameListener(Node):
    def __init__(self):
        super().__init__('turtle_tf2_frame_listener')

        # Declare and acquire `target_frame` parameter
        self.declare_parameter('target_frame', 'turtle1')
        self.target_frame = self.get_parameter(
            'target_frame').get_parameter_value().string_value

        self.tf_buffer = Buffer()
        self.tf_listener = TransformListener(self.tf_buffer, self)

        # Create a client to spawn a turtle
        self.spawner = self.create_client(Spawn, 'spawn')
        # Boolean values to store the information
        # if the service for spawning turtle is available
        self.turtle_spawning_service_ready = False
        # if the turtle was successfully spawned
        self.turtle_spawned = False

        # Create turtle2 velocity publisher
        self.publisher = self.create_publisher(Twist, 'turtle2/cmd_vel', 1)

        # Call on_timer function every second
        self.timer = self.create_timer(1.0, self.on_timer)

    def on_timer(self):
        # Store frame names in variables that will be used to
        # compute transformations
        from_frame_rel = self.target_frame
        to_frame_rel = 'turtle2'

        if self.turtle_spawning_service_ready:
            if self.turtle_spawned:
                # Look up for the transformation between target_frame and turtle2 frames
                # and send velocity commands for turtle2 to reach target_frame
                try:
                    now = rclpy.time.Time()
                    trans = self.tf_buffer.lookup_transform(
                        to_frame_rel,
                        from_frame_rel,
                        now)
                except TransformException as ex:
                    self.get_logger().info(
                        f'Could not transform {to_frame_rel} to {from_frame_rel}: {ex}')
                    return

                msg = Twist()
                scale_rotation_rate = 1.0
                msg.angular.z = scale_rotation_rate * math.atan2(
                    trans.transform.translation.y,
                    trans.transform.translation.x)

                scale_forward_speed = 0.5
                msg.linear.x = scale_forward_speed * math.sqrt(
                    trans.transform.translation.x ** 2 +
                    trans.transform.translation.y ** 2)

                self.publisher.publish(msg)
            else:
                if self.result.done():
                    self.get_logger().info(
                        f'Successfully spawned {self.result.result().name}')
                    self.turtle_spawned = True
                else:
                    self.get_logger().info('Spawn is not finished')
        else:
            if self.spawner.service_is_ready():
                # Initialize request with turtle name and coordinates
                # Note that x, y and theta are defined as floats in turtlesim/srv/Spawn
                request = Spawn.Request()
                request.name = 'turtle2'
                request.x = float(4)
                request.y = float(2)
                request.theta = float(0)
                # Call request
                self.result = self.spawner.call_async(request)
                self.turtle_spawning_service_ready = True
            else:
                # Check if the service is ready
                self.get_logger().info('Service is not ready')

def main():
    rclpy.init()
    node = FrameListener()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass

    rclpy.shutdown()

1.1 代码解析

我们来分析下代码。
from tf2_ros.transform_listener import TransformListener
头文件包含了TransformListener相关的实现。
self.tf_listener = TransformListener(self.tf_buffer, self)
这段代码创建了一个TransformListener对象,创建成功后,就开始接收并存储10秒钟之内的tf2坐标信息。
now = rclpy.time.Time()
trans = self.tf_buffer.lookup_transform(
    to_frame_rel,
    from_frame_rel,
    now)
这段代码是关键了,通过lookup_transform方法查询两个坐标系之间的位姿关系,输入的参数如下:
(1)目标坐标系
(2)原坐标系
(3)查询哪一时刻的坐标关系,tf2::TimePoint()表示最新时刻的。
代码实现中也使用了异常捕捉,如果没有成功获取tf的话,就会提示报错信息。

1.2 修改setup.py文件

打开setup.py文件,添加程序的入口配置:
    entry_points={
        'console_scripts': [
                              ......
            'turtle_tf2_listener = learning_tf2_py.turtle_tf2_listener:main',
        ],

二、编译与运行

打开在前边已经创建的turtle_tf2_demo.launch.py文件,添加如下内容:
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        ...,
        DeclareLaunchArgument(
            'target_frame', default_value='turtle1',
            description='Target frame name.'
        ),
        Node(
            package='learning_tf2_py',
            executable='turtle_tf2_broadcaster',
            name='broadcaster2',
            parameters=[
                {'turtlename': 'turtle2'}
            ]
        ),
        Node(
            package='learning_tf2_py',
            executable='turtle_tf2_listener',
            name='listener',
            parameters=[
                {'target_frame': LaunchConfiguration('target_frame')}
            ]
        ),
    ])
从Launch文件中可以看到,启动一个广播器广播world和turtle2坐标系关系,然后启动一个监听器监听其动态的位姿变换。
现在就可以启动例程试试了。
ros2 launch learning_tf2_py turtle_tf2_demo.launch.py
然后还是在一个新的终端中启动键盘控制节点:
ros2 run turtlesim turtle_teleop_key

三、查看运行效果

在键盘控制节点中,控制小海龟运动,就可以看到类似之前例程一样的效果,第二只海龟在跟随第一只海龟运动了。这背后的原理就是通过坐标变换来实现的,根据代码实现,大家可以思考一下,其中的数学原理是什么样的呢?
如果没想到,可以在纸上画一下三个坐标系之间的关系,然后从turtle1到turtle2坐标系连一根“辅助线”,有没有想起高中学过的向量(是不是已经还给老师了),箭头表示方向,长度表示距离,通过一个给定时间,距离除以时间,是不是就可以得到turtle2跟随的速度指令了,这就是以上代码实现的底层原理。