使用 Flux.merge() 的竞争条件
Race condition using Flux.merge()
我对响应式编程还很陌生。
我的项目基于 Spring WebFlux.
这是我的合并方法:
//fn1 and fn2 are JS functions with one int param, count - I execute them with param from 0 to count.
public Flux<String> generateUnordered(String fn1, String fn2, int count) {
Flux<String> fn1Flux = generateFunctionResultFlux(fn1, count, FUNCTION_1)
.map(fnResult->String.format(UNORDERED_OUTPUT,fnResult[0],fnResult[1],fnResult[2],fnResult[3]));
Flux<String> fn2Flux = generateFunctionResultFlux(fn2, count, FUNCTION_2)
.map(fnResult->String.format(UNORDERED_OUTPUT,fnResult[0],fnResult[1],fnResult[2],fnResult[3]));
return Flux.merge(fn1Flux, fn2Flux)
.delayElements(Duration.ofMillis(DELAY));
}
当我使用如下 JS 函数执行它并且 count=2
function fn1(number) {return number +100;}
function fn2(number) {return number +200;}
我得到了类似的东西:
0,FUNCTION_2,200.0,0
0,FUNCTION_1,200.0,0
但是可以清楚地看到,我以某种方式在 fn1 行中得到了 fn2 结果!
我做错了什么,我该如何解决?
GitHub link 项目:WebFlux project
GitHub link 在 class 上:FluxGenerator class
使用合并运算符源被热切订阅。所以我想第一个通量会在订阅后立即发出,第二个会发出。因为它们会立即发射。如果你想看到交错的通量,你需要延迟发射。
Flux<Integer> fluxOfIntegers = Flux.merge(
evenNumbers.delayElements(Duration.ofMillis(500L)),
oddNumbers.delayElements(Duration.ofMillis(300L)));
StepVerifier.create(fluxOfIntegers)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(5)
.expectNext(4)
.expectComplete()
.verify();
如果您需要更多运算符示例,可以查看 this
问题出在程序的其他部分。
为了计算 JS 函数,我使用了 class 字段 ScriptEngine,即 NOT thread-safe.
制作 ScriptEngine 本地字段解决了我的问题。
我对响应式编程还很陌生。 我的项目基于 Spring WebFlux.
这是我的合并方法:
//fn1 and fn2 are JS functions with one int param, count - I execute them with param from 0 to count.
public Flux<String> generateUnordered(String fn1, String fn2, int count) {
Flux<String> fn1Flux = generateFunctionResultFlux(fn1, count, FUNCTION_1)
.map(fnResult->String.format(UNORDERED_OUTPUT,fnResult[0],fnResult[1],fnResult[2],fnResult[3]));
Flux<String> fn2Flux = generateFunctionResultFlux(fn2, count, FUNCTION_2)
.map(fnResult->String.format(UNORDERED_OUTPUT,fnResult[0],fnResult[1],fnResult[2],fnResult[3]));
return Flux.merge(fn1Flux, fn2Flux)
.delayElements(Duration.ofMillis(DELAY));
}
当我使用如下 JS 函数执行它并且 count=2
function fn1(number) {return number +100;}
function fn2(number) {return number +200;}
我得到了类似的东西:
0,FUNCTION_2,200.0,0
0,FUNCTION_1,200.0,0
但是可以清楚地看到,我以某种方式在 fn1 行中得到了 fn2 结果!
我做错了什么,我该如何解决?
GitHub link 项目:WebFlux project
GitHub link 在 class 上:FluxGenerator class
使用合并运算符源被热切订阅。所以我想第一个通量会在订阅后立即发出,第二个会发出。因为它们会立即发射。如果你想看到交错的通量,你需要延迟发射。
Flux<Integer> fluxOfIntegers = Flux.merge(
evenNumbers.delayElements(Duration.ofMillis(500L)),
oddNumbers.delayElements(Duration.ofMillis(300L)));
StepVerifier.create(fluxOfIntegers)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(5)
.expectNext(4)
.expectComplete()
.verify();
如果您需要更多运算符示例,可以查看 this
问题出在程序的其他部分。 为了计算 JS 函数,我使用了 class 字段 ScriptEngine,即 NOT thread-safe.
制作 ScriptEngine 本地字段解决了我的问题。