使用队列在 D 中的线程之间进行通信

Use a Queue to Communicate Between Threads in D

在 D 中使用 std.container.dlist.

创建队列类型很容易

我希望有多个线程,但让它们与队列通信,而不是通过消息传递 (https://tour.dlang.org/tour/en/multithreading/message-passing)。据我了解,消息旨在始终在代码中的特定点接收数据;接收线程将阻塞,直到收到预期的数据。

(编辑:我被告知 receiveTimeout 但没有超时,在这种情况下只检查更合适(可能超时为 0?)。我也不确定消息是什么 API 如果在收到任何消息之前发送了多条消息。我将不得不玩这个。)

void main() {
    spawn(&worker, thisTid);

    // This line will block until the expected message is received.
    receive (
        (string message) {
            writeln("Received the message: ", text);
        },
    )
}

我需要的是仅在有数据的情况下接收数据。像这样:

void main() {
    Queue!string queue// custom `Queue` type based on DList

    spawn(&worker, queue);

    while (true) {
        // Go through any messages (while consuming `queue`)
        for (string message; queue) {
            writeln("Received a message: ", text);
        }
        // Do other stuff
    }
}

我尝试使用 shared 变量 (https://tour.dlang.org/tour/en/multithreading/synchronization-sharing),但 DMD 抱怨 "Aliases to mutable thread-local data not allowed." 或其他一些错误,具体取决于。

这在 D 中如何完成?或者,有没有办法使用消息来进行这种通信?

这并没有回答具体问题,但我确实澄清了我认为对传递消息的误解 api...

只需调用 receiveTimeout 而不是普通的 receive

http://dpldocs.info/experimental-docs/std.concurrency.receiveTimeout.html

我用这个:

shared class Queue(T) {

    private T[] queue;

    synchronized void opOpAssign(string op)(T object) if(op == "~") {
        queue ~= object;
    }

    synchronized size_t length(){
        return queue.length;
    }

    synchronized T pop(){
        assert(queue.length, "Please check queue length, is 0");
        auto first = queue[0];
        queue = queue[1..$];
        return first;
    }

    synchronized shared(T[]) consume(){
        auto copy = queue;
        queue = [];
        return copy;
    }

}

我得到了我需要的答案。

简单地说,使用core.thread而不是std.concurrencystd.concurrency 为您管理消息,不允许您自己管理。 core.threadstd.concurrency 内部使用的。

较长的答案,这是我完全实现它的方式。

我创建了一个基于 Singly Linked List but maintains a pointer of the last element. The Queue also uses standard component inputRange and outputRange (or at least I think it does) per Walter Brights vision (https://www.youtube.com/watch?v=cQkBOCo8UrEQueue 类型。 Queue 也被构建为允许一个线程写入,另一个线程读取,内部互斥很少,因此它应该很快。
我在这里分享的队列https://pastebin.com/ddyPpLrp

让第二个线程读取输入的简单实现:

Queue!string inputQueue = new Queue!string;
ThreadInput threadInput = new ThreadInput(inputQueue);
threadInput.start;

while (true) {
    foreach (string value; inputQueue) {
        writeln(value);
    }
}

ThreadInput 被定义为:

class ThreadInput : Thread {
    private Queue!string queue;

    this(Queue!string queue) {
        super(&run);
        this.queue = queue;
    }

    private void run() {
        while (true) {
            queue.put(readln);
        }
    }
}

代码https://pastebin.com/w5jwRVrL
Queuehttps://pastebin.com/ddyPpLrp