使用 Hooks 和 Lift 将 Context 推送到 ThreadLocal
Using Hooks and a Lift to push Context into ThreadLocal
我问自己是否有办法在订阅者收到 onNext 信号之前将反应上下文推送到 ThreadLocal 变量中。在深入研究 reactor-core 时,我发现了 Hooks class 和 Lift BiFunction.
我创建了一个 class 具有以下实现。 class 由一个 ThreadLocal 变量组成,该变量将保存 Context 并实现必要的 BiFunction 接口。它会将所有调用委托给实际订阅者,如果在实际订阅者上调用 onNext 之前修改了上下文,它还会将上下文推送到 ThreadLocal 变量中。
package com.example.demo;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.util.context.Context;
import java.util.function.BiFunction;
public class ThreadLocalContextLifter<T> implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {
private static Logger logger = LoggerFactory.getLogger(ThreadLocalContextLifter.class);
private static final ThreadLocal<Context> contextHolder = new ThreadLocal<>();
public static Context getContext() {
Context context = contextHolder.get();
if (context == null) {
context = Context.empty();
contextHolder.set(context);
}
return context;
}
public static void setContext(Context context) {
contextHolder.set(context);
}
@Override
public CoreSubscriber<? super T> apply(Scannable scannable, CoreSubscriber<? super T> coreSubscriber) {
return new ThreadLocalContextCoreSubscriber<>(coreSubscriber);
}
final class ThreadLocalContextCoreSubscriber<U> implements CoreSubscriber<U> {
private CoreSubscriber<? super U> delegate;
public ThreadLocalContextCoreSubscriber(CoreSubscriber<? super U> delegate) {
this.delegate = delegate;
}
@Override
public Context currentContext() {
return delegate.currentContext();
}
@Override
public void onSubscribe(Subscription s) {
delegate.onSubscribe(s);
}
@Override
public void onNext(U u) {
Context context = delegate.currentContext();
if (!context.isEmpty()) {
Context currentContext = ThreadLocalContextLifter.getContext();
if (!currentContext.equals(context)) {
logger.info("Pushing reactive context to holder {}", context);
ThreadLocalContextLifter.setContext(context);
}
}
delegate.onNext(u);
}
@Override
public void onError(Throwable t) {
delegate.onError(t);
}
@Override
public void onComplete() {
delegate.onComplete();
}
}
}
使用以下代码将实例加载到 Hooks 中:
Hooks.onEachOperator(Operators.lift(new ThreadLocalContextLifter<>()));
我已经 运行 进行了一些测试,它似乎工作正常,但我对解决方案不满意。我猜 hook 会降低 reactor 的性能,或者在我不知道的某些情况下它不会工作。
我的问题很简单:这是个坏主意吗?
我不认为这个想法有什么问题...每个 Reactor 提供的操作员都使用这个钩子。
Context
在 onNext
之间没有变化,因此电梯 ThreadLocalContextCoreSubscriber
可以在 onSubscribe
中捕获它。但是你仍然需要在 onNext
中至少检查一次 ThreadLocal,因为 onNext
和 onSubscribe
can 发生在两个不同的线程上,所以您使用 delegate.currentContext()
的解决方案也有效。最后,您的方法看起来很合理。
我问自己是否有办法在订阅者收到 onNext 信号之前将反应上下文推送到 ThreadLocal 变量中。在深入研究 reactor-core 时,我发现了 Hooks class 和 Lift BiFunction.
我创建了一个 class 具有以下实现。 class 由一个 ThreadLocal 变量组成,该变量将保存 Context 并实现必要的 BiFunction 接口。它会将所有调用委托给实际订阅者,如果在实际订阅者上调用 onNext 之前修改了上下文,它还会将上下文推送到 ThreadLocal 变量中。
package com.example.demo;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.util.context.Context;
import java.util.function.BiFunction;
public class ThreadLocalContextLifter<T> implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {
private static Logger logger = LoggerFactory.getLogger(ThreadLocalContextLifter.class);
private static final ThreadLocal<Context> contextHolder = new ThreadLocal<>();
public static Context getContext() {
Context context = contextHolder.get();
if (context == null) {
context = Context.empty();
contextHolder.set(context);
}
return context;
}
public static void setContext(Context context) {
contextHolder.set(context);
}
@Override
public CoreSubscriber<? super T> apply(Scannable scannable, CoreSubscriber<? super T> coreSubscriber) {
return new ThreadLocalContextCoreSubscriber<>(coreSubscriber);
}
final class ThreadLocalContextCoreSubscriber<U> implements CoreSubscriber<U> {
private CoreSubscriber<? super U> delegate;
public ThreadLocalContextCoreSubscriber(CoreSubscriber<? super U> delegate) {
this.delegate = delegate;
}
@Override
public Context currentContext() {
return delegate.currentContext();
}
@Override
public void onSubscribe(Subscription s) {
delegate.onSubscribe(s);
}
@Override
public void onNext(U u) {
Context context = delegate.currentContext();
if (!context.isEmpty()) {
Context currentContext = ThreadLocalContextLifter.getContext();
if (!currentContext.equals(context)) {
logger.info("Pushing reactive context to holder {}", context);
ThreadLocalContextLifter.setContext(context);
}
}
delegate.onNext(u);
}
@Override
public void onError(Throwable t) {
delegate.onError(t);
}
@Override
public void onComplete() {
delegate.onComplete();
}
}
}
使用以下代码将实例加载到 Hooks 中:
Hooks.onEachOperator(Operators.lift(new ThreadLocalContextLifter<>()));
我已经 运行 进行了一些测试,它似乎工作正常,但我对解决方案不满意。我猜 hook 会降低 reactor 的性能,或者在我不知道的某些情况下它不会工作。
我的问题很简单:这是个坏主意吗?
我不认为这个想法有什么问题...每个 Reactor 提供的操作员都使用这个钩子。
Context
在 onNext
之间没有变化,因此电梯 ThreadLocalContextCoreSubscriber
可以在 onSubscribe
中捕获它。但是你仍然需要在 onNext
中至少检查一次 ThreadLocal,因为 onNext
和 onSubscribe
can 发生在两个不同的线程上,所以您使用 delegate.currentContext()
的解决方案也有效。最后,您的方法看起来很合理。