Spring Kafka 异步发送调用块
Spring Kafka asynchronous send calls block
我正在使用 Spring-Kafka 版本 1.2.1,当 Kafka 服务器为 down/unreachable 时,异步发送调用会阻塞一段时间。这似乎是 TCP 超时。代码是这样的:
ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message);
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() {
@Override
public void onSuccess(SendResult<K, V> result) {
...
}
@Override
public void onFailure(Throwable ex) {
...
}
});
我快速浏览了 Spring-Kafka 代码,它似乎只是将任务传递给 kafka 客户端库,将回调交互转换为未来的对象交互。查看kafka客户端库,代码变得更加复杂,我没有花时间去理解它,但我猜它可能在同一个线程中进行远程调用(至少是元数据?)。
作为用户,我希望 Spring-Kafka 方法 return 将来会立即 return,即使无法访问远程 kafka 服务器。
如果我的理解有误或者这是一个错误,欢迎任何确认。我最终暂时将其设为异步。
还有一个问题是Spring-Kafka的文档一开始就说它提供了同步和异步发送方法。我找不到任何不 return 期货的方法,也许文档需要更新。
如果需要,我很乐意提供更多详细信息。谢谢
只是为了确定。您是否应用了@EnableAsync 注释?我想说这可能是指定 Future<>
行为的关键
除了配置上的 @EnableAsync 注释 class,还需要在调用此代码的方法上使用 @Async 注释。
http://www.baeldung.com/spring-async
这里有一些代码碎片。 Kafka 生产者配置:
@EnableAsync
@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);
@Value("${kafka.brokers}")
private String servers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
@Bean
public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) {
return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper));
}
@Bean
public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) {
return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper));
}
@Bean
public Producer producer() {
return new Producer();
}
}
以及制作人本身:
public class Producer {
public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
@Autowired
private KafkaTemplate<String, GenericMessage> kafkaTemplate;
@Async
public void send(String topic, GenericMessage message) {
ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() {
@Override
public void onSuccess(final SendResult<String, GenericMessage> message) {
LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
}
@Override
public void onFailure(final Throwable throwable) {
LOGGER.error("unable to send message= " + message, throwable);
}
});
}
}
最佳解决方案是在生产者级别添加 'Callback' 侦听器。
@Bean
public KafkaTemplate<String, WebUserOperation> operationKafkaTemplate() {
KafkaTemplate<String, WebUserOperation> kt = new KafkaTemplate<>(operationProducerFactory());
kt.setProducerListener(new ProducerListener<String, WebUserOperation>() {
@Override
public void onSuccess(ProducerRecord<String, WebUserOperation> record, RecordMetadata recordMetadata) {
System.out.println("### Callback :: " + recordMetadata.topic() + " ; partition = "
+ recordMetadata.partition() +" with offset= " + recordMetadata.offset()
+ " ; Timestamp : " + recordMetadata.timestamp() + " ; Message Size = " + recordMetadata.serializedValueSize());
}
@Override
public void onError(ProducerRecord<String, WebUserOperation> producerRecord, Exception exception) {
System.out.println("### Topic = " + producerRecord.topic() + " ; Message = " + producerRecord.value().getOperation());
exception.printStackTrace();
}
});
return kt;
}
下面的代码可以让我异步获取响应
ProducerRecord<UUID, Person> person = new ProducerRecord<>(kafkaTemplate.getDefaultTopic(), messageKey,Person);
Runnable runnable = () -> kafkaTemplate.send(person).addCallback(new MessageAckHandler());
new Thread(runnable).start();
public class MessageAckHandler implements ListenableFutureCallback<SendResult<UUID,Person>> {
@Override
public void onFailure(Throwable exception) {
log.error("unable to send message: " + exception.getMessage());
}
@Override
public void onSuccess(SendResult<UUID, ScreeningEvent> result) {
log.debug("sent message with offset={} messageID={}", result.getRecordMetadata().offset(), result.getProducerRecord().key());
}
}
public class SendResult<K, V> {
private final ProducerRecord<K, V> producerRecord;
private final RecordMetadata recordMetadata;
public SendResult(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
this.producerRecord = producerRecord;
this.recordMetadata = recordMetadata;
}
public ProducerRecord<K, V> getProducerRecord() {
return this.producerRecord;
}
public RecordMetadata getRecordMetadata() {
return this.recordMetadata;
}
@Override
public String toString() {
return "SendResult [producerRecord=" + this.producerRecord + ", recordMetadata=" + this.recordMetadata + "]";
}
}
如果我看KafkaProducer本身,发送消息有两个部分:
- 正在将消息存储到内部缓冲区中。
- 正在将缓冲区中的消息上传到 Kafka。
KafkaProducer 对于第二部分是异步的,而不是第一部分。
send() 方法仍然可以在第一部分被阻塞并最终抛出 TimeoutExceptions,例如:
- 主题的元数据未缓存或过时,因此生产者尝试从服务器获取元数据以了解主题是否仍然存在以及它有多少个分区。
- 缓冲区已满(默认为 32MB)。
如果服务器完全没有响应,您可能会遇到这两个问题。
更新:
我在 Kafka 2.2.1 中测试并确认了这一点。看起来这种行为在 2.4 and/or 2.6 中可能有所不同:KAFKA-3720
我正在使用 Spring-Kafka 版本 1.2.1,当 Kafka 服务器为 down/unreachable 时,异步发送调用会阻塞一段时间。这似乎是 TCP 超时。代码是这样的:
ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message);
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() {
@Override
public void onSuccess(SendResult<K, V> result) {
...
}
@Override
public void onFailure(Throwable ex) {
...
}
});
我快速浏览了 Spring-Kafka 代码,它似乎只是将任务传递给 kafka 客户端库,将回调交互转换为未来的对象交互。查看kafka客户端库,代码变得更加复杂,我没有花时间去理解它,但我猜它可能在同一个线程中进行远程调用(至少是元数据?)。
作为用户,我希望 Spring-Kafka 方法 return 将来会立即 return,即使无法访问远程 kafka 服务器。
如果我的理解有误或者这是一个错误,欢迎任何确认。我最终暂时将其设为异步。
还有一个问题是Spring-Kafka的文档一开始就说它提供了同步和异步发送方法。我找不到任何不 return 期货的方法,也许文档需要更新。
如果需要,我很乐意提供更多详细信息。谢谢
只是为了确定。您是否应用了@EnableAsync 注释?我想说这可能是指定 Future<>
行为的关键除了配置上的 @EnableAsync 注释 class,还需要在调用此代码的方法上使用 @Async 注释。
http://www.baeldung.com/spring-async
这里有一些代码碎片。 Kafka 生产者配置:
@EnableAsync
@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);
@Value("${kafka.brokers}")
private String servers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
@Bean
public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) {
return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper));
}
@Bean
public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) {
return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper));
}
@Bean
public Producer producer() {
return new Producer();
}
}
以及制作人本身:
public class Producer {
public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
@Autowired
private KafkaTemplate<String, GenericMessage> kafkaTemplate;
@Async
public void send(String topic, GenericMessage message) {
ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() {
@Override
public void onSuccess(final SendResult<String, GenericMessage> message) {
LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
}
@Override
public void onFailure(final Throwable throwable) {
LOGGER.error("unable to send message= " + message, throwable);
}
});
}
}
最佳解决方案是在生产者级别添加 'Callback' 侦听器。
@Bean
public KafkaTemplate<String, WebUserOperation> operationKafkaTemplate() {
KafkaTemplate<String, WebUserOperation> kt = new KafkaTemplate<>(operationProducerFactory());
kt.setProducerListener(new ProducerListener<String, WebUserOperation>() {
@Override
public void onSuccess(ProducerRecord<String, WebUserOperation> record, RecordMetadata recordMetadata) {
System.out.println("### Callback :: " + recordMetadata.topic() + " ; partition = "
+ recordMetadata.partition() +" with offset= " + recordMetadata.offset()
+ " ; Timestamp : " + recordMetadata.timestamp() + " ; Message Size = " + recordMetadata.serializedValueSize());
}
@Override
public void onError(ProducerRecord<String, WebUserOperation> producerRecord, Exception exception) {
System.out.println("### Topic = " + producerRecord.topic() + " ; Message = " + producerRecord.value().getOperation());
exception.printStackTrace();
}
});
return kt;
}
下面的代码可以让我异步获取响应
ProducerRecord<UUID, Person> person = new ProducerRecord<>(kafkaTemplate.getDefaultTopic(), messageKey,Person);
Runnable runnable = () -> kafkaTemplate.send(person).addCallback(new MessageAckHandler());
new Thread(runnable).start();
public class MessageAckHandler implements ListenableFutureCallback<SendResult<UUID,Person>> {
@Override
public void onFailure(Throwable exception) {
log.error("unable to send message: " + exception.getMessage());
}
@Override
public void onSuccess(SendResult<UUID, ScreeningEvent> result) {
log.debug("sent message with offset={} messageID={}", result.getRecordMetadata().offset(), result.getProducerRecord().key());
}
}
public class SendResult<K, V> {
private final ProducerRecord<K, V> producerRecord;
private final RecordMetadata recordMetadata;
public SendResult(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
this.producerRecord = producerRecord;
this.recordMetadata = recordMetadata;
}
public ProducerRecord<K, V> getProducerRecord() {
return this.producerRecord;
}
public RecordMetadata getRecordMetadata() {
return this.recordMetadata;
}
@Override
public String toString() {
return "SendResult [producerRecord=" + this.producerRecord + ", recordMetadata=" + this.recordMetadata + "]";
}
}
如果我看KafkaProducer本身,发送消息有两个部分:
- 正在将消息存储到内部缓冲区中。
- 正在将缓冲区中的消息上传到 Kafka。
KafkaProducer 对于第二部分是异步的,而不是第一部分。
send() 方法仍然可以在第一部分被阻塞并最终抛出 TimeoutExceptions,例如:
- 主题的元数据未缓存或过时,因此生产者尝试从服务器获取元数据以了解主题是否仍然存在以及它有多少个分区。
- 缓冲区已满(默认为 32MB)。
如果服务器完全没有响应,您可能会遇到这两个问题。
更新:
我在 Kafka 2.2.1 中测试并确认了这一点。看起来这种行为在 2.4 and/or 2.6 中可能有所不同:KAFKA-3720