Ratpack 的 Promise.cache 在 ParallelBatch 中具有多个下游承诺
Ratpack's Promise.cache with multiple downstream promises in ParallelBatch
当我将 Ratpack 的 Promise.cache
与多个下游承诺和 ParallelBatch
结合使用时,我 运行 进入了 Ratpack 的内部 NullPointerException
,目前还不清楚从文档中向我询问我的用法是否不正确,或者这是否代表 Ratpack 中的错误。
这是一个演示问题的简化测试用例:
@Test
public void foo() throws Exception {
List<Promise<Integer>> promises = new ArrayList<>();
for (int i = 0; i < 25; i++) {
Promise<Integer> p = Promise.value(12);
p = p.cache();
promises.add(p.map(v -> v + 1));
promises.add(p.map(v -> v + 2));
}
final List<Integer> results = ExecHarness.yieldSingle(c ->
ParallelBatch.of(promises).yield()
).getValueOrThrow();
}
运行 本地测试 10000 次导致失败率约为 10 / 10000,NullPointerException
如下所示:
java.lang.NullPointerException
at ratpack.exec.internal.CachingUpstream.yield(CachingUpstream.java:93)
at ratpack.exec.internal.CachingUpstream.tryDrain(CachingUpstream.java:65)
at ratpack.exec.internal.CachingUpstream.lambda$connect[=12=](CachingUpstream.java:116)
at ratpack.exec.internal.CachingUpstream$$Lambda/1438461739.connect(Unknown Source)
at ratpack.exec.internal.DefaultExecution.lambda$null(DefaultExecution.java:122)
at ratpack.exec.internal.DefaultExecution$$Lambda/2092087501.execute(Unknown Source)
at ratpack.exec.internal.DefaultExecution$SingleEventExecStream.exec(DefaultExecution.java:489)
at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:216)
at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:209)
at ratpack.exec.internal.DefaultExecution.drain(DefaultExecution.java:179)
at ratpack.exec.internal.DefaultExecution.<init>(DefaultExecution.java:92)
at ratpack.exec.internal.DefaultExecController.lambda$start[=12=](DefaultExecController.java:195)
at ratpack.exec.internal.DefaultExecController$$Lambda/1411892748.call(Unknown Source)
at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:886)
at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory.lambda$newThread[=12=](DefaultExecController.java:136)
at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory$$Lambda/1157058691.run(Unknown Source)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
在这个测试用例中不使用 cache
会使问题消失,因为没有订阅每个缓存的承诺两次。
我的问题是:这是对 Ratpack API 的错误使用,还是它代表了框架中的错误?如果是前者,你能指出文档中解释为什么这种用法错误的内容吗?
即使您的示例不是缓存承诺的最佳用例(重新创建和缓存每个迭代步骤都具有相同值的承诺没有多大意义),您实际上已经在 CachingUpstream<T>
class.
我做了一些实验来弄清楚发生了什么,这是我的发现。首先,我创建了一个价值 12
的承诺,它提供 CachingUpstream<T>
对象的自定义(更详细)实现。我采用了 Promise.value(12)
的主体,并且覆盖了内联方法 cacheResultIf(Predicate<? super ExecResult<T>> shouldCache)
,默认情况下 returns CachingUpstream<T>
instance:
Promise<Integer> p = new DefaultPromise<Integer>(down -> DefaultExecution.require().delimit(down::error, continuation ->
continuation.resume(() -> down.success(12))
)) {
@Override
public Promise<Integer> cacheResultIf(Predicate<? super ExecResult<Integer>> shouldCache) {
return transform(up -> {
return new TestCachingUpstream<>(up, shouldCache.function(Duration.ofSeconds(-1), Duration.ZERO));
});
}
};
接下来我创建了一个 class TestCachingUpstream<T>
只是通过复制原始 class 的主体,我添加了一些东西,例如
- 我让每个
TestCachingUpstream<T>
都具有内部 ID(随机 UUID),以便更轻松地跟踪承诺的执行情况。
- 当承诺执行期间发生特定事情时,我添加了一些详细的日志消息。
我没有更改方法的实现,我只是想跟踪执行流程并保持原始实现不变。我的自定义 class 看起来像这样:
private static class TestCachingUpstream<T> implements Upstream<T> {
private final String id = UUID.randomUUID().toString();
private Upstream<? extends T> upstream;
private final Clock clock;
private final AtomicReference<TestCachingUpstream.Cached<? extends T>> ref = new AtomicReference<>();
private final Function<? super ExecResult<T>, Duration> ttlFunc;
private final AtomicBoolean pending = new AtomicBoolean();
private final AtomicBoolean draining = new AtomicBoolean();
private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();
public TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
this(upstream, ttl, Clock.systemUTC());
}
@VisibleForTesting
TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
this.upstream = upstream;
this.ttlFunc = ttl;
this.clock = clock;
}
private void tryDrain() {
if (draining.compareAndSet(false, true)) {
try {
TestCachingUpstream.Cached<? extends T> cached = ref.get();
if (needsFetch(cached)) {
if (pending.compareAndSet(false, true)) {
Downstream<? super T> downstream = waiting.poll();
System.out.printf("[%s] [%s] no pending execution and downstream is %s and cached is %s...%n", id, Thread.currentThread().getName(), downstream == null ? "null" : "not null", cached);
if (downstream == null) {
pending.set(false);
} else {
try {
yield(downstream);
} catch (Throwable e) {
System.out.printf("[%s] [%s] calling receiveResult after catching exception %s%n", id, Thread.currentThread().getName(), e.getClass());
receiveResult(downstream, ExecResult.of(Result.error(e)));
}
}
}
} else {
System.out.printf("[%s] [%s] upstream does not need fetching...%n", id, Thread.currentThread().getName());
Downstream<? super T> downstream = waiting.poll();
while (downstream != null) {
downstream.accept(cached.result);
downstream = waiting.poll();
}
}
} finally {
draining.set(false);
}
}
if (!waiting.isEmpty() && !pending.get() && needsFetch(ref.get())) {
tryDrain();
}
}
private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
}
private void yield(final Downstream<? super T> downstream) throws Exception {
System.out.printf("[%s] [%s] calling yield... %s %n", id, Thread.currentThread().getName(), upstream == null ? "upstream is null..." : "");
upstream.connect(new Downstream<T>() {
public void error(Throwable throwable) {
System.out.printf("[%s] [%s] upstream.connect.error%n", id, Thread.currentThread().getName());
receiveResult(downstream, ExecResult.of(Result.<T>error(throwable)));
}
@Override
public void success(T value) {
System.out.printf("[%s] [%s] upstream.connect.success%n", id, Thread.currentThread().getName());
receiveResult(downstream, ExecResult.of(Result.success(value)));
}
@Override
public void complete() {
System.out.printf("[%s] [%s] upstream.connect.complete%n", id, Thread.currentThread().getName());
receiveResult(downstream, CompleteExecResult.get());
}
});
}
@Override
public void connect(Downstream<? super T> downstream) throws Exception {
TestCachingUpstream.Cached<? extends T> cached = this.ref.get();
if (needsFetch(cached)) {
Promise.<T>async(d -> {
waiting.add(d);
tryDrain();
}).result(downstream::accept);
} else {
downstream.accept(cached.result);
}
}
private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
Duration ttl = Duration.ofSeconds(0);
try {
ttl = ttlFunc.apply(result);
} catch (Throwable e) {
if (result.isError()) {
//noinspection ThrowableResultOfMethodCallIgnored
result.getThrowable().addSuppressed(e);
} else {
result = ExecResult.of(Result.error(e));
}
}
Instant expiresAt;
if (ttl.isNegative()) {
expiresAt = null; // eternal
System.out.printf("[%s] [%s] releasing upstream... (%s) %n", id, Thread.currentThread().getName(), result.toString());
upstream = null; // release
} else if (ttl.isZero()) {
expiresAt = clock.instant().minus(Duration.ofSeconds(1));
} else {
expiresAt = clock.instant().plus(ttl);
}
ref.set(new TestCachingUpstream.Cached<>(result, expiresAt));
pending.set(false);
downstream.accept(result);
tryDrain();
}
static class Cached<T> {
final ExecResult<T> result;
final Instant expireAt;
Cached(ExecResult<T> result, Instant expireAt) {
this.result = result;
this.expireAt = expireAt;
}
}
}
我已将 for 循环中的步骤数从 25 步减少到 3 步,以使控制台输出更加简洁。
测试执行成功(无竞争条件)
让我们看看正确执行的流程是什么样的:
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] no pending execution and downstream is not null and cached is null...
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] calling yield...
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] upstream.connect.success
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] upstream does not need fetching...
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] no pending execution and downstream is not null and cached is null...
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] calling yield...
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] upstream.connect.success
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] upstream does not need fetching...
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] no pending execution and downstream is not null and cached is null...
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] calling yield...
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] upstream.connect.success
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] upstream does not need fetching...
如您所见,每次迭代都会导致缓存的承诺生成 5 行控制台日志。
- 当第一次调用
tryDrain
方法时,没有缓存结果,它转到 yield(downstream);
方法调用
- 调用
yield(downstream)
成功完成,receiveResult(downstream, ExecResult.of(Result.success(value)));
从内部调用 success
回调
Promise.cache()
通过使用负持续时间来使用无限到期日期,这就是为什么 receiveResult()
方法 releases upstream
object by setting it's value to null
receiveResult()
方法在完成 sets cached result using ref
internal object 之前调用 tryDrain()
就在退出方法之前。
tryDrain()
方法 sees previously cached result 用于下一次调用缓存的承诺 (p.map(v -> v + 2)
),因此它将缓存的结果直接传递给下游。
对于在 for 循环中创建的所有 3 个 promise,都会重复这种情况。
测试执行失败(竞争条件)
运行 那些 System.out.printf()
的测试使测试失败的次数减少了几次,主要是因为这个 I/O 操作消耗了一些 CPU 周期和不同步的部分代码有几个周期以避免竞争条件。但是它仍然会发生,现在让我们看看失败测试的输出是什么样的:
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] no pending execution and downstream is not null and cached is null...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] calling yield...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] upstream.connect.success
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] no pending execution and downstream is not null and cached is null...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] calling yield... upstream is null...
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] no pending execution and downstream is not null and cached is null...
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] calling yield...
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] upstream.connect.success
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] upstream does not need fetching...
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] no pending execution and downstream is not null and cached is null...
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] calling yield...
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] upstream.connect.success
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] upstream does not need fetching...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] calling receiveResult after catching exception class java.lang.NullPointerException
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] releasing upstream... (ExecResult{complete=false, error=java.lang.NullPointerException, value=null})
java.lang.NullPointerException
at app.AnotherPromiseTest$TestCachingUpstream.yield(AnotherPromiseTest.java:120)
at app.AnotherPromiseTest$TestCachingUpstream.tryDrain(AnotherPromiseTest.java:89)
at app.AnotherPromiseTest$TestCachingUpstream.lambda$connect[=13=](AnotherPromiseTest.java:146)
at ratpack.exec.internal.DefaultExecution.lambda$null(DefaultExecution.java:122)
at ratpack.exec.internal.DefaultExecution$SingleEventExecStream.exec(DefaultExecution.java:489)
at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:216)
at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:209)
at ratpack.exec.internal.DefaultExecution.drain(DefaultExecution.java:179)
at ratpack.exec.internal.DefaultExecution.<init>(DefaultExecution.java:92)
at ratpack.exec.internal.DefaultExecController.lambda$start[=13=](DefaultExecController.java:195)
at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:309)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:886)
at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory.lambda$newThread[=13=](DefaultExecController.java:136)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
这是失败测试的输出 - 我在 IntelliJ IDEA 中 运行 它并且我已将此测试的执行配置为重复执行直到失败。我花了一些时间才让这个测试失败,但是在 运行 测试几次之后,它最终在迭代次数 1500 左右失败了。在这种情况下,我们可以看到竞争条件发生在 for-环形。您可以看到在 receiveResult()
方法
中释放上游对象后
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] releasing upstream... (ExecResult{complete=false, error=null, value=12})
并在退出方法前调用tryDrain
,下一次执行缓存的promise还没有看到之前缓存的结果,它运行又回到了yield(downstream)
方法。 upstream
对象已经通过将其值设置为 null
来释放。并且 yield(downstream)
方法期望上游对象被正确初始化,否则它会抛出 NPE。
我正在尝试调试方法:
private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
}
这是决定是否需要获取缓存的承诺的方法。但是,当我添加任何日志记录语句时,它开始导致 WhosebugError
。我猜测在极少数情况下 cached.expireAt.isBefore(clock.instant())
returns false
,因为 cached
对象来自 AtomicReference
所以这个对象应该在方法执行之间正确传递。
这是我在实验中使用的完整测试 class:
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.internal.PlatformDependent;
import org.junit.Test;
import ratpack.exec.*;
import ratpack.exec.internal.CompleteExecResult;
import ratpack.exec.internal.DefaultExecution;
import ratpack.exec.internal.DefaultPromise;
import ratpack.exec.util.ParallelBatch;
import ratpack.func.Function;
import ratpack.func.Predicate;
import ratpack.test.exec.ExecHarness;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class AnotherPromiseTest {
@Test
public void foo() throws Exception {
List<Promise<Integer>> promises = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Promise<Integer> p = new DefaultPromise<Integer>(down -> DefaultExecution.require().delimit(down::error, continuation ->
continuation.resume(() -> down.success(12))
)) {
@Override
public Promise<Integer> cacheResultIf(Predicate<? super ExecResult<Integer>> shouldCache) {
return transform(up -> {
return new TestCachingUpstream<>(up, shouldCache.function(Duration.ofSeconds(-1), Duration.ZERO));
});
}
};
p = p.cache();
promises.add(p.map(v -> v + 1));
promises.add(p.map(v -> v + 2));
}
ExecHarness.yieldSingle(c -> ParallelBatch.of(promises).yield()).getValueOrThrow();
}
private static class TestCachingUpstream<T> implements Upstream<T> {
private final String id = UUID.randomUUID().toString();
private Upstream<? extends T> upstream;
private final Clock clock;
private final AtomicReference<TestCachingUpstream.Cached<? extends T>> ref = new AtomicReference<>();
private final Function<? super ExecResult<T>, Duration> ttlFunc;
private final AtomicBoolean pending = new AtomicBoolean();
private final AtomicBoolean draining = new AtomicBoolean();
private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();
public TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
this(upstream, ttl, Clock.systemUTC());
}
@VisibleForTesting
TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
this.upstream = upstream;
this.ttlFunc = ttl;
this.clock = clock;
}
private void tryDrain() {
if (draining.compareAndSet(false, true)) {
try {
TestCachingUpstream.Cached<? extends T> cached = ref.get();
if (needsFetch(cached)) {
if (pending.compareAndSet(false, true)) {
Downstream<? super T> downstream = waiting.poll();
System.out.printf("[%s] [%s] no pending execution and downstream is %s and cached is %s...%n", id, Thread.currentThread().getName(), downstream == null ? "null" : "not null", cached);
if (downstream == null) {
pending.set(false);
} else {
try {
yield(downstream);
} catch (Throwable e) {
System.out.printf("[%s] [%s] calling receiveResult after catching exception %s%n", id, Thread.currentThread().getName(), e.getClass());
receiveResult(downstream, ExecResult.of(Result.error(e)));
}
}
}
} else {
System.out.printf("[%s] [%s] upstream does not need fetching...%n", id, Thread.currentThread().getName());
Downstream<? super T> downstream = waiting.poll();
while (downstream != null) {
downstream.accept(cached.result);
downstream = waiting.poll();
}
}
} finally {
draining.set(false);
}
}
if (!waiting.isEmpty() && !pending.get() && needsFetch(ref.get())) {
tryDrain();
}
}
private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
}
private void yield(final Downstream<? super T> downstream) throws Exception {
System.out.printf("[%s] [%s] calling yield... %s %n", id, Thread.currentThread().getName(), upstream == null ? "upstream is null..." : "");
upstream.connect(new Downstream<T>() {
public void error(Throwable throwable) {
System.out.printf("[%s] [%s] upstream.connect.error%n", id, Thread.currentThread().getName());
receiveResult(downstream, ExecResult.of(Result.<T>error(throwable)));
}
@Override
public void success(T value) {
System.out.printf("[%s] [%s] upstream.connect.success%n", id, Thread.currentThread().getName());
receiveResult(downstream, ExecResult.of(Result.success(value)));
}
@Override
public void complete() {
System.out.printf("[%s] [%s] upstream.connect.complete%n", id, Thread.currentThread().getName());
receiveResult(downstream, CompleteExecResult.get());
}
});
}
@Override
public void connect(Downstream<? super T> downstream) throws Exception {
TestCachingUpstream.Cached<? extends T> cached = this.ref.get();
if (needsFetch(cached)) {
Promise.<T>async(d -> {
waiting.add(d);
tryDrain();
}).result(downstream::accept);
} else {
downstream.accept(cached.result);
}
}
private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
Duration ttl = Duration.ofSeconds(0);
try {
ttl = ttlFunc.apply(result);
} catch (Throwable e) {
if (result.isError()) {
//noinspection ThrowableResultOfMethodCallIgnored
result.getThrowable().addSuppressed(e);
} else {
result = ExecResult.of(Result.error(e));
}
}
Instant expiresAt;
if (ttl.isNegative()) {
expiresAt = null; // eternal
System.out.printf("[%s] [%s] releasing upstream... (%s) %n", id, Thread.currentThread().getName(), result.toString());
upstream = null; // release
} else if (ttl.isZero()) {
expiresAt = clock.instant().minus(Duration.ofSeconds(1));
} else {
expiresAt = clock.instant().plus(ttl);
}
ref.set(new TestCachingUpstream.Cached<>(result, expiresAt));
pending.set(false);
downstream.accept(result);
tryDrain();
}
static class Cached<T> {
final ExecResult<T> result;
final Instant expireAt;
Cached(ExecResult<T> result, Instant expireAt) {
this.result = result;
this.expireAt = expireAt;
}
}
}
}
希望对您有所帮助。
当我将 Ratpack 的 Promise.cache
与多个下游承诺和 ParallelBatch
结合使用时,我 运行 进入了 Ratpack 的内部 NullPointerException
,目前还不清楚从文档中向我询问我的用法是否不正确,或者这是否代表 Ratpack 中的错误。
这是一个演示问题的简化测试用例:
@Test
public void foo() throws Exception {
List<Promise<Integer>> promises = new ArrayList<>();
for (int i = 0; i < 25; i++) {
Promise<Integer> p = Promise.value(12);
p = p.cache();
promises.add(p.map(v -> v + 1));
promises.add(p.map(v -> v + 2));
}
final List<Integer> results = ExecHarness.yieldSingle(c ->
ParallelBatch.of(promises).yield()
).getValueOrThrow();
}
运行 本地测试 10000 次导致失败率约为 10 / 10000,NullPointerException
如下所示:
java.lang.NullPointerException
at ratpack.exec.internal.CachingUpstream.yield(CachingUpstream.java:93)
at ratpack.exec.internal.CachingUpstream.tryDrain(CachingUpstream.java:65)
at ratpack.exec.internal.CachingUpstream.lambda$connect[=12=](CachingUpstream.java:116)
at ratpack.exec.internal.CachingUpstream$$Lambda/1438461739.connect(Unknown Source)
at ratpack.exec.internal.DefaultExecution.lambda$null(DefaultExecution.java:122)
at ratpack.exec.internal.DefaultExecution$$Lambda/2092087501.execute(Unknown Source)
at ratpack.exec.internal.DefaultExecution$SingleEventExecStream.exec(DefaultExecution.java:489)
at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:216)
at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:209)
at ratpack.exec.internal.DefaultExecution.drain(DefaultExecution.java:179)
at ratpack.exec.internal.DefaultExecution.<init>(DefaultExecution.java:92)
at ratpack.exec.internal.DefaultExecController.lambda$start[=12=](DefaultExecController.java:195)
at ratpack.exec.internal.DefaultExecController$$Lambda/1411892748.call(Unknown Source)
at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:886)
at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory.lambda$newThread[=12=](DefaultExecController.java:136)
at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory$$Lambda/1157058691.run(Unknown Source)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
在这个测试用例中不使用 cache
会使问题消失,因为没有订阅每个缓存的承诺两次。
我的问题是:这是对 Ratpack API 的错误使用,还是它代表了框架中的错误?如果是前者,你能指出文档中解释为什么这种用法错误的内容吗?
即使您的示例不是缓存承诺的最佳用例(重新创建和缓存每个迭代步骤都具有相同值的承诺没有多大意义),您实际上已经在 CachingUpstream<T>
class.
我做了一些实验来弄清楚发生了什么,这是我的发现。首先,我创建了一个价值 12
的承诺,它提供 CachingUpstream<T>
对象的自定义(更详细)实现。我采用了 Promise.value(12)
的主体,并且覆盖了内联方法 cacheResultIf(Predicate<? super ExecResult<T>> shouldCache)
,默认情况下 returns CachingUpstream<T>
instance:
Promise<Integer> p = new DefaultPromise<Integer>(down -> DefaultExecution.require().delimit(down::error, continuation ->
continuation.resume(() -> down.success(12))
)) {
@Override
public Promise<Integer> cacheResultIf(Predicate<? super ExecResult<Integer>> shouldCache) {
return transform(up -> {
return new TestCachingUpstream<>(up, shouldCache.function(Duration.ofSeconds(-1), Duration.ZERO));
});
}
};
接下来我创建了一个 class TestCachingUpstream<T>
只是通过复制原始 class 的主体,我添加了一些东西,例如
- 我让每个
TestCachingUpstream<T>
都具有内部 ID(随机 UUID),以便更轻松地跟踪承诺的执行情况。 - 当承诺执行期间发生特定事情时,我添加了一些详细的日志消息。
我没有更改方法的实现,我只是想跟踪执行流程并保持原始实现不变。我的自定义 class 看起来像这样:
private static class TestCachingUpstream<T> implements Upstream<T> {
private final String id = UUID.randomUUID().toString();
private Upstream<? extends T> upstream;
private final Clock clock;
private final AtomicReference<TestCachingUpstream.Cached<? extends T>> ref = new AtomicReference<>();
private final Function<? super ExecResult<T>, Duration> ttlFunc;
private final AtomicBoolean pending = new AtomicBoolean();
private final AtomicBoolean draining = new AtomicBoolean();
private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();
public TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
this(upstream, ttl, Clock.systemUTC());
}
@VisibleForTesting
TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
this.upstream = upstream;
this.ttlFunc = ttl;
this.clock = clock;
}
private void tryDrain() {
if (draining.compareAndSet(false, true)) {
try {
TestCachingUpstream.Cached<? extends T> cached = ref.get();
if (needsFetch(cached)) {
if (pending.compareAndSet(false, true)) {
Downstream<? super T> downstream = waiting.poll();
System.out.printf("[%s] [%s] no pending execution and downstream is %s and cached is %s...%n", id, Thread.currentThread().getName(), downstream == null ? "null" : "not null", cached);
if (downstream == null) {
pending.set(false);
} else {
try {
yield(downstream);
} catch (Throwable e) {
System.out.printf("[%s] [%s] calling receiveResult after catching exception %s%n", id, Thread.currentThread().getName(), e.getClass());
receiveResult(downstream, ExecResult.of(Result.error(e)));
}
}
}
} else {
System.out.printf("[%s] [%s] upstream does not need fetching...%n", id, Thread.currentThread().getName());
Downstream<? super T> downstream = waiting.poll();
while (downstream != null) {
downstream.accept(cached.result);
downstream = waiting.poll();
}
}
} finally {
draining.set(false);
}
}
if (!waiting.isEmpty() && !pending.get() && needsFetch(ref.get())) {
tryDrain();
}
}
private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
}
private void yield(final Downstream<? super T> downstream) throws Exception {
System.out.printf("[%s] [%s] calling yield... %s %n", id, Thread.currentThread().getName(), upstream == null ? "upstream is null..." : "");
upstream.connect(new Downstream<T>() {
public void error(Throwable throwable) {
System.out.printf("[%s] [%s] upstream.connect.error%n", id, Thread.currentThread().getName());
receiveResult(downstream, ExecResult.of(Result.<T>error(throwable)));
}
@Override
public void success(T value) {
System.out.printf("[%s] [%s] upstream.connect.success%n", id, Thread.currentThread().getName());
receiveResult(downstream, ExecResult.of(Result.success(value)));
}
@Override
public void complete() {
System.out.printf("[%s] [%s] upstream.connect.complete%n", id, Thread.currentThread().getName());
receiveResult(downstream, CompleteExecResult.get());
}
});
}
@Override
public void connect(Downstream<? super T> downstream) throws Exception {
TestCachingUpstream.Cached<? extends T> cached = this.ref.get();
if (needsFetch(cached)) {
Promise.<T>async(d -> {
waiting.add(d);
tryDrain();
}).result(downstream::accept);
} else {
downstream.accept(cached.result);
}
}
private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
Duration ttl = Duration.ofSeconds(0);
try {
ttl = ttlFunc.apply(result);
} catch (Throwable e) {
if (result.isError()) {
//noinspection ThrowableResultOfMethodCallIgnored
result.getThrowable().addSuppressed(e);
} else {
result = ExecResult.of(Result.error(e));
}
}
Instant expiresAt;
if (ttl.isNegative()) {
expiresAt = null; // eternal
System.out.printf("[%s] [%s] releasing upstream... (%s) %n", id, Thread.currentThread().getName(), result.toString());
upstream = null; // release
} else if (ttl.isZero()) {
expiresAt = clock.instant().minus(Duration.ofSeconds(1));
} else {
expiresAt = clock.instant().plus(ttl);
}
ref.set(new TestCachingUpstream.Cached<>(result, expiresAt));
pending.set(false);
downstream.accept(result);
tryDrain();
}
static class Cached<T> {
final ExecResult<T> result;
final Instant expireAt;
Cached(ExecResult<T> result, Instant expireAt) {
this.result = result;
this.expireAt = expireAt;
}
}
}
我已将 for 循环中的步骤数从 25 步减少到 3 步,以使控制台输出更加简洁。
测试执行成功(无竞争条件)
让我们看看正确执行的流程是什么样的:
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] no pending execution and downstream is not null and cached is null...
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] calling yield...
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] upstream.connect.success
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] upstream does not need fetching...
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] no pending execution and downstream is not null and cached is null...
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] calling yield...
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] upstream.connect.success
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] upstream does not need fetching...
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] no pending execution and downstream is not null and cached is null...
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] calling yield...
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] upstream.connect.success
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] upstream does not need fetching...
如您所见,每次迭代都会导致缓存的承诺生成 5 行控制台日志。
- 当第一次调用
tryDrain
方法时,没有缓存结果,它转到yield(downstream);
方法调用 - 调用
yield(downstream)
成功完成,receiveResult(downstream, ExecResult.of(Result.success(value)));
从内部调用success
回调 Promise.cache()
通过使用负持续时间来使用无限到期日期,这就是为什么receiveResult()
方法 releasesupstream
object by setting it's value tonull
receiveResult()
方法在完成 sets cached result usingref
internal object 之前调用tryDrain()
就在退出方法之前。tryDrain()
方法 sees previously cached result 用于下一次调用缓存的承诺 (p.map(v -> v + 2)
),因此它将缓存的结果直接传递给下游。
对于在 for 循环中创建的所有 3 个 promise,都会重复这种情况。
测试执行失败(竞争条件)
运行 那些 System.out.printf()
的测试使测试失败的次数减少了几次,主要是因为这个 I/O 操作消耗了一些 CPU 周期和不同步的部分代码有几个周期以避免竞争条件。但是它仍然会发生,现在让我们看看失败测试的输出是什么样的:
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] no pending execution and downstream is not null and cached is null...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] calling yield...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] upstream.connect.success
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] no pending execution and downstream is not null and cached is null...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] calling yield... upstream is null...
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] no pending execution and downstream is not null and cached is null...
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] calling yield...
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] upstream.connect.success
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] upstream does not need fetching...
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] no pending execution and downstream is not null and cached is null...
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] calling yield...
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] upstream.connect.success
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] upstream does not need fetching...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] calling receiveResult after catching exception class java.lang.NullPointerException
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] releasing upstream... (ExecResult{complete=false, error=java.lang.NullPointerException, value=null})
java.lang.NullPointerException
at app.AnotherPromiseTest$TestCachingUpstream.yield(AnotherPromiseTest.java:120)
at app.AnotherPromiseTest$TestCachingUpstream.tryDrain(AnotherPromiseTest.java:89)
at app.AnotherPromiseTest$TestCachingUpstream.lambda$connect[=13=](AnotherPromiseTest.java:146)
at ratpack.exec.internal.DefaultExecution.lambda$null(DefaultExecution.java:122)
at ratpack.exec.internal.DefaultExecution$SingleEventExecStream.exec(DefaultExecution.java:489)
at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:216)
at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:209)
at ratpack.exec.internal.DefaultExecution.drain(DefaultExecution.java:179)
at ratpack.exec.internal.DefaultExecution.<init>(DefaultExecution.java:92)
at ratpack.exec.internal.DefaultExecController.lambda$start[=13=](DefaultExecController.java:195)
at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:309)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:886)
at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory.lambda$newThread[=13=](DefaultExecController.java:136)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
这是失败测试的输出 - 我在 IntelliJ IDEA 中 运行 它并且我已将此测试的执行配置为重复执行直到失败。我花了一些时间才让这个测试失败,但是在 运行 测试几次之后,它最终在迭代次数 1500 左右失败了。在这种情况下,我们可以看到竞争条件发生在 for-环形。您可以看到在 receiveResult()
方法
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] releasing upstream... (ExecResult{complete=false, error=null, value=12})
并在退出方法前调用tryDrain
,下一次执行缓存的promise还没有看到之前缓存的结果,它运行又回到了yield(downstream)
方法。 upstream
对象已经通过将其值设置为 null
来释放。并且 yield(downstream)
方法期望上游对象被正确初始化,否则它会抛出 NPE。
我正在尝试调试方法:
private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
}
这是决定是否需要获取缓存的承诺的方法。但是,当我添加任何日志记录语句时,它开始导致 WhosebugError
。我猜测在极少数情况下 cached.expireAt.isBefore(clock.instant())
returns false
,因为 cached
对象来自 AtomicReference
所以这个对象应该在方法执行之间正确传递。
这是我在实验中使用的完整测试 class:
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.internal.PlatformDependent;
import org.junit.Test;
import ratpack.exec.*;
import ratpack.exec.internal.CompleteExecResult;
import ratpack.exec.internal.DefaultExecution;
import ratpack.exec.internal.DefaultPromise;
import ratpack.exec.util.ParallelBatch;
import ratpack.func.Function;
import ratpack.func.Predicate;
import ratpack.test.exec.ExecHarness;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class AnotherPromiseTest {
@Test
public void foo() throws Exception {
List<Promise<Integer>> promises = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Promise<Integer> p = new DefaultPromise<Integer>(down -> DefaultExecution.require().delimit(down::error, continuation ->
continuation.resume(() -> down.success(12))
)) {
@Override
public Promise<Integer> cacheResultIf(Predicate<? super ExecResult<Integer>> shouldCache) {
return transform(up -> {
return new TestCachingUpstream<>(up, shouldCache.function(Duration.ofSeconds(-1), Duration.ZERO));
});
}
};
p = p.cache();
promises.add(p.map(v -> v + 1));
promises.add(p.map(v -> v + 2));
}
ExecHarness.yieldSingle(c -> ParallelBatch.of(promises).yield()).getValueOrThrow();
}
private static class TestCachingUpstream<T> implements Upstream<T> {
private final String id = UUID.randomUUID().toString();
private Upstream<? extends T> upstream;
private final Clock clock;
private final AtomicReference<TestCachingUpstream.Cached<? extends T>> ref = new AtomicReference<>();
private final Function<? super ExecResult<T>, Duration> ttlFunc;
private final AtomicBoolean pending = new AtomicBoolean();
private final AtomicBoolean draining = new AtomicBoolean();
private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();
public TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
this(upstream, ttl, Clock.systemUTC());
}
@VisibleForTesting
TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
this.upstream = upstream;
this.ttlFunc = ttl;
this.clock = clock;
}
private void tryDrain() {
if (draining.compareAndSet(false, true)) {
try {
TestCachingUpstream.Cached<? extends T> cached = ref.get();
if (needsFetch(cached)) {
if (pending.compareAndSet(false, true)) {
Downstream<? super T> downstream = waiting.poll();
System.out.printf("[%s] [%s] no pending execution and downstream is %s and cached is %s...%n", id, Thread.currentThread().getName(), downstream == null ? "null" : "not null", cached);
if (downstream == null) {
pending.set(false);
} else {
try {
yield(downstream);
} catch (Throwable e) {
System.out.printf("[%s] [%s] calling receiveResult after catching exception %s%n", id, Thread.currentThread().getName(), e.getClass());
receiveResult(downstream, ExecResult.of(Result.error(e)));
}
}
}
} else {
System.out.printf("[%s] [%s] upstream does not need fetching...%n", id, Thread.currentThread().getName());
Downstream<? super T> downstream = waiting.poll();
while (downstream != null) {
downstream.accept(cached.result);
downstream = waiting.poll();
}
}
} finally {
draining.set(false);
}
}
if (!waiting.isEmpty() && !pending.get() && needsFetch(ref.get())) {
tryDrain();
}
}
private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
}
private void yield(final Downstream<? super T> downstream) throws Exception {
System.out.printf("[%s] [%s] calling yield... %s %n", id, Thread.currentThread().getName(), upstream == null ? "upstream is null..." : "");
upstream.connect(new Downstream<T>() {
public void error(Throwable throwable) {
System.out.printf("[%s] [%s] upstream.connect.error%n", id, Thread.currentThread().getName());
receiveResult(downstream, ExecResult.of(Result.<T>error(throwable)));
}
@Override
public void success(T value) {
System.out.printf("[%s] [%s] upstream.connect.success%n", id, Thread.currentThread().getName());
receiveResult(downstream, ExecResult.of(Result.success(value)));
}
@Override
public void complete() {
System.out.printf("[%s] [%s] upstream.connect.complete%n", id, Thread.currentThread().getName());
receiveResult(downstream, CompleteExecResult.get());
}
});
}
@Override
public void connect(Downstream<? super T> downstream) throws Exception {
TestCachingUpstream.Cached<? extends T> cached = this.ref.get();
if (needsFetch(cached)) {
Promise.<T>async(d -> {
waiting.add(d);
tryDrain();
}).result(downstream::accept);
} else {
downstream.accept(cached.result);
}
}
private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
Duration ttl = Duration.ofSeconds(0);
try {
ttl = ttlFunc.apply(result);
} catch (Throwable e) {
if (result.isError()) {
//noinspection ThrowableResultOfMethodCallIgnored
result.getThrowable().addSuppressed(e);
} else {
result = ExecResult.of(Result.error(e));
}
}
Instant expiresAt;
if (ttl.isNegative()) {
expiresAt = null; // eternal
System.out.printf("[%s] [%s] releasing upstream... (%s) %n", id, Thread.currentThread().getName(), result.toString());
upstream = null; // release
} else if (ttl.isZero()) {
expiresAt = clock.instant().minus(Duration.ofSeconds(1));
} else {
expiresAt = clock.instant().plus(ttl);
}
ref.set(new TestCachingUpstream.Cached<>(result, expiresAt));
pending.set(false);
downstream.accept(result);
tryDrain();
}
static class Cached<T> {
final ExecResult<T> result;
final Instant expireAt;
Cached(ExecResult<T> result, Instant expireAt) {
this.result = result;
this.expireAt = expireAt;
}
}
}
}
希望对您有所帮助。