从内部更改 `whenever` 块的目标

Changing the target of a `whenever` block from the inside

以下代码尝试对一个 Supply 做出反应,然后根据某些消息的内容改变主意并对来自另一个 Supply 的消息做出反应。它试图提供与 Supply.migrate 类似的行为,但具有更多的控制权。

my $c1 = Supplier.new;
my $c2 = Supplier.new;

my $s = supply {
    my $currently-listening-to = $c1.Supply;
    my $other-var = 'foo';
    whenever $currently-listening-to {
        say "got: $_";
        if .starts-with('3') {
            say "listening to something new";
            $currently-listening-to = $c2.Supply;
            $other-var = 'bar';
            say $other-var;
        }
    }
}

$s.tap;

for ^7 {
    $c1.emit: "$_ from $c1";
    $c2.emit: "$_ from $c2";
}
sleep 10;

如果我正确理解 supply 块的语义(非常怀疑!),这个块应该对 supply 块内声明的任何变量具有独占和可变访问权限。因此,我希望它从 $c1 中获取前 4 个值,然后切换到 $c2。然而,事实并非如此。这是输出:

ot: 0 from $c1
got: 1 from $c1
got: 2 from $c1
got: 3 from $c1
listening to something new
bar
got: 4 from $c1
got: 5 from $c1
got: 6 from $c1

如该输出所示,更改 $other-var 的效果与我预期的一样,但更改 $currently-listening-to 的尝试失败(无提示)。

这种行为是否正确?如果是这样,关于解释此行为的 supply blocks/other 构造的语义,我缺少什么?我使用 react 块得到了相同的结果,并且在使用 Channel 而不是 Supply 时得到了相同的结果,因此该行为在多个并发结构中是一致的。

(为了避免 X-Y 问题,触发此问题的用例是尝试实施 Erlang-style 错误处理。为此,我希望有一个监督 supply 块听其 children 并且可以 kill/re-launch 任何进入不良状态的 children。但这意味着要听新的 children – 这直接导致上述问题。)

我倾向于将 whenever 视为 for 的反应等价物。 (它甚至支持 LAST 循环移相器在点击的 Supplydone 时执行某些操作,并支持 nextlastredo 就像一个普通的 for 循环!)考虑一下:

my $x = (1,2,3);
for $x<> {
    .say;
    $x = (4,5,6);
}

输出为:

1
2
3

因为在 for 循环的设置阶段,我们获得了一个迭代器,然后对其进行处理,而不是在每次迭代时再次读取 $x 。它与 whenever 相同:它点击 Supply,然后每个 emit 事件调用正文。

因此需要另一个 whenever 来实现下一个 Supply 的点击,同时关闭当前的点击。当只考虑两个Supply时,简单的写法是这样的:

my $c1 = Supplier.new;
my $c2 = Supplier.new;

my $s = supply {
    whenever $c1 {
        say "got: $_";
        if .starts-with('3') {
            say "listening to something new";
            # Tap the next Supply...
            whenever $c2 {
                say "got: $_";
            }
            # ...and close the tap on the current one.
            last;
        }
    }
}

$s.tap;

for ^7 {
    $c1.emit: "$_ from $c1";
    $c2.emit: "$_ from $c2";
}

这将产生:

got: 0 from $c1
got: 1 from $c1
got: 2 from $c1
got: 3 from $c1
listening to something new
got: 3 from $c2
got: 4 from $c2
got: 5 from $c2
got: 6 from $c2

(请注意,我删除了 sleep 10,因为不需要它;我们在此示例中没有引入任何并发性,因此一切都同步运行。)

显然,如果要在十几个 Supply 之间移动,那么这种方法将无法很好地扩展。那么 migrate 是如何工作的呢?缺少的关键部分是我们可以在使用 whenever 时获得 Tap 句柄,因此我们可以从 whenever 的主体外部关闭它。这正是 migrate 的工作原理(从标准库复制,添加了注释):

method migrate(Supply:D:) {
    supply {
        # The Tap of the Supply we are currently emitting values from
        my $current;
        # Tap the Supply of Supply that we'll migrate between
        whenever self -> \inner {
            # Make sure we produce a sensible error
            X::Supply::Migrate::Needs.new.throw
                unless inner ~~ Supply;
            # Close the tap on whatever we are currently tapping
            $current.close if $current;
            # Tap the new thing and store the Tap handle
            $current = do whenever inner -> \value {
                emit(value);
            }
        }
    }
}

简而言之:您不会更改 whenever 的目标,而是开始一个新的 whenever 并终止前一个。