通信接口

在ROS系统中,无论话题还是服务,或者我们后续将要学习的动作,都会用到一个重要的概念——通信接口

通信并不是一个人自言自语,而是两个甚至更多个人,你来我往的交流,交流的内容是什么呢?为了让大家都好理解,我们可以给传递的数据定义一个标准的结构,这就是通信接口。

接口的定义

接口的概念在各个领域随处可见,无论是硬件结构还是软件开发,都有广泛的应用。

image-20220528001307706

比如生活中最为常见的插头和插座,两者必须匹配才能使用,电脑和手机上的USB接口也是,什么Micro-USB、TypeC等等,都是关于接口的具体定义。

image-20220528001318495

软件开发中,接口的使用就更多了,比如我们在编写程序时,使用的函数和函数的输入输出也称之为接口,每一次调用函数的时候,就像是把主程序和调用函数通过这个接口连接到一起,系统才能正常工作。

更为形象的是图形化编程中使用的程序模块,每一个模块都有固定的结构和形状,只有两个模块相互匹配,才能在一起工作,这就很好的讲代码形象化了。

所以什么是接口,它是一种相互关系,只有彼此匹配,才能建立连接。

image-20220528001346095

回到ROS的通信系统,它的主要目的就是传输数据,那就得让大家高效的建立连接,并且准确包装和解析传输的数据内容,话题、服务等机制也就诞生了,他们传输的数据,都要符合通信接口的标准定义

比如摄像头驱动发布的图像话题,由每个像素点的R、G、B三原色值组成,控制机器人运动的速度指令,由线速度和角速度组成,进行机器人配置的服务,有配置的参数和反馈的结果组成等等,类似这些常用的定义,在ROS系统中都有提供,我们也可以自己开发。

这些接口看上去像是给我们加了一些约束,但却是ROS系统的精髓所在。举个例子,我们使用相机驱动节点的时候,完全不用关注它是如何驱动相机的,只要一句话运行,我们就可以知道发布出来的图像数据是什么样的了,直接开始我们的应用开发;类似的,键盘控制我们也可以安装一个ROS包,如何实现的呢?不用关心,反正它发布出来的肯定是线速度和角速度。

ROS通信接口

接口可以让程序之间的依赖降低,便于我们使用别人的代码,也方便别人使用我们的代码,这就是ROS的核心目标,减少重复造轮子。

image-20220528001533911

ROS有三种常用的通信机制,分别是话题、服务、动作,通过每一种通信种定义的接口,各种节点才能有机的联系到一起。

语言无关

为了保证每一个节点可以使用不同语言编程,ROS将这些接口的设计做成了和语言无关的,比如这里看到的int32表示32位的整型数,int64表示64位的整型数,bool表示布尔值,还可以定义数组、结构体,这些定义在编译过程中,会自动生成对应到C++、Python等语言里的数据结构。

image-20220528001633925

  • 话题通信接口的定义使用的是.msg文件,由于是单向传输,只需要描述传输的每一帧数据是什么就行,比如在这个定义里,会传输两个32位的整型数,x、y,我们可以用来传输二维坐标的数值。

  • 服务通信接口的定义使用的是.srv文件,包含请求和应答两部分定义,通过中间的“---”区分,比如之前我们学习的加法求和功能,请求数据是两个64位整型数a和b,应答是求和的结果sum。

  • 动作是另外一种通信机制,用来描述机器人的一个运动过程,使用.action文件定义,比如我们让小海龟转90度,一边转一边周期反馈当前的状态,此时接口的定义分成了三个部分,分别是动作的目标,比如是开始运动,运动的结果,最终旋转的90度是否完成,还有一个周期反馈,比如每隔1s反馈一下当前转到第10度、20度还是30度了,让我们知道运动的进度。

标准接口

大家可能好奇ROS系统到底给我们定义了哪些接口呢?我们可以在ROS安装路径中的share文件夹中找到,涵盖众多标准定义,大家可以打开几个看看。

image-20220528001914859

image-20220528001943541

案例一:服务接口的定义与使用

了解了通信接口的概念,接下来我们再从代码实现的角度,研究下如何定义以及使用一个接口。

在之前服务概念讲解的课程中,我们编写了这样一个例程,我们再来回顾下。

image-20220528002304850

有三个节点,第一个驱动相机发布图像话题,第二个是机器视觉识别节点,封装了一个服务的服务端对象,提供目标识别位置的查询服务,第三个节点在需要目标位置的时候,就可以发送请求,收到位置进行使用了。

image-20220527235132847

接口定义

在这个例程中,我们使用GetObjectPosition.srv定义了服务通信的接口:

learning_interface/srv/GetObjectPosition.srv

bool get      # 获取目标位置的指令
---
int32 x       # 目标的X坐标
int32 y       # 目标的Y坐标

定义中有两个部分,上边是获取目标位置的指令,get为true的话,就表示我们需要一次位置,服务端就会反馈这个x、y坐标了。

完成定义后,还需要在功能包的CMakeLists.txt中配置编译选项,让编译器在编译过程中,根据接口定义,自动生成不同语言的代码:

...

find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "srv/GetObjectPosition.srv"
)

...

功能包的package.xml文件中也需要添加代码生成的功能依赖:


 ...

 <build_depend>rosidl_default_generators</build_depend>
 <exec_depend>rosidl_default_runtime</exec_depend>
 <member_of_group>rosidl_interface_packages</member_of_group>

 ...

程序调用

我们在代码中再来重点看下接口的使用方法。

客户端接口调用

learning_service/service_object_client.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
@作者: 古月居(www.guyuehome.com)
@说明: ROS2服务示例-请求目标识别,等待目标位置应答
"""

import rclpy                                            # ROS2 Python接口库
from rclpy.node   import Node                           # ROS2 节点类
from learning_interface.srv import GetObjectPosition    # 自定义的服务接口

class objectClient(Node):
    def __init__(self, name):
        super().__init__(name)                          # ROS2节点父类初始化
        self.client = self.create_client(GetObjectPosition, 'get_target_position')
        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('service not available, waiting again...')
        self.request = GetObjectPosition.Request()

    def send_request(self):
        self.request.get = True
        self.future = self.client.call_async(self.request)

def main(args=None):
    rclpy.init(args=args)                             # ROS2 Python接口初始化
    node = objectClient("service_object_client")      # 创建ROS2节点对象并进行初始化
    node.send_request()

    while rclpy.ok():
        rclpy.spin_once(node)

        if node.future.done():
            try:
                response = node.future.result()
            except Exception as e:
                node.get_logger().info(
                    'Service call failed %r' % (e,))
            else:
                node.get_logger().info(
                    'Result of object position:\n x: %d y: %d' %
                    (response.x, response.y))
            break
    node.destroy_node()                              # 销毁节点对象
    rclpy.shutdown()                                 # 关闭ROS2 Python接口

服务端接口调用

learning_service/service_object_server.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
@作者: 古月居(www.guyuehome.com)
@说明: ROS2服务示例-提供目标识别服务
"""

import rclpy                                           # ROS2 Python接口库
from rclpy.node import Node                            # ROS2 节点类
from sensor_msgs.msg import Image                      # 图像消息类型
import numpy as np                                     # Python数值计算库
from cv_bridge import CvBridge                         # ROS与OpenCV图像转换类
import cv2                                             # Opencv图像处理库
from learning_interface.srv import GetObjectPosition   # 自定义的服务接口

lower_red = np.array([0, 90, 128])     # 红色的HSV阈值下限
upper_red = np.array([180, 255, 255])  # 红色的HSV阈值上限

class ImageSubscriber(Node):
    def __init__(self, name):
        super().__init__(name)                              # ROS2节点父类初始化
        self.sub = self.create_subscription(
            Image, 'image_raw', self.listener_callback, 10) # 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度)
        self.cv_bridge = CvBridge()                         # 创建一个图像转换对象,用于OpenCV图像与ROS的图像消息的互相转换

        self.srv = self.create_service(GetObjectPosition,   # 创建服务器对象(接口类型、服务名、服务器回调函数)
                                       'get_target_position',
                                       self.object_position_callback)    
        self.objectX = 0
        self.objectY = 0                              

    def object_detect(self, image):
        hsv_img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)      # 图像从BGR颜色模型转换为HSV模型
        mask_red = cv2.inRange(hsv_img, lower_red, upper_red) # 图像二值化
        contours, hierarchy = cv2.findContours(
            mask_red, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE)   # 图像中轮廓检测

        for cnt in contours:                                  # 去除一些轮廓面积太小的噪声
            if cnt.shape[0] < 150:
                continue

            (x, y, w, h) = cv2.boundingRect(cnt)              # 得到苹果所在轮廓的左上角xy像素坐标及轮廓范围的宽和高
            cv2.drawContours(image, [cnt], -1, (0, 255, 0), 2)# 将苹果的轮廓勾勒出来
            cv2.circle(image, (int(x+w/2), int(y+h/2)), 5,
                       (0, 255, 0), -1)                       # 将苹果的图像中心点画出来

            self.objectX = int(x+w/2)
            self.objectY = int(y+h/2)

        cv2.imshow("object", image)                            # 使用OpenCV显示处理后的图像效果
        cv2.waitKey(50)

    def listener_callback(self, data):
        self.get_logger().info('Receiving video frame')        # 输出日志信息,提示已进入回调函数
        image = self.cv_bridge.imgmsg_to_cv2(data, 'bgr8')     # 将ROS的图像消息转化成OpenCV图像
        self.object_detect(image)                              # 苹果检测

    def object_position_callback(self, request, response):     # 创建回调函数,执行收到请求后对数据的处理
        if request.get == True:
            response.x = self.objectX                          # 目标物体的XY坐标
            response.y = self.objectY
            self.get_logger().info('Object position\nx: %d y: %d' %
                                   (response.x, response.y))   # 输出日志信息,提示已经反馈
        else:
            response.x = 0
            response.y = 0
            self.get_logger().info('Invalid command')          # 输出日志信息,提示已经反馈
        return response


def main(args=None):                                 # ROS2节点主入口main函数
    rclpy.init(args=args)                            # ROS2 Python接口初始化
    node = ImageSubscriber("service_object_server")  # 创建ROS2节点对象并进行初始化
    rclpy.spin(node)                                 # 循环等待ROS2退出
    node.destroy_node()                              # 销毁节点对象
    rclpy.shutdown()                                 # 关闭ROS2 Python接口

案例二:话题接口的定义与使用

话题通信接口的定义也是类似的,继续从之前的机器视觉案例中来衍生,我们想把服务换成话题,周期发布目标识别的位置,不管有没有人需要。

image-20220528003434007

运行效果

现在我们会运行三个节点:

  • 第一个节点,将驱动相机并发布图像话题,此时的话题数据使用的是ROS中标准定义的Image图像消息;
  • 第二个节点,会运行视觉识别功能,识别目标的位置,这个位置我们希望封装成话题消息,发布出去,谁需要使用谁就来订阅;
  • 第三个节点,订阅位置话题,打印到终端中。

启动三个终端,分别运行以上节点:


$ ros2 run usb_cam usb_cam_node_exe
$ ros2 run learning_topic interface_object_pub
$ ros2 run learning_topic interface_object_sub

image-20220528003829553

接口定义

在这个例程中,我们使用ObjectPosition.msg定义了服务通信的接口:

learning_interface/msg/ObjectPosition.msg

int32 x      # 表示目标的X坐标
int32 y      # 表示目标的Y坐标

话题消息的内容是一个位置,我们使用x、y坐标值进行描述。

完成定义后,还需要在功能包的CMakeLists.txt中配置编译选项,让编译器在编译过程中,根据接口定义,自动生成不同语言的代码:

...

find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/ObjectPosition.msg"
)

...

程序调用

我们在代码中再来重点看下接口的使用方法。

发布者接口调用

learning_topic/interface_object_pub.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
@作者: 古月居(www.guyuehome.com)
@说明: ROS2接口示例-发布目标位置
"""

import rclpy                                       # ROS2 Python接口库
from rclpy.node import Node                        # ROS2 节点类
from sensor_msgs.msg import Image                  # 图像消息类型
from cv_bridge import CvBridge                     # ROS与OpenCV图像转换类
import cv2                                         # Opencv图像处理库
import numpy as np                                 # Python数值计算库
from learning_interface.msg import ObjectPosition  # 自定义的目标位置消息

lower_red = np.array([0, 90, 128])                 # 红色的HSV阈值下限
upper_red = np.array([180, 255, 255])              # 红色的HSV阈值上限

"""
创建一个订阅者节点
"""
class ImageSubscriber(Node):

    def __init__(self, name):
        super().__init__(name)                                  # ROS2节点父类初始化
        self.sub = self.create_subscription(
            Image, 'image_raw', self.listener_callback, 10)     # 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度)
        self.pub = self.create_publisher(
            ObjectPosition, "object_position", 10)              # 创建发布者对象(消息类型、话题名、队列长度)
        self.cv_bridge = CvBridge()                             # 创建一个图像转换对象,用于OpenCV图像与ROS的图像消息的互相转换

        self.objectX = 0
        self.objectY = 0   

    def object_detect(self, image):      
        hsv_img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)        # 图像从BGR颜色模型转换为HSV模型
        mask_red = cv2.inRange(hsv_img, lower_red, upper_red)   # 图像二值化
        contours, hierarchy = cv2.findContours(
            mask_red, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE)     # 图像中轮廓检测
        for cnt in contours:                                    # 去除一些轮廓面积太小的噪声
            if cnt.shape[0] < 150:
                continue

            (x, y, w, h) = cv2.boundingRect(cnt)                # 得到苹果所在轮廓的左上角xy像素坐标及轮廓范围的宽和高

            cv2.drawContours(image, [cnt], -1, (0, 255, 0), 2)  # 将苹果的轮廓勾勒出来
            cv2.circle(image, (int(x+w/2), int(y+h/2)), 5,      # 将苹果的图像中心点画出来
                       (0, 255, 0), -1)   

            self.objectX = int(x+w/2)
            self.objectY = int(y+h/2)

        cv2.imshow("object", image)                             # 使用OpenCV显示处理后的图像效果
        cv2.waitKey(50)

    def listener_callback(self, data):
        self.get_logger().info('Receiving video frame')         # 输出日志信息,提示已进入回调函数
        image = self.cv_bridge.imgmsg_to_cv2(data, 'bgr8')      # 将ROS的图像消息转化成OpenCV图像
        position = ObjectPosition()
        self.object_detect(image)                               # 苹果检测
        position.x, position.y = int(self.objectX), int(self.objectY)
        self.pub.publish(position)                              # 发布目标位置

def main(args=None):                                        # ROS2节点主入口main函数
    rclpy.init(args=args)                                   # ROS2 Python接口初始化
    node = ImageSubscriber("topic_webcam_sub")              # 创建ROS2节点对象并进行初始化
    rclpy.spin(node)                                        # 循环等待ROS2退出
    node.destroy_node()                                     # 销毁节点对象
    rclpy.shutdown()                                        # 关闭ROS2 Python接口

订阅者接口调用

learning_topic/interface_object_sub.py


#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
@作者: 古月居(www.guyuehome.com)
@说明: ROS2接口示例-订阅目标位置
"""

import rclpy                                       # ROS2 Python接口库
from rclpy.node   import Node                      # ROS2 节点类
from std_msgs.msg import String                    # 字符串消息类型
from learning_interface.msg import ObjectPosition  # 自定义的目标位置消息

"""
创建一个订阅者节点
"""
class SubscriberNode(Node):
    def __init__(self, name):
        super().__init__(name)                                                # ROS2节点父类初始化
        self.sub = self.create_subscription(\
            ObjectPosition, "/object_position", self.listener_callback, 10)   # 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度

    def listener_callback(self, msg):                                         # 创建回调函数,执行收到话题消息后对数据的处理
        self.get_logger().info('Target Position: "(%d, %d)"' % (msg.x, msg.y))# 输出日志信息,提示订阅收到的话题消息


def main(args=None):                                 # ROS2节点主入口main函数
    rclpy.init(args=args)                            # ROS2 Python接口初始化
    node = SubscriberNode("interface_position_sub")  # 创建ROS2节点对象并进行初始化
    rclpy.spin(node)                                 # 循环等待ROS2退出
    node.destroy_node()                              # 销毁节点对象
    rclpy.shutdown()                                 # 关闭ROS2 Python接口

接口命令行操作

接口命令的常用操作如下:

$ ros2 interface list                    # 查看系统接口列表
$ ros2 interface show <interface_name>   # 查看某个接口的详细定义
$ ros2 interface package <package_name>  # 查看某个功能包中的接口定义

image-20220528004610983

image-20220528004525471

image-20220528004538865

参考链接

https://docs.ros.org/en/humble/Concepts/About-ROS-Interfaces.html

https://docs.ros.org/en/humble/Tutorials/Single-Package-Define-And-Use-Interface.html

本系列视频公开课:https://class.guyuehome.com/detail/p_628f4288e4b01c509ab5bc7a/6