Dart:如何创建一个流来聚合来自另一个流的事件?

Dart : How to create a stream that aggregates events from another stream?

创建需要聚合来自另一个流的多个事件的流的最佳方法是什么?

我的目标是创建一个流来聚合来自另一个流的事件,直到它有足够的事件来构建消息。在我的例子中,我正在从套接字流中读取数据,因此一条消息可能分布在不同的事件中,并且一个事件可能包含各种消息的数据,因此我不能只对每个元素应用映射操作。

似乎正确的方法是使用 Stream Transformer,但我无法找到有关如何在没有太多样板代码的情况下正确实现它的信息。

我在阅读了有关如何创建流的内容后想到了一个解决方案,但我不确定这是否可以接受,也不是最好的方法。

这是我的解决方案示例:

Stream<String> joinWordsIfStartWithC(Stream<String> a) async* {
  var prevWord= '';
  await for (var i in a) {
    prevWord += i;
    if(i.startsWith('C')){
      yield prevWord;
      prevWord = '';
    }
  }
}

Stream<String> periodicStream(Duration interval) async* {
  while (true) {
    await Future.delayed(interval);
    yield 'C';
    yield 'A';
    yield 'B';
    yield 'C';
    yield 'C';
    yield 'B';
    yield 'C';
  }
}

void main(List<String> arguments) async {
  var intStream = periodicStream(Duration(seconds: 2));

  var sStream = joinWordsIfStartWithC(intStream);

  sStream.listen((s) => print(s));
}

我会说你的解决方案似乎不错,但如果你想制作一个流转换器,那么通过扩展 StreamTransformerBase:

就相当容易了
import 'dart:async';

class JoinWordsIfStartWithCTransformer extends StreamTransformerBase<String, String> {
  Stream<String> bind(Stream<String> a) async* {
    var prevWord = '';
    await for (var i in a) {
      prevWord += i;
      if (i.startsWith('C')) {
        yield prevWord;
        prevWord = '';
      }
    }
  }
}

Stream<String> periodicStream(Duration interval) async* {
  while (true) {
    await Future.delayed(interval);
    yield 'C';
    yield 'A';
    yield 'B';
    yield 'C';
    yield 'C';
    yield 'B';
    yield 'C';
  }
}

void main(List<String> arguments) async {
  var intStream = periodicStream(Duration(seconds: 2));

  var sStream = intStream.transform(JoinWordsIfStartWithCTransformer());

  sStream.listen((s) => print(s));
}