有没有好的方法可以将 FutureLocal.java 添加到自定义 Future.java 扩展 CompletableFuture? (下面的示例代码)
Is there a good way to add FutureLocal.java to custom Future.java extending CompletableFuture? (example code below)
我有以下代码,除了当我调用 super.thenCompose 并且它 returns CompletableFuture 而不是我的 Custom Future.java 时,它有点接近。我正在尝试复制 twitter 的 scala futures
- 能够像 twitter scala 的 futures 添加取消链
可以让请求上下文流过 thenApply 和 thenCompose 链以修复 slf4j 中的 MDC(很像 ThreadLocal,但它在每个 lambda 之前重新应用 运行 如所见在下面的代码中)
public class 未来扩展 CompletableFuture {
@Override
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
Map<String, Object> state = FutureLocal.fetchState();
MyFunction f = new MyFunction(state, fn);
return super.thenApply(f);
}
@Override
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
Map<String, Object> state = FutureLocal.fetchState();
MyFunction f = new MyFunction(state, fn);
return super.thenCompose(f);
}
@SuppressWarnings("hiding")
private class MyFunction implements Function {
private Map<String, Object> state;
private Function fn;
public MyFunction(Map<String, Object> state, @SuppressWarnings("rawtypes") Function fn) {
this.state = state;
this.fn = fn;
}
@Override
public Object apply(Object t) {
try {
FutureLocal.restoreState(state);
return fn.apply(t);
} finally {
FutureLocal.restoreState(null);
}
}
}
@Override
public boolean complete(T value) {
return super.complete(value);
}
@Override
public boolean completeExceptionally(Throwable ex) {
return super.completeExceptionally(ex);
}
}
这是我用来 运行 该代码的一些代码,但是在地图中记录 "test" 在第 3 次远程调用时开始失败,这意味着 slf4j MDC 将崩溃。
public class TestCustomFutures {
private Executor exec = Executors.newFixedThreadPool(3);
@Test
public void testFutureContext() throws InterruptedException, ExecutionException {
Set<Integer> hashSet = new HashSet<Integer>();
FutureLocal.put("test", 100);
CompletableFuture<Integer> f = myRemoteCall(4)
.thenCompose(s -> myRemoteCall(3))
.thenCompose(s -> myRemoteCall(2));
f.get();
}
private Future<Integer> myRemoteCall(int i) {
System.out.println("result="+i+" map="+FutureLocal.get("test")+" thread="+Thread.currentThread().getName());
Future<Integer> f = new Future<Integer>();
exec.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
f.completeExceptionally(e);
}
f.complete(i);
}
});
return f;
}
}
然后输出是这样的
result=4 map=100 thread=main
result=3 map=100 thread=pool-1-thread-1
result=2 map=null thread=pool-1-thread-2
注意最后一个值我们不想为空
啊哈,我错过了一件简单的事情,因为我在 jdk8 中。但是在 jdk11 中,你可以覆盖这个...
@Override
public <U> CompletableFuture<U> newIncompleteFuture() {
return new Future<U>();
}
在 jdk8 中,出于某种原因,这不会编译,也不会调用它 :(。糟糕,我不想升级到 11,因为一些用法仍然在 jdk8 上:(。
我有以下代码,除了当我调用 super.thenCompose 并且它 returns CompletableFuture 而不是我的 Custom Future.java 时,它有点接近。我正在尝试复制 twitter 的 scala futures
- 能够像 twitter scala 的 futures 添加取消链
可以让请求上下文流过 thenApply 和 thenCompose 链以修复 slf4j 中的 MDC(很像 ThreadLocal,但它在每个 lambda 之前重新应用 运行 如所见在下面的代码中)
public class 未来扩展 CompletableFuture {
@Override public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) { Map<String, Object> state = FutureLocal.fetchState(); MyFunction f = new MyFunction(state, fn); return super.thenApply(f); } @Override public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) { Map<String, Object> state = FutureLocal.fetchState(); MyFunction f = new MyFunction(state, fn); return super.thenCompose(f); } @SuppressWarnings("hiding") private class MyFunction implements Function { private Map<String, Object> state; private Function fn; public MyFunction(Map<String, Object> state, @SuppressWarnings("rawtypes") Function fn) { this.state = state; this.fn = fn; } @Override public Object apply(Object t) { try { FutureLocal.restoreState(state); return fn.apply(t); } finally { FutureLocal.restoreState(null); } } } @Override public boolean complete(T value) { return super.complete(value); } @Override public boolean completeExceptionally(Throwable ex) { return super.completeExceptionally(ex); }
}
这是我用来 运行 该代码的一些代码,但是在地图中记录 "test" 在第 3 次远程调用时开始失败,这意味着 slf4j MDC 将崩溃。
public class TestCustomFutures {
private Executor exec = Executors.newFixedThreadPool(3);
@Test
public void testFutureContext() throws InterruptedException, ExecutionException {
Set<Integer> hashSet = new HashSet<Integer>();
FutureLocal.put("test", 100);
CompletableFuture<Integer> f = myRemoteCall(4)
.thenCompose(s -> myRemoteCall(3))
.thenCompose(s -> myRemoteCall(2));
f.get();
}
private Future<Integer> myRemoteCall(int i) {
System.out.println("result="+i+" map="+FutureLocal.get("test")+" thread="+Thread.currentThread().getName());
Future<Integer> f = new Future<Integer>();
exec.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
f.completeExceptionally(e);
}
f.complete(i);
}
});
return f;
}
}
然后输出是这样的
result=4 map=100 thread=main
result=3 map=100 thread=pool-1-thread-1
result=2 map=null thread=pool-1-thread-2
注意最后一个值我们不想为空
啊哈,我错过了一件简单的事情,因为我在 jdk8 中。但是在 jdk11 中,你可以覆盖这个...
@Override
public <U> CompletableFuture<U> newIncompleteFuture() {
return new Future<U>();
}
在 jdk8 中,出于某种原因,这不会编译,也不会调用它 :(。糟糕,我不想升级到 11,因为一些用法仍然在 jdk8 上:(。