当卡夫卡生产者设置为同步时,卡夫卡消费者不会被调用
Kafka Consumer not getting invoked when the kafka Producer is set to Sync
我有一个要求,其中有 2 个主题需要维护,1 个使用同步方法,另一个使用异步方法。
异步方法按预期调用消费者记录,但在同步方法中,消费者代码未被调用。
下面是配置文件中声明的代码
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
我在这里启用了 autoFlush true
@Bean( name="KafkaPayloadSyncTemplate")
public KafkaTemplate<String, KafkaPayload> KafkaPayloadSyncTemplate() {
return new KafkaTemplate<String,KafkaPayload>(producerFactory(),true);
}
此后控件停止,在返回 recordMetadataResults 对象后不对使用者进行任何调用
private List<RecordMetadata> sendPayloadToKafkaTopicInSync() throws InterruptedException, ExecutionException {
final List<RecordMetadata> recordMetadataResults = new ArrayList<RecordMetadata>();
KafkaPayload kafkaPayload = constructKafkaPayload();
ListenableFuture<SendResult<String,KafkaPayload>>
future = KafkaPayloadSyncTemplate.send(TestTopic, kafkaPayload);
SendResult<String, KafkaPayload> results;
results = future.get();
recordMetadataResults.add(results.getRecordMetadata());
return recordMetadataResults;
}
消费者代码
public class KafkaTestListener {
@Autowired
TestServiceImpl TestServiceImpl;
public final CountDownLatch countDownLatch = new CountDownLatch(1);
@KafkaListener(id="POC", topics = "TestTopic", group = "TestGroup")
public void listen(ConsumerRecord<String,KafkaPayload> record, Acknowledgment acknowledgment) {
countDownLatch.countDown();
TestServiceImpl.consumeKafkaMessage(record);
System.out.println("Acknowledgment : " + acknowledgment);
acknowledgment.acknowledge();
}
}
基于这个问题,我有2个问题
- 我们是否应该在侦听器 Class 中手动调用 listen() 当它是同步生产者时。如果是,该怎么做?
- 如果侦听器 (
@KafkaListener
) 被自动调用,我还需要添加什么 setup/configurations 才能使其正常工作。
感谢您的提前投入
-斯里坎特
您应该确保使用 consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
作为消费者属性。
不确定您所说的 sync/async 是什么意思,但是生产和消费是完全不同的操作。而且您不能从生产者方面影响消费者。因为中间有Kafka Broker。
我有一个要求,其中有 2 个主题需要维护,1 个使用同步方法,另一个使用异步方法。 异步方法按预期调用消费者记录,但在同步方法中,消费者代码未被调用。
下面是配置文件中声明的代码
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
我在这里启用了 autoFlush true
@Bean( name="KafkaPayloadSyncTemplate")
public KafkaTemplate<String, KafkaPayload> KafkaPayloadSyncTemplate() {
return new KafkaTemplate<String,KafkaPayload>(producerFactory(),true);
}
此后控件停止,在返回 recordMetadataResults 对象后不对使用者进行任何调用
private List<RecordMetadata> sendPayloadToKafkaTopicInSync() throws InterruptedException, ExecutionException {
final List<RecordMetadata> recordMetadataResults = new ArrayList<RecordMetadata>();
KafkaPayload kafkaPayload = constructKafkaPayload();
ListenableFuture<SendResult<String,KafkaPayload>>
future = KafkaPayloadSyncTemplate.send(TestTopic, kafkaPayload);
SendResult<String, KafkaPayload> results;
results = future.get();
recordMetadataResults.add(results.getRecordMetadata());
return recordMetadataResults;
}
消费者代码
public class KafkaTestListener {
@Autowired
TestServiceImpl TestServiceImpl;
public final CountDownLatch countDownLatch = new CountDownLatch(1);
@KafkaListener(id="POC", topics = "TestTopic", group = "TestGroup")
public void listen(ConsumerRecord<String,KafkaPayload> record, Acknowledgment acknowledgment) {
countDownLatch.countDown();
TestServiceImpl.consumeKafkaMessage(record);
System.out.println("Acknowledgment : " + acknowledgment);
acknowledgment.acknowledge();
}
}
基于这个问题,我有2个问题
- 我们是否应该在侦听器 Class 中手动调用 listen() 当它是同步生产者时。如果是,该怎么做?
- 如果侦听器 (
@KafkaListener
) 被自动调用,我还需要添加什么 setup/configurations 才能使其正常工作。
感谢您的提前投入
-斯里坎特
您应该确保使用 consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
作为消费者属性。
不确定您所说的 sync/async 是什么意思,但是生产和消费是完全不同的操作。而且您不能从生产者方面影响消费者。因为中间有Kafka Broker。