在反应流扫描(),原子等中保持状态的有效方法?
Efficient way to keep state in reactive stream scan(), atomic, etc?
上次,我开始实施bitbay.net订单订阅。
问题是 bitbay 正在返回订单的增量,但我总是想保持整个价格深度(所以我必须保持完整的价格深度并在发生某些增量事件时更新它):
bid ask bid ask
---------- -----------
A D ------------>delta-event(removed=D)---> A F
B F B G
C G C
所以我决定使用
Flux
.from(eventsFromBitbay)
.scan(FullPriceDepth.empty(), (pd, e) -> pd.update(e))
.subscription(...)
我的问题是 Flux.scan(...) 将是一个不错的选择(就效率和线程安全而言)?我说的是高速系统中的数百万个事件。
我的替代方案是制作一些 Atomic...
并在 Flux.create(...).map(e -> atomicHere)
中更新它,或者有更好的东西吗?
Flux.scan()
是否比Atomic...
更有效率,为什么,为什么不呢?
“我的问题是 Flux.scan(...) 会是一个不错的选择吗?”
当然可以,为什么不呢?如果你问我,这是一个明显的模式。您有一个 class 来保存处理通量所需的信息。不过你应该记住一些事情,主要是通量的顺序很容易改变,例如使用 Flux::flatMap
而不是 Flux::flatMapSequential
,所以你可以很容易地以任何顺序得到东西。此外,有人可以将通量放在多个线程上,因此您的 FullPriceDepth
属性可能必须针对并发问题进行编码。
上次,我开始实施bitbay.net订单订阅。
问题是 bitbay 正在返回订单的增量,但我总是想保持整个价格深度(所以我必须保持完整的价格深度并在发生某些增量事件时更新它):
bid ask bid ask
---------- -----------
A D ------------>delta-event(removed=D)---> A F
B F B G
C G C
所以我决定使用
Flux
.from(eventsFromBitbay)
.scan(FullPriceDepth.empty(), (pd, e) -> pd.update(e))
.subscription(...)
我的问题是 Flux.scan(...) 将是一个不错的选择(就效率和线程安全而言)?我说的是高速系统中的数百万个事件。
我的替代方案是制作一些 Atomic...
并在 Flux.create(...).map(e -> atomicHere)
中更新它,或者有更好的东西吗?
Flux.scan()
是否比Atomic...
更有效率,为什么,为什么不呢?
“我的问题是 Flux.scan(...) 会是一个不错的选择吗?”
当然可以,为什么不呢?如果你问我,这是一个明显的模式。您有一个 class 来保存处理通量所需的信息。不过你应该记住一些事情,主要是通量的顺序很容易改变,例如使用 Flux::flatMap
而不是 Flux::flatMapSequential
,所以你可以很容易地以任何顺序得到东西。此外,有人可以将通量放在多个线程上,因此您的 FullPriceDepth
属性可能必须针对并发问题进行编码。