0. 简介

对弈ros/ros2的bag而言,真的是非常好用一个产物,通过bag我们可以保证数据的多次的重复的复现,直至发现问题。而对弈ros的bag的二次开发也一直是大家关注的重点。下面我们将从ROS1开始,带领大家了解一下ROS2的内容。

1. ROS1 BAG

对于ROS1的bag,网上有比较多的例子了,比如说下面的https://zhuanlan.zhihu.com/p/150290102

#include <rosbag/bag.h>
#include <rosbag/view.h>
#include <ros/ros.h>
#include <novatel_msgs/INSPVAX.h>
#include <vector>
#include <boost/foreach.hpp>
#define foreach BOOST_FOREACH

int main(){
    rosbag::Bag bag;
    bag.open("/home/liuxiao/bagfiles/novatel.bag", rosbag::bagmode::Read);

    std::vector<std::string> topics;
    topics.push_back(std::string("/novatel_data/inspvax"));

    rosbag::View view(bag, rosbag::TopicQuery(topics));

    foreach(rosbag::MessageInstance const m, view)
    {
        novatel_msgs::INSPVAX::ConstPtr s = m.instantiate<novatel_msgs::INSPVAX>();
        if (s != NULL)
            std::cout << s->latitude << std::endl;
    }
    bag.close();
}

这些都是比较简单的例子,比较详细的内容可以参考ROS WIKI

rosbag C++ API工作的前提是使用 “查询 (queries)”创建一个或多个包的 “views(视图)”。查询是一个抽象的类,它定义了一个函数来过滤是否包括来自连接的消息。这个函数可以访问topic_name、数据类型、md5sum、消息定义以及连接头。此外,每个查询可以为它包括的时间范围指定一个开始和结束时间。

多个查询可以被添加到一个视图中,包括来自不同包的查询。然后,该视图提供一个跨包的迭代器接口,根据时间排序。

上面一段文字比较详细的介绍了ROS1当中比较重要的两个内容,分别是:

  • rosbag::Bag - 在磁盘上序列化到/从一个bag文件。
  • rosbag::View - 指定一个bag文件的视图,以允许在特间范围内查询特定连接的消息。

2. ROS2 BAG

对于ROS2而言,我们知道ROS2包的形式和ROS1不太一样,但rosbag2也不只是提供了ros2 bag命令行工具。它还提供了一个C++ API,用于从你自己的源代码中读取和写入一个包。这允许你订阅一个主题,并在对该数据进行任何其他处理的同时,将收到的数据保存到一个包中。
首先需要安装了rosbag2软件包

sudo apt install ros-foxy-rosbag2

然后创建的ros2_ws目录。导航到ros2_ws/src目录并创建一个新的包

ros2 pkg create --build-type ament_cmake bag_recorder_nodes --dependencies example_interfaces rclcpp rosbag2_cpp std_msgs

你的终端将返回一条信息,验证你的包bag_recorder_nodes及其所有必要的文件和文件夹的创建。—dependencies参数将自动在package.xml和CMakeLists.txt中添加必要的依赖关系行。在这种情况下,该包将使用rosbag2_cpp包以及rclcpp包。本教程的后面部分也需要对example_interfaces包的依赖。

然后就是创建新文件simple_bag_recorder.cpp

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>

#include <rosbag2_cpp/typesupport_helpers.hpp>
#include <rosbag2_cpp/writer.hpp>
#include <rosbag2_cpp/writers/sequential_writer.hpp>
#include <rosbag2_storage/serialized_bag_message.hpp>

using std::placeholders::_1;

class SimpleBagRecorder : public rclcpp::Node
{
public:
  SimpleBagRecorder()
  : Node("simple_bag_recorder")
  {
    const rosbag2_cpp::StorageOptions storage_options({"my_bag", "sqlite3"});//设置可选的参数,包括bag文件名和数据库类型
    const rosbag2_cpp::ConverterOptions converter_options(
      {rmw_get_serialization_format(),
       rmw_get_serialization_format()});//设置可选的参数,包括序列化格式和压缩格式
    writer_ = std::make_unique<rosbag2_cpp::writers::SequentialWriter>();//创建SequentialWriter对象,并制定序列化格式

    writer_->open(storage_options, converter_options);//打开writer,当中就包含了打开数据库的操作和设置序列化格式的操作

    writer_->create_topic(
      {"chatter",
       "std_msgs/msg/String",
       rmw_get_serialization_format(),
       ""});//创建topic

    subscription_ = create_subscription<std_msgs::msg::String>(
      "chatter", 10, std::bind(&SimpleBagRecorder::topic_callback, this, _1));//创建订阅者
  }

private:
  void topic_callback(std::shared_ptr<rclcpp::SerializedMessage> msg) const//设置回调函数
  {
    auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();//创建SerializedBagMessage对象,并设置bag信息的类型

    bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
      new rcutils_uint8_array_t,//创建rcutils_uint8_array_t对象
      [this](rcutils_uint8_array_t *msg) {//设置析构函数
        auto fini_return = rcutils_uint8_array_fini(msg);//释放内存
        delete msg;//删除对象
        if (fini_return != RCUTILS_RET_OK) {//判断是否释放成功
          RCLCPP_ERROR(get_logger(),
            "Failed to destroy serialized message %s",     rcutils_get_error_string().str);//打印错误信息
        }
      });//将bag_message中的serialized_data设置为rcutils_uint8_array_t类型
    *bag_message->serialized_data = msg->release_rcl_serialized_message();//然后将msg信息中的rcl_serialized_message赋值给bag_message中的serialized_data

    bag_message->topic_name = "chatter";//设置topic名称
    if (rcutils_system_time_now(&bag_message->time_stamp) != RCUTILS_RET_OK) {//设置时间戳
      RCLCPP_ERROR(get_logger(), "Error getting current time: %s",
        rcutils_get_error_string().str);//打印错误信息
    }

    writer_->write(bag_message);//写入数据
  }

  rclcpp::Subscription<rclcpp::SerializedMessage>::SharedPtr subscription_;//创建订阅者
  std::unique_ptr<rosbag2_cpp::writers::SequentialWriter> writer_;//创建SequentialWriter对象
};

这里面只是最简单的订阅并发布写入bag当中的操作,当然也有一些比较高级的操作,这里作者一一列举

void read_and_print_bag(std::string uri)
{
    std::unique_ptr<rosbag2_cpp::readers::SequentialReader> reader_impl =
        std::make_unique<rosbag2_cpp::readers::SequentialReader>(); // 设置ros2bag的读取器,并制定序列化格式
    const rosbag2_cpp::StorageOptions storage_options({uri, "sqlite3"});// 设置ros2bag的存储格式
    const rosbag2_cpp::ConverterOptions converter_options(
        {rmw_get_serialization_format(),
         rmw_get_serialization_format()});// 设置ros2bag的转换格式
    reader_impl->open(storage_options, converter_options);// 打开ros2bag
    rosbag2_cpp::Reader reader(std::move(reader_impl));// 读取ros2bag

    auto serializer = rclcpp::Serialization<example_interfaces::msg::Int32>();// 设置序列化格式
    while (reader.has_next())// 读取ros2bag的数据,并判断是否有下一个数据
    {
        auto message = reader.read_next();// 读取下一个数据
        example_interfaces::msg::Int32 data;//设置输出的格式类型

        rcutils_uint8_array_t raw_data = *message->serialized_data;//将bag包中的数据转换为rcutils_uint8_array_t类型
        auto serialized_message = rclcpp::SerializedMessage(raw_data);//然后将raw_data数据转成数据格式
        serializer.deserialize_message(&serialized_message, &data);//并将数据格式输出成data的信息,以供显示
        // The data is owned by the original shared pointer in the SerializedBagMessage object, not the
        // SerializedMessage object

        std::cout << "Topic: " << message->topic_name << "\tData: " << data.data << "\tTime stamp: " << message->time_stamp << '\n';
    }
}

或者像这样,可以暂停数据包

// 消息反序列化
void deserialize_message_by_topic(auto bag_message, std::string topic)
{

    if (topic == "/camera/color/image_raw") //
    {
        // 使用deserialize_message函数将bag_message反序列化为sensor_msgs::msg::Image类型的消息
        sensor_msgs::msg::Image deserialized_message =
            this->deserialize_message<sensor_msgs::msg::Image>(bag_message);
        // file_parser_util.save(deserialized_message);
    }

    /*
    if (topic == "/thermal_raw"){
        flir_lepton_msgs::msg::TemperatureRaw deserialized_message =
            this->deserialize_message<flir_lepton_msgs::msg::TemperatureRaw>(bag_message);
        file_parser_util.save(deserialized_message);
    }
    */
    if (topic == "/camera/depth/color/points")
    {
        // 使用deserialize_message函数将bag_message反序列化为sensor_msgs::msg::PointCloud2类型的消息,并将其存储到pc2变量中
        sensor_msgs::msg::PointCloud2 pc2 =
            this->deserialize_message<sensor_msgs::msg::PointCloud2>(bag_message);
        // file_parser_util.save(pc2);
        // RCLCPP_INFO_STREAM(this->get_logger(), "MESSAGE");
    }
    if (topic == "/livox/lidar")
    {
        sensor_msgs::msg::PointCloud2 pc2 =
            this->deserialize_message<sensor_msgs::msg::PointCloud2>(bag_message);
        // file_parser_util.save(pc2);
        // RCLCPP_INFO_STREAM(this->get_logger(), "MESSAGE");
    }
}

或者可以通过这样的方法来完成数据包的录制:

class BagRecorder : public rclcpp::Node
{
public:
  BagRecorder()
      : Node("bag_recorder")
  {
    const rosbag2_cpp::StorageOptions storage_options({make_then_get_dir(), "sqlite3"});
    const rosbag2_cpp::ConverterOptions converter_options(
        {rmw_get_serialization_format(),
         rmw_get_serialization_format()});
    writer_ = std::make_unique<rosbag2_cpp::writers::SequentialWriter>();

    writer_->open(storage_options, converter_options);

    writer_->create_topic(
        {BUTTONS,
         "std_msgs/msg/String",
         rmw_get_serialization_format(),
         ""});
    buttons_sub_ = create_subscription<std_msgs::msg::String>(
        BUTTONS, 10, std::bind(&BagRecorder::buttons_topic_callback, this, _1));

    writer_->create_topic(
        {DISPLAY,
         "interfaces/msg/DisplayData",
         rmw_get_serialization_format(),
         ""});
    display_sub_ = create_subscription<interfaces::msg::DisplayData>(
        DISPLAY, 10, std::bind(&BagRecorder::display_topic_callback, this, _1));

    writer_->create_topic(
        {FUSION_IMU,
         "interfaces/msg/FusionImu",
         rmw_get_serialization_format(),
         ""});
    fusion_imu_sub_ = create_subscription<interfaces::msg::FusionImu>(
        FUSION_IMU, 10, std::bind(&BagRecorder::fusion_imu_topic_callback, this, _1));

    writer_->create_topic(
        {RAW_IMU,
         "interfaces/msg/RawImu",
         rmw_get_serialization_format(),
         ""});
    raw_imu_sub_ = create_subscription<interfaces::msg::RawImu>(
        RAW_IMU, 10, std::bind(&BagRecorder::raw_imu_topic_callback, this, _1));

    writer_->create_topic(
        {MOTOR,
         "interfaces/msg/MotorData",
         rmw_get_serialization_format(),
         ""});
    motor_sub_ = create_subscription<interfaces::msg::MotorData>(
        MOTOR, 10, std::bind(&BagRecorder::motor_topic_callback, this, _1));
  }

private:
  void buttons_topic_callback(std::shared_ptr<rclcpp::SerializedMessage> msg) const
  {
    callback_writer_helper(msg, BUTTONS);
  }
  void display_topic_callback(std::shared_ptr<rclcpp::SerializedMessage> msg) const
  {
    callback_writer_helper(msg, DISPLAY);
  }
  void fusion_imu_topic_callback(std::shared_ptr<rclcpp::SerializedMessage> msg) const
  {
    callback_writer_helper(msg, FUSION_IMU);
  }
  void raw_imu_topic_callback(std::shared_ptr<rclcpp::SerializedMessage> msg) const
  {
    callback_writer_helper(msg, RAW_IMU);
  }
  void motor_topic_callback(std::shared_ptr<rclcpp::SerializedMessage> msg) const
  {
    callback_writer_helper(msg, MOTOR);
  }

  void callback_writer_helper(std::shared_ptr<rclcpp::SerializedMessage> msg, std::string topic_name) const
  {
    auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();

    bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
        new rcutils_uint8_array_t,
        [this](rcutils_uint8_array_t *msg)
        {
          auto fini_return = rcutils_uint8_array_fini(msg);
          delete msg;
          if (fini_return != RCUTILS_RET_OK)
          {
            RCLCPP_ERROR(get_logger(),
                         "Failed to destroy serialized message %s", rcutils_get_error_string().str);
          }
        });
    *bag_message->serialized_data = msg->release_rcl_serialized_message();

    bag_message->topic_name = topic_name;
    if (rcutils_system_time_now(&bag_message->time_stamp) != RCUTILS_RET_OK)
    {
      RCLCPP_ERROR(get_logger(), "Error getting current time: %s",
                   rcutils_get_error_string().str);
    }
    std::cout << "message: " << topic_name << std::endl;
    writer_->write(bag_message);
  }

  std::string make_then_get_dir(void)
  {
    namespace fs = std::filesystem;
    using namespace std;

    time_t now = time(0);
    tm *ltm = localtime(&now);
    string year = to_string(1900 + ltm->tm_year);
    string month = to_string(1 + ltm->tm_mon);
    string day = to_string(ltm->tm_mday);
    string date = get_month_name(1 + ltm->tm_mon) + "-" + day + "-" + year;
    string ttime = to_string(ltm->tm_hour) + "-" + to_string(ltm->tm_min) + "-" + to_string(ltm->tm_sec);
    string folder = "my_bag/" + date + "/" + ttime;
    fs::create_directories(folder);
    cout << "path: " << folder << endl;
    return folder;
  }

  std::string get_month_name(int month_num)
  {
    std::cout << "month_num: " << month_num << std::endl;
    switch (month_num)
    {
    case 1:
      return "Jan";
    case 2:
      return "Feb";
    case 3:
      return "March";
    case 4:
      return "Apr";
    case 5:
      return "May";
    case 6:
      return "Jun";
    case 7:
      return "Jul";
    case 8:
      return "Aug";
    case 9:
      return "Sep";
    case 10:
      return "Oct";
    case 11:
      return "Nov";
    case 12:
      return "Dec";
    default: // Optional
      return "Error";
    }
  }

  rclcpp::Subscription<rclcpp::SerializedMessage>::SharedPtr buttons_sub_;
  rclcpp::Subscription<rclcpp::SerializedMessage>::SharedPtr display_sub_;
  rclcpp::Subscription<rclcpp::SerializedMessage>::SharedPtr fusion_imu_sub_;
  rclcpp::Subscription<rclcpp::SerializedMessage>::SharedPtr raw_imu_sub_;
  rclcpp::Subscription<rclcpp::SerializedMessage>::SharedPtr motor_sub_;
  std::unique_ptr<rosbag2_cpp::writers::SequentialWriter> writer_;
};

如果各位觉得这些仍然讲的比较粗糙,也可以从示例开始学起,这里我们推荐两个代码包,分别是:合并多个bag包可视化简单图像订阅者和查看器
以及图像和PCD文件读取

#include <rcpputils/filesystem_helper.hpp>

#include <optional>
#include <rclcpp/rclcpp.hpp>
#include <rclcpp/serialization.hpp>
#include <rosbag2_cpp/reader.hpp>
#include <rosbag2_cpp/readers/sequential_reader.hpp>
#include <rosbag2_cpp/writer.hpp>
#include <rosbag2_cpp/writers/sequential_writer.hpp>

#include <algorithm>
#include <iostream>
#include <limits>
#include <memory>
#include <string>
#include <utility>
#include <vector>

struct BagMergerOptions
{
  std::vector<rcpputils::fs::path> inputs;
  std::optional<rcpputils::fs::path> output{std::nullopt};
  uint max_bagfile_duration{0};
  uint max_bagfile_size{0};
};

using ReaderPtr = std::shared_ptr<rosbag2_cpp::Reader>;
using NextMessage = std::optional<std::shared_ptr<rosbag2_storage::SerializedBagMessage>>;
struct ReaderWithNext
{
  ReaderPtr reader;
  NextMessage next_message;
};
using ReaderStore = std::vector<ReaderWithNext>;

BagMergerOptions get_options(int argc, char ** argv)
{
  BagMergerOptions options;

  // There must be at least 4 arguments:
  // program name, -o, output destination input bag 1
  if (argc < 4) {
    std::cerr << "Usage: " << argv[0] << " -o <output bag> <input bag...> " <<
      "[-b <max_bagfile_size> -d <max_bagfile_duration>" << std::endl;
    return options;
  }

  std::string output_flag = "-o";
  std::string duration_flag = "-d";
  std::string size_flag = "-b";

  for (int ii = 1; ii < argc; ++ii) {
    if (
      (output_flag == argv[ii] || duration_flag == argv[ii] || size_flag == argv[ii]) &&
      ii + 1 == argc)
    {
      std::cerr << "Missing argument to flag " << argv[ii] << std::endl;
      return options;
    } else if (output_flag == argv[ii]) {
      options.output = rcpputils::fs::path(argv[ii + 1]);
      ii += 1;
    } else if (duration_flag == argv[ii]) {
      options.max_bagfile_duration = std::stoi(argv[ii + 1]);
      ii += 1;
    } else if (size_flag == argv[ii]) {
      options.max_bagfile_size = std::stoi(argv[ii + 1]);
      ii += 1;
    } else {
      options.inputs.push_back(rcpputils::fs::path(argv[ii]));
    }
  }
  return options;
}

ReaderStore make_readers(const std::vector<rcpputils::fs::path> & input_names)
{
  ReaderStore result;

  for (const auto & input_name : input_names) {
    std::unique_ptr<rosbag2_cpp::readers::SequentialReader> reader_impl =
      std::make_unique<rosbag2_cpp::readers::SequentialReader>();
    const rosbag2_cpp::StorageOptions storage_options({input_name.string(), "sqlite3"});
    const rosbag2_cpp::ConverterOptions converter_options(
      {rmw_get_serialization_format(), rmw_get_serialization_format()});
    reader_impl->open(storage_options, converter_options);

    std::shared_ptr<rosbag2_cpp::Reader> reader =
      std::make_shared<rosbag2_cpp::Reader>(std::move(reader_impl));
    if (reader->has_next()) {
      result.push_back({reader, reader->read_next()});
    } else {
      result.push_back({reader, std::nullopt});
    }
  }

  return result;
}

std::unique_ptr<rosbag2_cpp::Writer> make_writer(
  const rcpputils::fs::path & output_name, BagMergerOptions options)
{
  const rosbag2_cpp::StorageOptions storage_options(
    {output_name.string(), "sqlite3", options.max_bagfile_size, options.max_bagfile_duration});
  const rosbag2_cpp::ConverterOptions converter_options(
    {rmw_get_serialization_format(), rmw_get_serialization_format()});
  std::unique_ptr<rosbag2_cpp::writers::SequentialWriter> writer_impl =
    std::make_unique<rosbag2_cpp::writers::SequentialWriter>();
  writer_impl->open(storage_options, converter_options);

  return std::make_unique<rosbag2_cpp::Writer>(std::move(writer_impl));
}

std::vector<rosbag2_storage::TopicMetadata> combine_input_topics(const ReaderStore & readers)
{
  std::vector<rosbag2_storage::TopicMetadata> result;

  for (const auto & r : readers) {
    auto topic_metadata = r.reader->get_all_topics_and_types();
    for (auto && t : topic_metadata) {
      auto existing_topic = std::find_if(
        result.begin(), result.end(), [&t](auto topic) {return topic.name == t.name;});
      if (existing_topic == result.end()) {
        result.emplace_back(std::move(t));
      }
      // Ignore already-listed topics
    }
  }
  return result;
}

void set_output_metadata(
  std::unique_ptr<rosbag2_cpp::Writer> & writer,
  const std::vector<rosbag2_storage::TopicMetadata> & topics)
{
  for (const auto & t : topics) {
    writer->create_topic(t);
  }
}

uint64_t get_total_message_count(const ReaderStore & readers)
{
  uint64_t total = 0;
  for (const auto & r : readers) {
    total += r.reader->get_metadata().message_count;
  }
  return total;
}

std::optional<ReaderStore::iterator> get_earliest_reader(ReaderStore & readers)
{
  rcutils_time_point_value_t earliest_time = std::numeric_limits<rcutils_time_point_value_t>::max();
  ReaderStore::iterator earliest_message = readers.end();

  for (ReaderStore::iterator current = readers.begin(); current != readers.end(); ++current) {
    if (current->next_message != std::nullopt) {
      if (current->next_message.value()->time_stamp < earliest_time) {
        earliest_time = current->next_message.value()->time_stamp;
        earliest_message = current;
      }
    }
  }

  if (earliest_message == readers.end()) {
    return std::nullopt;
  } else {
    return earliest_message;
  }
}

std::optional<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> read_next(
  ReaderStore & readers)
{
  auto reader_with_next = get_earliest_reader(readers);
  if (reader_with_next == std::nullopt) {
    return std::nullopt;
  }

  auto result = reader_with_next.value()->next_message.value();
  if (reader_with_next.value()->reader->has_next()) {
    reader_with_next.value()->next_message = reader_with_next.value()->reader->read_next();
  } else {
    reader_with_next.value()->next_message = std::nullopt;
  }
  return result;
}

void write_next_message(
  std::unique_ptr<rosbag2_cpp::Writer> & writer,
  const std::shared_ptr<rosbag2_storage::SerializedBagMessage> & message)
{
  writer->write(message);
}

int main(int argc, char ** argv)
{
  auto bag_merger_options = get_options(argc, argv);
  rcpputils::fs::path output;

  if (bag_merger_options.inputs.size() == 0) {
    std::cerr << "Missing input bags\n";
    return 1;
  }
  if (bag_merger_options.output == std::nullopt) {
    std::cerr << "Missing output bag name\n";
    return 1;
  }
  output = bag_merger_options.output.value();

  // Create a reader for each input bag
  ReaderStore readers = make_readers(bag_merger_options.inputs);
  if (readers.size() == 0) {
    return 1;
  }
  // Make the output directory absolute
  if (!output.is_absolute()) {
    auto cwd = rcpputils::fs::current_path();
    output = cwd / output;
  }
  // Create the output directory
  if (output.exists()) {
    std::cerr << "Output bag directory already exists\n";
    return 1;
  }
  std::cout << "Creating output directory '" << output.string() << "' for destination bag\n";
  if (!rcpputils::fs::create_directories(output)) {
    std::cerr << "Failed to create destination bag's output directory\n";
    return 1;
  }
  // Create a writer for the output bag
  std::unique_ptr<rosbag2_cpp::Writer> writer = make_writer(output, bag_merger_options);

  // Combine the input bag topics into one list and use it for the output bag metadata
  auto input_topics = combine_input_topics(readers);
  set_output_metadata(writer, input_topics);

  auto num_messages = get_total_message_count(readers);
  std::cout << "Processing " << num_messages << " messages from " << readers.size() <<
    " input bags\n";
  std::cout << std::unitbuf;
  std::cout << "   00%";
  uint64_t processed_count = 0;
  int processed_fraction = 0;
  // Loop over the messages in all bags in time order, writing them to the output bag
  while (true) {
    auto next_message = read_next(readers);
    // Check if we've reached the end of all input bags or not
    if (next_message == std::nullopt) {
      std::cout << "\b\b\b\b100%\nProcessing complete\n";
      break;
    }
    // Write the message
    write_next_message(writer, next_message.value());
    processed_count += 1;
    processed_fraction =
      static_cast<float>(processed_count) / static_cast<float>(num_messages) * 100.0f;
    std::cout << "\b\b\b\b\b" << processed_fraction << "%";
  }

  // Close the bags

  return 0;
}

3. 参考链接

https://github.com/osrf/bag_merger/blob/main/src/merge_bags.cpp

https://www.ncnynl.com/archives/202110/4623.html

https://docs.ros.org/en/rolling/Tutorials/Advanced/Recording-A-Bag-From-Your-Own-Node-CPP.html