Rx 缓冲区 运行 一次并完成

Rx Buffer running once and complete

我喜欢收集流的第一个值 3 秒,然后将它们连接到相同数据类型的另一个流。第一个 Observable 是一个单独的、已完成的数据块,它在任何情况下都应该完成,这样我就可以使用 Concat() 连接第一个和第二个。使用 Concat() 需要保持数据流的完整性。

            private IObservable<3DPoints> _1stBuffer = Observable.Empty<3DPoints>();
            ..

           _1stBuffer = someRawStreamObservableReceivingOnNext
                .Buffer(TimeSpan.FromSeconds(3), 100)
                .Where(item => item.Any())
                .SelectMany(item => item);


// later 
            var streamObservable = _1stBuffer.Concat(_some2ndStream); // doesn't emit, since 1stBuffer doesn't complete

我试过这个:

            _1stBuffer = someRawStreamObservableReceivingOnNext

                // completes the observable, but i want the buffer so far, no emptiness !
                .TimeOut(TimeSpan.FromSeconds(3), Observable.Empty<3DPoints>)

                .Buffer(TimeSpan.FromSeconds(3), 100)
                .Where(item => item.Any())
                .SelectMany(item => item);

超时使 bufferObservable 执行 Concat(),但继续 Observable.Empty<3DPoints> 不是选项。我怎样才能获取填充了 3 秒的第一个缓冲区,并连接它们?

您可能正在寻找 Take 变体之一:

  • source.Take(1) 将从源中获取第一个元素,然后完成。
  • source.TakeUntil(otherObservable) 将从源获取直到 otherObservable 的第一个元素发出。