简介:前几节我们主要介绍了关于ROS2通讯机制—主题(Topic)的发布与订阅机制的使用方法,一般情况下,数据周期发布、数据交互频率高,或者数据生产与消费不是强关联的功能,我们通过主题的发布与订阅机制来实现,比如摄像头图像数据、小车控制指令等;ROS2系统底层通讯机制还有另外一种—服务(Service),服务一般用于数据交互频率比较低或者没有明显周期性质,而且需要实时返回交互结果的功能需求中,比如我们这一节要实现的在线更新地图的功能。

两种通讯机制应用场景区分并不是绝对的,只有相对适合的应用场景。

无论是主题还是服务,在通信过程中都需要用到消息(Message),ROS2系统预定义了丰富的基础消息类型,我们可以通过ros2 interface list 查看,但是基础消息类型在实际应用时并不能完全适用,比如在第四章我们实现了手动控制仿真小车行走的功能,其中主题发布用的消息类型是TwistStamped,这是一个相对复杂的消息类型,一般用于传递三维空间的数据,而我们只需要传递线速度v和角速度omega两个参数,显然是大材小用了,所以我们需要自定义一个消息类型来适配控制指令。
ROS2系统也预定义了 一些常用服务供我们直接调用,使用方法与使用自定义的服务是一样的,所以我们先学习如何自定义消息与服务,然后再了解如何调用。

注:ROS2中消息(Message)、服务(Service)和动作(Action)统一称之为接口(interface)。


1、自定义消息与服务

1.1 创建接口包

进入工作空间

$ cd ~/ros2_ws/src

创建接口包 duckietown_interface,目前消息与服务只支持cmake方式创建,另外在自定义消息中我们需要用到std_msgs中的Header,所以直接指定依赖std_msgs(也可以后续手动配置)

$ ros2 pkg create duckietown_interface --build-type ament_cmake --dependencies std_msgs

在接口包内创建消息和服务目录

$ cd duckietown_interface/
$ mkdir msg
$ mkdir srv

创建完成后的目录结构:

1.2 创建新消息

所有的消息都是.msg类型文件。
在msg目录下新建Twist2D.msg文件,编辑内容如下,表示该消息含有两个float32类型变量v和omega:

float32 v
float32 omega

在msg目录下新建Twist2DStamped.msg文件,编辑内容如下,表示该消息含有两个变量,Header类型的header和Twist2D类型的twist2d:

std_msgs/Header header
Twist2D twist2d

1.3 创建新服务

所有服务都是.srv类型文件。

在srv目录下新建ChangeMap.srv文件,编辑内容如下,表示该服务请求是一个string类型的map_name,回应的是bool类型的result,请求与回应数据之间用“—-”隔开。

string map_name
---
bool result

1.4 修改CMakeLists.txt

修改接口包目录下的CMakeLists.txt文件,增加以下内容,需要注意的是我们新消息依赖于std_msgs,所以需要添加DEPENDENCIES std_msgs这一行,如果还有其他依赖项,也需要添加,否则编译不会出错,但是调用时会报错:

find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/Twist2D.msg"
  "msg/Twist2DStamped.msg"
  "srv/ChangeMap.srv"
   DEPENDENCIES std_msgs
)

修改后完整文件如下图:

1.5 修改package.xml

修改接口包目录下的package.xml文件,添加3行配置代码:

<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>

如果创建包时没有指定依赖项std_msgs,则需要在这里手动添加

<depend>std_msgs</depend>

修改后完整文件如下图:

1.6 编译并测试新接口

返回工作空间:

$ cd ~/ros2_ws

单独编译新接口:

$ colcon build --packages-select duckietown_interface

配置环境变量:

$ source install/setup.bash

利用ros2 interface package命令查看新接口,当然也可以通过list命令查看新接口是否在列表中:

$ ros2 interface package duckietown_interface

显示消息Twist2D内容:

$ ros2 interface show duckietown_interface/msg/Twist2D

显示消息Twist2DStamped内容:

$ ros2 interface show duckietown_interface/msg/Twist2DStamped

显示服务ChangeMap内容

$ ros2 interface show duckietown_interface/srv/ChangeMap

以上就是新消息与服务的 创建与验证,接下来我们学习如何使用。

2、自定义消息的使用

自定义消息时因为之前的TwistStamped适用于3D数据类型,而我们的需求是传递线速度v和角速度omega两个数据,自定义消息和预定义消息使用是完全一样的,所以直接替换TwistStamped相关配置和代码即可,具体包含以下几部分:

2.1 duckiebot节点修改

2.1.1 修改配置文件

消息与服务都是新创建的,要使用就必须添加相关依赖配置,新建功能包的话可以在创建命令行中直接添加依赖,原有项目需要手动添加,修改duckiebot包内的package.xml文件,添加duckietown_interface接口的依赖配置:

<depend>duckietown_interface</depend>
2.1.2 修改源码

源码中所有TwistStamped相关的部分都需要修改,包括引入、话题订阅和消息解析:

#修改前
from geometry_msgs.msg import TwistStamped #控制消息类型
#修改后
from duckietown_interface.msg import Twist2DStamped #控制消息类型


    #修改前
    self.sub_action = self.create_subscription(TwistStamped, "control_node/action", self.cb_action, 10)
    #修改后
    self.sub_action = self.create_subscription(Twist2DStamped, "control_node/action", self.cb_action, 10)
    #修改前
    def cb_action(self, msg):
        v = msg.twist.linear.x       #线速度
        omega = msg.twist.angular.x  #角速度
        self.action[0] = v
        self.action[1] = omega
    #修改后
    def cb_action(self, msg):
        v = msg.twist2d.v       #线速度
        omega = msg.twist2d.omega  #角速度
        self.action[0] = v
        self.action[1] = omega

2.2 control节点修改

2.2.1 修改配置文件

同样,在package.xml内添加duckietown_interface依赖:

<depend>duckietown_interface</depend>
2.2.2 修改源码文件

同样,修改所有TwistStamped相关部分:

#修改前
from geometry_msgs.msg import TwistStamped
    self.action = TwistStamped()
    self.pub_action = self.create_publisher(TwistStamped, "control_node/action", 10)
    def on_press(self, key):
        #判断是否是方向键,只处理方向键事件
        if key == keyboard.Key.up or key == keyboard.Key.down or key == keyboard.Key.left or key == keyboard.Key.right:
            if key == keyboard.Key.up:      #上:向前
                self.action.twist.linear.x = 0.44  #设置线速度
                self.action.twist.angular.x = 0.0  #设置角速度
            elif key == keyboard.Key.down:  #下:向后
                self.action.twist.linear.x = -0.44 #设置线速度
                self.action.twist.angular.x = 0.0  #设置角速度
            elif key == keyboard.Key.left:  #左:左转
                self.action.twist.linear.x = 0.2   #设置线速度
                self.action.twist.angular.x = 1.0  #设置角速度
            elif key == keyboard.Key.right: #右:右转
                self.action.twist.linear.x = 0.2   #设置线速度
                self.action.twist.angular.x = -1.0 #设置角速度
            #设置消息时间数据
            self.action.header.stamp = self.get_clock().now().to_msg()
            #发布消息
            self.pub_action.publish(self.action)
    #键盘按键松开事件处理,松开方向键时设定线速度和角速度为0并发布
    def on_release(self, key):
         #判断是否是方向键,只处理方向键事件
        if key == keyboard.Key.up or key == keyboard.Key.down or key == keyboard.Key.left or key == keyboard.Key.right:
            self.action.twist.linear.x = 0.0
            self.action.twist.angular.x = 0.0
            self.action.header.stamp = self.get_clock().now().to_msg()
            self.pub_action.publish(self.action)
#修改后
from duckietown_interface.msg import Twist2DStamped
    self.action = Twist2DStamped()
    self.pub_action = self.create_publisher(Twist2DStamped, "control_node/action", 10)
    def on_press(self, key):
        #判断是否是方向键,只处理方向键事件
        if key == keyboard.Key.up or key == keyboard.Key.down or key == keyboard.Key.left or key == keyboard.Key.right:
            if key == keyboard.Key.up:      #上:向前
                self.action.twist2d.v = 0.44  #设置线速度
                self.action.twist2d.omega = 0.0  #设置角速度
            elif key == keyboard.Key.down:  #下:向后
                self.action.twist2d.v = -0.44 #设置线速度
                self.action.twist2d.omega = 0.0  #设置角速度
            elif key == keyboard.Key.left:  #左:左转
                self.action.twist2d.v = 0.2   #设置线速度
                self.action.twist2d.omega = 1.0  #设置角速度
            elif key == keyboard.Key.right: #右:右转
                self.action.twist2d.v = 0.2   #设置线速度
                self.action.twist2d.omega = -1.0 #设置角速度
            #设置消息时间数据
            self.action.header.stamp = self.get_clock().now().to_msg()
            #发布消息
            self.pub_action.publish(self.action)
     #键盘按键松开事件处理,松开方向键时设定线速度和角速度为0并发布
    def on_release(self, key):
         #判断是否是方向键,只处理方向键事件
        if key == keyboard.Key.up or key == keyboard.Key.down or key == keyboard.Key.left or key == keyboard.Key.right:
            self.action.twist2d.v = 0.0
            self.action.twist2d.omega = 0.0
            self.action.header.stamp = self.get_clock().now().to_msg()
            self.pub_action.publish(self.action)

2.3 编译并测试

返回工作空间
$ cd ~/ros2_ws

编译所有包:

$ colcon build

配置环境变量:

$ source install/setup.bash

运行duckiebot节点

$ ros2 run duckiebot duckiebot_node

新建终端,配置环境变量,并运行control节点

$ source ~/ros2_ws/install/setup.bash
$ ros2 run control control_node

运行起来后,通过方向键可以控制小车运动,证明新消息已经成功应用。

3、自定义服务的使用

上文中提到我们目前的功能中,虚拟仿真环境地图是在源码中写死的,要修改就必须修改源码,很不方便,所以我们需要实现在线修改地图的功能,而这个功能的使用频率明显不会太高,正好用ROS2的服务来实现。

注:自定义服务与预定义服务在使用过程是没有区别的,使用预定义服务,可以忽略上边的内容,直接参考以下说明。

3.1 服务端实现

自定义服务接口的过程中我们可以看到,所谓服务接口,就是定义了一组请求与响应的数据格式,在应用时,客户端发送请求数据,服务端处理后返回响应数据,我们先在duckiebot节点中实现服务端功能,主要包括4部分,配置文件修改、服务的引入、创建以及回调函数的实现。

配置文件在这里就不用再修改了,因为在使用自定义消息是,已经在package.xml中添加了相关依赖配置。

直接在源码中添加功能实现即可:

from duckietown_interface.srv import ChangeMap  #引入自定义的服务
    #在__init__函数中创建服务,指定服务的类型,名称以及回调函数
    self.cm_srv = self.create_service(ChangeMap, 'change_map_name', self.cb_change_map)
    #服务回掉函数的实现
    def cb_change_map(self, request, response):
        map_name = request.map_name   #获取请求数据中的地图名称
        try:
            self.env.close()  #关闭原有仿真环境
            #用新地图重新创建仿真环境
            self.env = DuckietownEnv(seed=1,map_name=map_name,draw_curve=False,draw_bbox=False,domain_rand=False,frame_skip=1,distortion=False,camera_rand=False,dynamics_rand=False)
            #初始化仿真环境
            self.env.reset()
            #设置响应数据
            response.result = True
        except Exception:
            #异常情况设置相响应数据
            response.result = False
        return response

3.2 客户端实现

客户端我们还是在control节点中实现,具体功能是按下Tab键时切换duckiebot节点的地图。客户端的实现要稍微复杂一点,ROS2的服务通讯有同步和异步的区别,同步通讯使用简单,但是可能会因为服务端不响应等问题导致死锁(deadlock),而且还不会报错,导致节点崩溃还不容易定位问题,所以我们直接学习异步通讯的实现,实现方式也有若干种,我这里提供其中一种解决方案。

首先还是修改配置文件,添加依赖项,使用自定义消息时已经添加,跳过。

源码中添加功能实现:

import random  #功能中我们需要一个随机数,引入随机数函数库
#引入自定义的服务ChangeMap
from duckietown_interface.srv import ChangeMap
    #在__init__函数中增加以下内容
    #创建一个地图名称数组,切换时随机抽取
    self.map_list = ['udem1','4way','small_loop','loop_empty']
    #创建client,类型ChangeMap,名称要与服务端保持一致change_map_name
    self.cm_cli = self.create_client(ChangeMap, 'change_map_name')
    #等待服务端启动
    while not self.cm_cli.wait_for_service(timeout_sec=1.0):
        self.get_logger().info('change map service not available, waiting again...')
    #初始化请求数据
    self.request = ChangeMap.Request()
    #创建一个发送异步请求的函数
    def send_cm_request(self):
        #从地图名称数组随机抽取一个元素赋值给请求数据
        self.request.map_name = self.map_list[random.randint(0,3)]
        #发送异步请求call_async,同步请求是call
        self.future = self.cm_cli.call_async(self.request)
        #启动一个定时器,监听异步请求的响应,响应在回调函数timer_callback_for_future中处理
        self.timer = self.create_timer(0.05, self.timer_callback_for_future)

    #响应处理函数
    def timer_callback_for_future(self):
        #请求已经完成则处理,否则继续循环等待
        if self.future.done():
            try:
                #从响应结果中取响应数据
                response = self.future.result()
            except Exception as e:
                #如果发生异常,记录异常错误信息
                self.get_logger().info('Service call failed %r' % (e,))
            else:
                #正常响应,打印响应数据
                self.get_logger().info('Result of change_map_service is %s' %response.result)
            #响应处理完毕,关闭定时器
            self.timer.cancel()    
    #在监听按键按下的函数中添加tab键的监听
    def on_press(self, key):
        #判断是否是方向键,只处理方向键事件
        if key == keyboard.Key.up or key == keyboard.Key.down or key == keyboard.Key.left or key == keyboard.Key.right:
            ......
        #监听tab键按下事件,调用发送请求函数
        elif key == keyboard.Key.tab:
            self.send_cm_request()

3.3 编译并测试

编译源码,配置环境变量并启动duckiebot节点

$ cd ~/ros2_ws
$ colcon build
$ source install/setup.bash
$ ros2 run duckiebot duckiebot_node

新开一个终端启动control节点

$ source ~/ros2_ws/install/setup.bash
$ ros2 run control control_node

按下tab键,可以观察到地图的切换(偶尔不切换,是因为随机到了同样的数字),终端打印出通讯结果:

4、附完整版源码

4.1 duckiebot_node.py

#!/usr/bin/env python3

import rclpy
from rclpy.node import Node

import gym
from pyglet.window import key
from gym_duckietown.envs import DuckietownEnv

import cv2
import numpy as np

from sensor_msgs.msg import Image  #发布图像使用Image消息类型
from duckietown_interface.msg import Twist2DStamped #控制消息类型
from duckietown_interface.srv import ChangeMap
from cv_bridge import CvBridge #opencv和ros图像数据转换工具

class DuckiebotNode(Node):

    def __init__(self, name):
        super().__init__(name)
        self.env = DuckietownEnv(seed=1,map_name="udem1",draw_curve=False,draw_bbox=False,domain_rand=False,frame_skip=1,distortion=False,camera_rand=False,dynamics_rand=False)
        self.env.reset()
        #定义图像发布接口
        self.pub_img = self.create_publisher(Image,"duckiebot_node/image",10)
        self.timer = self.create_timer(0.05, self.timer_callback)
        #创建图像转换工具
        self.bridge = CvBridge()
        #定义全局动作变量,默认线速度和角速度都是0,车辆停止
        self.action = np.array([0.0,0.0])
        #订阅控制指令话题
        self.sub_action = self.create_subscription(Twist2DStamped, "control_node/action", self.cb_action, 10)
        self.cm_srv = self.create_service(ChangeMap, 'change_map_name', self.cb_change_map)

    def timer_callback(self):
        #这里不再生成随机动作指令,直接使用全局动作变量
        obs, reward, done, info = self.env.step(self.action)
        #发布图像数据,obs是rgb编码,转化时指定编码,解码时就有据可查
        self.pub_img.publish(self.bridge.cv2_to_imgmsg(obs, 'rgb8'))
        if done:
            self.env.reset()
    #控制指令处理函数
    def cb_action(self, msg):
        v = msg.twist2d.v       #线速度
        omega = msg.twist2d.omega  #角速度
        self.action[0] = v
        self.action[1] = omega

    def cb_change_map(self, request, response):
        map_name = request.map_name
        try:
            self.env.close()
            self.env = DuckietownEnv(seed=1,map_name=map_name,draw_curve=False,draw_bbox=False,domain_rand=False,frame_skip=1,distortion=False,camera_rand=False,dynamics_rand=False)
            self.env.reset()
            response.result = True
        except Exception:
            response.result = False
        return response

def main(args=None):
    rclpy.init(args=args)
    node = DuckiebotNode(name="duckiebot_node")
    rclpy.spin(node=node)
    rclpy.shutdown()

4.2 control_node.py

#!/usr/bin/env python3

import rclpy
from rclpy.node import Node

import cv2
import numpy as np
import random
from pynput import keyboard #引入键盘监听功能库

from sensor_msgs.msg import Image 
from duckietown_interface.msg import Twist2DStamped
from cv_bridge import CvBridge
from duckietown_interface.srv import ChangeMap

class ControlNode(Node):

    def __init__(self,name):
        super().__init__(name)
        #初始化控制消息,设置header.frame_id
        self.action = Twist2DStamped()
        self.action.header.frame_id = name
        #创建控制消息发布接口
        self.pub_action = self.create_publisher(Twist2DStamped, "control_node/action", 10)
        #创建图像消息接收接口(消息类型,话题名称,回调函数,消息队列长度)
        self.sub_img = self.create_subscription(Image, "duckiebot_node/image", self.cb_image, 10)
        #创建图像转换工具
        self.bridge = CvBridge()
        #创建键盘事件监听器,并启动
        self.listener = keyboard.Listener(on_press=self.on_press,on_release=self.on_release)
        self.listener.start()

        self.map_list = ['udem1','4way','small_loop','loop_empty']
        self.cm_cli = self.create_client(ChangeMap, 'change_map_name')
        while not self.cm_cli.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('change map service not available, waiting again...')
        self.request = ChangeMap.Request()

    #图像处理回调函数
    def cb_image(self,imgmsg):
        #ROS图像消息转化为opencv格式,第二个参数指定图像颜色编码格式
        image = self.bridge.imgmsg_to_cv2(imgmsg, 'bgr8')
        #显示图像
        cv2.imshow("image", image)
        cv2.waitKey(1)       

    def send_cm_request(self):
        self.request.map_name = self.map_list[random.randint(0,3)]
        self.future = self.cm_cli.call_async(self.request)
        self.timer = self.create_timer(0.05, self.timer_callback_for_future)

    def timer_callback_for_future(self):
        if self.future.done():
            try:
                response = self.future.result()
            except Exception as e:
                self.get_logger().info('Service call failed %r' % (e,))
            else:
                self.get_logger().info('Result of change_map_service is %s' %response.result)
            self.timer.cancel()             

    #键盘按键按下事件处理,按下方向键时设定线速度和角速度数据并发布
    def on_press(self, key):
        #判断是否是方向键,只处理方向键事件
        if key == keyboard.Key.up or key == keyboard.Key.down or key == keyboard.Key.left or key == keyboard.Key.right:
            if key == keyboard.Key.up:      #上:向前
                self.action.twist2d.v = 0.44  #设置线速度
                self.action.twist2d.omega = 0.0  #设置角速度
            elif key == keyboard.Key.down:  #下:向后
                self.action.twist2d.v = -0.44 #设置线速度
                self.action.twist2d.omega = 0.0  #设置角速度
            elif key == keyboard.Key.left:  #左:左转
                self.action.twist2d.v = 0.2   #设置线速度
                self.action.twist2d.omega = 1.0  #设置角速度
            elif key == keyboard.Key.right: #右:右转
                self.action.twist2d.v = 0.2   #设置线速度
                self.action.twist2d.omega = -1.0 #设置角速度
            #设置消息时间数据
            self.action.header.stamp = self.get_clock().now().to_msg()
            #发布消息
            self.pub_action.publish(self.action)
        elif key == keyboard.Key.tab:
            self.send_cm_request()

     #键盘按键松开事件处理,松开方向键时设定线速度和角速度为0并发布
    def on_release(self, key):
         #判断是否是方向键,只处理方向键事件
        if key == keyboard.Key.up or key == keyboard.Key.down or key == keyboard.Key.left or key == keyboard.Key.right:
            self.action.twist2d.v = 0.0
            self.action.twist2d.omega = 0.0
            self.action.header.stamp = self.get_clock().now().to_msg()
            self.pub_action.publish(self.action)

def main(args=None):
    rclpy.init(args=args)
    node = ControlNode(name="control_node")
    rclpy.spin(node=node)
    rclpy.shutdown()

若有收获,就点个赞吧