等同于 D 语言中的 ExecutorService?
equivalent of an ExecutorService in D language?
D的文档有点难懂,下面Java代码如何在D中实现?
ExecutorService service = Executors.newFixedThreadPool(num_threads);
for (File f : files) {
service.execute(() -> process(f));
}
service.shutdown();
try {
service.awaitTermination(24, TimeUnit.HOURS);
} catch (InterruptedException e) {
e.printStackTrace();
}
我会使用 std.parallelism 还是 std.concurrency 还是标准库中没有此功能。
您发布的示例最好用 std.parallelism
表示。您可以在那里使用 parallel
辅助函数,当它在 foreach 中使用时,它将自动在线程数(工作线程大小)为 totalCPUs - 1
的线程池中执行 foreach 循环的主体。您可以通过在执行任何并行代码之前设置 defaultPoolThreads = x;
或使用自定义任务池来更改此默认值。
基本上您的代码将转换为:
foreach (f; files.parallel) {
process(f); // or just paste what should be done with f in here if it matters
}
std.parallelism是多线程的高级实现。如果你只想拥有一个任务池,你可以创建一个 new TaskPool()
(将工人数量作为可选参数),然后使用 service.parallel(files)
.
执行与上面相同的操作
或者,您可以使用
对大量任务进行排队
foreach (f; files) {
service.put!process(f);
}
service.finish(true); // true = blocking
// you could also do false here in a while true loop with sleeps to implement a timeout
这将允许实现超时。
尽管我建议使用并行,因为它会为您处理上面的代码 + 为每个线程提供一个存储空间以访问本地堆栈,因此您可以像使用普通的非并行 foreach 循环一样使用它。
关于文档的side-note/explanation:
std.concurrency
也非常有用,尽管不是您要在示例中使用的内容。其中有一个 spawn 函数,它使用强大的消息 API 生成一个新线程。通过消息传递 API(发送和接收),您可以在线程之间实现线程安全的值传递,而无需使用套接字、文件或其他变通方法。
当您有一个任务(带有消息传递的线程 API)并在其中调用 receive
时,它将等待传递的超时完成或另一个线程调用 send
函数任务。例如,您可以有一个文件加载队列任务,它总是使用接收等待,例如UI 将文件放入加载队列(只需调用一次或多次发送),它可以处理这些文件并将它们发送回 UI 任务,该任务在主循环中使用超时接收。
std.concurrency
还有一个 FiberScheduler
可用于在单线程中进行线程样式编程。例如,如果你有一个 UI 进行绘图和输入处理以及它可以做的所有事情,那么它可以在主循环中的每个滴答调用 FiberScheduler 并且所有当前 运行 任务将在它们最后的地方继续停止(通过调用 yield
)。当你喜欢一个需要很长时间生成的图像生成器时,这很有用,但你不想阻塞 UI 太久,所以你每次迭代调用 yield()
来停止执行生成器并执行主循环的一步。
当光纤不是 运行 时,它们甚至可以绕过线程传递,因此您可以从 std.parallelism 和自定义 FiberScheduler 实现中获得一个线程池,并进行负载平衡,这在例如网络服务器。
如果你想在没有 FiberScheduler 的情况下创建 Fibers 并将它们称为原始(并检查它们的完成状态并将它们从任何自定义调度程序实现中删除)你可以继承 Fiber
class 来自 core.thread
,它的工作原理和线程完全一样,你只需要在每次等待或认为自己处于CPU密集区时调用Fiber.yield()。
虽然因为大多数 API 不是为纤维制造的,它们会阻塞并使纤维看起来有点无用,所以你肯定想使用一些在那里使用纤维的 API。例如 vibe.d 有很多基于 fiber 的函数,但是有一个自定义的 std.concurrency 实现,所以你需要留意它。
但回到您的问题,TaskPool
或在您的特定情况下,parallel
函数就是您所需要的。
https://dlang.org/phobos/std_parallelism.html#.parallel
https://dlang.org/phobos/std_parallelism.html#.TaskPool.parallel
D的文档有点难懂,下面Java代码如何在D中实现?
ExecutorService service = Executors.newFixedThreadPool(num_threads);
for (File f : files) {
service.execute(() -> process(f));
}
service.shutdown();
try {
service.awaitTermination(24, TimeUnit.HOURS);
} catch (InterruptedException e) {
e.printStackTrace();
}
我会使用 std.parallelism 还是 std.concurrency 还是标准库中没有此功能。
您发布的示例最好用 std.parallelism
表示。您可以在那里使用 parallel
辅助函数,当它在 foreach 中使用时,它将自动在线程数(工作线程大小)为 totalCPUs - 1
的线程池中执行 foreach 循环的主体。您可以通过在执行任何并行代码之前设置 defaultPoolThreads = x;
或使用自定义任务池来更改此默认值。
基本上您的代码将转换为:
foreach (f; files.parallel) {
process(f); // or just paste what should be done with f in here if it matters
}
std.parallelism是多线程的高级实现。如果你只想拥有一个任务池,你可以创建一个 new TaskPool()
(将工人数量作为可选参数),然后使用 service.parallel(files)
.
或者,您可以使用
对大量任务进行排队foreach (f; files) {
service.put!process(f);
}
service.finish(true); // true = blocking
// you could also do false here in a while true loop with sleeps to implement a timeout
这将允许实现超时。
尽管我建议使用并行,因为它会为您处理上面的代码 + 为每个线程提供一个存储空间以访问本地堆栈,因此您可以像使用普通的非并行 foreach 循环一样使用它。
关于文档的side-note/explanation:
std.concurrency
也非常有用,尽管不是您要在示例中使用的内容。其中有一个 spawn 函数,它使用强大的消息 API 生成一个新线程。通过消息传递 API(发送和接收),您可以在线程之间实现线程安全的值传递,而无需使用套接字、文件或其他变通方法。
当您有一个任务(带有消息传递的线程 API)并在其中调用 receive
时,它将等待传递的超时完成或另一个线程调用 send
函数任务。例如,您可以有一个文件加载队列任务,它总是使用接收等待,例如UI 将文件放入加载队列(只需调用一次或多次发送),它可以处理这些文件并将它们发送回 UI 任务,该任务在主循环中使用超时接收。
std.concurrency
还有一个 FiberScheduler
可用于在单线程中进行线程样式编程。例如,如果你有一个 UI 进行绘图和输入处理以及它可以做的所有事情,那么它可以在主循环中的每个滴答调用 FiberScheduler 并且所有当前 运行 任务将在它们最后的地方继续停止(通过调用 yield
)。当你喜欢一个需要很长时间生成的图像生成器时,这很有用,但你不想阻塞 UI 太久,所以你每次迭代调用 yield()
来停止执行生成器并执行主循环的一步。
当光纤不是 运行 时,它们甚至可以绕过线程传递,因此您可以从 std.parallelism 和自定义 FiberScheduler 实现中获得一个线程池,并进行负载平衡,这在例如网络服务器。
如果你想在没有 FiberScheduler 的情况下创建 Fibers 并将它们称为原始(并检查它们的完成状态并将它们从任何自定义调度程序实现中删除)你可以继承 Fiber
class 来自 core.thread
,它的工作原理和线程完全一样,你只需要在每次等待或认为自己处于CPU密集区时调用Fiber.yield()。
虽然因为大多数 API 不是为纤维制造的,它们会阻塞并使纤维看起来有点无用,所以你肯定想使用一些在那里使用纤维的 API。例如 vibe.d 有很多基于 fiber 的函数,但是有一个自定义的 std.concurrency 实现,所以你需要留意它。
但回到您的问题,TaskPool
或在您的特定情况下,parallel
函数就是您所需要的。
https://dlang.org/phobos/std_parallelism.html#.parallel https://dlang.org/phobos/std_parallelism.html#.TaskPool.parallel