使用 Hooks 和 Lift 将 Context 推送到 ThreadLocal

Using Hooks and a Lift to push Context into ThreadLocal

我问自己是否有办法在订阅者收到 onNext 信号之前将反应上下文推送到 ThreadLocal 变量中。在深入研究 reactor-core 时,我发现了 Hooks class 和 Lift BiFunction.

我创建了一个 class 具有以下实现。 class 由一个 ThreadLocal 变量组成,该变量将保存 Context 并实现必要的 BiFunction 接口。它会将所有调用委托给实际订阅者,如果在实际订阅者上调用 onNext 之前修改了上下文,它还会将上下文推送到 ThreadLocal 变量中。

package com.example.demo;

import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.util.context.Context;

import java.util.function.BiFunction;

public class ThreadLocalContextLifter<T> implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {

    private static Logger logger = LoggerFactory.getLogger(ThreadLocalContextLifter.class);

    private static final ThreadLocal<Context> contextHolder = new ThreadLocal<>();

    public static Context getContext() {
        Context context = contextHolder.get();
        if (context == null) {
            context = Context.empty();
            contextHolder.set(context);
        }
        return context;
    }

    public static void setContext(Context context) {
        contextHolder.set(context);
    }

    @Override
    public CoreSubscriber<? super T> apply(Scannable scannable, CoreSubscriber<? super T> coreSubscriber) {
        return new ThreadLocalContextCoreSubscriber<>(coreSubscriber);
    }

    final class ThreadLocalContextCoreSubscriber<U> implements CoreSubscriber<U> {

        private CoreSubscriber<? super U> delegate;

        public ThreadLocalContextCoreSubscriber(CoreSubscriber<? super U> delegate) {
            this.delegate = delegate;
        }

        @Override
        public Context currentContext() {
            return delegate.currentContext();
        }

        @Override
        public void onSubscribe(Subscription s) {
            delegate.onSubscribe(s);
        }

        @Override
        public void onNext(U u) {
            Context context = delegate.currentContext();
            if (!context.isEmpty()) {
                Context currentContext = ThreadLocalContextLifter.getContext();
                if (!currentContext.equals(context)) {
                    logger.info("Pushing reactive context to holder {}", context);
                    ThreadLocalContextLifter.setContext(context);
                }
            }
            delegate.onNext(u);
        }

        @Override
        public void onError(Throwable t) {
            delegate.onError(t);
        }

        @Override
        public void onComplete() {
            delegate.onComplete();
        }
    }

}

使用以下代码将实例加载到 Hooks 中:

Hooks.onEachOperator(Operators.lift(new ThreadLocalContextLifter<>()));

我已经 运行 进行了一些测试,它似乎工作正常,但我对解决方案不满意。我猜 hook 会降低 reactor 的性能,或者在我不知道的某些情况下它不会工作。

我的问题很简单:这是个坏主意吗?

我不认为这个想法有什么问题...每个 Reactor 提供的操作员都使用这个钩子。

ContextonNext 之间没有变化,因此电梯 ThreadLocalContextCoreSubscriber 可以在 onSubscribe 中捕获它。但是你仍然需要在 onNext 中至少检查一次 ThreadLocal,因为 onNextonSubscribe can 发生在两个不同的线程上,所以您使用 delegate.currentContext() 的解决方案也有效。最后,您的方法看起来很合理。