使用 Flux.merge() 的竞争条件

Race condition using Flux.merge()

我对响应式编程还很陌生。 我的项目基于 Spring WebFlux.

这是我的合并方法:

//fn1 and fn2 are JS functions with one int param, count - I execute them with param from 0 to count.  
public Flux<String> generateUnordered(String fn1, String fn2, int count) {
    Flux<String> fn1Flux = generateFunctionResultFlux(fn1, count, FUNCTION_1)
            .map(fnResult->String.format(UNORDERED_OUTPUT,fnResult[0],fnResult[1],fnResult[2],fnResult[3]));

    Flux<String> fn2Flux = generateFunctionResultFlux(fn2, count, FUNCTION_2)
            .map(fnResult->String.format(UNORDERED_OUTPUT,fnResult[0],fnResult[1],fnResult[2],fnResult[3]));

    return Flux.merge(fn1Flux, fn2Flux)
            .delayElements(Duration.ofMillis(DELAY));
}

当我使用如下 JS 函数执行它并且 count=2

function fn1(number) {return number +100;}
function fn2(number) {return number +200;}

我得到了类似的东西:

0,FUNCTION_2,200.0,0

0,FUNCTION_1,200.0,0

但是可以清楚地看到,我以某种方式在 fn1 行中得到了 fn2 结果!

我做错了什么,我该如何解决?

GitHub link 项目:WebFlux project

GitHub link 在 class 上:FluxGenerator class

使用合并运算符源被热切订阅。所以我想第一个通量会在订阅后立即发出,第二个会发出。因为它们会立即发射。如果你想看到交错的通量,你需要延迟发射。

        Flux<Integer> fluxOfIntegers = Flux.merge(
      evenNumbers.delayElements(Duration.ofMillis(500L)), 
      oddNumbers.delayElements(Duration.ofMillis(300L)));
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(1)
      .expectNext(2)
      .expectNext(3)
      .expectNext(5)
      .expectNext(4)
      .expectComplete()
      .verify();

如果您需要更多运算符示例,可以查看 this

问题出在程序的其他部分。 为了计算 JS 函数,我使用了 class 字段 ScriptEngine,即 NOT thread-safe.

制作 ScriptEngine 本地字段解决了我的问题。