简介:ROS2底层通信机制有主题(Topic)和服务(Service)两种,两种通信机制可以解决大部分问题,但是还有一些问题用这两种机制解决起来比较复杂,以无人驾驶虚拟仿真环境中的应用举例,在十字路口或者丁字路口要实现转弯,因为在路口并没有给出转弯车道线,车辆转弯将没有任何依据,通过主题或者服务可以实现强制指挥小车转弯,但是什么时候结束强制指挥又是一个问题,像这类情况,我们就可以通过ROS2中的动作(Action)通信方式来解决,动作一般用在需要一定时间并且需要实时反馈的应用场景,路口转弯正好是这类场景。

目录

1、问题描述
2、解决方案
3、理解动作(Action)
4、自定义动作
5、创建action的服务端
6、创建action的客户端
7、编译并验证
8、附完整源码
8.1 duckiebot_node.py
8.2 control_node.py


1、问题描述

        虚拟仿真环境下的十字路口和丁字路口如下图所示:

        路口内没有车道线,只有在进入路口时的红线标识,在这种情况下要实现转弯或者直行都比较困难,会随机的受当前姿态和相邻地图块车道线的影响,所以我们需要设计一种控制机制,来实现小车在路口区域内的自动驾驶行为。

2、解决方案

        首先,我们先分析一下车辆行为,应该可分为3类,直行、右转、左转,行驶路线如下图:

        地图块大小默默认是0.585米,白线宽0.048米,黄线宽0.024米,车辆在路口内的驾驶行为大致可计算出来:

行驶方向

行驶距离

转弯角度

直行

0.585米

0

右转

((0.585-0.024-0.048*2)/4+0.048)*2*3.14/4≈0.26米

向右90°

左转

(0.585-(0.585-0.024-0.048*2)/4-0.048)*2*3.14/4≈0.66米

向左90°

      由上表可计算出车辆在路口区域内的线速度、角速度和通行时间的关系,以此为基础,我们利用ROS2的动作(Action)来实现路口的通行。

3、理解动作(Action)

        Action并不是一个全新的机制,而是由底层的三个主题和服务组成:一个任务目标(Goal,服务),一个执行结果(Result,服务),周期数据反馈(Feedback,话题)。action是可抢占式的,由于需要执行一段时间,比如执行过程中你不想跑了,那可以随时发送取消指令,动作终止,如果执行过程中发送一个新的action目标,则会直接中断上一个目标开始执行最新的任务目标。总体上来讲,action是一个客户端/服务器的通信模型,客户端发送一个任务目标,服务器端根据收到的目标执行并周期反馈状态,执行完成后反馈一个执行结果。

        下图用官方的Action示意图:

     以路口转弯为例,客户端发起目标服务请求,以指定速度转弯,服务端接收目标要求并响应,客户端发起结果服务请求,服务端开始执行任务并发布实时反馈主题,客户端通过反馈主题可以了解任务执行状态,任务执行完毕响应结果请求。

        接下来我们来学习具体编码实现。

4、自定义动作

        与服务类似,使用动作,需要先定义动作接口,可以新创建接口功能包,也可在已有功能包里添加动作接口,为保证项目结构的清晰,我们在已有的duckietown_interface接口中添加action。

        在接口目录下添加action目录,:

$ cd ~/ros2_ws/src/duckietown_interface/
$ mkdir action

    创建action文件

$ touch action/Crossing.action

  编辑该文件,内容如下:

float32 speed
string direction
---
bool result
---
float32 countdown

        修改CMakeLists.txt:

        注:如果是新建接口包,需要添加:

find_package(rosidl_default_generators REQUIRED)

        修改package.xml,添加action_msgs依赖:

   如果是新建的接口包,还需要添加以下依赖:

<buildtool_depend>rosidl_default_generators</buildtool_depend>
<member_of_group>rosidl_interface_packages</member_of_group>

编译、配置环境变量并验证:

$ colcon build --packages-select duckietown_interface
$ source install/setup.bash
$ ros2 interface show duckietown_interface/action/Crossing

5、创建action的服务端

        action服务端我们还是在duckiebot_node中实现,添加与更改的内容如下:

import time  #动作是一段时间内完成的,引入time库使用时间相关函数
#引入ActionServer库
from rclpy.action import ActionServer
#引入新建的动作接口
from duckietown_interface.action import Crossing
#引入多线程执行器,将节点支持多线程,否则执行动作时会阻塞主线程
from rclpy.executors import MultiThreadedExecutor
    #__init__函数
    #修改定时器频率,仿真器默认是30Hz,保持一致,方便计算
    self.timer = self.create_timer(1/30, self.timer_callback)
    #定义变量,用来标识动作是否在执行,执行过程中禁用其他车辆控制指令
    self.do_action = False
    #创建动作服务
    self.crossing_server = ActionServer(self,Crossing,'crossing_acton',self.execute_callback)
    
    #修改键盘控制车辆指令回调函数,执行动作期间,禁用车辆控制指令
    def cb_action(self, msg):
        if self.do_action:
            return
        v = msg.twist2d.v       #线速度
        omega = msg.twist2d.omega  #角速度
        self.action[0] = v
        self.action[1] = omega
    
    #动作服务回调函数,所有动作指令在这里实现
    def execute_callback(self, goal_handle):
        #开始执行动作,利用do_action变量禁用键盘控制指令
        self.do_action = True
        #从动作请求中获取速度speed和方向direction参数
        speed = goal_handle.request.speed
        direction = goal_handle.request.direction    
        #计算计算实际执行过程中的相关参数:实际线速度、角速度、执行时间、前置执行时间
        real_speed, omega, need_time, prev_time = self.get_action_args(speed, direction)
        #创建反馈主题数据
        feedback_msg = Crossing.Feedback()
        #反馈主题反馈动作完成度百分比的倒计数
        feedback_msg.countdown = 100.0
        #记录当前时间
        current_t = time.time()
        #记录开始时间
        start_t = current_t
        #执行时间结束前,循环执行
        while current_t-start_t<need_time:
            #在前置时间内,需要直行走过0.07米
            if current_t-start_t<prev_time:
                self.action = np.array([real_speed,0.0])
            else: #剩余的时间内,按计算出的线速度和角速度行驶
                self.action = np.array([real_speed, omega])
            #计算剩余时间占总执行时间的百分比
            feedback_msg.countdown = (1-(current_t-start_t)/need_time)*100
            #通过反馈主题发送给客户端
            goal_handle.publish_feedback(feedback_msg)
            #线程休眠0.1秒,继续循环
            time.sleep(0.1)
            #更新当前时间
            current_t = time.time()
        #动作执行完成后,设置目标完成标识
        goal_handle.succeed()
        #在结果服务中,设置结果响应数据
        result = Crossing.Result()
        result.result = True
        #清除动作执行期间的数据
        self.action = np.array([0.0,0.0])
        #启用键盘控制指令
        self.do_action = False
        return result
    
    #根据指定的动作参数(速度:speed,方向:direction)计算实际执行过程中的相关参数
    def get_action_args(self, speed, direction):
        #计算真实需要输入的线速度(目前只是根据实际现象得知:实际速度=给定速度*0.7,为什么是0.7需要仔细读一下仿真源码)
        real_speed =  speed/0.7
        #预设看到红线时开始执行动作,
        #小车摄像头前倾20°左右,根据小车高度和摄像头视角等参数,
        #计算得到红线在摄像头观察图像下边缘时,车辆距离红线约等于7cm
        #这段距离需要车辆先直行一段时间,这段时间下文定义为前置时间(prev_time)
        dist = 0.07
        #动作期间要转过得角度
        angular = 0
        if direction=='left':    #左转弧线距离0.66米,转角90°
            dist += 0.66
            angular = np.pi/2
        elif direction=='right': #右转弧线距离0.26米,转角-90°
            dist += 0.26
            angular = -np.pi/2
        else:                    #直行距离0.585米,转角为0
            dist += 0.585
        need_time = dist/speed   #根据距离计算动作执行时间
        prev_time = 0.07/speed   #计算前置时间,直行通过0.07米的时间
        #计算转弯时角速度,4/9的关系是根据实践大致计算出来的,具体原理需要研读仿真器源码
        omega = angular/(need_time-prev_time)/4*9
        return real_speed, omega, need_time, prev_time
#修改main函数
def main(args=None):
    rclpy.init(args=args)
    node = DuckiebotNode(name="duckiebot_node")
    #添加支持多线程执行器
    executor = MultiThreadedExecutor()
    rclpy.spin(node=node,executor=executor)
    rclpy.shutdown()

     其中关于ROS2的Action关键代码如下(完整代码参考文章结尾):

#创建action server,指定回调函数
self.crossing_server = ActionServer(self,Crossing,'crossing_acton',self.execute_callback)
    #定义回调函数
    def execute_callback(self, goal_handle):
        #从目标请求服务中获取请求参数
        speed = goal_handle.request.speed
        direction = goal_handle.request.direction
        #创建反馈主题消息
        feedback_msg = Crossing.Feedback()
        #在循环中是新具体action的操作
        while True:
            ......
             #执行一轮,将结果或者完成情况等信息通过反馈主题发送给客户端
            feedback_msg.countdown = (1-(current_t-start_t)/need_time)*100        
            goal_handle.publish_feedback(feedback_msg)
        #动作执行完成后,设置目标完成标识
        goal_handle.succeed()
        #在结果服务中,设置结果响应数据
        result = Crossing.Result()
        result.result = True
        return result

6、创建action的客户端

        action客户端我们在control_node中实现,具体功能为当按下W键,向前行驶一个地图块,按下A键左转90度,按下D键右转90度,速度统一为0.2。

        需要添加修改的代码如下:

from duckietown_interface.action import Crossing #引入新建的action接口
from rclpy.action import ActionClient            #引入ActionClient库
    #创建action客户端,名称需要与服务端保持一致
    self.crossing_action = ActionClient(self, Crossing, 'crossing_acton')
    
    #创建调用函数,输入参数speed和direction
    def send_goal(self, speed, direction):
        #创建目标服务请求数据并赋值
        goal_msg = Crossing.Goal()
        goal_msg.speed = speed
        goal_msg.direction = direction
        #等待服务端上线
        self.crossing_action.wait_for_server()
        #发出异步请求,并设置反馈回调函数
        self._send_goal_future = self.crossing_action.send_goal_async(goal_msg,feedback_callback=self.feedback_callback)
        #设置目标服务响应回调函数
        self._send_goal_future.add_done_callback(self.goal_response_callback)
    
    #反馈回调函数,收到反馈数据,打印完成度
    def feedback_callback(self, feedback_msg):
        countdown = feedback_msg.feedback.countdown
        print('Received feedback: countdown ',countdown,'%')
    
    #目标服务响应回调函数,如果目标服务无响应,则结束动作请求,有响应,则设置结果服务回调函数
    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            print('Goal rejected!')
            return
        print('Goal accepted!')
        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)
    
    #结果服务回调函数
    def get_result_callback(self, future):
        result = future.result().result
        print('Result: crossing ', result)
    
    #键盘按键按下事件处理,按下方向键时设定线速度和角速度数据并发布
    def on_press(self, key):
        #判断是否是方向键,只处理方向键事件
        if key == keyboard.Key.up or key == keyboard.Key.down or key == keyboard.Key.left or key == keyboard.Key.right:
            ......
        elif key == keyboard.Key.tab:
            self.send_cm_request()
        elif isinstance(key, keyboard._xorg.KeyCode):
            if key.char == 'w': #按下W键,向前
                self.send_goal(0.2, 'forward')
            elif key.char == 'a': #按下A键,左转
                self.send_goal(0.2, 'left')
            elif key.char == 'd': #按下D键,右转
                self.send_goal(0.2, 'right')

      客户端关键代码其实就是简介中提到的两服务一主题的回调函数的嵌套使用,按照业务逻辑流程依次调用处理即可。

7、编译并验证

        返回工作空间,编译、设置环境变量、启动duckiebot节点

$ cd ~/ros2_ws
$ colcon build
$ source install/setup.bash
$ ros2 run duckiebot duckiebot_node

       新建终端,设置环境变量,启动control节点

$ source ~/ros2_ws/install/setup.bash
$ ros2 run control control_node

       通过方向键控制小车行驶到红线附近,根据情况,按下W/A/D键测试路口通行功能:

        右转

        左转

   到此为止,我们就完成了通过自定义动作实现在路口转弯的功能。

8、附完整源码

8.1 duckiebot_node.py

#!/usr/bin/env python3
 
import rclpy
from rclpy.node import Node
 
import gym
from pyglet.window import key
from gym_duckietown.envs import DuckietownEnv
 
import cv2
import numpy as np
import time  #动作是一段时间内完成的,引入time库使用时间相关函数
 
from sensor_msgs.msg import Image  #发布图像使用Image消息类型
from duckietown_interface.msg import Twist2DStamped #控制消息类型
from duckietown_interface.srv import ChangeMap
from cv_bridge import CvBridge #opencv和ros图像数据转换工具
 
#引入ActionServer库
from rclpy.action import ActionServer
#引入新建的动作接口
from duckietown_interface.action import Crossing
#引入多线程执行器,将节点支持多线程,否则执行动作时会阻塞主线程
from rclpy.executors import MultiThreadedExecutor
 
class DuckiebotNode(Node):
 
    def __init__(self, name):
        super().__init__(name)
        self.env = DuckietownEnv(seed=1,map_name="udem1",draw_curve=False,draw_bbox=False,domain_rand=False,frame_skip=1,distortion=False,camera_rand=False,dynamics_rand=False)
        self.env.reset()
        #定义图像发布接口
        self.pub_img = self.create_publisher(Image,"duckiebot_node/image",10)
        #修改定时器频率,仿真器默认是30Hz,保持一致,方便计算
        self.timer = self.create_timer(1/30, self.timer_callback)
        #创建图像转换工具
        self.bridge = CvBridge()
        #定义全局动作变量,默认线速度和角速度都是0,车辆停止
        self.action = np.array([0.0,0.0])
        #订阅控制指令话题
        self.sub_action = self.create_subscription(Twist2DStamped, "control_node/action", self.cb_action, 10)
        self.cm_srv = self.create_service(ChangeMap, 'change_map_name', self.cb_change_map)
        
        #定义变量,用来标识动作是否在执行,执行过程中禁用其他车辆控制指令
        self.do_action = False
        #创建动作服务
        self.crossing_server = ActionServer(self,Crossing,'crossing_acton',self.execute_callback)
    
    def timer_callback(self):
        #这里不再生成随机动作指令,直接使用全局动作变量
        obs, reward, done, info = self.env.step(self.action)
        #发布图像数据,obs是rgb编码,转化时指定编码,解码时就有据可查
        self.pub_img.publish(self.bridge.cv2_to_imgmsg(obs, 'rgb8'))
        if done:
            self.env.reset()
    #修改键盘控制车辆指令回调函数,执行动作期间,禁用车辆控制指令
    def cb_action(self, msg):
        if self.do_action:
            return
        v = msg.twist2d.v       #线速度
        omega = msg.twist2d.omega  #角速度
        self.action[0] = v
        self.action[1] = omega
    
    def cb_change_map(self, request, response):
        map_name = request.map_name
        try:
            self.env.close()
            self.env = DuckietownEnv(seed=1,map_name=map_name,draw_curve=False,draw_bbox=False,domain_rand=False,frame_skip=1,distortion=False,camera_rand=False,dynamics_rand=False)
            self.env.reset()
            response.result = True
        except Exception:
            response.result = False
        return response
    
     #动作服务回调函数,所有动作指令在这里实现
    def execute_callback(self, goal_handle):
        #开始执行动作,利用do_action变量禁用键盘控制指令
        self.do_action = True
        #从动作请求中获取速度speed和方向direction参数
        speed = goal_handle.request.speed
        direction = goal_handle.request.direction    
        #计算计算实际执行过程中的相关参数:实际线速度、角速度、执行时间、前置执行时间
        real_speed, omega, need_time, prev_time = self.get_action_args(speed, direction)
        #创建反馈主题数据
        feedback_msg = Crossing.Feedback()
        #反馈主题反馈动作完成度百分比的倒计数
        feedback_msg.countdown = 100.0
        #记录当前时间
        current_t = time.time()
        #记录开始时间
        start_t = current_t
        #执行时间结束前,循环执行
        while current_t-start_t<need_time:
            #在前置时间内,需要直行走过0.07米
            if current_t-start_t<prev_time:
                self.action = np.array([real_speed,0.0])
            else: #剩余的时间内,按计算出的线速度和角速度行驶
                self.action = np.array([real_speed, omega])
            #计算剩余时间占总执行时间的百分比
            feedback_msg.countdown = (1-(current_t-start_t)/need_time)*100
            #通过反馈主题发送给客户端
            goal_handle.publish_feedback(feedback_msg)
            #线程休眠0.1秒,继续循环
            time.sleep(0.1)
            #更新当前时间
            current_t = time.time()
        #动作执行完成后,设置目标完成标识
        goal_handle.succeed()
        #在结果服务中,设置结果响应数据
        result = Crossing.Result()
        result.result = True
        #清除动作执行期间的数据
        self.action = np.array([0.0,0.0])
        #启用键盘控制指令
        self.do_action = False
        return result
    
    #根据指定的动作参数(速度:speed,方向:direction)计算实际执行过程中的相关参数
    def get_action_args(self, speed, direction):
        #计算真实需要输入的线速度(目前只是根据实际现象得知:实际速度=给定速度*0.7,为什么是0.7需要仔细读一下仿真源码)
        real_speed =  speed/0.7
        #预设看到红线时开始执行动作,
        #小车摄像头前倾20°左右,根据小车高度和摄像头视角等参数,
        #计算得到红线在摄像头观察图像下边缘时,车辆距离红线约等于7cm
        #这段距离需要车辆先直行一段时间,这段时间下文定义为前置时间(prev_time)
        dist = 0.07
        #动作期间要转过得角度
        angular = 0
        if direction=='left':    #左转弧线距离0.66米,转角90°
            dist += 0.66
            angular = np.pi/2
        elif direction=='right': #右转弧线距离0.26米,转角-90°
            dist += 0.26
            angular = -np.pi/2
        else:                    #直行距离0.585米,转角为0
            dist += 0.585
        need_time = dist/speed   #根据距离计算动作执行时间
        prev_time = 0.07/speed   #计算前置时间,直行通过0.07米的时间
        #计算转弯时角速度,4/9的关系是根据实践大致计算出来的,具体原理需要研读仿真器源码
        omega = angular/(need_time-prev_time)/4*9
        return real_speed, omega, need_time, prev_time
        
 
def main(args=None):
    rclpy.init(args=args)
    node = DuckiebotNode(name="duckiebot_node")
    executor = MultiThreadedExecutor()
    rclpy.spin(node=node,executor=executor)
    rclpy.shutdown()

8.2 control_node.py

#!/usr/bin/env python3
 
import rclpy
from rclpy.node import Node
 
import cv2
import numpy as np
import random
from pynput import keyboard #引入键盘监听功能库
 
from sensor_msgs.msg import Image 
from duckietown_interface.msg import Twist2DStamped
from cv_bridge import CvBridge
from duckietown_interface.srv import ChangeMap
 
from duckietown_interface.action import Crossing #引入新建的action接口
from rclpy.action import ActionClient            #引入ActionClient库
 
class ControlNode(Node):
    
    def __init__(self,name):
        super().__init__(name)
        #初始化控制消息,设置header.frame_id
        self.action = Twist2DStamped()
        self.action.header.frame_id = name
        #创建控制消息发布接口
        self.pub_action = self.create_publisher(Twist2DStamped, "control_node/action", 10)
        #创建图像消息接收接口(消息类型,话题名称,回调函数,消息队列长度)
        self.sub_img = self.create_subscription(Image, "duckiebot_node/image", self.cb_image, 10)
        #创建图像转换工具
        self.bridge = CvBridge()
        #创建键盘事件监听器,并启动
        self.listener = keyboard.Listener(on_press=self.on_press,on_release=self.on_release)
        self.listener.start()
        
        self.map_list = ['udem1','4way','small_loop','loop_empty']
        self.cm_cli = self.create_client(ChangeMap, 'change_map_name')
        while not self.cm_cli.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('change map service not available, waiting again...')
        self.request = ChangeMap.Request()
        
        #创建action客户端,名称需要与服务端保持一致
        self.crossing_action = ActionClient(self, Crossing, 'crossing_acton')
    
    #创建调用函数,输入参数speed和direction
    def send_goal(self, speed, direction):
        #创建目标服务请求数据并赋值
        goal_msg = Crossing.Goal()
        goal_msg.speed = speed
        goal_msg.direction = direction
        #等待服务端上线
        self.crossing_action.wait_for_server()
        #发出异步请求,并设置反馈回调函数
        self._send_goal_future = self.crossing_action.send_goal_async(goal_msg,feedback_callback=self.feedback_callback)
        #设置目标服务响应回调函数
        self._send_goal_future.add_done_callback(self.goal_response_callback)
    
    #反馈回调函数,收到反馈数据,打印完成度
    def feedback_callback(self, feedback_msg):
        countdown = feedback_msg.feedback.countdown
        print('Received feedback: countdown ',countdown,'%')
    
    #目标服务响应回调函数,如果目标服务无响应,则结束动作请求,有响应,则设置结果服务回调函数
    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            print('Goal rejected!')
            return
        print('Goal accepted!')
        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)
    
    #结果服务回调函数
    def get_result_callback(self, future):
        result = future.result().result
        print('Result: crossing ', result)
    
    #图像处理回调函数
    def cb_image(self,imgmsg):
        #ROS图像消息转化为opencv格式,第二个参数指定图像颜色编码格式
        image = self.bridge.imgmsg_to_cv2(imgmsg, 'bgr8')
        #显示图像
        cv2.imshow("image", image)
        cv2.waitKey(1)       
    
    def send_cm_request(self):
        self.request.map_name = self.map_list[random.randint(0,3)]
        self.future = self.cm_cli.call_async(self.request)
        self.timer = self.create_timer(0.05, self.timer_callback_for_future)
    
    def timer_callback_for_future(self):
        if self.future.done():
            try:
                response = self.future.result()
            except Exception as e:
                self.get_logger().info('Service call failed %r' % (e,))
            else:
                self.get_logger().info('Result of change_map_service is %s' %response.result)
            self.timer.cancel()             
    
    #键盘按键按下事件处理,按下方向键时设定线速度和角速度数据并发布
    def on_press(self, key):
        #判断是否是方向键,只处理方向键事件
        if key == keyboard.Key.up or key == keyboard.Key.down or key == keyboard.Key.left or key == keyboard.Key.right:
            if key == keyboard.Key.up:      #上:向前
                self.action.twist2d.v = 0.44  #设置线速度
                self.action.twist2d.omega = 0.0  #设置角速度
            elif key == keyboard.Key.down:  #下:向后
                self.action.twist2d.v = -0.44 #设置线速度
                self.action.twist2d.omega = 0.0  #设置角速度
            elif key == keyboard.Key.left:  #左:左转
                self.action.twist2d.v = 0.2   #设置线速度
                self.action.twist2d.omega = 1.0  #设置角速度
            elif key == keyboard.Key.right: #右:右转
                self.action.twist2d.v = 0.2   #设置线速度
                self.action.twist2d.omega = -1.0 #设置角速度
            #设置消息时间数据
            self.action.header.stamp = self.get_clock().now().to_msg()
            #发布消息
            self.pub_action.publish(self.action)
        elif key == keyboard.Key.tab:
            self.send_cm_request()
        elif isinstance(key, keyboard._xorg.KeyCode):
            if key.char == 'w':   #按下W键,向前
                self.send_goal(0.2, 'forward')
            elif key.char == 'a': #按下A键,左转
                self.send_goal(0.2, 'left')
            elif key.char == 'd': #按下D键,右转
                self.send_goal(0.2, 'right')
            
     #键盘按键松开事件处理,松开方向键时设定线速度和角速度为0并发布
    def on_release(self, key):
         #判断是否是方向键,只处理方向键事件
        if key == keyboard.Key.up or key == keyboard.Key.down or key == keyboard.Key.left or key == keyboard.Key.right:
            self.action.twist2d.v = 0.0
            self.action.twist2d.omega = 0.0
            self.action.header.stamp = self.get_clock().now().to_msg()
            self.pub_action.publish(self.action)
    
def main(args=None):
    rclpy.init(args=args)
    node = ControlNode(name="control_node")
    rclpy.spin(node=node)
    rclpy.shutdown()