Alpakka/Kafka - 分区消耗速度比其他分区快
Alpakka/Kafka - Partitions consumed faster than others
我一直在使用 alpakka kafka 从 kafka 主题流式传输数据。我正在使用:
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topic))
最近我试图在一个有 15 个分区的主题上向更多消费者发送垃圾邮件,例如 3。当我插入更多具有相同 group id 的消费者时,它会很好地为每个消费者拆分 5 个分区,但它似乎不会同时消费所有分区,它似乎是一个一个地读取,或者读取特定分区比其他分区快得多.
|Partition|LogSize |Consumer Offset|Lag |
|0 |8,429,145| 6,087,144|2,342,001|
|1 |8,424,948| 6,223,257|2,201,691|
|2 |8,428,121| 7,764,854| 663,267|
|3 |8,421,528| 6,071,425|2,350,103|
|4 |8,434,659| 7,351,552|1,083,107|
|5 |8,428,323| 5,935,336|2,492,987|
|6 |8,424,974| 6,455,301|1,969,673|
|7 |8,431,820| 7,763,984| 667,836|
|8 |8,425,999| 6,370,962|2,055,037|
|9 |8,416,354| 6,681,093|1,735,261|
|10 |8,416,217| 6,814,949|1,601,268|
|11 |8,428,026| 5,878,703|2,549,323|
|12 |8,424,604| 8,424,589| 15|
|13 |8,431,019| 8,431,019| 0|
|14 |8,423,218| 8,423,218| 0|
这是我 运行 生产应用程序的真实示例。所以我有一些问题:
读取某些分区比其他分区快得多可以吗?
请注意,此行为仅在我启动多个消费者时发生。
我应该改变我的消费方式吗?我应该为每个分区使用源代码,还是有不同的选项?
更新
我怀疑是不是插入了多个consumer(读取了多个application),今天只用了一个consumer就发生了,看一下consumer group就知道了,就是一样。
在它发生时,我有 20MM 的消息仍在等待处理(滞后)。上图是我们公司Kafka管理器的截图
我们通过删除将消息从一个主题复制到另一个主题的组件之一来解决这个问题。
本质上,生产者正在写一个主题,这个组件将这些消息复制到另一个主题,启用压缩,保持给定 id 的最后状态。事实证明,该组件无法正常工作,并且与此压缩主题相关的消费者遇到了一些问题。
所以,到底谁需要一个compaction topic,还是让生产者直接写吧。
我一直在使用 alpakka kafka 从 kafka 主题流式传输数据。我正在使用:
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topic))
最近我试图在一个有 15 个分区的主题上向更多消费者发送垃圾邮件,例如 3。当我插入更多具有相同 group id 的消费者时,它会很好地为每个消费者拆分 5 个分区,但它似乎不会同时消费所有分区,它似乎是一个一个地读取,或者读取特定分区比其他分区快得多.
|Partition|LogSize |Consumer Offset|Lag |
|0 |8,429,145| 6,087,144|2,342,001|
|1 |8,424,948| 6,223,257|2,201,691|
|2 |8,428,121| 7,764,854| 663,267|
|3 |8,421,528| 6,071,425|2,350,103|
|4 |8,434,659| 7,351,552|1,083,107|
|5 |8,428,323| 5,935,336|2,492,987|
|6 |8,424,974| 6,455,301|1,969,673|
|7 |8,431,820| 7,763,984| 667,836|
|8 |8,425,999| 6,370,962|2,055,037|
|9 |8,416,354| 6,681,093|1,735,261|
|10 |8,416,217| 6,814,949|1,601,268|
|11 |8,428,026| 5,878,703|2,549,323|
|12 |8,424,604| 8,424,589| 15|
|13 |8,431,019| 8,431,019| 0|
|14 |8,423,218| 8,423,218| 0|
这是我 运行 生产应用程序的真实示例。所以我有一些问题:
读取某些分区比其他分区快得多可以吗?
请注意,此行为仅在我启动多个消费者时发生。
我应该改变我的消费方式吗?我应该为每个分区使用源代码,还是有不同的选项?
更新
我怀疑是不是插入了多个consumer(读取了多个application),今天只用了一个consumer就发生了,看一下consumer group就知道了,就是一样。
在它发生时,我有 20MM 的消息仍在等待处理(滞后)。上图是我们公司Kafka管理器的截图
我们通过删除将消息从一个主题复制到另一个主题的组件之一来解决这个问题。
本质上,生产者正在写一个主题,这个组件将这些消息复制到另一个主题,启用压缩,保持给定 id 的最后状态。事实证明,该组件无法正常工作,并且与此压缩主题相关的消费者遇到了一些问题。
所以,到底谁需要一个compaction topic,还是让生产者直接写吧。