0. 前言

我们都知道ROS2比ROS1好很多,就比如说:

  1. 去中心化master,ROS和ROS2中间件不同之处在于,ROS2取消了master节点。在去中心化后,各个节点之间可以通过DDS的节点相互发现,各个节点都是平等的,且可以1对1、1对n、n对n进行互相通信。

  2. 不造通信的轮子,通信直接更换为DDS进行实现。通信采用采用DDS通信,使得ROS2的是实行、可靠性和连续性上都有了增强。

同时ROS2中也拥有了很多新的特性:

  • 可用Python编写的Launch文件
  • 多机器人协同通信支持
  • 支持安全加密通信
  • 同一个进程支持多个节点
  • 使用ament进行包管理
  • 支持Qos服务质量
  • 支持节点生命周期管理
  • 高效的进程间通信

我们发现这些特性都是基于通信模式从TCP/IP到DDS模式的转变,因此本文想从DDS出发,来带大家理解ROS2的DDS到底做了什么。

1. 多机通信DDS

DDS是一个被很多公司实现的工业标准,比如RTI的实现Connext和eProsima的实现Fast RTPS。ROS2 支持多种实现方式。在选择DDS实现的时候要考虑很多方面:法律上要考虑协议,技术上要考虑是否支持跨平台。不同的公司也许会为了适应不同的场景提出不止一种的DDS实现方式。比如RTI为了不同的目标就有很多种他们的Connext的变种。

DDS基于Domain ID在一个物理网络内切分为若干逻辑网络。在同一域(domain)中的ROS 2节点可以被自由发现并通信,在不同域中则不能互通。所有的ROS 2节点默认使用domain ID 0。为避免消息混淆,同网络内运行ROS 2的不同组的设备应该使用不同的domain ID。ROS_DOMAIN_ID有两种(short version / long version)。正常使用推荐short version,在[0, 101]之间进行选择即可。long version则可以在[0, 232]之间进行选择

每个ROS节点在DDS中被称为参与者(participant)。对于在计算机上运行的每个ROS 2进程,都会创建一个DDS“参与者”。由于每个DDS参与者会占用计算机上的两个端口,因此在一台计算机上运行120多个ROS 2进程就可能会溢出到其他域ID或临时端口。

在这里插入图片描述
所以我们我们需要控制节点的数目或者通过一个进程控制多个节点的方法来避免溢出。

设置domain id就是在两台虚拟机中分别执行下面的指令(最好写到bashrc当中,当然不固定除外)

echo "export ROS_DOMAIN_ID=1">> ~/.bashrc
#export ROS_DOMAIN_ID=1

2. 中间件RMW

为了能够在ROS2中使用一个DDS实现,需要一个ROS中间件(RMW软件包),这个包需要利用DDS程序提供的API和工具实现ROS中间件的接口。为了在ROS2中使用一个DDS实现,有大量的工作需要做。但是为了防止ROS2的代码过于绑定某种DDS程序必须支持至少几种DDS程序

C++和Python节点都支持环境变量 RMW_IMPLEMENTATION,该变量允许用户在运行ROS 2应用程序时选择要使用的RMW实现。
下面是常用的ROS2的RMW中间件。
在这里插入图片描述
我们也针对性的展示了换不同的RMW的操作
编译

  • FastDDS:默认已经编译好了
  • cycloneDDS: 默认已经编译好了
  • RTI connext
sudo apt install rti-connext-dds-5.3.1
#执行colcon build 前做
export RTI_LICENSE_FILE=/opt/rti.com/rti_connext_dds-5.3.1/rti_license.dat
source /opt/rti.com/rti_connext_dds-5.3.1/setenv_ros2rti.bash

执行

  • FastDDS:默认使用
  • cycloneDDS
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
#然后执行ros2 run 等命令
#或者
RMW_IMPLEMENTATION=rmw_cyclonedds_cpp ros2 run ...
  • RTI connext
export RTI_LICENSE_FILE=/opt/rti.com/rti_connext_dds-5.3.1/rti_license.dat
source /opt/rti.com/rti_connext_dds-5.3.1/setenv_ros2rti.bash
export RMW_IMPLEMENTATION=rmw_connext_cpp
#然后执行ros2 run 等命令
#或者
RMW_IMPLEMENTATION=rmw_connext_cpp ros2 run ...

3. 安全加密SROS

加密安全这部分正好作者zhangrelay的文章中提到了,也没有过多的介绍,而我们如何对我们的通讯进行加密,这一块在ROS中都有所体现,正好这次是DDS通讯漫谈,也把这一块拿出来一起讨论,网络消息安全性对多机器人系统非常重要。

从源代码编译时候,需要为fastrpts 打开 security 的支持

colcon build --symlink-install --cmake-args -DSECURITY=ON

创建 key store

ros2 security create_keystore demo_keys

为talker 和 listener两个node 创建key 和 certificate

ros2 security create_key demo_keys /talker
ros2 security create_key demo_keys /listener

有可能你会遇到如下错误

lxml.etree.XMLSchemaParseError: attribute use (unknown), attribute 'ref': The QName value '{http://www.w3.org/XML/1998/namespace}base' does not resolve to a(n) attribute declaration., line 34

修正这个问题如下
修改sros2 的代码

cd ros2_ws/src/ros2/sros2/sros2/sros2/policy/schemas
wget http://www.w3.org/2001/03/xml.xsd  # 下载 xml.xsd
vim policy.xsd
#修改如下
@@ -4,7 +4,7 @@
   xmlns:xml="http://www.w3.org/XML/1998/namespace"
   elementFormDefault="qualified" attributeFormDefault="unqualified">
   <xs:import namespace="http://www.w3.org/XML/1998/namespace"
-               schemaLocation="http://www.w3.org/2001/03/xml.xsd" />
+               schemaLocation="xml.xsd" />

执行demo

export ROS_SECURITY_ROOT_DIRECTORY=~/sros2_demo/demo_keys
export ROS_SECURITY_ENABLE=true
export ROS_SECURITY_STRATEGY=Enforce

每个ros2 run 的终端都要配置上面的参数
一个终端执行

ros2 run demo_nodes_cpp talker

另一个终端执行

ros2 run demo_nodes_py listener

这样就形成了加密通话。

4. 服务质量 QoS

ROS1通过TCP实现底层数据的传输,TCP比较可靠,但是如果对于网络情况不好的使用场景,经常发生丢包的现象,而不可靠的UDP反而是一个比较好的选择,从而保持消息不会中断。ROS2作为一个应用于DDS分布系统的产品,如何进行可靠的数据传输是一个关键点,而QoS就是对数据如何进行底层的传输进行配置的。QoS Profile提供了对以下QoS策略的设置:
(1)历史记录(History)
保留近期记录(Keep last):缓存最多N条记录,可通过队列长度选项来配置。

保留所有记录(Keep all):缓存所有记录,但受限于底层中间件可配置的最大资源。

(2)深度(Depth)
队列深度(Size of the queue):只能与Keep last配合使用。

(3)可靠性(Reliability)
尽力的(Best effort):尝试传输数据但不保证成功传输(当网络不稳定时可能丢失数据)。

可靠的(Reliable):反复重传以保证数据成功传输。

(4)持续性(Durability)
局部瞬态(Transient local):发布器为晚连接(late-joining)的订阅器保留数据。

易变态(Volatile):不保留任何数据。

以上每个策略都有系统默认值。这个默认值就是底层中间件的默认值,由DDS供应商工具(如XML配置文件)定义。DDS本身提供了许多可配置的策略。这些策略与ROS1的特征相似。ROS2提供了如下的默认配置:

  • 发布者和订阅者的默认QoS配置(Default QoS settings for publishers and subscribers)
  • 服务(Services)
  • 传感器数据(Sensor data)
  • 参数服务器(Parameters)
  • 系统默认(System default)

与ROS1的比较,QOS有以下好处:
ROS2的 History和Depth结合起来类似于ROS的队列大小功能
ROS2的Reliability取Best-effort类似于Ros1的UDPROS(仅 roscpp包含此功能),取 Reliable类似于ROS1的TCPROS
ROS2的Durability和队列深度为1的 Depth结合起来类似于ROS 1中的latching订阅器

c++

#include <iostream>
#include <functional>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

int main(int argc, char ** argv)
{
  rclcpp::init(argc, argv);

  auto node = std::make_shared<rclcpp::Node>("qos_callback_listener");
  rclcpp::SubscriptionOptions options;

  rclcpp::QoS qos(0);
  qos.reliable();
  qos.durability(RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL);

  options.use_default_callbacks = false;
  options.event_callbacks.incompatible_qos_callback = [](rclcpp::QOSOfferedIncompatibleQoSInfo & event) {
    std::cout << "Custom Incompatible Callback!" << std::endl;
  };

  auto subscriber = node->create_subscription<std_msgs::msg::String>(
    "/string data",
    qos,
    [](const std_msgs::msg::String::SharedPtr) {
      std::cout << "Heard a message" << std::endl;
    },
    options);

  rclcpp::spin(node);
  return 0;
}

Python

import rclpy
from rclpy.node import Node
from rclpy.qos import DurabilityPolicy
from rclpy.qos import QoSProfile
from rclpy.qos_event import SubscriptionEventCallbacks

from std_msgs.msg import String


class MinimalSubscriber(Node):

    def __init__(self):
        #创建qos对象
        qos = QoSProfile(depth=10)
        #设置不同的qos策略
        qos.durability = DurabilityPolicy.RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL

        custom_callback = lambda event: print("Custom Incompatible Callback!")
        callbacks = SubscriptionEventCallbacks()
        callbacks.incompatible_qos = custom_callback

        super().__init__('minimal_subscriber')
        #qos策略传入
        self.subscription = self.create_subscription(
            String,
            'string data',
            self.listener_callback,
            qos,
            event_callbacks=callbacks)

    def listener_callback(self, msg):
        self.get_logger().info('I heard: "%s"' % msg.data)


def main(args=None):
    rclpy.init(args=args)

    minimal_subscriber = MinimalSubscriber()

    rclpy.spin(minimal_subscriber)

    minimal_subscriber.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

5. DDS调优

这一部分,ROS2的官网也有提及,这里就简略的总结一下官网的内容吧。
DDS调优应将这样一条建议作为起点;这些调优适用于特定的系统和环境,但调优可能会因多种因素而异。在调试时,可能需要增大或减小与消息大小、网络拓扑等因素相关的值。

1. 跨供应商调优(适用于各供应商DDS实现)

问题:当某些IP片段(fragments)被丢弃时,通过有损网络连接(通常是WiFi)发送数据会出现问题,可能会导致接收端的内核缓冲区变满。

解决方案一: 使用“尽力而为(best-effort)”的QoS设置而不是“可靠(reliable)”的设置。“尽力而为”设置会减少网络流量,因为DDS实现不必承担可靠通信的开销,在可靠通信情况下,发布者要求对发送给订阅者的消息进行确认,并且必须重新发送未正确接收的数据样本。但是,如果IP片段的内核缓冲区已满,则症状仍然相同(阻塞30 秒)。该解决方案应该可以在一定程度上改善问题,而无需调整参数。

解决方案二: 减小ipfrag_time参数的值。
net.ipv4.ipfrag_time / /proc/sys/net/ipv4/ipfrag_time参数 (默认值为30s) : 将IP片段保留在内存中的时间,单位为秒。

例如,通过运行以下命令将该参数值减小到3秒:

sudo sysctl net.ipv4.ipfrag_time=3

减小此参数的值也会减少没有接收到片段的时间窗口。该参数是用于所有正在进入的片段的全局参数,因此需要针对每个具体环境考虑减小此参数值的可行性。

解决方案三: 增大ipfrag_high_thresh参数的值。

net.ipv4.ipfrag_high_thresh / /proc/sys/net/ipv4/ipfrag_high_thresh参数(默认值为262144字节):用于重组IP片段的最大内存。

例如,通过运行以下命令将此参数值增加到128MB:

sudo sysctl net.ipv4.ipfrag_high_thresh=134217728 # (128MB)

显著地增大此参数的值是为了确保缓冲区永远不会完全被填满。但是,假设每个UDP数据包都缺少一个片段,该参数值可能必须非常大才能保存ipfrag_time参数设置的时间窗口内接收到的所有数据。

2. Fast RTPS调优

问题: 通过WiFi连接时,Fast RTPS会用大量数据或快速发布的数据淹没(floods)网络。

解决方案: 请参阅前面“跨供应商调优”中的解决方案。

3. Cyclone DDS调优

问题: 即使使用“可靠(reliable)”设置并通过有线网络传输,但Cyclone DDS仍无法可靠地传送大型消息。

解决方案: 增大Cyclone使用的最大Linux内核接收缓冲区大小和最小套接字接收缓冲区大小。
进行以下调整以解决9MB大小的消息传送:

通过运行以下命令设置最大接收缓冲区大小rmem_max

sudo sysctl -w net.core.rmem_max=2147483647

或者通过编辑/etc/sysctl.d/10-cyclone-max.conf文件以包含下面一行内容来永久设置该参数:

net.core.rmem_max=2147483647

接下来,为了设置Cyclone请求的最小套接字接收缓冲区大小,请编写一个配置文件供Cyclone在启动时使用,该配置文件内容应该如下所示:

<?xml version="1.0" encoding="UTF-8" ?>
<CycloneDDS xmlns="https://cdds.io/config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://cdds.io/config
https://raw.githubusercontent.com/eclipse-cyclonedds/cyclonedds/master/etc/cyclonedds.xsd">
    <Domain id="any">
        <Internal>
            <MinimumSocketReceiveBufferSize>10MB</MinimumSocketReceiveBufferSize>
        </Internal>
    </Domain>
</CycloneDDS>

然后,每当要运行一个节点时,请设置以下环境变量(记得要用xml格式的配置文件的绝对路径替换absolute/path/to/config_file.xml):

CYCLONEDDS_URI=file:///absolute/path/to/config_file.xml

6. 同步与异步调用

这一块内容相对而言是属于python语法上的问题,其本质原因是在于程序产生死锁(发送请求直到收到响应这段时间完全阻塞调用call的线程,这个线程基本上在调用时属于独占了。调用儿断时间不确定,但是调用完成立马就返回了)。主要存在两种情况,均是没有跳出子程序(call()send_request())就执行spin操作。官网中采用了thread来避免这个问题,同样我们可以使用call_async()来避免这种问题。这篇博文举得已经较为详细,这里不再详写。

7. 参考文献

https://blog.csdn.net/ZhangRelay/article/details/101596837
https://zhuanlan.zhihu.com/p/378752082
https://blog.csdn.net/wsc820508/article/details/81284852
https://blog.csdn.net/qq_38649880/article/details/105908598
https://blog.csdn.net/sph123s/article/details/108223669
https://blog.csdn.net/DDS_CSIT/article/details/105155446
https://zhuanlan.zhihu.com/p/379592378