共享供应 运行 可以同时使用多个 tap 方块吗?
Can a shared supply run multiple tap blocks simultaneously?
考虑这段代码,其中点击需要一段时间才能完成。所有块同时运行(立即输出)然后休眠。大多数人没有完成,因为程序结束得比他们完成的早:
my $supply = Supply.interval(0.2);
my $tap = $supply.tap: { say "1 $^a"; sleep 5; };
sleep 5;
输出(省略)有 25 行(5 秒内每 0.2 个刻度一行):
1. 0
1. 1
...
1. 24
然后我将该供应更改为 .share
:
my $supply = Supply.interval(0.2).share;
my $tap = $supply.tap: { say "1. $^a"; sleep 5 };
sleep 5;
我只看到一行输入,但我期望得到相同的输出:
1. 1
.share
使得多次点击获得相同的值成为可能。
my $supply = Supply.interval(0.2).share;
my $tap = $supply.tap: { say "1. $^a"; sleep 5 };
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
输出仍然只有第一次点击的输出并且仍然只有一行。我预计每行 25 行:
1. 1
Supply
的点击在单个线程中按顺序 运行。因此,第二次点击的代码只会在第一次点击(休眠 5 秒)之后变为 运行。这显示在以下代码中:
my $supply = Supply.interval(0.2).share;
my $tap = $supply.tap: { say "1. $^a in #{+$*THREAD}" };
my $tap2 = $supply.tap: { say "2. $^a in #{+$*THREAD}" };
sleep 0.5;
===================
1. 1 in #4
2. 1 in #4
1. 2 in #4
2. 2 in #4
所以目前的答案是:否
Supply
的基本规则是:
- 没有明确要求就不会引入并发
- Back-pressure 通过 sender-pays 模型
- 一条消息在下一条消息之前被完整处理(所以
.map({ ...something with state... })
可以相信不会引起状态冲突)
规则 3 并不真正适用于 share
,因为在那之后有单独的下游操作链,但规则 1 和 2 适用。 share
的目的是允许 publish/subscribe,并且还提供 re-use 由多个下游消息处理器处理的块。引入并行消息处理是一个独立的问题。
有多种选择。一种是将并行处理的消息放入Channel
。这明确地引入了一个消息缓冲的地方(好吧,直到你 运行 内存不足......这正是 Supply
带有 sender-pays back-pressure 模型的原因).将 Channel
强制返回 Supply
获取从 Channel
提取的值,并在池线程的 Supply
上发出。这样看起来像:
my $supply = Supply.interval(0.2).share;
my $tap = $supply.Channel.Supply.tap: { say "1. $^a"; sleep 5 };
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
请注意,由于 whenever
会自动强制要求它对 Supply
做出反应的事物,因此它看起来像 whenever $supply.Channel { }
,这使它成为一个非常简短的解决方案 -但同时又非常明确地表明 正常的 back-pressure 机制是如何 side-stepped 的。此解决方案的另一个 属性 是它保留了消息的顺序,并且仍然在 Channel
.
的下游提供 one-at-a-time 处理
另一种方法是通过启动一些异步工作来处理每条消息,从而对每条消息做出反应。 Supply
上的 start
操作会为接收到的每条消息调度它传递给线程池上 运行 的块,因此不会阻塞下一条消息的到达。结果是 Supply
的 Supply
。这迫使人们挖掘每个内部 Supply
以实际发生任何事情,一开始看起来有点 counter-intuitive,但实际上是为了程序员的利益:它清楚地表明还有一些额外的异步工作保持对...的跟踪。我强烈建议将它与 react
/whenever
语法结合使用,后者会自动进行订阅管理和错误传播。题中代码最直接的改造是:
my $supply = Supply.interval(0.2).share;
my $tap = supply { whenever $supply.start({ say "1. $^a"; sleep 5 }) { whenever $_ {} } }.tap;
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
尽管也可以将其写为:
my $supply = Supply.interval(0.2).share;
my $tap = supply { whenever $supply -> $a { whenever start { say "1. $a"; sleep 5 } {} } }.tap;
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
这表明写 parallelize
Supply
组合子的可能性:
my $supply = Supply.interval(0.2).share;
my $tap = parallelize($supply, { say "1. $^a"; sleep 5 }).tap;
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
sub parallelize(Supply $messages, &operation) {
supply {
whenever $messages -> $value {
whenever start operation($value) {
emit $_;
}
}
}
}
这种方法的输出结果与 Channel
方法有很大不同,因为消息一进入,操作就全部开始。而且它不保留消息顺序。仍然有一个隐式队列(与使用 Channel
方法的显式队列不同),只是现在是线程池调度程序的工作队列和 OS 调度程序必须跟踪 in-progress 工作。同样,没有 back-pressure,但请注意,完全有可能通过跟踪未完成的 Promises
并使用 await Promise.anyof(@outstanding)
.
阻止进一步传入的消息来实现它
最后,我会注意到 hyper whenever
和 race whenever
结构的一些考虑,以提供一些 language-level 机制来处理 Supply
消息的并行处理.然而,它们的语义,以及它们如何影响 supply
-block 设计目标和安全属性,代表了重大的设计挑战。
考虑这段代码,其中点击需要一段时间才能完成。所有块同时运行(立即输出)然后休眠。大多数人没有完成,因为程序结束得比他们完成的早:
my $supply = Supply.interval(0.2);
my $tap = $supply.tap: { say "1 $^a"; sleep 5; };
sleep 5;
输出(省略)有 25 行(5 秒内每 0.2 个刻度一行):
1. 0
1. 1
...
1. 24
然后我将该供应更改为 .share
:
my $supply = Supply.interval(0.2).share;
my $tap = $supply.tap: { say "1. $^a"; sleep 5 };
sleep 5;
我只看到一行输入,但我期望得到相同的输出:
1. 1
.share
使得多次点击获得相同的值成为可能。
my $supply = Supply.interval(0.2).share;
my $tap = $supply.tap: { say "1. $^a"; sleep 5 };
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
输出仍然只有第一次点击的输出并且仍然只有一行。我预计每行 25 行:
1. 1
Supply
的点击在单个线程中按顺序 运行。因此,第二次点击的代码只会在第一次点击(休眠 5 秒)之后变为 运行。这显示在以下代码中:
my $supply = Supply.interval(0.2).share;
my $tap = $supply.tap: { say "1. $^a in #{+$*THREAD}" };
my $tap2 = $supply.tap: { say "2. $^a in #{+$*THREAD}" };
sleep 0.5;
===================
1. 1 in #4
2. 1 in #4
1. 2 in #4
2. 2 in #4
所以目前的答案是:否
Supply
的基本规则是:
- 没有明确要求就不会引入并发
- Back-pressure 通过 sender-pays 模型
- 一条消息在下一条消息之前被完整处理(所以
.map({ ...something with state... })
可以相信不会引起状态冲突)
规则 3 并不真正适用于 share
,因为在那之后有单独的下游操作链,但规则 1 和 2 适用。 share
的目的是允许 publish/subscribe,并且还提供 re-use 由多个下游消息处理器处理的块。引入并行消息处理是一个独立的问题。
有多种选择。一种是将并行处理的消息放入Channel
。这明确地引入了一个消息缓冲的地方(好吧,直到你 运行 内存不足......这正是 Supply
带有 sender-pays back-pressure 模型的原因).将 Channel
强制返回 Supply
获取从 Channel
提取的值,并在池线程的 Supply
上发出。这样看起来像:
my $supply = Supply.interval(0.2).share;
my $tap = $supply.Channel.Supply.tap: { say "1. $^a"; sleep 5 };
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
请注意,由于 whenever
会自动强制要求它对 Supply
做出反应的事物,因此它看起来像 whenever $supply.Channel { }
,这使它成为一个非常简短的解决方案 -但同时又非常明确地表明 正常的 back-pressure 机制是如何 side-stepped 的。此解决方案的另一个 属性 是它保留了消息的顺序,并且仍然在 Channel
.
另一种方法是通过启动一些异步工作来处理每条消息,从而对每条消息做出反应。 Supply
上的 start
操作会为接收到的每条消息调度它传递给线程池上 运行 的块,因此不会阻塞下一条消息的到达。结果是 Supply
的 Supply
。这迫使人们挖掘每个内部 Supply
以实际发生任何事情,一开始看起来有点 counter-intuitive,但实际上是为了程序员的利益:它清楚地表明还有一些额外的异步工作保持对...的跟踪。我强烈建议将它与 react
/whenever
语法结合使用,后者会自动进行订阅管理和错误传播。题中代码最直接的改造是:
my $supply = Supply.interval(0.2).share;
my $tap = supply { whenever $supply.start({ say "1. $^a"; sleep 5 }) { whenever $_ {} } }.tap;
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
尽管也可以将其写为:
my $supply = Supply.interval(0.2).share;
my $tap = supply { whenever $supply -> $a { whenever start { say "1. $a"; sleep 5 } {} } }.tap;
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
这表明写 parallelize
Supply
组合子的可能性:
my $supply = Supply.interval(0.2).share;
my $tap = parallelize($supply, { say "1. $^a"; sleep 5 }).tap;
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
sub parallelize(Supply $messages, &operation) {
supply {
whenever $messages -> $value {
whenever start operation($value) {
emit $_;
}
}
}
}
这种方法的输出结果与 Channel
方法有很大不同,因为消息一进入,操作就全部开始。而且它不保留消息顺序。仍然有一个隐式队列(与使用 Channel
方法的显式队列不同),只是现在是线程池调度程序的工作队列和 OS 调度程序必须跟踪 in-progress 工作。同样,没有 back-pressure,但请注意,完全有可能通过跟踪未完成的 Promises
并使用 await Promise.anyof(@outstanding)
.
最后,我会注意到 hyper whenever
和 race whenever
结构的一些考虑,以提供一些 language-level 机制来处理 Supply
消息的并行处理.然而,它们的语义,以及它们如何影响 supply
-block 设计目标和安全属性,代表了重大的设计挑战。