Kafka 1.0 Streaming API:来自分区的消息消费延迟

Kafka 1.0 Streaming API: message consumption from partitions get delayed

最近,我将我们的流媒体应用程序从 spark-streaming 2.1 切换到使用 kafka-streaming new API (1.0) 和 kafka 代理服务器 0.11.0.0

我已经实现了自己的处理器class,在处理方法中,我只是打印消息内容。

我有一个 3 台机器的 kafka 集群,我挂钩的主题有 300 个分区。

我 运行 具有 100 个线程的流媒体应用程序,在具有 32 GB RAM 和 8 个内核的机器上。

我的问题是,在某些情况下,一旦到达 kafka,我就会收到消息 topic/partition,而在其他情况下,我会在到达主题 10-15 分钟后收到消息,Don不知道为什么!

我使用下面的命令行来跟踪流媒体应用 group.id 的 kafka 主题的滞后。

./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --new-consumer --describe --group kf_streaming_gp_id

但不幸的是,它并不能一直给出准确的结果,甚至根本不能给出结果,有人知道为什么吗?

我是否错过了流媒体应用程序的某些内容,以便我可以在到达分区后一致地阅读消息? 任何消费者属性都可以解决此问题。

我的kafka-streaming应用结构如下:

        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "kf_streaming_gp_id");
        config.put(StreamsConfig.CLIENT_ID_CONFIG, "kf_streaming_gp_id");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, DocumentSerde.class);
        config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimeExtractor.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 100);

        KStream<String, Document> topicStreams = builder.stream(sourceTopic);

        topicStreams.process(() -> new DocumentProcessor(appName, environment, dimensions, vector, sinkTopic));

        KafkaStreams streams = new KafkaStreams(builder.build(), config);
        streams.start();

我弄清楚我的问题是什么了。

事实证明,有些线程在执行高 CPU 密集型工作时卡住了,这导致其他线程停止使用消息,这就是为什么我看到这样的爆发,当我停止这个 cpu 密集的逻辑,一切都超快,消息一旦到达kafka主题就会进入流式作业。