Kafka 1.0 Streaming API:来自分区的消息消费延迟
Kafka 1.0 Streaming API: message consumption from partitions get delayed
最近,我将我们的流媒体应用程序从 spark-streaming 2.1 切换到使用 kafka-streaming new API (1.0) 和 kafka 代理服务器 0.11.0.0
我已经实现了自己的处理器class,在处理方法中,我只是打印消息内容。
我有一个 3 台机器的 kafka 集群,我挂钩的主题有 300 个分区。
我 运行 具有 100 个线程的流媒体应用程序,在具有 32 GB RAM 和 8 个内核的机器上。
我的问题是,在某些情况下,一旦到达 kafka,我就会收到消息 topic/partition,而在其他情况下,我会在到达主题 10-15 分钟后收到消息,Don不知道为什么!
我使用下面的命令行来跟踪流媒体应用 group.id 的 kafka 主题的滞后。
./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --new-consumer --describe --group kf_streaming_gp_id
但不幸的是,它并不能一直给出准确的结果,甚至根本不能给出结果,有人知道为什么吗?
我是否错过了流媒体应用程序的某些内容,以便我可以在到达分区后一致地阅读消息?
任何消费者属性都可以解决此问题。
我的kafka-streaming应用结构如下:
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "kf_streaming_gp_id");
config.put(StreamsConfig.CLIENT_ID_CONFIG, "kf_streaming_gp_id");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, DocumentSerde.class);
config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimeExtractor.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 100);
KStream<String, Document> topicStreams = builder.stream(sourceTopic);
topicStreams.process(() -> new DocumentProcessor(appName, environment, dimensions, vector, sinkTopic));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
我弄清楚我的问题是什么了。
事实证明,有些线程在执行高 CPU 密集型工作时卡住了,这导致其他线程停止使用消息,这就是为什么我看到这样的爆发,当我停止这个 cpu 密集的逻辑,一切都超快,消息一旦到达kafka主题就会进入流式作业。
最近,我将我们的流媒体应用程序从 spark-streaming 2.1 切换到使用 kafka-streaming new API (1.0) 和 kafka 代理服务器 0.11.0.0
我已经实现了自己的处理器class,在处理方法中,我只是打印消息内容。
我有一个 3 台机器的 kafka 集群,我挂钩的主题有 300 个分区。
我 运行 具有 100 个线程的流媒体应用程序,在具有 32 GB RAM 和 8 个内核的机器上。
我的问题是,在某些情况下,一旦到达 kafka,我就会收到消息 topic/partition,而在其他情况下,我会在到达主题 10-15 分钟后收到消息,Don不知道为什么!
我使用下面的命令行来跟踪流媒体应用 group.id 的 kafka 主题的滞后。
./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --new-consumer --describe --group kf_streaming_gp_id
但不幸的是,它并不能一直给出准确的结果,甚至根本不能给出结果,有人知道为什么吗?
我是否错过了流媒体应用程序的某些内容,以便我可以在到达分区后一致地阅读消息? 任何消费者属性都可以解决此问题。
我的kafka-streaming应用结构如下:
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "kf_streaming_gp_id");
config.put(StreamsConfig.CLIENT_ID_CONFIG, "kf_streaming_gp_id");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, DocumentSerde.class);
config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimeExtractor.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 100);
KStream<String, Document> topicStreams = builder.stream(sourceTopic);
topicStreams.process(() -> new DocumentProcessor(appName, environment, dimensions, vector, sinkTopic));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
我弄清楚我的问题是什么了。
事实证明,有些线程在执行高 CPU 密集型工作时卡住了,这导致其他线程停止使用消息,这就是为什么我看到这样的爆发,当我停止这个 cpu 密集的逻辑,一切都超快,消息一旦到达kafka主题就会进入流式作业。