Apache Storm - KafkaSpout 不使用来自 Kafka 主题的消息

Apache Storm - KafkaSpout not consuming messaes from Kafka Topic

我正在尝试使用以下代码将 Kafka 集成到 Storm Topology,但不幸的是 KafkaSpout 没有使用来自 Kafka-topic 的消息。在 Storm UI-Core,发射计数永远保持为 0。

String bootStrapServer = "10.20.10.238:9092";
String topic = "test.topic";

KafkaSpoutConfig.Builder spoutConfigBuilder = KafkaSpoutConfig.builder(bootStrapServer,topic);
spoutConfigBuilder.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG,100*1024*1024);
spoutConfigBuilder.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,100*1024*1024);
spoutConfigBuilder.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE);
Boolean readFromStart = true;
if(readFromStart) {
spoutConfigBuilder.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST);
} 
else {
spoutConfigBuilder.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST);
}

KafkaSpout spout = new KafkaSpout(spoutConfigBuilder.build());
builder.setSpout("kafkaSpout", spout, 1);
// And a Bolt to see messages
builder.setBolt("fcBolt", new FcBolt(), 1).setNumTasks(1).shuffleGrouping("kafkaSpout");

但是当我试图查看从 CLI 生成的消息时,我可以使用以下命令查看有关主题的所有消息:


bin/kafka-console-consumer.sh --topic test.topic --from-beginning --bootstrap-server 10.20.10.238:9092


Picked up _JAVA_OPTIONS: -Xmx128000m
test
test
test1
....

版本:

Storm : 2.2.0
Kafka : 2.13_2.6.0

在旧版本中,它工作正常!我错过了在新版本中阅读的内容。

感谢任何帮助。提前致谢!

很难知道你有什么,所以考虑展示你的代码的其余部分。 但是从您所拥有的情况来看,您实际上并没有产生任何事件。

如果您尝试在 spout 中使用 kafka 事件进行进一步处理,请确保您实际上订阅了一个在其上创建了事件的主题,然后您将无法通过控制台消费者看到事件输出因为您是在 Storm 中使用它们,而不是生产它们。

如果您尝试通过 Storm 向测试主题生成 kafka 事件,然后尝试通过控制台消费者使用它们,请确保您实际上是在 storm 中生成事件。

希望这能让您走上正确的道路,我建议您在这里复习 Kafka 的基本概念:Kafka Introduction