如何强制线程轮流连接到字符串?

How do I force threads to take turn concatenating to a string?

基本上我有 2 个文本文件,每个文件都有一堆全都是 1 个字符长的行。一个文件中的每个字符都是一个字母或零,如果字符为零,我需要查看另一个文件以查看应该包含什么。我的目标是启动两个线程,每个线程读取一个单独的文件并将每个字符添加到一个字符串中。

文件 1:

t
0
i
s
0
0
0
t
e
0
t

文件 2:

0
h
0
0
i
s
a
0
0
s
0

所以这个的预期输出应该是'thisisatest'。

我目前能够 运行 这两个线程并让它们各自读取各自的文件,而且我知道我需要使用互斥锁 lock() 和 unlock() 来确保只有一个线程正在添加到字符串中,但我无法弄清楚如何实现它。

mutex m;
int i = 0;
string s = "";

void *readFile(string fileName) {
    ifstream file;

    char a;

    file.open(fileName);
    if(!file) {
        cout << "Failed to open file." << endl;
        exit(1);
    }
    while(file >> a) {
        if(a == '0') {

        } else {
            s += a;
        }
    }
}
int main() {

    thread p1(readFile, "Person1");
    thread p2(readFile, "Person2");

    p1.join();
    p2.join();

    cout << s << endl;

    return 0;
}

我试过将 m.lock() 放在 while() 循环中,并将 m.unlock() 嵌套在 if() 语句中,但它没有用。目前我的代码将只输出没有零的文件 1 和没有零连接的文件 2(没有任何特定的顺序,因为没有办法预测哪个线程先完成)。

我希望程序查看文本文件,检查当前行的字符,如果是字母,则将其连接到字符串 s,如果是零,则暂停该线程,让其他线程线程检查它的行。

您需要确保两个线程 运行 同步,轮流一次读取一行。当读取到 0 时,跳过转弯,否则打印值。

为此你可以使用:

  • 工作线程之间共享的变量,用于跟踪轮次;
  • 一个条件变量,用于通知线程轮换;
  • 使条件变量起作用的互斥体。

这是一个演示轮流方法的工作示例:

#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>

int main() {
    std::mutex mtx;
    std::condition_variable cond;
    int turn = 0;
    auto task = [&](int myturn, int turns) {
        std::unique_lock<std::mutex> lock(mtx);
        while (turn < 9) {
            cond.wait(lock, [&] { return turn % turns == myturn; });
            std::cout << "Task " << myturn << std::endl;
            turn++;
            cond.notify_all();
        }
    };
    std::thread p1(task, 0, 2);
    std::thread p2(task, 1, 2);

    p1.join();
    p2.join();

    std::cout << "Done" << std::endl;
}

输出:

Task 0
Task 1
Task 0
Task 1
Task 0
Task 1
Task 0
Task 1
Task 0
Task 1
Done

考虑到每个字母必须到达的字符串中的索引位置是预先确定的,并且很容易从数据中计算出来。

读取第二个文件的线程:

0
h
0
0
i
s

知道它不负责str[0]str[2]str[3]处的字符,但负责str[1]str[4]str[5].

如果我们添加一个互斥锁和一个条件变量,算法就很简单了。

index = 0
while reading a line from the file succeeds: {
  if the line isn't "0": {
     lock(mutex)
     while length(str) < index: {
        wait(condition, mutex)
     }
     assert(length(str) == index)
     add line[0] to end of str
     unlock(mutex)
     broadcast(condition)
  }
  index++
}

基本上,对于线程需要写入的每个字符,它都知道索引。它等待字符串先达到那么长,其他线程将执行此操作。每当一个线程添加一个字符时,它都会广播条件变量,以唤醒另一个想要在新索引处放置一个字符的线程。

assert 检查永远不会失败,除非数据是错误的(告诉两个或更多线程将一个字符放在同一索引处)。此外,如果所有线程都在同一索引处命中 0 行,当然,这将死锁;每个线程都将等待另一个线程在该索引处放置一个字符。

另一种解决方案是使用称为屏障的同步对象。这个问题对于障碍来说是完美的,因为我们拥有的是一组并行处理一些数据元组的线程。对于每个元组,只有一个线程必须采取行动。

算法是这样的:

// initialization:
init(barrier, 2)  // number of threads


// each thread:
while able to read line from file: {
   if line is not "0":
     append line[0] to str
   wait(barrier)
}

wait(barrier) 所做的是延迟执行,直到有 2 个线程调用它(因为我们将其初始化为 2)。发生这种情况时,将释放所有线程。然后屏障为下一个 wait 重置自身,然后它将再次等待 2 个线程。

因此,执行是序列化的:线程在遍历文件时以锁定步骤执行循环体。读取字符而不是 0 的线程将其添加到字符串中。其他线程不触弦;他们直接进入屏障等待,因此没有数据竞争。