ReplyingKafkaTemplate 没有得到回复
ReplyingKafkaTemplate not getting response back
您好,我在 project.Configuration 中使用 ReplyingKafkaTemplate,如下所示
@Bean
public KafkaMessageListenerContainer<Object, Object> replyListenerContainer() {
ContainerProperties containerProperties = new ContainerProperties("account-get-response",
"account-put-response",
"account-post-response",
"account-delete-response");
return new KafkaMessageListenerContainer<>(consumerFactory(), containerProperties);
}
@Bean
public ReplyingKafkaTemplate<Object, Object, Object> replyKafkaTemplate(ProducerFactory<Object, Object> pf,
KafkaMessageListenerContainer<Object, Object> lc) {
ReplyingKafkaTemplate<Object, Object, Object> replyKafkaTemplate = new ReplyingKafkaTemplate<>(pf, lc);
replyKafkaTemplate.setReplyTimeout(300000);
replyKafkaTemplate.setSharedReplyTopic(true);
return replyKafkaTemplate;
}
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ProducerFactory<Object, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "grp");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return props;
}
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaRequestTimeoutMs);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaDeliveryTimeoutMs);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaRetryBackofMs);
props.put(ProducerConfig.ACKS_CONFIG, kafkaackconfig);
return props;
}
代码如下
在这种情况下,requestTopic 是 account-post-request,responseTopic 是 account-post-response
ProducerRecord<Object, Object> record = new ProducerRecord<Object, Object>(requestTopic, jsonNode);
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, responseTopic.getBytes()));
record.headers().add(new RecordHeader("TransactionID", clientTxnId.getBytes()));
LOGGER.info("TransactionID: " + clientTxnId + " Produced record on topic " + requestTopic);
RequestReplyFuture<Object, Object, Object> sendAndReceive = replyKafkaTemplate.sendAndReceive(record);
try {
LOGGER.info("TransactionID: " + clientTxnId + " Waiting for consumer response ");
reply = sendAndReceive.get().value();
LOGGER.info("TransactionID: " + clientTxnId + " Got the response from consumer for topic :" + responseTopic);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
在这种情况下,消费者正在使用响应主题中发送回响应的请求。但在生产者端(回复kafkatemplate)是否卡在
reply = sendAndReceive.get().value();
表示等待响应
但是当我检查了 kafka 工具时,响应被写在相同相关 ID 的期望主题上。
我无法弄清楚为什么它不能从响应主题中读取它,即使它可用?
提前致谢。
这可能是竞争条件;将 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
设置为 earliest
。
默认为latest
;如果响应在消费者订阅响应主题之前到达,它将不会收到。
您好,我在 project.Configuration 中使用 ReplyingKafkaTemplate,如下所示
@Bean
public KafkaMessageListenerContainer<Object, Object> replyListenerContainer() {
ContainerProperties containerProperties = new ContainerProperties("account-get-response",
"account-put-response",
"account-post-response",
"account-delete-response");
return new KafkaMessageListenerContainer<>(consumerFactory(), containerProperties);
}
@Bean
public ReplyingKafkaTemplate<Object, Object, Object> replyKafkaTemplate(ProducerFactory<Object, Object> pf,
KafkaMessageListenerContainer<Object, Object> lc) {
ReplyingKafkaTemplate<Object, Object, Object> replyKafkaTemplate = new ReplyingKafkaTemplate<>(pf, lc);
replyKafkaTemplate.setReplyTimeout(300000);
replyKafkaTemplate.setSharedReplyTopic(true);
return replyKafkaTemplate;
}
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ProducerFactory<Object, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "grp");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return props;
}
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaRequestTimeoutMs);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaDeliveryTimeoutMs);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaRetryBackofMs);
props.put(ProducerConfig.ACKS_CONFIG, kafkaackconfig);
return props;
}
代码如下 在这种情况下,requestTopic 是 account-post-request,responseTopic 是 account-post-response
ProducerRecord<Object, Object> record = new ProducerRecord<Object, Object>(requestTopic, jsonNode);
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, responseTopic.getBytes()));
record.headers().add(new RecordHeader("TransactionID", clientTxnId.getBytes()));
LOGGER.info("TransactionID: " + clientTxnId + " Produced record on topic " + requestTopic);
RequestReplyFuture<Object, Object, Object> sendAndReceive = replyKafkaTemplate.sendAndReceive(record);
try {
LOGGER.info("TransactionID: " + clientTxnId + " Waiting for consumer response ");
reply = sendAndReceive.get().value();
LOGGER.info("TransactionID: " + clientTxnId + " Got the response from consumer for topic :" + responseTopic);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
在这种情况下,消费者正在使用响应主题中发送回响应的请求。但在生产者端(回复kafkatemplate)是否卡在
reply = sendAndReceive.get().value();
表示等待响应 但是当我检查了 kafka 工具时,响应被写在相同相关 ID 的期望主题上。
我无法弄清楚为什么它不能从响应主题中读取它,即使它可用? 提前致谢。
这可能是竞争条件;将 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
设置为 earliest
。
默认为latest
;如果响应在消费者订阅响应主题之前到达,它将不会收到。