确保已使用 REST 代理从 Kafka 主题读取所有消息
Ensuring that all messages have been read from Kafka topic using REST Proxy
我是 Kafka 的新手,我们的团队正在研究服务间通信的模式。
目标
我们有两个服务,P(生产者)和 C(消费者)。 P 是 C 需要的一组数据的真实来源。当 C 启动时,它需要将 P 中的所有当前数据加载到它的缓存中,然后订阅更改通知。 (换句话说,我们要在服务之间同步数据。)
数据总量比较少,变化不频繁。同步的短暂延迟是可以接受的(最终一致性)。
我们想解耦服务,这样 P 和 C 就不需要知道对方了。
提案
当 P 启动时,它会将其所有数据发布到启用了日志压缩的 Kafka 主题。每条消息都是一个 aggregate 及其 ID 键。
当 C 启动时,它会从主题开头读取所有消息并填充其缓存。然后它继续从其偏移量读取以收到更新通知。
当 P 更新其数据时,它会为已更改的聚合发布一条消息。 (此消息与原始消息具有相同的架构。)
当 C 收到新消息时,它会更新其缓存中的相应数据。
约束条件
我们正在使用 Confluent REST Proxy 与 Kafka 通信。
问题
当 C 启动时,它如何知道它何时从主题中读取所有消息以便它可以安全地开始处理?
如果C没有立即注意到P在一秒钟前发送的消息,这是可以接受的。如果 C 在消费 P 一个小时前发送的消息之前就开始处理,这是不可接受的。请注意,我们不知道何时会更新 P 的数据。
我们不希望 C 在使用每条消息后必须等待 REST 代理的轮询间隔。
如果你想找到一个消费组的结束分区,为了知道你什么时候得到了某个时间点的所有数据,你可以使用
POST /consumers/(string: group_name)/instances/(string: instance)/positions/end
请注意,您必须在搜索之前进行投票 (GET /consumers/.../records
),但您不需要提交。
如果您不想影响现有消费者组的偏移量,则必须post一个单独的消费者组。
然后您可以使用
查询偏移量
GET /consumers/(string: group_name)/instances/(string: instance)/offsets
请注意,在计算结束偏移量和实际到达终点之间可能会有数据写入主题,因此您可能希望进行一些额外的设置,以便在最终到达终点后进行更多消费。
备选方案(未测试):
如果消费者同时充当生产者呢?
- 当 C 启动时,它会使用不会与 P 中的键重叠的键向压缩主题(它要从中读取的同一主题)发布消息。值是 GUID 或随机数字;基本上是随机数。
- C 订阅压缩主题并开始消费。
- 当 C 收到它的唯一键和一个与它发送的内容匹配的随机数时(如果清洁线程还没有压缩日志,它可能会多次获得该键),它知道它可以安全地开始处理。
这确实假设一个分区。
我是 Kafka 的新手,我们的团队正在研究服务间通信的模式。
目标
我们有两个服务,P(生产者)和 C(消费者)。 P 是 C 需要的一组数据的真实来源。当 C 启动时,它需要将 P 中的所有当前数据加载到它的缓存中,然后订阅更改通知。 (换句话说,我们要在服务之间同步数据。)
数据总量比较少,变化不频繁。同步的短暂延迟是可以接受的(最终一致性)。
我们想解耦服务,这样 P 和 C 就不需要知道对方了。
提案
当 P 启动时,它会将其所有数据发布到启用了日志压缩的 Kafka 主题。每条消息都是一个 aggregate 及其 ID 键。
当 C 启动时,它会从主题开头读取所有消息并填充其缓存。然后它继续从其偏移量读取以收到更新通知。
当 P 更新其数据时,它会为已更改的聚合发布一条消息。 (此消息与原始消息具有相同的架构。)
当 C 收到新消息时,它会更新其缓存中的相应数据。
约束条件
我们正在使用 Confluent REST Proxy 与 Kafka 通信。
问题
当 C 启动时,它如何知道它何时从主题中读取所有消息以便它可以安全地开始处理?
如果C没有立即注意到P在一秒钟前发送的消息,这是可以接受的。如果 C 在消费 P 一个小时前发送的消息之前就开始处理,这是不可接受的。请注意,我们不知道何时会更新 P 的数据。
我们不希望 C 在使用每条消息后必须等待 REST 代理的轮询间隔。
如果你想找到一个消费组的结束分区,为了知道你什么时候得到了某个时间点的所有数据,你可以使用
POST /consumers/(string: group_name)/instances/(string: instance)/positions/end
请注意,您必须在搜索之前进行投票 (GET /consumers/.../records
),但您不需要提交。
如果您不想影响现有消费者组的偏移量,则必须post一个单独的消费者组。
然后您可以使用
查询偏移量GET /consumers/(string: group_name)/instances/(string: instance)/offsets
请注意,在计算结束偏移量和实际到达终点之间可能会有数据写入主题,因此您可能希望进行一些额外的设置,以便在最终到达终点后进行更多消费。
备选方案(未测试):
如果消费者同时充当生产者呢?
- 当 C 启动时,它会使用不会与 P 中的键重叠的键向压缩主题(它要从中读取的同一主题)发布消息。值是 GUID 或随机数字;基本上是随机数。
- C 订阅压缩主题并开始消费。
- 当 C 收到它的唯一键和一个与它发送的内容匹配的随机数时(如果清洁线程还没有压缩日志,它可能会多次获得该键),它知道它可以安全地开始处理。
这确实假设一个分区。