为什么我向 kafka 生成 65536 条消息,但只消耗了数百条消息?
why I produces 65536 messages to kafka but only got hundreds consumed?
卡夫卡版本:1.0.0
萨拉玛版本:1.15.0
去版本:1.9.1
代码示例如下:
func main() {
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForAll
// config.Producer.Flush.Frequency = 10 * time.Second
// config.Producer.Flush.Bytes = 1024 * 1024
// config.Producer.Flush.MaxMessages = 1024
producer, err := sarama.NewAsyncProducer(strings.Split(*brokers, ","), config)
if err != nil {
panic(err)
}
// Trap SIGINT to trigger a graceful shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
var (
wg sync.WaitGroup
enqueued, successes, errors int
)
wg.Add(1)
go func() {
defer wg.Done()
for range producer.Successes() {
successes++
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for err := range producer.Errors() {
log.Println(err)
errors++
}
}()
counter := 0
ProducerLoop:
for {
if counter >= 65536 {
producer.AsyncClose() // Trigger a shutdown of the producer.
break ProducerLoop
}
message := &sarama.ProducerMessage{
Topic: *topics,
// Key: sarama.StringEncoder(fmt.Sprintf("%d", counter)),
// Partition: int32(counter),
Value: sarama.StringEncoder(fmt.Sprintf("%d,%d", counter, time.Now().UnixNano())),
// Timestamp: time.Now(),
}
select {
case producer.Input() <- message:
enqueued++
case <-signals:
producer.AsyncClose() // Trigger a shutdown of the producer.
break ProducerLoop
}
if *verbose {
fmt.Printf(".")
}
if *sleep {
// fmt.Println(100 * time.Millisecond)
time.Sleep(1 * time.Millisecond)
}
counter++
}
wg.Wait()
log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
}
从日志中我发现有65536条消息发送到kafka,但是我用kafka官方消费客户端消费时,只有一百条消息,我很迷茫
我用的是官方消费工具:
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_BROKERS --topic fire-8 --from-beginning
关于制作时的偏移量和分区,我也记录了它们。贴在这里太长了。计数匹配,我没有发现任何奇怪的东西
如有任何解释,我们将不胜感激。
我也在 GitHub 上发布了一个 issue。
更新 1
我发现如果我设置睡眠,消费和生产之间的差距会减少。
更新 2
我在日志中找到了我发送的数据,但我无法消费,我猜是消费者出错了
更新3
我在较旧的 kafka 集群 (0.10.1.0) 上重放了上述步骤,一切都按预期进行
应该是kafka go客户端sarama的bug。我从我发布的 GitHub issue 中找到了解决方案:删除 kafka 版本配置并且它有效。但这失去了一些新版本带来的新特性。如果你没有指定你使用的版本,默认是最早的
卡夫卡版本:1.0.0 萨拉玛版本:1.15.0 去版本:1.9.1
代码示例如下:
func main() {
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForAll
// config.Producer.Flush.Frequency = 10 * time.Second
// config.Producer.Flush.Bytes = 1024 * 1024
// config.Producer.Flush.MaxMessages = 1024
producer, err := sarama.NewAsyncProducer(strings.Split(*brokers, ","), config)
if err != nil {
panic(err)
}
// Trap SIGINT to trigger a graceful shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
var (
wg sync.WaitGroup
enqueued, successes, errors int
)
wg.Add(1)
go func() {
defer wg.Done()
for range producer.Successes() {
successes++
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for err := range producer.Errors() {
log.Println(err)
errors++
}
}()
counter := 0
ProducerLoop:
for {
if counter >= 65536 {
producer.AsyncClose() // Trigger a shutdown of the producer.
break ProducerLoop
}
message := &sarama.ProducerMessage{
Topic: *topics,
// Key: sarama.StringEncoder(fmt.Sprintf("%d", counter)),
// Partition: int32(counter),
Value: sarama.StringEncoder(fmt.Sprintf("%d,%d", counter, time.Now().UnixNano())),
// Timestamp: time.Now(),
}
select {
case producer.Input() <- message:
enqueued++
case <-signals:
producer.AsyncClose() // Trigger a shutdown of the producer.
break ProducerLoop
}
if *verbose {
fmt.Printf(".")
}
if *sleep {
// fmt.Println(100 * time.Millisecond)
time.Sleep(1 * time.Millisecond)
}
counter++
}
wg.Wait()
log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
}
从日志中我发现有65536条消息发送到kafka,但是我用kafka官方消费客户端消费时,只有一百条消息,我很迷茫
我用的是官方消费工具:
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_BROKERS --topic fire-8 --from-beginning
关于制作时的偏移量和分区,我也记录了它们。贴在这里太长了。计数匹配,我没有发现任何奇怪的东西
如有任何解释,我们将不胜感激。
我也在 GitHub 上发布了一个 issue。
更新 1我发现如果我设置睡眠,消费和生产之间的差距会减少。
更新 2我在日志中找到了我发送的数据,但我无法消费,我猜是消费者出错了
更新3
我在较旧的 kafka 集群 (0.10.1.0) 上重放了上述步骤,一切都按预期进行
应该是kafka go客户端sarama的bug。我从我发布的 GitHub issue 中找到了解决方案:删除 kafka 版本配置并且它有效。但这失去了一些新版本带来的新特性。如果你没有指定你使用的版本,默认是最早的