0. 简介
对弈ros/ros2的bag而言,真的是非常好用一个产物,通过bag我们可以保证数据的多次的重复的复现,直至发现问题。而对弈ros的bag的二次开发也一直是大家关注的重点。下面我们将从ROS1开始,带领大家了解一下ROS2的内容。
1. ROS1 BAG
对于ROS1的bag,网上有比较多的例子了,比如说下面的https://zhuanlan.zhihu.com/p/150290102
#include <rosbag/bag.h>
#include <rosbag/view.h>
#include <ros/ros.h>
#include <novatel_msgs/INSPVAX.h>
#include <vector>
#include <boost/foreach.hpp>
#define foreach BOOST_FOREACH
int main(){
rosbag::Bag bag;
bag.open("/home/liuxiao/bagfiles/novatel.bag", rosbag::bagmode::Read);
std::vector<std::string> topics;
topics.push_back(std::string("/novatel_data/inspvax"));
rosbag::View view(bag, rosbag::TopicQuery(topics));
foreach(rosbag::MessageInstance const m, view)
{
novatel_msgs::INSPVAX::ConstPtr s = m.instantiate<novatel_msgs::INSPVAX>();
if (s != NULL)
std::cout << s->latitude << std::endl;
}
bag.close();
}
这些都是比较简单的例子,比较详细的内容可以参考ROS WIKI。
rosbag C++ API工作的前提是使用 “查询 (queries)”创建一个或多个包的 “views(视图)”。查询是一个抽象的类,它定义了一个函数来过滤是否包括来自连接的消息。这个函数可以访问topic_name、数据类型、md5sum、消息定义以及连接头。此外,每个查询可以为它包括的时间范围指定一个开始和结束时间。
多个查询可以被添加到一个视图中,包括来自不同包的查询。然后,该视图提供一个跨包的迭代器接口,根据时间排序。
上面一段文字比较详细的介绍了ROS1当中比较重要的两个内容,分别是:
- rosbag::Bag - 在磁盘上序列化到/从一个bag文件。
- rosbag::View - 指定一个bag文件的视图,以允许在特间范围内查询特定连接的消息。
2. ROS2 BAG
对于ROS2而言,我们知道ROS2包的形式和ROS1不太一样,但rosbag2也不只是提供了ros2 bag命令行工具。它还提供了一个C++ API,用于从你自己的源代码中读取和写入一个包。这允许你订阅一个主题,并在对该数据进行任何其他处理的同时,将收到的数据保存到一个包中。
首先需要安装了rosbag2软件包
sudo apt install ros-foxy-rosbag2
然后创建的ros2_ws目录。导航到ros2_ws/src目录并创建一个新的包
ros2 pkg create --build-type ament_cmake bag_recorder_nodes --dependencies example_interfaces rclcpp rosbag2_cpp std_msgs
你的终端将返回一条信息,验证你的包bag_recorder_nodes及其所有必要的文件和文件夹的创建。—dependencies参数将自动在package.xml和CMakeLists.txt中添加必要的依赖关系行。在这种情况下,该包将使用rosbag2_cpp包以及rclcpp包。本教程的后面部分也需要对example_interfaces包的依赖。
然后就是创建新文件simple_bag_recorder.cpp
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>
#include <rosbag2_cpp/typesupport_helpers.hpp>
#include <rosbag2_cpp/writer.hpp>
#include <rosbag2_cpp/writers/sequential_writer.hpp>
#include <rosbag2_storage/serialized_bag_message.hpp>
using std::placeholders::_1;
class SimpleBagRecorder : public rclcpp::Node
{
public:
SimpleBagRecorder()
: Node("simple_bag_recorder")
{
const rosbag2_cpp::StorageOptions storage_options({"my_bag", "sqlite3"});//设置可选的参数,包括bag文件名和数据库类型
const rosbag2_cpp::ConverterOptions converter_options(
{rmw_get_serialization_format(),
rmw_get_serialization_format()});//设置可选的参数,包括序列化格式和压缩格式
writer_ = std::make_unique<rosbag2_cpp::writers::SequentialWriter>();//创建SequentialWriter对象,并制定序列化格式
writer_->open(storage_options, converter_options);//打开writer,当中就包含了打开数据库的操作和设置序列化格式的操作
writer_->create_topic(
{"chatter",
"std_msgs/msg/String",
rmw_get_serialization_format(),
""});//创建topic
subscription_ = create_subscription<std_msgs::msg::String>(
"chatter", 10, std::bind(&SimpleBagRecorder::topic_callback, this, _1));//创建订阅者
}
private:
void topic_callback(std::shared_ptr<rclcpp::SerializedMessage> msg) const//设置回调函数
{
auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();//创建SerializedBagMessage对象,并设置bag信息的类型
bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
new rcutils_uint8_array_t,//创建rcutils_uint8_array_t对象
[this](rcutils_uint8_array_t *msg) {//设置析构函数
auto fini_return = rcutils_uint8_array_fini(msg);//释放内存
delete msg;//删除对象
if (fini_return != RCUTILS_RET_OK) {//判断是否释放成功
RCLCPP_ERROR(get_logger(),
"Failed to destroy serialized message %s", rcutils_get_error_string().str);//打印错误信息
}
});//将bag_message中的serialized_data设置为rcutils_uint8_array_t类型
*bag_message->serialized_data = msg->release_rcl_serialized_message();//然后将msg信息中的rcl_serialized_message赋值给bag_message中的serialized_data
bag_message->topic_name = "chatter";//设置topic名称
if (rcutils_system_time_now(&bag_message->time_stamp) != RCUTILS_RET_OK) {//设置时间戳
RCLCPP_ERROR(get_logger(), "Error getting current time: %s",
rcutils_get_error_string().str);//打印错误信息
}
writer_->write(bag_message);//写入数据
}
rclcpp::Subscription<rclcpp::SerializedMessage>::SharedPtr subscription_;//创建订阅者
std::unique_ptr<rosbag2_cpp::writers::SequentialWriter> writer_;//创建SequentialWriter对象
};
这里面只是最简单的订阅并发布写入bag当中的操作,当然也有一些比较高级的操作,这里作者一一列举
void read_and_print_bag(std::string uri)
{
std::unique_ptr<rosbag2_cpp::readers::SequentialReader> reader_impl =
std::make_unique<rosbag2_cpp::readers::SequentialReader>(); // 设置ros2bag的读取器,并制定序列化格式
const rosbag2_cpp::StorageOptions storage_options({uri, "sqlite3"});// 设置ros2bag的存储格式
const rosbag2_cpp::ConverterOptions converter_options(
{rmw_get_serialization_format(),
rmw_get_serialization_format()});// 设置ros2bag的转换格式
reader_impl->open(storage_options, converter_options);// 打开ros2bag
rosbag2_cpp::Reader reader(std::move(reader_impl));// 读取ros2bag
auto serializer = rclcpp::Serialization<example_interfaces::msg::Int32>();// 设置序列化格式
while (reader.has_next())// 读取ros2bag的数据,并判断是否有下一个数据
{
auto message = reader.read_next();// 读取下一个数据
example_interfaces::msg::Int32 data;//设置输出的格式类型
rcutils_uint8_array_t raw_data = *message->serialized_data;//将bag包中的数据转换为rcutils_uint8_array_t类型
auto serialized_message = rclcpp::SerializedMessage(raw_data);//然后将raw_data数据转成数据格式
serializer.deserialize_message(&serialized_message, &data);//并将数据格式输出成data的信息,以供显示
// The data is owned by the original shared pointer in the SerializedBagMessage object, not the
// SerializedMessage object
std::cout << "Topic: " << message->topic_name << "\tData: " << data.data << "\tTime stamp: " << message->time_stamp << '\n';
}
}
或者像这样,可以暂停数据包
// 消息反序列化
void deserialize_message_by_topic(auto bag_message, std::string topic)
{
if (topic == "/camera/color/image_raw") //
{
// 使用deserialize_message函数将bag_message反序列化为sensor_msgs::msg::Image类型的消息
sensor_msgs::msg::Image deserialized_message =
this->deserialize_message<sensor_msgs::msg::Image>(bag_message);
// file_parser_util.save(deserialized_message);
}
/*
if (topic == "/thermal_raw"){
flir_lepton_msgs::msg::TemperatureRaw deserialized_message =
this->deserialize_message<flir_lepton_msgs::msg::TemperatureRaw>(bag_message);
file_parser_util.save(deserialized_message);
}
*/
if (topic == "/camera/depth/color/points")
{
// 使用deserialize_message函数将bag_message反序列化为sensor_msgs::msg::PointCloud2类型的消息,并将其存储到pc2变量中
sensor_msgs::msg::PointCloud2 pc2 =
this->deserialize_message<sensor_msgs::msg::PointCloud2>(bag_message);
// file_parser_util.save(pc2);
// RCLCPP_INFO_STREAM(this->get_logger(), "MESSAGE");
}
if (topic == "/livox/lidar")
{
sensor_msgs::msg::PointCloud2 pc2 =
this->deserialize_message<sensor_msgs::msg::PointCloud2>(bag_message);
// file_parser_util.save(pc2);
// RCLCPP_INFO_STREAM(this->get_logger(), "MESSAGE");
}
}
或者可以通过这样的方法来完成数据包的录制:
class BagRecorder : public rclcpp::Node
{
public:
BagRecorder()
: Node("bag_recorder")
{
const rosbag2_cpp::StorageOptions storage_options({make_then_get_dir(), "sqlite3"});
const rosbag2_cpp::ConverterOptions converter_options(
{rmw_get_serialization_format(),
rmw_get_serialization_format()});
writer_ = std::make_unique<rosbag2_cpp::writers::SequentialWriter>();
writer_->open(storage_options, converter_options);
writer_->create_topic(
{BUTTONS,
"std_msgs/msg/String",
rmw_get_serialization_format(),
""});
buttons_sub_ = create_subscription<std_msgs::msg::String>(
BUTTONS, 10, std::bind(&BagRecorder::buttons_topic_callback, this, _1));
writer_->create_topic(
{DISPLAY,
"interfaces/msg/DisplayData",
rmw_get_serialization_format(),
""});
display_sub_ = create_subscription<interfaces::msg::DisplayData>(
DISPLAY, 10, std::bind(&BagRecorder::display_topic_callback, this, _1));
writer_->create_topic(
{FUSION_IMU,
"interfaces/msg/FusionImu",
rmw_get_serialization_format(),
""});
fusion_imu_sub_ = create_subscription<interfaces::msg::FusionImu>(
FUSION_IMU, 10, std::bind(&BagRecorder::fusion_imu_topic_callback, this, _1));
writer_->create_topic(
{RAW_IMU,
"interfaces/msg/RawImu",
rmw_get_serialization_format(),
""});
raw_imu_sub_ = create_subscription<interfaces::msg::RawImu>(
RAW_IMU, 10, std::bind(&BagRecorder::raw_imu_topic_callback, this, _1));
writer_->create_topic(
{MOTOR,
"interfaces/msg/MotorData",
rmw_get_serialization_format(),
""});
motor_sub_ = create_subscription<interfaces::msg::MotorData>(
MOTOR, 10, std::bind(&BagRecorder::motor_topic_callback, this, _1));
}
private:
void buttons_topic_callback(std::shared_ptr<rclcpp::SerializedMessage> msg) const
{
callback_writer_helper(msg, BUTTONS);
}
void display_topic_callback(std::shared_ptr<rclcpp::SerializedMessage> msg) const
{
callback_writer_helper(msg, DISPLAY);
}
void fusion_imu_topic_callback(std::shared_ptr<rclcpp::SerializedMessage> msg) const
{
callback_writer_helper(msg, FUSION_IMU);
}
void raw_imu_topic_callback(std::shared_ptr<rclcpp::SerializedMessage> msg) const
{
callback_writer_helper(msg, RAW_IMU);
}
void motor_topic_callback(std::shared_ptr<rclcpp::SerializedMessage> msg) const
{
callback_writer_helper(msg, MOTOR);
}
void callback_writer_helper(std::shared_ptr<rclcpp::SerializedMessage> msg, std::string topic_name) const
{
auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
new rcutils_uint8_array_t,
[this](rcutils_uint8_array_t *msg)
{
auto fini_return = rcutils_uint8_array_fini(msg);
delete msg;
if (fini_return != RCUTILS_RET_OK)
{
RCLCPP_ERROR(get_logger(),
"Failed to destroy serialized message %s", rcutils_get_error_string().str);
}
});
*bag_message->serialized_data = msg->release_rcl_serialized_message();
bag_message->topic_name = topic_name;
if (rcutils_system_time_now(&bag_message->time_stamp) != RCUTILS_RET_OK)
{
RCLCPP_ERROR(get_logger(), "Error getting current time: %s",
rcutils_get_error_string().str);
}
std::cout << "message: " << topic_name << std::endl;
writer_->write(bag_message);
}
std::string make_then_get_dir(void)
{
namespace fs = std::filesystem;
using namespace std;
time_t now = time(0);
tm *ltm = localtime(&now);
string year = to_string(1900 + ltm->tm_year);
string month = to_string(1 + ltm->tm_mon);
string day = to_string(ltm->tm_mday);
string date = get_month_name(1 + ltm->tm_mon) + "-" + day + "-" + year;
string ttime = to_string(ltm->tm_hour) + "-" + to_string(ltm->tm_min) + "-" + to_string(ltm->tm_sec);
string folder = "my_bag/" + date + "/" + ttime;
fs::create_directories(folder);
cout << "path: " << folder << endl;
return folder;
}
std::string get_month_name(int month_num)
{
std::cout << "month_num: " << month_num << std::endl;
switch (month_num)
{
case 1:
return "Jan";
case 2:
return "Feb";
case 3:
return "March";
case 4:
return "Apr";
case 5:
return "May";
case 6:
return "Jun";
case 7:
return "Jul";
case 8:
return "Aug";
case 9:
return "Sep";
case 10:
return "Oct";
case 11:
return "Nov";
case 12:
return "Dec";
default: // Optional
return "Error";
}
}
rclcpp::Subscription<rclcpp::SerializedMessage>::SharedPtr buttons_sub_;
rclcpp::Subscription<rclcpp::SerializedMessage>::SharedPtr display_sub_;
rclcpp::Subscription<rclcpp::SerializedMessage>::SharedPtr fusion_imu_sub_;
rclcpp::Subscription<rclcpp::SerializedMessage>::SharedPtr raw_imu_sub_;
rclcpp::Subscription<rclcpp::SerializedMessage>::SharedPtr motor_sub_;
std::unique_ptr<rosbag2_cpp::writers::SequentialWriter> writer_;
};
如果各位觉得这些仍然讲的比较粗糙,也可以从示例开始学起,这里我们推荐两个代码包,分别是:合并多个bag包、可视化简单图像订阅者和查看器
以及图像和PCD文件读取。
#include <rcpputils/filesystem_helper.hpp>
#include <optional>
#include <rclcpp/rclcpp.hpp>
#include <rclcpp/serialization.hpp>
#include <rosbag2_cpp/reader.hpp>
#include <rosbag2_cpp/readers/sequential_reader.hpp>
#include <rosbag2_cpp/writer.hpp>
#include <rosbag2_cpp/writers/sequential_writer.hpp>
#include <algorithm>
#include <iostream>
#include <limits>
#include <memory>
#include <string>
#include <utility>
#include <vector>
struct BagMergerOptions
{
std::vector<rcpputils::fs::path> inputs;
std::optional<rcpputils::fs::path> output{std::nullopt};
uint max_bagfile_duration{0};
uint max_bagfile_size{0};
};
using ReaderPtr = std::shared_ptr<rosbag2_cpp::Reader>;
using NextMessage = std::optional<std::shared_ptr<rosbag2_storage::SerializedBagMessage>>;
struct ReaderWithNext
{
ReaderPtr reader;
NextMessage next_message;
};
using ReaderStore = std::vector<ReaderWithNext>;
BagMergerOptions get_options(int argc, char ** argv)
{
BagMergerOptions options;
// There must be at least 4 arguments:
// program name, -o, output destination input bag 1
if (argc < 4) {
std::cerr << "Usage: " << argv[0] << " -o <output bag> <input bag...> " <<
"[-b <max_bagfile_size> -d <max_bagfile_duration>" << std::endl;
return options;
}
std::string output_flag = "-o";
std::string duration_flag = "-d";
std::string size_flag = "-b";
for (int ii = 1; ii < argc; ++ii) {
if (
(output_flag == argv[ii] || duration_flag == argv[ii] || size_flag == argv[ii]) &&
ii + 1 == argc)
{
std::cerr << "Missing argument to flag " << argv[ii] << std::endl;
return options;
} else if (output_flag == argv[ii]) {
options.output = rcpputils::fs::path(argv[ii + 1]);
ii += 1;
} else if (duration_flag == argv[ii]) {
options.max_bagfile_duration = std::stoi(argv[ii + 1]);
ii += 1;
} else if (size_flag == argv[ii]) {
options.max_bagfile_size = std::stoi(argv[ii + 1]);
ii += 1;
} else {
options.inputs.push_back(rcpputils::fs::path(argv[ii]));
}
}
return options;
}
ReaderStore make_readers(const std::vector<rcpputils::fs::path> & input_names)
{
ReaderStore result;
for (const auto & input_name : input_names) {
std::unique_ptr<rosbag2_cpp::readers::SequentialReader> reader_impl =
std::make_unique<rosbag2_cpp::readers::SequentialReader>();
const rosbag2_cpp::StorageOptions storage_options({input_name.string(), "sqlite3"});
const rosbag2_cpp::ConverterOptions converter_options(
{rmw_get_serialization_format(), rmw_get_serialization_format()});
reader_impl->open(storage_options, converter_options);
std::shared_ptr<rosbag2_cpp::Reader> reader =
std::make_shared<rosbag2_cpp::Reader>(std::move(reader_impl));
if (reader->has_next()) {
result.push_back({reader, reader->read_next()});
} else {
result.push_back({reader, std::nullopt});
}
}
return result;
}
std::unique_ptr<rosbag2_cpp::Writer> make_writer(
const rcpputils::fs::path & output_name, BagMergerOptions options)
{
const rosbag2_cpp::StorageOptions storage_options(
{output_name.string(), "sqlite3", options.max_bagfile_size, options.max_bagfile_duration});
const rosbag2_cpp::ConverterOptions converter_options(
{rmw_get_serialization_format(), rmw_get_serialization_format()});
std::unique_ptr<rosbag2_cpp::writers::SequentialWriter> writer_impl =
std::make_unique<rosbag2_cpp::writers::SequentialWriter>();
writer_impl->open(storage_options, converter_options);
return std::make_unique<rosbag2_cpp::Writer>(std::move(writer_impl));
}
std::vector<rosbag2_storage::TopicMetadata> combine_input_topics(const ReaderStore & readers)
{
std::vector<rosbag2_storage::TopicMetadata> result;
for (const auto & r : readers) {
auto topic_metadata = r.reader->get_all_topics_and_types();
for (auto && t : topic_metadata) {
auto existing_topic = std::find_if(
result.begin(), result.end(), [&t](auto topic) {return topic.name == t.name;});
if (existing_topic == result.end()) {
result.emplace_back(std::move(t));
}
// Ignore already-listed topics
}
}
return result;
}
void set_output_metadata(
std::unique_ptr<rosbag2_cpp::Writer> & writer,
const std::vector<rosbag2_storage::TopicMetadata> & topics)
{
for (const auto & t : topics) {
writer->create_topic(t);
}
}
uint64_t get_total_message_count(const ReaderStore & readers)
{
uint64_t total = 0;
for (const auto & r : readers) {
total += r.reader->get_metadata().message_count;
}
return total;
}
std::optional<ReaderStore::iterator> get_earliest_reader(ReaderStore & readers)
{
rcutils_time_point_value_t earliest_time = std::numeric_limits<rcutils_time_point_value_t>::max();
ReaderStore::iterator earliest_message = readers.end();
for (ReaderStore::iterator current = readers.begin(); current != readers.end(); ++current) {
if (current->next_message != std::nullopt) {
if (current->next_message.value()->time_stamp < earliest_time) {
earliest_time = current->next_message.value()->time_stamp;
earliest_message = current;
}
}
}
if (earliest_message == readers.end()) {
return std::nullopt;
} else {
return earliest_message;
}
}
std::optional<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> read_next(
ReaderStore & readers)
{
auto reader_with_next = get_earliest_reader(readers);
if (reader_with_next == std::nullopt) {
return std::nullopt;
}
auto result = reader_with_next.value()->next_message.value();
if (reader_with_next.value()->reader->has_next()) {
reader_with_next.value()->next_message = reader_with_next.value()->reader->read_next();
} else {
reader_with_next.value()->next_message = std::nullopt;
}
return result;
}
void write_next_message(
std::unique_ptr<rosbag2_cpp::Writer> & writer,
const std::shared_ptr<rosbag2_storage::SerializedBagMessage> & message)
{
writer->write(message);
}
int main(int argc, char ** argv)
{
auto bag_merger_options = get_options(argc, argv);
rcpputils::fs::path output;
if (bag_merger_options.inputs.size() == 0) {
std::cerr << "Missing input bags\n";
return 1;
}
if (bag_merger_options.output == std::nullopt) {
std::cerr << "Missing output bag name\n";
return 1;
}
output = bag_merger_options.output.value();
// Create a reader for each input bag
ReaderStore readers = make_readers(bag_merger_options.inputs);
if (readers.size() == 0) {
return 1;
}
// Make the output directory absolute
if (!output.is_absolute()) {
auto cwd = rcpputils::fs::current_path();
output = cwd / output;
}
// Create the output directory
if (output.exists()) {
std::cerr << "Output bag directory already exists\n";
return 1;
}
std::cout << "Creating output directory '" << output.string() << "' for destination bag\n";
if (!rcpputils::fs::create_directories(output)) {
std::cerr << "Failed to create destination bag's output directory\n";
return 1;
}
// Create a writer for the output bag
std::unique_ptr<rosbag2_cpp::Writer> writer = make_writer(output, bag_merger_options);
// Combine the input bag topics into one list and use it for the output bag metadata
auto input_topics = combine_input_topics(readers);
set_output_metadata(writer, input_topics);
auto num_messages = get_total_message_count(readers);
std::cout << "Processing " << num_messages << " messages from " << readers.size() <<
" input bags\n";
std::cout << std::unitbuf;
std::cout << " 00%";
uint64_t processed_count = 0;
int processed_fraction = 0;
// Loop over the messages in all bags in time order, writing them to the output bag
while (true) {
auto next_message = read_next(readers);
// Check if we've reached the end of all input bags or not
if (next_message == std::nullopt) {
std::cout << "\b\b\b\b100%\nProcessing complete\n";
break;
}
// Write the message
write_next_message(writer, next_message.value());
processed_count += 1;
processed_fraction =
static_cast<float>(processed_count) / static_cast<float>(num_messages) * 100.0f;
std::cout << "\b\b\b\b\b" << processed_fraction << "%";
}
// Close the bags
return 0;
}
3. 参考链接
https://github.com/osrf/bag_merger/blob/main/src/merge_bags.cpp
https://www.ncnynl.com/archives/202110/4623.html
https://docs.ros.org/en/rolling/Tutorials/Advanced/Recording-A-Bag-From-Your-Own-Node-CPP.html
评论(0)
您还未登录,请登录后发表或查看评论