为文件写入实现互斥锁

Implementing mutexes for file writes

我正在尝试使用互斥锁来避免对同一线程进行多次写入 inC/Cpp。下面是我的程序流程。我对在哪里包含我的锁定和解锁代码感到困惑。

main() {
    spawn a worker thread
}
worker_thread() {
    read the input file name 
    read some content
    write the content to the given file name
}

我看到的大多数实现似乎都是这样的:

main() {
    pthread_mutex_init(&myMutex;,0);
    *spawn a worker thread*
    pthread_join(thread1, 0);
    pthread_mutex_destroy(&myMutex;);
}
worker_thread() {
    read the input file name 
    read some content
    write the content to the given file name
}

我想要的是这样的:

main() {
    spawn a worker thread
}
worker_thread() {
    read the input file name 
    read some content
    pthread_mutex_init(&myMutex;,0) --> for the given file?
    write the content to the given file name
    pthread_mutex_destroy(&myMutex;);
}

非常感谢任何继续进行的想法。谢谢!

为 iostream 创建一个包装器以确保一次只有一个线程可以写入流是相当容易的。不幸的是,几乎就在您这样做的同时,您 运行 陷入了另一个问题。它确保一次只有一个线程可以插入到流中,因此您可以获得定义的行为。但是,如果您有类似的内容:

线程 1:sync_stream << a << b << c << '\n';
线程 2:sync_stream << x << y << z << '\n';

你想要的是:

abc
xyz

...否则:

xyz
abc

由于它们位于不同的线程中,因此它们之间的顺序可以有所不同,但是一个线程的一行输出应该保持为一行输出。类似于:

abxy
cz

...可能不希望或不可接受。为了防止这种情况,我们确实需要 两个 个单独的 classes。一种是同步流。另一个是让我们将一些(或多或少任意的)插入流作为一个单一的、不可分割的"transaction"。为此,我们可以使用一对 classes:

class transaction {
    std::ostringstream buffer;
public:
    transaction(std::string const &s="") : buffer(s, std::ios::out | std::ios::ate) {}

    template <class T>
    transaction &operator<<(T const &t) {
        buffer << t;
        return *this;
    }

    friend std::ostream &operator<<(std::ostream &os, transaction const &t) {
        return os << t.buffer.str();
    }
};

class sync_stream {
    std::ostream &out;
    std::mutex mutex;
public:
    sync_stream(std::ostream &sink) : out(sink) { }

    void operator<<(transaction const &t) {
        std::lock_guard<std::mutex> l(mutex);
        out << t;
    }    
};

请注意 transaction class 支持链接,但 sync_stream 不支持(您唯一可以插入其中的是 transaction)。要使用它们,我们做这样的事情:

for (int i=0; i<10; i++)
    threads[i] = std::thread([&]{ 
        for (int i=0; i<10; i++) 
            s << (transaction() << "Thread: " << std::this_thread::get_id() << "\n");
    });

这样,线程认为是单个输出的内容实际上是单个输出,因此我们的结果可能如下所示:

Thread: 140375947724544
Thread: 140376068564736
Thread: 140375964509952
Thread: 140375964509952
Thread: 140375972902656
Thread: 140375964509952

当然,您会得到与我不同的线程 ID,并且行的顺序可能会有所不同——但每一行都将作为一个完整的单元编写。

总结

工作线程根本不应该直接与互斥锁一起工作。这应该是自动化的,这样工作线程就可以专注于它的工作,并且只需花费最少的精力在它完成工作所需的底层机制上。