0. 简介
我们在开发ROS2程序时会发现,当面对只有一个node节点时,程序的调用是线性的,这个时候就会有两种解决方式,一种就是使用rclcpp_components来完成对子节点的注册,并完成类似ROS1中Nodelets的操作。另外一种就是使用执行器和回调组完成多线程的创建。
1. ROS2中多线程—callback_group
相较于ROS1中使用MultiThreadedSpinner完成多线程调用而言,ROS2在程序中自带了callback_group。callback_group为所有的callback分了组别,分别为:
MutuallyExclusive
;互斥,即这个组别中每时刻只允许1个线程,一个callback在执行时,其他只能等待Reentrant
;可重入,这个组别中每时刻允许多个线程,一个Callback在执行时,其他callback可开启新的线程
这样也以为这我们可以有效地对ROS2中的callback程序进行控制。在ROS2的node中默认组别是MutuallyExclusive类型,即便使用了multiThreadedExecutor,也依然默认MutuallyExclusive类型,所以我们可以按照我们的需求来进行设置。
Node::create_subscription(
const std::string & topic_name, //topic名称
const rclcpp::QoS & qos,
CallbackT && callback,
const SubscriptionOptionsWithAllocator<AllocatorT> & options, //选项
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, AllocatorT>::SharedPtr
msg_mem_strat)
//-------->
this->create_subscription<std_msgs::msg::Int32>("int1",qos,
std::bind(&PathSearcherNode::int1Sub, this,
std::placeholders::_1),
rclcpp::SubscriptionOptions());
值得一提的是当我们不依赖ros时,可以选择thread完成多线程的创立。具体的操作可以从众多的SLAM项目中总结出来。
2. 多线程的大致流程
上面的图片展示了多线程的整体流程,回调组会在程序创立时初始化。
// 声明回调组
rclcpp::CallbackGroup::SharedPtr callback_group_service_;
// 实例化回调组,类型为:互斥的
callback_group_service_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
在后面会实例化回调组,通过服务端完成对回调组的调用,从而告诉ROS2的执行器,当你要调用回调函数处理请求时,请把它放到单独线程的回调组中。
// 声明占位符
using std::placeholders::_1;
using std::placeholders::_2;
// 声明服务端
// 声明一个服务端
rclcpp::Service<village_interfaces::srv::SellNovel>::SharedPtr server_;
// 实例化卖二手书的服务
server_ = this->create_service<village_interfaces::srv::SellNovel>("sell_novel",
std::bind(&SingleDogNode::sell_book_callback,this,_1,_2),
rmw_qos_profile_services_default,
callback_group_service_);
最后一步就是将单线程执行器换为多线程执行器
auto node = std::make_shared<SingleDogNode>("wang2");
/* 运行节点,并检测退出信号*/
rclcpp::executors::MultiThreadedExecutor exector;
exector.add_node(node);
exector.spin();
3. 示例程序
这个程序使用了Reentrant
模式,完成了当同时收到多个/int1和/int2,开启的线程数会受电脑内核和初始化MultiThreadedExecutor时设置的线程数决定。
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/int32.hpp>
using namespace std::chrono_literals;
std::string getTime(){
time_t tick = (time_t)(rclcpp::Clock().now().seconds());
struct tm tm;
char s[100];
tm = *localtime(&tick);
strftime(s, sizeof(s), "%Y-%m-%d %H:%M:%S", &tm);
return std::string(s);
}
std::string string_thread_id()
{
auto hashed = std::hash<std::thread::id>()(std::this_thread::get_id());
return std::to_string(hashed);
}
class MyNode: public rclcpp::Node{
public:
MyNode(const rclcpp::NodeOptions & options);
~MyNode(){}
void int1Sub(std_msgs::msg::Int32::SharedPtr msg);
void int2Sub(std_msgs::msg::Int32::SharedPtr msg);
private:
rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr int_sub1_;
rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr int_sub2_;
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group_sub1_;
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group_sub2_;
}
MyNode::MyNode(const rclcpp::NodeOptions & options):
rclcpp::Node("my_node",options){
//同一个组别,类型设置为Reentrant
callback_group_sub_ = this->create_callback_group(rclcpp::callback_group::CallbackGroupType::Reentrant);
auto sub_opt = rclcpp::SubscriptionOptions();
sub_opt.callback_group = callback_group_sub_;
int_sub1_ = nh_->create_subscription<std_msgs::msg::Int32>("int1",10,
std::bind(&MyNode::int1Sub,this,std::placeholders::_1),
sub_opt);
int_sub2_ = nh_->create_subscription<std_msgs::msg::Int32>("int2",10,
std::bind(&MyNode::int2Sub,this,std::placeholders::_1),
sub_opt);
}
void MyNode::int1Sub(std_msgs::msg::Int32::SharedPtr msg){
printf("int1 sub time:%s\n",getTime().c_str());
printf("thread1 id:%s\n",string_thread_id().c_str());
rclcpp::sleep_for(10s);
printf("int1 sub time:%s\n",getTime().c_str());
}
void MyNode::int2Sub(std_msgs::msg::Int32::SharedPtr msg){
printf("int2 sub time:%s\n",getTime().c_str());
printf("thread2 id:%s\n",string_thread_id().c_str());
rclcpp::sleep_for(10s);
printf("int2 sub time:%s\n",getTime().c_str());
}
int main(int argc, char **argv) {
rclcpp::init(argc, argv);
rclcpp::executors::MultiThreadedExecutor executor(rclcpp::executor::ExecutorArgs(),5,true);
auto node = std::make_shared<MyNode>(rclcpp::NodeOptions());
executor.add_node(node);
printf("threads %d\n",executor.get_number_of_threads());
executor.spin();
rclcpp::shutdown();
return EXIT_SUCCESS;
}
4. 参考链接
https://blog.csdn.net/weixin_44229819/article/details/122509623
https://blog.csdn.net/weixin_39536630/article/details/110514759
评论(2)
您还未登录,请登录后发表或查看评论