KafkaTemplate 发送方法可以安全使用,无需手动(阻塞)检查返回的未来

KafkaTemplate send method safe to use without manual (blocking) check of returned future

我使用 spring-kafka 的 KafkaTemplate 向 kafka 生成一些消息。

我有一个简单的休息 API 可用于创建消息。 在我的代码中,我使用这样的 KafkaTemplate 向 kafka 生成消息。

kafkaTemplate.send("topic", "key", "data")

对我来说重要的是,只有当消息真正发送时,我才将成功返回给客户端。

但现在我意识到该方法至少是部分异步的。 该方法的签名是:

    public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {

所以它 returns 未来和未来最终可能会在解决时调用失败回调。

我在spring-kafka(版本2.2.6)的java代码中稍微看了下,似乎有一些错误会被直接抛出并且有些只能通过解决未来才能获得,还有一个 javadoc 看起来像:

            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly

我的主要问题是:我是否必须以阻塞方式解决未来(包括 API 异常)以查明发送时是否出现问题?这样我就可以 100% 确定我的消息已正确发送?

或者当 send 方法不抛出任何异常时,发送本身是否已经得到保证,并且未来的错误只是关于接收回一些元信息的问题? (或类似的东西)。

如果您不想阻塞获取以检测失败,另一种选择是向未来添加回调以异步获取结果。 (这是一个ListenableFuture)。

public interface ListenableFutureCallback<T> extends SuccessCallback<T>, FailureCallback {

}

@FunctionalInterface
public interface SuccessCallback<T> {

    /**
     * Called when the {@link ListenableFuture} completes with success.
     * <p>Note that Exceptions raised by this method are ignored.
     * @param result the result
     */
    void onSuccess(@Nullable T result);

}

@FunctionalInterface
public interface FailureCallback {

    /**
     * Called when the {@link ListenableFuture} completes with failure.
     * <p>Note that Exceptions raised by this method are ignored.
     * @param ex the failure
     */
    void onFailure(Throwable ex);

}

如果你只是发送(并祈祷),并不能保证一定会成功。

编辑

调用线程抛出客户端异常(例如序列化),但服务器端将在以后异步完成。

有许多服务器端错误可能会导致异步失败;请参阅 Errors class...

/**
 * This class contains all the client-server errors--those errors that must be sent from the server to the client. These
 * are thus part of the protocol. The names can be changed but the error code cannot.
 *
 * Note that client library will convert an unknown error code to the non-retriable UnknownServerException if the client library
 * version is old and does not recognize the newly-added error code. Therefore when a new server-side error is added,
 * we may need extra logic to convert the new error code to another existing error code before sending the response back to
 * the client if the request version suggests that the client may not recognize the new error code.
 *
 * Do not add exceptions that occur only on the client or only on the server here.
 */
public enum Errors {
    UNKNOWN_SERVER_ERROR(-1, "The server experienced an unexpected error when processing the request.",
            UnknownServerException::new),
    NONE(0, null, message -> null),
    OFFSET_OUT_OF_RANGE(1, "The requested offset is not within the range of offsets maintained by the server.",
            OffsetOutOfRangeException::new),
    CORRUPT_MESSAGE(2, "This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt.",
            CorruptRecordException::new),
    UNKNOWN_TOPIC_OR_PARTITION(3, "This server does not host this topic-partition.",
            UnknownTopicOrPartitionException::new),
    INVALID_FETCH_SIZE(4, "The requested fetch size is invalid.",
            InvalidFetchSizeException::new),
    LEADER_NOT_AVAILABLE(5, "There is no leader for this topic-partition as we are in the middle of a leadership election.",
            LeaderNotAvailableException::new),
...