使用队列在 D 中的线程之间进行通信
Use a Queue to Communicate Between Threads in D
在 D 中使用 std.container.dlist
.
创建队列类型很容易
我希望有多个线程,但让它们与队列通信,而不是通过消息传递 (https://tour.dlang.org/tour/en/multithreading/message-passing)。据我了解,消息旨在始终在代码中的特定点接收数据;接收线程将阻塞,直到收到预期的数据。
(编辑:我被告知 receiveTimeout 但没有超时,在这种情况下只检查更合适(可能超时为 0?)。我也不确定消息是什么 API 如果在收到任何消息之前发送了多条消息。我将不得不玩这个。)
void main() {
spawn(&worker, thisTid);
// This line will block until the expected message is received.
receive (
(string message) {
writeln("Received the message: ", text);
},
)
}
我需要的是仅在有数据的情况下接收数据。像这样:
void main() {
Queue!string queue// custom `Queue` type based on DList
spawn(&worker, queue);
while (true) {
// Go through any messages (while consuming `queue`)
for (string message; queue) {
writeln("Received a message: ", text);
}
// Do other stuff
}
}
我尝试使用 shared
变量 (https://tour.dlang.org/tour/en/multithreading/synchronization-sharing),但 DMD 抱怨 "Aliases to mutable thread-local data not allowed." 或其他一些错误,具体取决于。
这在 D 中如何完成?或者,有没有办法使用消息来进行这种通信?
这并没有回答具体问题,但我确实澄清了我认为对传递消息的误解 api...
只需调用 receiveTimeout
而不是普通的 receive
http://dpldocs.info/experimental-docs/std.concurrency.receiveTimeout.html
我用这个:
shared class Queue(T) {
private T[] queue;
synchronized void opOpAssign(string op)(T object) if(op == "~") {
queue ~= object;
}
synchronized size_t length(){
return queue.length;
}
synchronized T pop(){
assert(queue.length, "Please check queue length, is 0");
auto first = queue[0];
queue = queue[1..$];
return first;
}
synchronized shared(T[]) consume(){
auto copy = queue;
queue = [];
return copy;
}
}
我得到了我需要的答案。
简单地说,使用core.thread
而不是std.concurrency
。 std.concurrency
为您管理消息,不允许您自己管理。 core.thread
是 std.concurrency
内部使用的。
较长的答案,这是我完全实现它的方式。
我创建了一个基于 Singly Linked List but maintains a pointer of the last element. The Queue
also uses standard component inputRange and outputRange (or at least I think it does) per Walter Brights vision (https://www.youtube.com/watch?v=cQkBOCo8UrE 的 Queue
类型。
Queue
也被构建为允许一个线程写入,另一个线程读取,内部互斥很少,因此它应该很快。
我在这里分享的队列https://pastebin.com/ddyPpLrp
让第二个线程读取输入的简单实现:
Queue!string inputQueue = new Queue!string;
ThreadInput threadInput = new ThreadInput(inputQueue);
threadInput.start;
while (true) {
foreach (string value; inputQueue) {
writeln(value);
}
}
ThreadInput
被定义为:
class ThreadInput : Thread {
private Queue!string queue;
this(Queue!string queue) {
super(&run);
this.queue = queue;
}
private void run() {
while (true) {
queue.put(readln);
}
}
}
代码https://pastebin.com/w5jwRVrL
Queue
再https://pastebin.com/ddyPpLrp
在 D 中使用 std.container.dlist
.
我希望有多个线程,但让它们与队列通信,而不是通过消息传递 (https://tour.dlang.org/tour/en/multithreading/message-passing)。据我了解,消息旨在始终在代码中的特定点接收数据;接收线程将阻塞,直到收到预期的数据。
(编辑:我被告知 receiveTimeout 但没有超时,在这种情况下只检查更合适(可能超时为 0?)。我也不确定消息是什么 API 如果在收到任何消息之前发送了多条消息。我将不得不玩这个。)
void main() {
spawn(&worker, thisTid);
// This line will block until the expected message is received.
receive (
(string message) {
writeln("Received the message: ", text);
},
)
}
我需要的是仅在有数据的情况下接收数据。像这样:
void main() {
Queue!string queue// custom `Queue` type based on DList
spawn(&worker, queue);
while (true) {
// Go through any messages (while consuming `queue`)
for (string message; queue) {
writeln("Received a message: ", text);
}
// Do other stuff
}
}
我尝试使用 shared
变量 (https://tour.dlang.org/tour/en/multithreading/synchronization-sharing),但 DMD 抱怨 "Aliases to mutable thread-local data not allowed." 或其他一些错误,具体取决于。
这在 D 中如何完成?或者,有没有办法使用消息来进行这种通信?
这并没有回答具体问题,但我确实澄清了我认为对传递消息的误解 api...
只需调用 receiveTimeout
而不是普通的 receive
http://dpldocs.info/experimental-docs/std.concurrency.receiveTimeout.html
我用这个:
shared class Queue(T) {
private T[] queue;
synchronized void opOpAssign(string op)(T object) if(op == "~") {
queue ~= object;
}
synchronized size_t length(){
return queue.length;
}
synchronized T pop(){
assert(queue.length, "Please check queue length, is 0");
auto first = queue[0];
queue = queue[1..$];
return first;
}
synchronized shared(T[]) consume(){
auto copy = queue;
queue = [];
return copy;
}
}
我得到了我需要的答案。
简单地说,使用core.thread
而不是std.concurrency
。 std.concurrency
为您管理消息,不允许您自己管理。 core.thread
是 std.concurrency
内部使用的。
较长的答案,这是我完全实现它的方式。
我创建了一个基于 Singly Linked List but maintains a pointer of the last element. The Queue
also uses standard component inputRange and outputRange (or at least I think it does) per Walter Brights vision (https://www.youtube.com/watch?v=cQkBOCo8UrE 的 Queue
类型。
Queue
也被构建为允许一个线程写入,另一个线程读取,内部互斥很少,因此它应该很快。
我在这里分享的队列https://pastebin.com/ddyPpLrp
让第二个线程读取输入的简单实现:
Queue!string inputQueue = new Queue!string;
ThreadInput threadInput = new ThreadInput(inputQueue);
threadInput.start;
while (true) {
foreach (string value; inputQueue) {
writeln(value);
}
}
ThreadInput
被定义为:
class ThreadInput : Thread {
private Queue!string queue;
this(Queue!string queue) {
super(&run);
this.queue = queue;
}
private void run() {
while (true) {
queue.put(readln);
}
}
}
代码https://pastebin.com/w5jwRVrL
Queue
再https://pastebin.com/ddyPpLrp