KafkaTemplate 发送方法可以安全使用,无需手动(阻塞)检查返回的未来
KafkaTemplate send method safe to use without manual (blocking) check of returned future
我使用 spring-kafka
的 KafkaTemplate 向 kafka 生成一些消息。
我有一个简单的休息 API 可用于创建消息。
在我的代码中,我使用这样的 KafkaTemplate 向 kafka 生成消息。
kafkaTemplate.send("topic", "key", "data")
对我来说重要的是,只有当消息真正发送时,我才将成功返回给客户端。
但现在我意识到该方法至少是部分异步的。
该方法的签名是:
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
所以它 returns 未来和未来最终可能会在解决时调用失败回调。
我在spring-kafka(版本2.2.6)的java代码中稍微看了下,似乎有一些错误会被直接抛出并且有些只能通过解决未来才能获得,还有一个 javadoc 看起来像:
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
我的主要问题是:我是否必须以阻塞方式解决未来(包括 API 异常)以查明发送时是否出现问题?这样我就可以 100% 确定我的消息已正确发送?
或者当 send
方法不抛出任何异常时,发送本身是否已经得到保证,并且未来的错误只是关于接收回一些元信息的问题? (或类似的东西)。
如果您不想阻塞获取以检测失败,另一种选择是向未来添加回调以异步获取结果。 (这是一个ListenableFuture
)。
public interface ListenableFutureCallback<T> extends SuccessCallback<T>, FailureCallback {
}
@FunctionalInterface
public interface SuccessCallback<T> {
/**
* Called when the {@link ListenableFuture} completes with success.
* <p>Note that Exceptions raised by this method are ignored.
* @param result the result
*/
void onSuccess(@Nullable T result);
}
@FunctionalInterface
public interface FailureCallback {
/**
* Called when the {@link ListenableFuture} completes with failure.
* <p>Note that Exceptions raised by this method are ignored.
* @param ex the failure
*/
void onFailure(Throwable ex);
}
如果你只是发送(并祈祷),并不能保证一定会成功。
编辑
调用线程抛出客户端异常(例如序列化),但服务器端将在以后异步完成。
有许多服务器端错误可能会导致异步失败;请参阅 Errors
class...
/**
* This class contains all the client-server errors--those errors that must be sent from the server to the client. These
* are thus part of the protocol. The names can be changed but the error code cannot.
*
* Note that client library will convert an unknown error code to the non-retriable UnknownServerException if the client library
* version is old and does not recognize the newly-added error code. Therefore when a new server-side error is added,
* we may need extra logic to convert the new error code to another existing error code before sending the response back to
* the client if the request version suggests that the client may not recognize the new error code.
*
* Do not add exceptions that occur only on the client or only on the server here.
*/
public enum Errors {
UNKNOWN_SERVER_ERROR(-1, "The server experienced an unexpected error when processing the request.",
UnknownServerException::new),
NONE(0, null, message -> null),
OFFSET_OUT_OF_RANGE(1, "The requested offset is not within the range of offsets maintained by the server.",
OffsetOutOfRangeException::new),
CORRUPT_MESSAGE(2, "This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt.",
CorruptRecordException::new),
UNKNOWN_TOPIC_OR_PARTITION(3, "This server does not host this topic-partition.",
UnknownTopicOrPartitionException::new),
INVALID_FETCH_SIZE(4, "The requested fetch size is invalid.",
InvalidFetchSizeException::new),
LEADER_NOT_AVAILABLE(5, "There is no leader for this topic-partition as we are in the middle of a leadership election.",
LeaderNotAvailableException::new),
...
我使用 spring-kafka
的 KafkaTemplate 向 kafka 生成一些消息。
我有一个简单的休息 API 可用于创建消息。 在我的代码中,我使用这样的 KafkaTemplate 向 kafka 生成消息。
kafkaTemplate.send("topic", "key", "data")
对我来说重要的是,只有当消息真正发送时,我才将成功返回给客户端。
但现在我意识到该方法至少是部分异步的。 该方法的签名是:
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
所以它 returns 未来和未来最终可能会在解决时调用失败回调。
我在spring-kafka(版本2.2.6)的java代码中稍微看了下,似乎有一些错误会被直接抛出并且有些只能通过解决未来才能获得,还有一个 javadoc 看起来像:
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
我的主要问题是:我是否必须以阻塞方式解决未来(包括 API 异常)以查明发送时是否出现问题?这样我就可以 100% 确定我的消息已正确发送?
或者当 send
方法不抛出任何异常时,发送本身是否已经得到保证,并且未来的错误只是关于接收回一些元信息的问题? (或类似的东西)。
如果您不想阻塞获取以检测失败,另一种选择是向未来添加回调以异步获取结果。 (这是一个ListenableFuture
)。
public interface ListenableFutureCallback<T> extends SuccessCallback<T>, FailureCallback {
}
@FunctionalInterface
public interface SuccessCallback<T> {
/**
* Called when the {@link ListenableFuture} completes with success.
* <p>Note that Exceptions raised by this method are ignored.
* @param result the result
*/
void onSuccess(@Nullable T result);
}
@FunctionalInterface
public interface FailureCallback {
/**
* Called when the {@link ListenableFuture} completes with failure.
* <p>Note that Exceptions raised by this method are ignored.
* @param ex the failure
*/
void onFailure(Throwable ex);
}
如果你只是发送(并祈祷),并不能保证一定会成功。
编辑
调用线程抛出客户端异常(例如序列化),但服务器端将在以后异步完成。
有许多服务器端错误可能会导致异步失败;请参阅 Errors
class...
/**
* This class contains all the client-server errors--those errors that must be sent from the server to the client. These
* are thus part of the protocol. The names can be changed but the error code cannot.
*
* Note that client library will convert an unknown error code to the non-retriable UnknownServerException if the client library
* version is old and does not recognize the newly-added error code. Therefore when a new server-side error is added,
* we may need extra logic to convert the new error code to another existing error code before sending the response back to
* the client if the request version suggests that the client may not recognize the new error code.
*
* Do not add exceptions that occur only on the client or only on the server here.
*/
public enum Errors {
UNKNOWN_SERVER_ERROR(-1, "The server experienced an unexpected error when processing the request.",
UnknownServerException::new),
NONE(0, null, message -> null),
OFFSET_OUT_OF_RANGE(1, "The requested offset is not within the range of offsets maintained by the server.",
OffsetOutOfRangeException::new),
CORRUPT_MESSAGE(2, "This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt.",
CorruptRecordException::new),
UNKNOWN_TOPIC_OR_PARTITION(3, "This server does not host this topic-partition.",
UnknownTopicOrPartitionException::new),
INVALID_FETCH_SIZE(4, "The requested fetch size is invalid.",
InvalidFetchSizeException::new),
LEADER_NOT_AVAILABLE(5, "There is no leader for this topic-partition as we are in the middle of a leadership election.",
LeaderNotAvailableException::new),
...