用于事件过滤的 Kafka Consumer API 与 Streams API
Kafka Consumer API vs Streams API for event filtering
对于这个用例,我应该使用 Kafka Consumer API 还是 Kafka Streams API?我有一个主题,许多消费者群体都在消费它。本主题包含一种类型的事件,它是一个 JSON 消息,内部隐藏了一个类型字段。有些消息会被某些消费者群体消费,而不会被其他消费者群体消费,一个消费者群体可能根本不会消费很多消息。
我的问题是:
我应该使用消费者 API,然后在每个事件上读取类型字段并根据类型字段删除或处理事件。
或者,我应该使用流 API、过滤方法和谓词进行过滤吗?
在我使用一个事件后,计划是处理该事件(数据库删除、更新或其他取决于服务),然后如果出现故障,我将生成一个单独的队列,我将重新处理稍后。
谢谢。
这个好像见仁见智吧。我个人会选择 Streams/KSQL,可能是您必须维护的更小的代码。您可以有另一个包含已清理数据的中间主题,然后您可以将其附加到 Connect 接收器、其他消费者或其他 Stream 和 KSQL 进程。使用流,您可以在不同的机器上扩展单个应用程序,您可以存储状态,拥有备用副本等等,这将是一个 PITA 来完成所有事情。
对于这个用例,我应该使用 Kafka Consumer API 还是 Kafka Streams API?我有一个主题,许多消费者群体都在消费它。本主题包含一种类型的事件,它是一个 JSON 消息,内部隐藏了一个类型字段。有些消息会被某些消费者群体消费,而不会被其他消费者群体消费,一个消费者群体可能根本不会消费很多消息。
我的问题是: 我应该使用消费者 API,然后在每个事件上读取类型字段并根据类型字段删除或处理事件。
或者,我应该使用流 API、过滤方法和谓词进行过滤吗?
在我使用一个事件后,计划是处理该事件(数据库删除、更新或其他取决于服务),然后如果出现故障,我将生成一个单独的队列,我将重新处理稍后。
谢谢。
这个好像见仁见智吧。我个人会选择 Streams/KSQL,可能是您必须维护的更小的代码。您可以有另一个包含已清理数据的中间主题,然后您可以将其附加到 Connect 接收器、其他消费者或其他 Stream 和 KSQL 进程。使用流,您可以在不同的机器上扩展单个应用程序,您可以存储状态,拥有备用副本等等,这将是一个 PITA 来完成所有事情。