rxdart:在流订阅取消时获取缓冲元素

rxdart: Get buffered elements on stream subscription cancel

我在我的应用程序中使用 rxdart ZipStream 来合并两个传入的蓝牙数据流。这些流与“bufferCount”一起使用,在发射前分别收集 500 个元素。到目前为止一切正常,但如果流订阅在某个时候被取消,这些缓冲区中可能有一些元素在那之后被省略。我可以在取消流订阅之前等待“缓冲周期”完成,但由于这可能需要一些时间,具体取决于配置的采样率,我想知道是否有解决方案可以让这些缓冲区保持原样,即使数量元素可能少于 500。

这里是一些简化的解释代码:

subscription = ZipStream.zip2(
  streamA.bufferCount(500),
  streamB.bufferCount(500),
  (streamABuffer, streamBBuffer) {
    return ...;
  },
).listen((data) {
  ...
});

提前致谢!

所以对于任何想知道的人来说:由于 bufferCount 是使用扩展了 BackpressureStreamTransformer 的 BufferCountStreamTransformer 实现的,因此有一个默认为 true 的 dispatchOnClose 属性。这意味着如果其发射元素被缓冲的底层流被关闭,那么该缓冲区中的剩余元素最终会被发射。这也适用于上面的示例。我的错误是关闭流并立即取消流订阅。等待流关闭并随后取消流订阅,一切都按预期进行。