在 goroutine 中扭曲 sarama-cluster 消耗动作,然后它无法消耗任何东西
Warp sarama-cluster consuming action in a goroutine, then it fails to consume anything
我正在使用 sarama-cluster
库在后端服务中创建一个 kafka 组消费者。来自 godoc 的示例代码有效:
for {
if msg, ok := <-consumer.Messages(); ok {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
}
}
因为是死循环,所以我把它放在一个goroutine中,避免阻塞其他活动,然后它就不能再消费任何消息了:
go func() {
for msg := range consumer.Messages() {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
}
}()
(服务是运行,所以这个goroutine并没有终止。它只是消费失败)
知道如何解决这个问题吗?
sarama-cluster
已经有一段时间没有维护了 with the following notice:
Please note that since https://github.com/Shopify/sarama/pull/1099 was
merged and released (>= v1.19.0) this library is officially
deprecated. The native implementation supports a variety of use cases
that are not available through this library.
我建议您改用 github.com/Shopify/sarama。它具有 sarama-cluster
的所有功能,并且得到积极维护。
您可以遵循一个简单的消费者组示例from their repository。
我正在使用 sarama-cluster
库在后端服务中创建一个 kafka 组消费者。来自 godoc 的示例代码有效:
for {
if msg, ok := <-consumer.Messages(); ok {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
}
}
因为是死循环,所以我把它放在一个goroutine中,避免阻塞其他活动,然后它就不能再消费任何消息了:
go func() {
for msg := range consumer.Messages() {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
}
}()
(服务是运行,所以这个goroutine并没有终止。它只是消费失败)
知道如何解决这个问题吗?
sarama-cluster
已经有一段时间没有维护了 with the following notice:
Please note that since https://github.com/Shopify/sarama/pull/1099 was merged and released (>= v1.19.0) this library is officially deprecated. The native implementation supports a variety of use cases that are not available through this library.
我建议您改用 github.com/Shopify/sarama。它具有 sarama-cluster
的所有功能,并且得到积极维护。
您可以遵循一个简单的消费者组示例from their repository。