共享供应 运行 可以同时使用多个 tap 方块吗?

Can a shared supply run multiple tap blocks simultaneously?

考虑这段代码,其中点击需要一段时间才能完成。所有块同时运行(立即输出)然后休眠。大多数人没有完成,因为程序结束得比他们完成的早:

my $supply = Supply.interval(0.2);
my $tap = $supply.tap: { say "1 $^a"; sleep 5;  };
sleep 5;

输出(省略)有 25 行(5 秒内每 0.2 个刻度一行):

1. 0
1. 1
...
1. 24

然后我将该供应更改为 .share:

my $supply = Supply.interval(0.2).share;
my $tap = $supply.tap: { say "1. $^a"; sleep 5 };
sleep 5;

我只看到一行输入,但我期望得到相同的输出:

1. 1

.share 使得多次点击获得相同的值成为可能。

my $supply = Supply.interval(0.2).share;
my $tap  = $supply.tap: { say "1. $^a"; sleep 5 };
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;

输出仍然只有第一次点击的输出并且仍然只有一行。我预计每行 25 行:

1. 1

Supply 的点击在单个线程中按顺序 运行。因此,第二次点击的代码只会在第一次点击(休眠 5 秒)之后变为 运行。这显示在以下代码中:

my $supply = Supply.interval(0.2).share;
my $tap  = $supply.tap: { say "1. $^a in #{+$*THREAD}" };
my $tap2 = $supply.tap: { say "2. $^a in #{+$*THREAD}" };
sleep 0.5;
===================
1. 1 in #4
2. 1 in #4
1. 2 in #4
2. 2 in #4

所以目前的答案是:否

Supply 的基本规则是:

  1. 没有明确要求就不会引入并发
  2. Back-pressure 通过 sender-pays 模型
  3. 一条消息在下一条消息之前被完整处理(所以.map({ ...something with state... })可以相信不会引起状态冲突)

规则 3 并不真正适用于 share,因为在那之后有单独的下游操作链,但规则 1 和 2 适用。 share 的目的是允许 publish/subscribe,并且还提供 re-use 由多个下游消息处理器处理的块。引入并行消息处理是一个独立的问题。

有多种选择。一种是将并行处理的消息放入Channel。这明确地引入了一个消息缓冲的地方(好吧,直到你 运行 内存不足......这正是 Supply 带有 sender-pays back-pressure 模型的原因).将 Channel 强制返回 Supply 获取从 Channel 提取的值,并在池线程的 Supply 上发出。这样看起来像:

my $supply = Supply.interval(0.2).share;
my $tap  = $supply.Channel.Supply.tap: { say "1. $^a"; sleep 5 };
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;

请注意,由于 whenever 会自动强制要求它对 Supply 做出反应的事物,因此它看起来像 whenever $supply.Channel { },这使它成为一个非常简短的解决方案 -但同时又非常明确地表明 正常的 back-pressure 机制是如何 side-stepped 的。此解决方案的另一个 属性 是它保留了消息的顺序,并且仍然在 Channel.

的下游提供 one-at-a-time 处理

另一种方法是通过启动一些异步工作来处理每条消息,从而对每条消息做出反应。 Supply 上的 start 操作会为接收到的每条消息调度它传递给线程池上 运行 的块,因此不会阻塞下一条消息的到达。结果是 SupplySupply。这迫使人们挖掘每个内部 Supply 以实际发生任何事情,一开始看起来有点 counter-intuitive,但实际上是为了程序员的利益:它清楚地表明还有一些额外的异步工作保持对...的跟踪。我强烈建议将它与 react/whenever 语法结合使用,后者会自动进行订阅管理和错误传播。题中代码最直接的改造是:

my $supply = Supply.interval(0.2).share;
my $tap  = supply { whenever $supply.start({ say "1. $^a"; sleep 5 }) { whenever $_ {} } }.tap;
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;

尽管也可以将其写为:

my $supply = Supply.interval(0.2).share;
my $tap  = supply { whenever $supply -> $a { whenever start { say "1. $a"; sleep 5 } {} } }.tap;
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;

这表明写 parallelize Supply 组合子的可能性:

my $supply = Supply.interval(0.2).share;
my $tap  = parallelize($supply, { say "1. $^a"; sleep 5 }).tap;
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;

sub parallelize(Supply $messages, &operation) {
    supply {
        whenever $messages -> $value {
            whenever start operation($value) {
                emit $_;
            }
        }
     }
}

这种方法的输出结果与 Channel 方法有很大不同,因为消息一进入,操作就全部开始。而且它不保留消息顺序。仍然有一个隐式队列(与使用 Channel 方法的显式队列不同),只是现在是线程池调度程序的工作队列和 OS 调度程序必须跟踪 in-progress 工作。同样,没有 back-pressure,但请注意,完全有可能通过跟踪未完成的 Promises 并使用 await Promise.anyof(@outstanding).

阻止进一步传入的消息来实现它

最后,我会注意到 hyper wheneverrace whenever 结构的一些考虑,以提供一些 language-level 机制来处理 Supply 消息的并行处理.然而,它们的语义,以及它们如何影响 supply-block 设计目标和安全属性,代表了重大的设计挑战。