在反应流扫描(),原子等中保持状态的有效方法?

Efficient way to keep state in reactive stream scan(), atomic, etc?

上次,我开始实施bitbay.net订单订阅。

问题是 bitbay 正在返回订单的增量,但我总是想保持整个价格深度(所以我必须保持完整的价格深度并在发生某些增量事件时更新它):

bid    ask                                        bid     ask
----------                                        -----------
A      D  ------------>delta-event(removed=D)---> A       F
B      F                                          B       G
C      G                                          C

所以我决定使用

Flux
   .from(eventsFromBitbay)
   .scan(FullPriceDepth.empty(), (pd, e) -> pd.update(e))
   .subscription(...)

我的问题是 Flux.scan(...) 将是一个不错的选择(就效率和线程安全而言)?我说的是高速系统中的数百万个事件。

我的替代方案是制作一些 Atomic... 并在 Flux.create(...).map(e -> atomicHere) 中更新它,或者有更好的东西吗?

Flux.scan()是否比Atomic...更有效率,为什么,为什么不呢?

“我的问题是 Flux.scan(...) 会是一个不错的选择吗?”

当然可以,为什么不呢?如果你问我,这是一个明显的模式。您有一个 class 来保存处理通量所需的信息。不过你应该记住一些事情,主要是通量的顺序很容易改变,例如使用 Flux::flatMap 而不是 Flux::flatMapSequential,所以你可以很容易地以任何顺序得到东西。此外,有人可以将通量放在多个线程上,因此您的 FullPriceDepth 属性可能必须针对并发问题进行编码。