了解点供应块(按需供应)

Understanding the point of supply blocks (on-demand supplies)

我无法理解他们创建的 supply {…} blocks/the 按需供应的目的。

Live 供给(即来自 Supplier 并在 Supplier 发出值时获得新值的类型)对我——它们是异步流的一个版本,我可以使用它从一个或多个发送者向一个或多个接收者广播消息。很容易看到响应实时消息流的用例:我可能想在每次从 GUI 接口收到 UI 事件或每次收到 chat application broadcasts that it has received a new message.

但是按需 供应没有类似的意义。 docs

An on-demand broadcast is like Netflix: everyone who starts streaming a movie (taps a supply), always starts it from the beginning (gets all the values), regardless of how many people are watching it right now.

好的,很公平。但是 why/when 我想要那些语义吗?

这些例子也让我有些摸不着头脑。 Concurancy 页面目前提供了三个 supply 块的示例,但其中两个只是从 for 循环中发出值。第三个是bit more detailed:

my $bread-supplier = Supplier.new;
my $vegetable-supplier = Supplier.new;
 
my $supply = supply {
    whenever $bread-supplier.Supply {
        emit("We've got bread: " ~ $_);
    };
    whenever $vegetable-supplier.Supply {
        emit("We've got a vegetable: " ~ $_);
    };
}
$supply.tap( -> $v { say "$v" });
 
$vegetable-supplier.emit("Radish");   # OUTPUT: «We've got a vegetable: Radish␤» 
$bread-supplier.emit("Thick sliced"); # OUTPUT: «We've got bread: Thick sliced␤» 
$vegetable-supplier.emit("Lettuce");  # OUTPUT: «We've got a vegetable: Lettuce␤» 

那里,supply 块正在做某事。具体来说,它对两个不同(实时)Supplier 的输入做出反应,然后将它们合并为一个 Supply。这看起来确实很有用。

... 除了如果我想转换两个 Supplier 的输出并将它们的输出合并为一个组合流,我可以只使用

my $supply = Supply.merge: 
                 $bread-supplier.Supply.map(    { "We've got bread: $_" }),
                 $vegetable-supplier.Supply.map({ "We've got a vegetable: $_" });

而且,实际上,如果我用上面的 map/merge 替换该示例中的 supply 块,我会得到完全相同的输出。此外,如果将 tap 移动到对 .emit 的调用下方,则 supply 块版本和 map/merge 版本都不会产生任何输出,这表明supply 块的“按需”方面在这里并没有真正发挥作用。

在更一般的层面上,我不相信 Raku(或 Cro)文档提供了 supply 块的任何示例,该块不会以某种方式转换输出基于 for 循环或 Supply.interval 的实时 Supply 或发射值。 None 其中的用例似乎特别引人注目,而不是作为转换 Supply 的不同方式。

考虑到以上所有情况,除了作为某些 [=19= 的可能替代语法之外,我很想将 supply 块作为一个不是那么有用的结构来注销] 组合器。但是,我在

上有它

while Supplier is often reached for, many times one would be better off writing a supply block that emits the values.

鉴于此,我愿意冒险大胆地猜测我缺少 一些东西 关于 supply 个块。如果能深入了解那可能是什么,我将不胜感激。

鉴于您提到的 Supply.merge,让我们从这个开始。想象一下它不在 Raku 标准库中,我们必须实现它。为了实现正确的实施,我们必须注意什么?至少:

  1. 产生一个 Supply 结果,当点击时,将...
  2. 点击(即订阅)所有输入供应。
  3. 当其中一个输入提供 emit 值时,emit 将其提供给我们的窃听器...
  4. ...但请确保我们遵循串行供应规则,即我们一次只 emit 一条消息;我们的两个输入电源可能会同时从不同的线程 emit 值,所以这不是自动 属性.
  5. 当我们所有供应商都发送了他们的 done 事件后,请同时发送 done 事件。
  6. 如果我们分接的任何输入电源发送 quit 事件,中继它,同时关闭所有其他输入电源的分接。
  7. 确保我们没有任何会导致破坏供应语法的奇数竞赛 emit* [done|quit]
  8. 当关闭我们生产的结果 Supply 上的分接头时,请务必关闭我们分接头的所有(仍处于活动状态的)输入电源上的分接头。

祝你好运!

那么标准库是怎么做到的呢?像这样:

method merge(*@s) {
    @s.unshift(self) if self.DEFINITE;  # add if instance method
    # [I elided optimizations for when there are 0 or 1 things to merge]
    supply {
        for @s {
            whenever $_ -> \value { emit(value) }
        }
    }
}

supply 块的要点是大大简化 正确地 在一个或多个 Supply 上实施 可重用 操作]s。它旨在消除的主要风险是:

  • 在我们点击多个 Supply 的情况下无法正确处理并发到达的消息,可能会导致我们损坏状态(因为我们可能希望编写的许多供应组合器也会有状态;merge 很简单,不至于)。 supply 块向我们保证我们一次只会处理一条消息,从而消除了这种危险。
  • 失去对订阅的跟踪,从而泄漏资源,这将成为任何更长的 运行ning 程序的问题。

第二个很容易被忽视,尤其是在使用像 Raku 这样的垃圾收集语言时。事实上,如果我开始迭代一些 Seq 然后在到达它的末尾之前停止这样做,迭代器变得无法访问并且 GC 会在一段时间内吃掉它。如果我遍历文件的行并且那里有一个隐式文件句柄,我冒着文件没有及时关闭的风险,如果我不走运,可能 运行 超出句柄,但至少有一些 关闭它并释放资源的路径。

响应式编程并非如此:引用从生产者指向消费者,因此如果消费者“停止关心”但没有关闭水龙头,那么生产者将保留其对消费者的引用(从而导致记忆泄漏)并继续向它发送消息(从而进行一次性工作)。这最终会导致应用程序崩溃。链接的 Cro 聊天示例是一个示例:

my $chat = Supplier.new;

get -> 'chat' {
    web-socket -> $incoming {
        supply {
            whenever $incoming -> $message {
                $chat.emit(await $message.body-text);
            }
            whenever $chat -> $text {
                emit $text;
            }
        }
    }
}

当 WebSocket 客户端断开连接时会发生什么?我们使用 supply 块返回的 Supply 上的点击被关闭,导致传入 WebSocket 消息和 $chat 的隐式点击 close。如果没有这个,$chat Supplier 的订阅者列表将无限制地增长,反过来也会为每个先前的连接保持一定大小的对象图。

因此,即使在直播 Supply 非常直接参与的情况下,我们也经常会随着时间的推移订阅它。按需供应主要是资源的获取和释放;有时,该资源将是实时订阅 Supply.

一个公平的问题是我们是否可以在没有 supply 块的情况下编写这个示例。是的,我们可以;这可能有效:

my $chat = Supplier.new;

get -> 'chat' {
    web-socket -> $incoming {
        my $emit-and-discard = $incoming.map(-> $message {
                $chat.emit(await $message.body-text);
                Supply.from-list()
            }).flat;
        Supply.merge($chat, $emit-and-discard)
    }
}

注意到在 Supply-space 中需要付出一些努力才能映射到任何内容。我个人觉得可读性较差——这甚至没有避免 supply 块,它只是隐藏在 merge 的实现中。更棘手的情况是,被窃取的供应品数量会随着时间的推移而变化,such as in recursive file watching 可能会出现新的要观看的目录。我真的不知道如何用标准库中出现的组合器来表达它。

我花了一些时间教授响应式编程(不是用 Raku,而是用 .Net)。对于一个异步流,事情很容易,但当我们开始处理多个异步流时,事情就变得更加困难了。有些东西很自然地适合组合器,例如“merge”或“zip”或“combine latest”。其他人可以通过足够的创造力被打成这些形状——但我常常觉得它扭曲而不是富有表现力。当问题无法在组合器中表达时会发生什么?在 Raku 术语中,一个人创建输出 Suppliers,挖掘输入电源,编写将输入中的东西发射到输出中的逻辑,等等。每次都必须处理订阅管理、错误传播、完成传播和并发控制——而且很容易搞砸。

当然,supply 块的存在并没有停止在 Raku 中走脆弱的道路。这就是我说的意思:

while Supplier is often reached for, many times one would be better off writing a supply block that emits the values

我在这里没有考虑 publish/subscribe 的情况,在这种情况下我们确实想要广播值并且处于反应链的入口点。我在考虑这样的情况:我们挖掘一个或多个 Supply,取值,做一些事情,然后 emit 事情变成另一个 SupplierHere is an example where I migrated such code towards a supply block; here is another example 稍后出现在同一代码库中。希望这些例子能澄清我的想法。