0. 简介

对于使用C++的开发者来说,这两种用法应该是比较高级也是比较常用的。对于线程池来说,作用主要有以下几点:

  1. 提高响应速度:线程池内部维护一组预先创建好的线程,当需要处理任务时,无需新建线程,直接从池中取出空闲线程,从而大幅减小了线程创建和销毁的开销,提高了系统响应速度。
  2. 提高线程复用性:线程池管理的线程可以反复利用,完成一次任务后,不会被销毁,而是可以继续接收下一个任务。
  3. 提高资源的可管理性:线程池可以控制系统中并发线程的数量,避免大量线程之间的抢占导致CPU过度切换,影响系统性能。此外,也方便对线程进行统一的管理和调度。

而对于内存池来说,主要作用又有以下几点:

  1. 减少内存碎片:内存池通过预先申请一大块内存,并对其进行管理和分配,有效地减少了因反复申请和释放小块内存导致的内存碎片问题(时间过长会出现coredump)。
  2. 提高内存利用率:通过对内存的管理,可以重复利用已经释放的内存块,避免了频繁地向系统申请和释放内存,从而提高内存利用率。
  3. 提高内存申请效率:在内存池中申请和释放内存的操作只需要在内存池内部进行,避免了频繁地进行系统调用,从而提高了内存申请的效率。

1. 线程池使用

这里介绍的线程池和内存池都是Light-City博主开源的,两个项目都是基于Apache Arrow项目的衍生版本。我们将Arrow项目中复杂的核心结构——线程池和内存池,完全剥离出来,形成了这个独立的项目。线程池的项目为:https://github.com/Light-City/light-thread-pool。该部分可以使线程池更方便地作为其他项目的依赖库使用,并提供简单的方式来引入本项目的so库和头文件,以使用线程池功能。只需要调用这些头文件即可,目前官方给到的是使用Bazel来完成编译的。如果不习惯的同学可以使用CMake来完成相同的操作。

这段代码展示了如何使用Apache Arrow库的线程池来并发执行任务。首先,它创建了一个线程池,然后将一个简单的任务(打印”hello world!”)提交给线程池进行异步执行。随后,它等待所有的任务都完成,这是通过调用WaitForIdle方法实现的。最后,它关闭线程池。如果无法创建线程池,程序会返回错误消息并退出。

#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

#include "src/cancel.h"
#include "src/io_util.h"
#include "src/macros.h"
#include "src/thread_pool.h"
using namespace arrow;

int main() {
  // Create a thread pool
  auto threadPool = GetCpuThreadPool();
  if (!threadPool) {
    std::cerr << "Failed to create thread pool" << std::endl;
    return 1;
  }

  // Submit tasks to the thread pool
  threadPool->Spawn([]() { std::cout << "hello world!" << std::endl; });

  // Wait for all tasks to complete
  threadPool->WaitForIdle();

  // Shutdown the thread pool
  threadPool->Shutdown();
}

然后下面这段代码就高级了一些,演示了如何使用Apache Arrow库的线程池并发执行多个任务,并处理由于任务取消导致的错误。首先,它定义了三个模板函数(add、sub、mul),每个函数都将模拟需要2秒钟来计算结果的操作。它还定义了一个名为status_callback的函数,该函数将用于处理任务执行的状态。

在main函数中,它创建了一个StopSource和一个与之关联的StopToken,这将用于发出停止任务的请求。然后,它获取了一个线程池实例,并将变量a和b的值设置为10和20。然后,它创建了一个futures向量,用于保存每个任务的返回值。

然后,它向线程池提交了三个任务(分别对应add、sub、mul函数),并将返回的future对象添加到futures向量中。注意,它在提交任务时传递了StopToken和状态回调函数。在提交所有任务后,它调用RequestStop方法,请求停止所有任务。

接下来,它遍历futures向量并尝试获取每个任务的结果。如果任务被成功执行,它将打印结果。如果任务被取消或发生其他错误,它将捕获异常并打印错误信息。

#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

#include "src/cancel.h"
#include "src/io_util.h"
#include "src/macros.h"
#include "src/thread_pool.h"
using namespace arrow;

template <typename T>
static T add(T x, T y) {
  std::this_thread::sleep_for(std::chrono::seconds(2));
  return x + y;
}
template <typename T>
static T sub(T x, T y) {
  std::this_thread::sleep_for(std::chrono::seconds(2));
  return x - y;
}
template <typename T>
static T mul(T x, T y) {
  std::this_thread::sleep_for(std::chrono::seconds(2));
  return x * y;
}

void status_callback(const Status& status) {
  std::cout << "status_callback: " << status.ToString() << std::endl;
}

int main() {
  arrow::StopSource stop_source;
  arrow::StopToken stop_token = stop_source.token();
  auto threadPool = GetCpuThreadPool();

  int a = 10;
  int b = 20;

  std::vector<std::future<int>> futures;
  // Submit multiple tasks
  auto add_fut = threadPool->Submit(arrow::TaskHints{}, stop_token, status_callback, [a, b]() { return add(a, b); });
  auto sub_fut = threadPool->Submit(stop_token, [a, b]() { return sub(a, b); });
  auto mul_fut = threadPool->Submit(stop_token, [a, b]() { return mul(a, b); });
  futures.push_back(std::move(add_fut));
  futures.push_back(std::move(sub_fut));
  futures.push_back(std::move(mul_fut));
  stop_source.RequestStop();

  // Retrieve the results
  for (auto& future : futures) {
    try {
      int result = future.get();
      std::cout << "Result: " << result << std::endl;
    } catch (const std::exception& e) { /* will throw error. */
      std::cerr << "Exception occurred: " << e.what() << std::endl;
    }
  }

  return 0;
}

具体的使用还没有非常仔细的分析,如果作者出一个API就更好了

2. 内存池使用

内存池和线程池类似,项目地址为:https://github.com/Light-City/light-memory-pool。具体使用也比较简单,大致编写了一个使用Apache Arrow库的内存池进行内存分配和释放。首先,它创建了一个指向默认内存池的指针,并尝试分配14字节的内存。如果分配成功,它会将字符串”Hello, World!”复制到分配的内存中,并打印出来。然后,它尝试释放这些内存。如果分配失败,它将打印一条错误消息。需要注意的是,释放内存的字节数应与分配的字节数一致(这里释放的字节数不一致?)。

#include <iostream>

#include "src/memory_pool.h"

int main() {
  arrow::MemoryPool* pool = arrow::default_memory_pool();

  char* val;
  arrow::Status status = pool->Allocate(14, reinterpret_cast<uint8_t**>(&val));//在内存池中请求分配14个字节的内存,分配的内存地址将被存放在val指针中,同时返回一个状态对象。

  if (status.ok()) {
    std::cout << "Memory allocation successful." << std::endl;
    std::strcpy(val, "Hello, World!");
    std::cout << "Filled content: " << val << std::endl;
    pool->Free(reinterpret_cast<uint8_t*>(val), 4);//释放之前分配的内存
  } else {
    std::cout << "Memory allocation failed." << std::endl;
  }

  return 0;
}

3. 参考链接

https://zhuanlan.zhihu.com/p/610570326

https://github.com/Light-City/light-thread-pool

https://github.com/Light-City/light-memory-pool