在 Reactor 中处理异步副作用的最佳方法是什么?

What is the best way to do async side effect in Reactor?

假设我有一个带有一些处理的主链,我想对每个处理的项目调用计费服务,然后 return 处理的结果进一步像这样:

... ... ... // chain starts somewhere here
.flatMap(item -> processItem(item))
.doOnNext(item -> billingService.bill(item))
... ... ... // continue to work with processed items

即我想在链上做一些副作用操作。如果 billingService.bill() 是同步的,这会起作用,但它 returns Mono。 所以,这是我必须做的:

... ... ... // chain starts somewhere here
.flatMap(item -> processItem(item))
.flatMap(item -> billingService.bill(item).thenReturn(item))
... ... ... // continue to work with processed items

有更好的方法吗?我觉得有点尴尬...

回答了我的问题。所以在我的例子中它看起来像这样:

... ... ... // chain starts somewhere here
.flatMap(item -> processItem(item))
.doOnNext(item -> billingService.bill(item).subscribe())
... ... ... // continue to work with processed items