System.Reactive Observable of string 将项目合并为单个项目作为新的 Observable

System.Reactive Observable of string combine items into single item as new Observable

我有一个 observable,它的项目发出一个字符串值,它只是我想public作为 observable 提供的整个消息的一部分。

如果物品是这样进来的:

我想提供一个 public observable,其项目的发射方式如下:

而且我知道消息部分何时是完整的消息。

我一直在努力让 Buffer 运算符工作,因为它似乎是适合我的场景的运算符,但我不知道如何告诉缓冲区关闭条件是什么或者是否这甚至是可能的。

Buffer 是最好的方法:

var source = new Subject<string>();

var result = source.Publish(_source => _source
        .Buffer(_source.Where(s => s.EndsWith(".")))
    )
    .Select(l => l.Aggregate ((x, y) => x + y));

result.Subscribe(s => Console.WriteLine(s));

source.OnNext("This is ");
source.OnNext("only part of");
source.OnNext(" the message.");
source.OnNext("Not. A. Full. Message ");
source.OnNext("but end of stream anyway");
source.OnCompleted();

Buffer 接受一个参数,指定组拆分应该发生的位置,我们用 where 子句指定。 Buffer 将消息聚合到一个列表中,然后我们使用 Linq 的 Aggregate 对其进行聚合。


编辑:

Publish 避免重新订阅。如果您要删除 Publish,解决方案将如下所示,并且有效:

var result2 = source.Buffer(
        source.Where(s => s.EndsWith("."))
    )
    .Select(l => l.Aggregate((x, y) => x + y));

但是,result2 会被订阅两次 source,这可能是错误的来源,特别是如果 source 没有很好地实现或表现得很好。因此,当你订阅一个 observable 两次时,最好使用 Publish,这实际上是 'forwards' 从一个订阅到多个订阅的消息。