如何启动多个线程并且每个线程处理不同的文件?

How to launch multiple threads and each thread working on different files?

我有一个单线程应用程序,它通过调用 send_new_file

向其他服务器发送文件
void send_new_file_command::start_sending_file()
{
    m_thread = thread(&send_new_file_command::execute_file, this);
}

void send_new_file_command::execute_file()
{
    for (auto it = files_need_to_send.begin(); it != files_need_to_send.end() && !is_complete(); ++it)
    {
        {
            std::unique_lock<spinning_lock> guard(lock_obj);
            m_current_file = *it;
        }
        // send a file.
        // I want to call this in parallel
        send_new_file(*it);
    }
}

有什么方法可以让多个线程每个线程发送一个文件。例如,假设我们有 4 个线程,线程 1、2、3、4 将并行发送不同的文件。我想并行调用send_new_file

我正在使用 std::thread。我正在查看有关如何在 C++ 中执行此操作的线程示例,但对如何在此处划分每个线程的文件数并确保每个线程在文件子集上工作感到困惑。

  std::vector<std::thread> threads;
  for (int i = 0; i < 4; ++i)
    threads.push_back(std::thread(send_new_file(*it)));

我的背景是 Java 所以有点混淆如何使用 std::thread 在 C++ 中做到这一点。

第一种方法

第一个简单的解决方案:

  • 您的 class 包含要处理的文件向量
  • 只有一个线程通过函数execute_file()
  • 管理这个vector
  • 此函数根据需要创建尽可能多的线程,每个线程处理一个文件
  • 最后,所有的线程都加入了(强制)

代码看起来像这样:

struct send_new_file_command {
    vector<string> files_need_to_send;
public:
    send_new_file_command(vector<string> f) : files_need_to_send(f) {}
    void execute_file();
};
void send_new_file_command::execute_file()
{
    vector<thread> exec;
    for(auto it = files_need_to_send.begin(); it != files_need_to_send.end(); ++it)
    {
        exec.push_back(thread(send_new_file, *it));
    }
    for(auto &e : exec)
        e.join();
}

可以使用以下代码测试代码:

void send_new_file(string x) { // simulator 
    for(int i = 0; i<10; i++) {
        cout << x << endl;
        this_thread::sleep_for(chrono::milliseconds(500));
    }
}
int main() {
    vector<string>vs{"a", "b", "c", "d"};
    send_new_file_command sfc(vs);
    sfc.execute_file();
    return 0;
}

这个解决方案非常简单。它有两个主要缺点:

  • 它可能会启动比您的硬件管理更多的线程。所以他们中只有少数人真正运行并发。
  • 线程专用于文件。 F如果是短文件,线程再次空闲,则不会被重用。

其他解决方案

还有很多其他解决方案。例如:

  • 这个的一个变体,将启动固定数量的线程,一旦准备就绪,每个线程都会查找要为下一个项目处理的文件向量。然后你需要引入强锁定。

  • 您可以考虑使用 futures,而不是使用原始线程,启动 std::async(std::launch::async, send_new_file, *it);

性能方面的最佳方法:

  1. 使用 std::atomic<int>
  2. 声明一个计数器变量
  3. 在向量、数组等中创建线程
  4. 为每个线程调用 join

线程的main函数然后访问并递增共享计数器并将结果保存在循环中的局部变量中:

std::atomic<int> counter = 0;
for(int j = 0;j<4;j++)
{
    threads.push_back(std::thread([&](){
        for(int i; (i = counter++) < size;)//the counter variable must be atomic!
        {
            do_work(i);
        }
    }));
}

for(int j = 0;j<4;j++)
    threads[i].join();

这是一种使用作品 queue 的相当简单的方法。您可以将代码片段连接成一个 self-contained 程序。我们将使用以下标准库 headers.

#include <fstream>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

首先,我们定义一个函数,它接受一个文件名并将它发送到它应该去的任何地方。我将通过简单地将其写入 /dev/null.

来模拟它
void
send_file(const std::string& filename)
{
  std::ifstream istr {};
  std::ofstream ostr {};
  std::string line {};
  istr.exceptions(std::ifstream::badbit);
  ostr.exceptions(std::ofstream::badbit);
  istr.open(filename);
  ostr.open("/dev/null");
  while (std::getline(istr, line))
    ostr << line << '\n';
}

接下来,我们定义一个函数,它接受一个指向 std::vector 仍需要发送的文件的指针和另一个指向应该保护该向量的 std::mutex 的指针。我使用指针而不是引用,因为这允许我稍后创建更简单的 std::threads。如果你不喜欢,你不需要这样做。

int
send_files(std::vector<std::string> *const files_p, std::mutex *const mutex_p)
{
  auto count = 0;
  while (true)
    {
      std::string next {};
      {
        const std::unique_lock<std::mutex> lck {*mutex_p};
        if (files_p->empty())  // nothing left to do
          return count;
        next = std::move(files_p->back());
        files_p->pop_back();
      }
      send_file(next);
      count += 1;
    }
}

重要的是我们在执行发送文件的实际工作时不持有锁。否则,我们将完全扼杀并发性。我也很小心,在持有锁的同时不要分配任何内存。通常,当 queue 发生变化时,您会看到 std::lists 用作工作 queues 和 std::condition_variables 以发出信号。前段时间我在 another answer 中发布了显示此内容的代码。 然而,在这个简单的例子中,queue 只会被删除,所以 std::vector 是一个完美的选择。

最后,我们使用我们在一个简单的程序中所拥有的,该程序为每个硬件并发单元创建一个线程,并要求这些线程发送命令行参数中指定的所有文件。请注意,如所写,这将以相反的顺序处理列表。但是,如果这对您来说是个问题,那么改变是微不足道的。

int
main(int argc, char * * argv)
{
  const auto nthreads = std::thread::hardware_concurrency();
  std::mutex mutex {};
  std::vector<std::thread> threads {};
  std::vector<std::string> files {};
  files.reserve(argc - 1);
  for (auto i = 1; i < argc; ++i)
    files.push_back(argv[i]);
  threads.reserve(nthreads);
  for (auto t = 0U; t < nthreads; ++t)
    threads.emplace_back(send_files, &files, &mutex);
  for (auto t = 0U; t < nthreads; ++t)
    threads[t].join();
}