禁用 Flowable/Observable 缓冲区
Disable Flowable/Observable Buffer
我始终无法从使用 combineLatest 运算符获得的组合中获取最后一个值。
我有 2 个热流 (a, b) 以高频率生成 事件 (每 100 毫秒一个事件):
Flowable<OrderBook> flowA = sourceA.getObservableOrderBook(value);
Flowable<OrderBook> flowB = sourceB.getObservableOrderBook(value);
结合 combineLatest,完成它的工作需要将近 300 毫秒。
Flowable<OrderBookCouple> combined = Flowable.combineLatest(flowA, flowB, OrderBookCouple::new).observeOn(Schedulers.newThread());
combined.subscribe((bookCouple) -> {
System.out.println("A timestamp: " + bookCouple.aOrderBook.getTimeStamp());
System.out.println("B timestamp: " + bookCouple.bOrderBook.getTimeStamp());
Thread.sleep(300);
}
在 combiner 执行一次后,我想处理生成的事件的最后一个组合,意思是(lastA,lastB ).
组合流的默认行为是将所有事件组合缓存在它自己的缓冲区中,以便组合流接收非常旧的组合并且这个时间间隔正在爆炸。
我应该如何更改我的代码以禁用此缓冲区并始终接收最后的组合?
您可以在 flowA
和 flowB
上应用 onBackpressureLatest
并使用 overload of combineLatest
来指定预取数量。
Flowable.combineLatest(
Arrays.asList(flowA.onBackpressureLatest(), flowB.onBackpressureLatest()),
a -> new OrderBookCouple((OrderBook)a[0], (OrderBook)a[1]),
1
)
.onBackpressureLatest()
.observeOn(Schedulers.newThread(), false, 1)
不幸的是,没有同时采用 BiFunction 和 bufferSize 的重载,因此您必须恢复为转换数组元素。
编辑
应用第二个 onBackpressureLatest
并限制 observeOn
上的缓冲区大小应该会让您更接近所需的模式,尽管 combineLatest
并非针对此用例。您可能需要一些多样本运算符。
我始终无法从使用 combineLatest 运算符获得的组合中获取最后一个值。
我有 2 个热流 (a, b) 以高频率生成 事件 (每 100 毫秒一个事件):
Flowable<OrderBook> flowA = sourceA.getObservableOrderBook(value);
Flowable<OrderBook> flowB = sourceB.getObservableOrderBook(value);
结合 combineLatest,完成它的工作需要将近 300 毫秒。
Flowable<OrderBookCouple> combined = Flowable.combineLatest(flowA, flowB, OrderBookCouple::new).observeOn(Schedulers.newThread());
combined.subscribe((bookCouple) -> {
System.out.println("A timestamp: " + bookCouple.aOrderBook.getTimeStamp());
System.out.println("B timestamp: " + bookCouple.bOrderBook.getTimeStamp());
Thread.sleep(300);
}
在 combiner 执行一次后,我想处理生成的事件的最后一个组合,意思是(lastA,lastB ).
组合流的默认行为是将所有事件组合缓存在它自己的缓冲区中,以便组合流接收非常旧的组合并且这个时间间隔正在爆炸。
我应该如何更改我的代码以禁用此缓冲区并始终接收最后的组合?
您可以在 flowA
和 flowB
上应用 onBackpressureLatest
并使用 overload of combineLatest
来指定预取数量。
Flowable.combineLatest(
Arrays.asList(flowA.onBackpressureLatest(), flowB.onBackpressureLatest()),
a -> new OrderBookCouple((OrderBook)a[0], (OrderBook)a[1]),
1
)
.onBackpressureLatest()
.observeOn(Schedulers.newThread(), false, 1)
不幸的是,没有同时采用 BiFunction 和 bufferSize 的重载,因此您必须恢复为转换数组元素。
编辑
应用第二个 onBackpressureLatest
并限制 observeOn
上的缓冲区大小应该会让您更接近所需的模式,尽管 combineLatest
并非针对此用例。您可能需要一些多样本运算符。