Rx 缓冲区 运行 一次并完成
Rx Buffer running once and complete
我喜欢收集流的第一个值 3 秒,然后将它们连接到相同数据类型的另一个流。第一个 Observable 是一个单独的、已完成的数据块,它在任何情况下都应该完成,这样我就可以使用 Concat() 连接第一个和第二个。使用 Concat() 需要保持数据流的完整性。
private IObservable<3DPoints> _1stBuffer = Observable.Empty<3DPoints>();
..
_1stBuffer = someRawStreamObservableReceivingOnNext
.Buffer(TimeSpan.FromSeconds(3), 100)
.Where(item => item.Any())
.SelectMany(item => item);
// later
var streamObservable = _1stBuffer.Concat(_some2ndStream); // doesn't emit, since 1stBuffer doesn't complete
我试过这个:
_1stBuffer = someRawStreamObservableReceivingOnNext
// completes the observable, but i want the buffer so far, no emptiness !
.TimeOut(TimeSpan.FromSeconds(3), Observable.Empty<3DPoints>)
.Buffer(TimeSpan.FromSeconds(3), 100)
.Where(item => item.Any())
.SelectMany(item => item);
超时使 bufferObservable 执行 Concat(),但继续 Observable.Empty<3DPoints> 不是选项。我怎样才能获取填充了 3 秒的第一个缓冲区,并连接它们?
您可能正在寻找 Take
变体之一:
source.Take(1)
将从源中获取第一个元素,然后完成。
source.TakeUntil(otherObservable)
将从源获取直到 otherObservable 的第一个元素发出。
我喜欢收集流的第一个值 3 秒,然后将它们连接到相同数据类型的另一个流。第一个 Observable 是一个单独的、已完成的数据块,它在任何情况下都应该完成,这样我就可以使用 Concat() 连接第一个和第二个。使用 Concat() 需要保持数据流的完整性。
private IObservable<3DPoints> _1stBuffer = Observable.Empty<3DPoints>();
..
_1stBuffer = someRawStreamObservableReceivingOnNext
.Buffer(TimeSpan.FromSeconds(3), 100)
.Where(item => item.Any())
.SelectMany(item => item);
// later
var streamObservable = _1stBuffer.Concat(_some2ndStream); // doesn't emit, since 1stBuffer doesn't complete
我试过这个:
_1stBuffer = someRawStreamObservableReceivingOnNext
// completes the observable, but i want the buffer so far, no emptiness !
.TimeOut(TimeSpan.FromSeconds(3), Observable.Empty<3DPoints>)
.Buffer(TimeSpan.FromSeconds(3), 100)
.Where(item => item.Any())
.SelectMany(item => item);
超时使 bufferObservable 执行 Concat(),但继续 Observable.Empty<3DPoints> 不是选项。我怎样才能获取填充了 3 秒的第一个缓冲区,并连接它们?
您可能正在寻找 Take
变体之一:
source.Take(1)
将从源中获取第一个元素,然后完成。source.TakeUntil(otherObservable)
将从源获取直到 otherObservable 的第一个元素发出。