有没有好的方法可以将 FutureLocal.java 添加到自定义 Future.java 扩展 CompletableFuture? (下面的示例代码)

Is there a good way to add FutureLocal.java to custom Future.java extending CompletableFuture? (example code below)

我有以下代码,除了当我调用 super.thenCompose 并且它 returns CompletableFuture 而不是我的 Custom Future.java 时,它有点接近。我正在尝试复制 twitter 的 scala futures

  1. 能够像 twitter scala 的 futures 添加取消链
  2. 可以让请求上下文流过 thenApply 和 thenCompose 链以修复 slf4j 中的 MDC(很像 ThreadLocal,但它在每个 lambda 之前重新应用 运行 如所见在下面的代码中)

    public class 未来扩展 CompletableFuture {

    @Override
    public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
        Map<String, Object> state = FutureLocal.fetchState();
        MyFunction f = new MyFunction(state, fn);       
    
        return super.thenApply(f);
    }
    
    @Override
    public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
        Map<String, Object> state = FutureLocal.fetchState();
        MyFunction f = new MyFunction(state, fn);
    
        return super.thenCompose(f);
    }
    
    @SuppressWarnings("hiding")
    private class MyFunction implements Function {
    
        private Map<String, Object> state;
        private Function fn;
    
        public MyFunction(Map<String, Object> state, @SuppressWarnings("rawtypes") Function fn) {
            this.state = state;
            this.fn = fn;
    
        }
    
        @Override
        public Object apply(Object t) {
    
            try {
                FutureLocal.restoreState(state);
    
                return fn.apply(t);
    
            } finally {
                FutureLocal.restoreState(null);
            }
    
    
        }
    
    }
    
    @Override
    public boolean complete(T value) {
        return super.complete(value);
    }
    
    @Override
    public boolean completeExceptionally(Throwable ex) {
        return super.completeExceptionally(ex);
    }
    

    }

这是我用来 运行 该代码的一些代码,但是在地图中记录 "test" 在第 3 次远程调用时开始失败,这意味着 slf4j MDC 将崩溃。

public class TestCustomFutures {

    private Executor exec = Executors.newFixedThreadPool(3);

    @Test
    public void testFutureContext() throws InterruptedException, ExecutionException {

        Set<Integer> hashSet = new HashSet<Integer>();

        FutureLocal.put("test", 100);

        CompletableFuture<Integer> f = myRemoteCall(4)
            .thenCompose(s -> myRemoteCall(3))
            .thenCompose(s -> myRemoteCall(2));

        f.get();
    }

    private Future<Integer> myRemoteCall(int i) {
        System.out.println("result="+i+" map="+FutureLocal.get("test")+" thread="+Thread.currentThread().getName());

        Future<Integer> f = new Future<Integer>();

        exec.execute(new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    f.completeExceptionally(e);
                }

                f.complete(i);
            }
        });

        return f;
    }
}

然后输出是这样的

result=4 map=100 thread=main
result=3 map=100 thread=pool-1-thread-1
result=2 map=null thread=pool-1-thread-2

注意最后一个值我们不想为空

啊哈,我错过了一件简单的事情,因为我在 jdk8 中。但是在 jdk11 中,你可以覆盖这个...

@Override
public <U> CompletableFuture<U> newIncompleteFuture() {
    return new Future<U>();
}

在 jdk8 中,出于某种原因,这不会编译,也不会调用它 :(。糟糕,我不想升级到 11,因为一些用法仍然在 jdk8 上:(。