在 WebFlux 中生成 UUID

Generating UUID in WebFlux

假设我在 class 中有这样的方法实现 BeforeConvertCall<Object>

 @Override
    public Publisher<Object> onBeforeConvert(Object o, SqlIdentifier sqlIdentifier) {
        
        return Mono.fromCallable(() -> enhance(o)); (A)
        //
        return Mono.just(o)
        .map(this::enhance); (B)

    }

private Object enhance(Object o) {

    if (o instanceof Foo && ((Foo) o).getId() == null) {
        ((Foo) o).setId(UUID.randomUUID().toString());
    } 
    return o;
}
  1. 这段代码是否阻塞?我的意思是应该担心吗?
  2. UUID 代包装在 ThreadLocal 反应器中是否有意义?
  3. (A)和(B)调用有什么区别吗?

我试过把这个耗时的操作也放到enhance方法里面:

for (int i = 0; i < Integer.MAX_VALUE; i++) {
        String id = UUID.randomUUID().toString();
    }

并发出了 3 个并发请求。 生成了 4 reactor-http-nio 个线程,它们都处于可运行状态。首先锁定 SecureRandom 并休息 3:

- locked <0x0000000600c4b510> (a io.netty.channel.nio.SelectedSelectionKeySet)
- locked <0x0000000600c4b4b0> (a sun.nio.ch.KQueueSelectorImpl)
  1. 为什么其余 3 个线程没有因为 SecureRandom 实例上的锁而被阻塞?

我试图将相同的 for 循环放入 doNext() 钩子内的主管道中,突然间其余线程按预期被阻塞。

  1. 为什么他们在这里被屏蔽了,但在 onBeforeConvert() 中使用相同的方法却没有? 此外,当我尝试用 ThreadLocal 包装调用时,它没有帮助。

java.lang.Thread.State: BLOCKED (on object monitor) at sun.security.provider.SecureRandom.engineNextBytes(java.base@11.0.10/SecureRandom.java:216)

  • waiting to lock <0x0000000600118b28> (a sun.security.provider.SecureRandom)
  1. 我希望所有 4 个线程都有自己的 SecureRandom.. 实例吗? 是否只有主线程拥有该实例并将工作填充到没有 SecureRandom 线程本地的工作线程?

最后一个实验:

 @Override
    public Publisher<Object> onBeforeConvert(Object o, SqlIdentifier sqlIdentifier) {
        
        return Mono.fromCallable(() -> enhance(o))
        .map(this::enhance)
        .subscribeOn(Schedulers.boundedElastic()); (C)

    }

添加行(C)和运行 4个并发请求后,boundedElastic-2,3,4确实被阻塞了。

  1. 为什么之前的方法reactor-http-nio没有被屏蔽,却被屏蔽了?
  1. 是的,在您显示此代码段的方式中 - 在这里,UUID 生成被阻止。

  2. 直到 subscribeOn 没有被调用,这两者之间没有显着差异。 just 立即为所有订阅者评估,而 fromCallable 在订阅时单独评估每个订阅者。 在此处查看更多信息:

最好的办法是使用:

return Mono.fromCallable(() -> enhance(o))
        .subscribeOn(Schedulers.boundedElastic());

使用这种方法,您将阻塞逻辑委托给单独的线程池Schedulers.boundedElastic(),这样主反应堆线程池就不会被阻塞,并且不会对整个应用程序产生影响