Apache Kafka 按特定顺序使用来自不同主题的事件
Apache Kafka consume events from different topics in specific order
比方说,我有 topicA、topicB 和 topicC,这两个主题都基于域实体由不同的事件类型分隔。 topicA只对eventA进行操作,topicB保留eventB,topicC只对eventC进行操作。所有事件都按业务领域相互关联,但由单独的微服务产生,应按特定顺序处理。
问题是,如何使用Apache Kafka按特定顺序引入消费事件,eventA然后等待接收eventB,然后当eventC接收到时消费所有这些。
感谢任何反馈,欢迎提出任何问题。
一些注意事项:
Kafka Streams 是一种很好的方法,但受到公司政策的限制。
此外,我查看了 Join Pattern 但没有找到任何可靠的实施方法。
如果事件彼此相关,那么它们应该属于同一个主题。所以 microservice-1 应该使用 (key, value) & label (eventA) 推送 eventA。同理,microservice-2和microservice-3应该将数据推送到一个共同的topic。
这将对您在消费者方面有所帮助。
既然你问的是关于在不同主题之间对消息的消费进行排序,那么第一个选择是让一个消费者产生一条消息,为下一个消费者提供信息(这些消费者可能是也可能不是相同的过程):
consumerA 处理消息 -> consumerA 将新消息放在不同的主题上 -> consumerB 拾取该新消息并处理 -> consumerB 将新消息放在第二个主题上 -> 等等......
如果 streams 本质上是在做这个或类似的过程,我不会感到惊讶。可以使用任何其他类型的进程间通信接口:RDP、内存映射文件、互斥体、管道;拿你的选择。
除非万不得已,否则我会尽量避免将不同的事件放在同一主题上。当您将多个事件放在一个 queue/topic 上时,您会以几种方式限制您的消费者:
- 您的合同现在与这两个事件紧密耦合。要仅更改关于该单个主题的其中一个事件的形状,您的消费者必须根据元数据(幻数、键值等)动态反序列化这些事件
- 您的消费模式可能效率较低。如果我只对其中一个事件感兴趣怎么办?我必须阅读该事件,如果它不是我要找的,则将其丢弃。
现实生活中的一个例子是游乐园。假设您有两种类型的游乐园访客:快速通行证和标准客户。您的业务规则规定快速通行证客户可以先于标准客户免排队。
如果您将它们合并为一个 queue/topic,您是怎么做到的?答案是优先排队;你问每一个排队的人是不是fast pass,这样容易出错,效率低下(这是优先级排队,能行,但未必是最好的方案)。大多数游乐园通过设置两个单独的队列来解决这个问题(每种类型的客户一个[event/message])。现在他们可以将客户送入两个单独的服务员(一个快速通行证一个标准),或者他们可以让一个服务员同时处理两个队列,首先清空快速通行证队列。
归根结底,这取决于您的限制条件:是每天 10 条消息,还是 10 亿条消息,您需要即时一致性还是最终一致性,是在 IoT 设备上吗?
想必,解决问题的方法有很多种。这是一对,我可以建议:
引入关联 ID,这将 link 来自主题 A、B 和 C 的事件。然后,按以下方式使用关联 ID:
服务A、B、C产生对应主题的事件,但相关事件具有相同的关联ID
服务 D 使用来自不同主题的事件。每次它从任何主题接收到事件时,服务 D 要么通过关联 ID 将事件数据插入数据库,要么在接收到所有数据时执行某些操作。
例如,当服务 D 收到事件 C 时,它首先发出查询以检查数据库中是否存在与事件 C 相关 ID 的记录:
- 如果没有记录,则存储传入事件C,
- 如果某些记录已经存在,则服务 D 会检查事件 C 是否是最后一个需要消耗所有数据的事件,然后执行最终操作或将事件 C 插入数据库。
每个消耗的事件依此类推。
产生事件的链服务(A、B和C)。例如,可以通过以下方式形成链:
服务 A 向主题 A 生成事件
服务 B 使用主题 A 的事件,并向主题 B 生成事件(可能是事件 A 和 B 的聚合)
服务 C 使用主题 B 的事件,并向主题 C 生成事件(可能聚合事件 A、B 和 C)
最后,服务 D 使用主题 C 的事件(可能与 A、B 和 C 聚合)并执行所需的操作。
这种方法的变体(不在每个中间阶段聚合事件)将链接服务并侦听链中的最后一个事件。当最后一个事件被消费时,然后向相应的主题发出Kafka拉取以获取其他服务产生的事件。
比方说,我有 topicA、topicB 和 topicC,这两个主题都基于域实体由不同的事件类型分隔。 topicA只对eventA进行操作,topicB保留eventB,topicC只对eventC进行操作。所有事件都按业务领域相互关联,但由单独的微服务产生,应按特定顺序处理。
问题是,如何使用Apache Kafka按特定顺序引入消费事件,eventA然后等待接收eventB,然后当eventC接收到时消费所有这些。
感谢任何反馈,欢迎提出任何问题。
一些注意事项: Kafka Streams 是一种很好的方法,但受到公司政策的限制。
此外,我查看了 Join Pattern 但没有找到任何可靠的实施方法。
如果事件彼此相关,那么它们应该属于同一个主题。所以 microservice-1 应该使用 (key, value) & label (eventA) 推送 eventA。同理,microservice-2和microservice-3应该将数据推送到一个共同的topic。
这将对您在消费者方面有所帮助。
既然你问的是关于在不同主题之间对消息的消费进行排序,那么第一个选择是让一个消费者产生一条消息,为下一个消费者提供信息(这些消费者可能是也可能不是相同的过程):
consumerA 处理消息 -> consumerA 将新消息放在不同的主题上 -> consumerB 拾取该新消息并处理 -> consumerB 将新消息放在第二个主题上 -> 等等......
如果 streams 本质上是在做这个或类似的过程,我不会感到惊讶。可以使用任何其他类型的进程间通信接口:RDP、内存映射文件、互斥体、管道;拿你的选择。
除非万不得已,否则我会尽量避免将不同的事件放在同一主题上。当您将多个事件放在一个 queue/topic 上时,您会以几种方式限制您的消费者:
- 您的合同现在与这两个事件紧密耦合。要仅更改关于该单个主题的其中一个事件的形状,您的消费者必须根据元数据(幻数、键值等)动态反序列化这些事件
- 您的消费模式可能效率较低。如果我只对其中一个事件感兴趣怎么办?我必须阅读该事件,如果它不是我要找的,则将其丢弃。
现实生活中的一个例子是游乐园。假设您有两种类型的游乐园访客:快速通行证和标准客户。您的业务规则规定快速通行证客户可以先于标准客户免排队。
如果您将它们合并为一个 queue/topic,您是怎么做到的?答案是优先排队;你问每一个排队的人是不是fast pass,这样容易出错,效率低下(这是优先级排队,能行,但未必是最好的方案)。大多数游乐园通过设置两个单独的队列来解决这个问题(每种类型的客户一个[event/message])。现在他们可以将客户送入两个单独的服务员(一个快速通行证一个标准),或者他们可以让一个服务员同时处理两个队列,首先清空快速通行证队列。
归根结底,这取决于您的限制条件:是每天 10 条消息,还是 10 亿条消息,您需要即时一致性还是最终一致性,是在 IoT 设备上吗?
想必,解决问题的方法有很多种。这是一对,我可以建议:
引入关联 ID,这将 link 来自主题 A、B 和 C 的事件。然后,按以下方式使用关联 ID:
服务A、B、C产生对应主题的事件,但相关事件具有相同的关联ID
服务 D 使用来自不同主题的事件。每次它从任何主题接收到事件时,服务 D 要么通过关联 ID 将事件数据插入数据库,要么在接收到所有数据时执行某些操作。
例如,当服务 D 收到事件 C 时,它首先发出查询以检查数据库中是否存在与事件 C 相关 ID 的记录:
- 如果没有记录,则存储传入事件C,
- 如果某些记录已经存在,则服务 D 会检查事件 C 是否是最后一个需要消耗所有数据的事件,然后执行最终操作或将事件 C 插入数据库。
每个消耗的事件依此类推。
产生事件的链服务(A、B和C)。例如,可以通过以下方式形成链:
服务 A 向主题 A 生成事件
服务 B 使用主题 A 的事件,并向主题 B 生成事件(可能是事件 A 和 B 的聚合)
服务 C 使用主题 B 的事件,并向主题 C 生成事件(可能聚合事件 A、B 和 C)
最后,服务 D 使用主题 C 的事件(可能与 A、B 和 C 聚合)并执行所需的操作。
这种方法的变体(不在每个中间阶段聚合事件)将链接服务并侦听链中的最后一个事件。当最后一个事件被消费时,然后向相应的主题发出Kafka拉取以获取其他服务产生的事件。