在融合的kafka中读取来自kafka主题的消息时如何使用确认?
How to use acknowledgement when reading messages from kafka topic in confluent kafka go?
我正在开发一个向客户端发送许多消息的推送通知。消息发布到主题中,订阅者从同一主题读取消息。如果在从主题偏移量读取消息后立即出现错误,即使我无法发送消息,我的订阅者也需要读取下一条消息并发送它。错误是指服务器出现故障或出现严重问题。
如何阅读带有确认信息的消息?
我不确定我是否理解你的意思
In case of errors right after reading message from topic offset is
incremented and even though I could not send message my Subscriber
needs to read the next message and send it
据我了解,您想管理消费者处理确认的方式(提交给 _consumer_offsets)。
因此 Kafka 允许消费者通过在 __consumer_offsets 主题中向 Kafka 生成消息来跟踪他们在每个分区中的位置(偏移量)。
有 3 个选项可用:
- 自动提交:如果enable.auto.commit=true,每隔auto.commit.interval.ms(默认5秒)自动提交。
- 同步提交:使用 commitSync() 显式同步提交,它提交 poll() 返回的最新偏移量,如果失败则重试,直到得到确认。
- 异步提交:以前的方法会等到代理响应确认提交,这会使事情变得更慢。我们可以使用不阻塞的 commitAsync(),如果失败也不会重试。它更快。我们可以将回调传递给 commitAsync()。
所以基本上,您可以让提交自动处理。同步提交并等待代理的确认或使用回调异步提交。
希望这能回答您的问题。
此致。
我正在开发一个向客户端发送许多消息的推送通知。消息发布到主题中,订阅者从同一主题读取消息。如果在从主题偏移量读取消息后立即出现错误,即使我无法发送消息,我的订阅者也需要读取下一条消息并发送它。错误是指服务器出现故障或出现严重问题。
如何阅读带有确认信息的消息?
我不确定我是否理解你的意思
In case of errors right after reading message from topic offset is incremented and even though I could not send message my Subscriber needs to read the next message and send it
据我了解,您想管理消费者处理确认的方式(提交给 _consumer_offsets)。
因此 Kafka 允许消费者通过在 __consumer_offsets 主题中向 Kafka 生成消息来跟踪他们在每个分区中的位置(偏移量)。
有 3 个选项可用:
- 自动提交:如果enable.auto.commit=true,每隔auto.commit.interval.ms(默认5秒)自动提交。
- 同步提交:使用 commitSync() 显式同步提交,它提交 poll() 返回的最新偏移量,如果失败则重试,直到得到确认。
- 异步提交:以前的方法会等到代理响应确认提交,这会使事情变得更慢。我们可以使用不阻塞的 commitAsync(),如果失败也不会重试。它更快。我们可以将回调传递给 commitAsync()。
所以基本上,您可以让提交自动处理。同步提交并等待代理的确认或使用回调异步提交。
希望这能回答您的问题。
此致。