0. 简介

我们在开发ROS2程序时会发现,当面对只有一个node节点时,程序的调用是线性的,这个时候就会有两种解决方式,一种就是使用rclcpp_components来完成对子节点的注册,并完成类似ROS1中Nodelets的操作。另外一种就是使用执行器和回调组完成多线程的创建。

1. ROS2中多线程—callback_group

相较于ROS1中使用MultiThreadedSpinner完成多线程调用而言,ROS2在程序中自带了callback_group。callback_group为所有的callback分了组别,分别为:

  • MutuallyExclusive;互斥,即这个组别中每时刻只允许1个线程,一个callback在执行时,其他只能等待
  • Reentrant;可重入,这个组别中每时刻允许多个线程,一个Callback在执行时,其他callback可开启新的线程

这样也以为这我们可以有效地对ROS2中的callback程序进行控制。在ROS2的node中默认组别是MutuallyExclusive类型,即便使用了multiThreadedExecutor,也依然默认MutuallyExclusive类型,所以我们可以按照我们的需求来进行设置。

Node::create_subscription(
  const std::string & topic_name,  //topic名称
  const rclcpp::QoS & qos,
  CallbackT && callback,
  const SubscriptionOptionsWithAllocator<AllocatorT> & options,  //选项
  typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
    typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, AllocatorT>::SharedPtr
  msg_mem_strat)
//-------->
this->create_subscription<std_msgs::msg::Int32>("int1",qos,
            std::bind(&PathSearcherNode::int1Sub, this,
            std::placeholders::_1),
            rclcpp::SubscriptionOptions());

值得一提的是当我们不依赖ros时,可以选择thread完成多线程的创立。具体的操作可以从众多的SLAM项目中总结出来。

2. 多线程的大致流程

在这里插入图片描述
上面的图片展示了多线程的整体流程,回调组会在程序创立时初始化。

// 声明回调组
rclcpp::CallbackGroup::SharedPtr callback_group_service_;
// 实例化回调组,类型为:互斥的
callback_group_service_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);

在后面会实例化回调组,通过服务端完成对回调组的调用,从而告诉ROS2的执行器,当你要调用回调函数处理请求时,请把它放到单独线程的回调组中。

// 声明占位符
using std::placeholders::_1;
using std::placeholders::_2;
// 声明服务端
// 声明一个服务端
rclcpp::Service<village_interfaces::srv::SellNovel>::SharedPtr server_;
// 实例化卖二手书的服务
server_ = this->create_service<village_interfaces::srv::SellNovel>("sell_novel",
                            std::bind(&SingleDogNode::sell_book_callback,this,_1,_2),
                            rmw_qos_profile_services_default,
                            callback_group_service_);

最后一步就是将单线程执行器换为多线程执行器

auto node = std::make_shared<SingleDogNode>("wang2");
/* 运行节点,并检测退出信号*/
rclcpp::executors::MultiThreadedExecutor exector;
exector.add_node(node);
exector.spin();

3. 示例程序

这个程序使用了Reentrant模式,完成了当同时收到多个/int1和/int2,开启的线程数会受电脑内核和初始化MultiThreadedExecutor时设置的线程数决定。

#include <rclcpp/rclcpp.hpp> 
#include <std_msgs/msg/int32.hpp>
using namespace std::chrono_literals;
std::string getTime(){
    time_t tick = (time_t)(rclcpp::Clock().now().seconds());
    struct tm tm;
    char s[100];
    tm = *localtime(&tick);
    strftime(s, sizeof(s), "%Y-%m-%d %H:%M:%S", &tm);
    return std::string(s);
}
std::string string_thread_id()
{
  auto hashed = std::hash<std::thread::id>()(std::this_thread::get_id());
  return std::to_string(hashed);
}
class MyNode: public rclcpp::Node{
public:
    MyNode(const rclcpp::NodeOptions & options);
    ~MyNode(){}
    void int1Sub(std_msgs::msg::Int32::SharedPtr msg);
    void int2Sub(std_msgs::msg::Int32::SharedPtr msg);

private:
    rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr int_sub1_;
    rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr int_sub2_;
    rclcpp::callback_group::CallbackGroup::SharedPtr callback_group_sub1_;
    rclcpp::callback_group::CallbackGroup::SharedPtr callback_group_sub2_;
}
MyNode::MyNode(const rclcpp::NodeOptions & options):
    rclcpp::Node("my_node",options){
    //同一个组别,类型设置为Reentrant
    callback_group_sub_ = this->create_callback_group(rclcpp::callback_group::CallbackGroupType::Reentrant);
    auto sub_opt = rclcpp::SubscriptionOptions();
    sub_opt.callback_group = callback_group_sub_;

    int_sub1_ = nh_->create_subscription<std_msgs::msg::Int32>("int1",10,
        std::bind(&MyNode::int1Sub,this,std::placeholders::_1),
        sub_opt);
    int_sub2_ = nh_->create_subscription<std_msgs::msg::Int32>("int2",10,
        std::bind(&MyNode::int2Sub,this,std::placeholders::_1),
        sub_opt);
}
void MyNode::int1Sub(std_msgs::msg::Int32::SharedPtr msg){
    printf("int1 sub time:%s\n",getTime().c_str());
    printf("thread1 id:%s\n",string_thread_id().c_str());
    rclcpp::sleep_for(10s);
    printf("int1 sub time:%s\n",getTime().c_str());
}
void MyNode::int2Sub(std_msgs::msg::Int32::SharedPtr msg){
    printf("int2 sub time:%s\n",getTime().c_str());
    printf("thread2 id:%s\n",string_thread_id().c_str());
    rclcpp::sleep_for(10s);
    printf("int2 sub time:%s\n",getTime().c_str());
}
int main(int argc, char **argv) {
    rclcpp::init(argc, argv);
    rclcpp::executors::MultiThreadedExecutor executor(rclcpp::executor::ExecutorArgs(),5,true);

    auto node = std::make_shared<MyNode>(rclcpp::NodeOptions());
    executor.add_node(node);
    printf("threads   %d\n",executor.get_number_of_threads());
    executor.spin();
    rclcpp::shutdown();
    return EXIT_SUCCESS;
}

4. 参考链接

https://blog.csdn.net/weixin_44229819/article/details/122509623

https://blog.csdn.net/weixin_39536630/article/details/110514759

https://blog.csdn.net/Linuxhus/article/details/115030132