等待 x 毫秒或直到条件变为真

Wait x milliseconds or until a condition becomes true

我有一个代码,我将数据发送到我们的队列,然后队列发回确认,说他们已经收到数据,所以我等待 X 时间,然后再检查他们是否收到数据。下面是执行此操作并且有效的代码:

  public boolean send(final long address, final byte[] records, final Socket socket) {
    boolean sent = sendAsync(address, records, socket, true);
    if (sent) {
      try {
        TimeUnit.MILLISECONDS.sleep(800);
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
      }
    }
    // if key is not present, then acknowledgement was received successfully
    sent = !acknowledgementCache.asMap().containsKey(address);
    // and key is still present in the cache, then it means acknowledgment was not received after
    // waiting for timeout period, so we will remove it from cache.
    if (!sent)
      removeFromRetryBucket(address);
    return sent;
  }

现在上面代码的问题是 - 我等待 800 milliseconds 无论如何,这是错误的。确认可能会在 100 毫秒后返回,但我仍在等待 800,所以我想在确认返回后立即 return,而不是等待那 X 时间。

所以我想出了下面使用 awaitility 的代码,但由于某种原因它没有按预期工作。意思是,即使确认很快返回,它仍然超时。我也尝试将超时值增加到非常高的数字,但它仍然超时,所以看起来有问题。有没有更好的方法来做到这一点?

  public boolean send(final long address, final byte[] records, final Socket socket) {
    boolean sent = sendAsync(address, records, socket, true);
    if (sent) {
      try {
        // if key is not present, then acknowledgement was received successfully
        Awaitility.await().atMost(800, TimeUnit.MILLISECONDS)
            .untilTrue(new AtomicBoolean(!acknowledgementCache.asMap().containsKey(address)));
        return true;
      } catch (ConditionTimeoutException ex) {
      }
    }
    // and key is still present in the cache, then it means acknowledgment was not received after
    // waiting for timeout period, so we will remove it from cache.
    removeFromRetryBucket(address);
    return false;
  }

注意:我目前正在使用 Java 7。我确实可以访问 Guava,所以如果除了等待之外还有更好的东西,那么我也可以使用它。

为了能够在 Java 7 中检查您需要编写一个可调用对象。

@Test
public void send() {
    //when
    boolean sent = sendAsync(address, records, socket, true);
    //then
    if (sent) {
        await().until(receivedPackageCount(), equalTo(false));
    }
}

private Callable receivedPackageCount(String address) {
    return new Callable() {
        @Override
        public boolean call() throws Exception {
            return acknowledgementCache.asMap().containsKey(address);
        }
    };
}

肯定和上面类似。可能会有编译错误,因为我写的时候没有 ide.