如何有条件地轮询来自 Kafka 主题的消息
How to conditionally poll messages from Kafka Topic
我在 MongoDB 数据库中有一些任务通知。每个任务都有一个 due_date 和提醒标志。我正在将这些任务推送到 Kafka 主题。有一个 Node JS 应用程序从该主题进行轮询,并根据 due_date 和提醒标志将通知推送到前端应用程序。 due_date 可能已过时或即将到来。
我们需要从 Kafka 向 Node App 发送通知,只要这些基于时间的条件发生,它就会监听:
- 提醒 = 真,距离截止日期还有 X 时间
- 截止日期 = 现在
- 任务仍然存在并且已过期
Kafka 如何做到这一点?
DB 到 Kafka 的交互应该通过源连接器。只要底层 table 发生变化,DB 连接器就可以将事件发布到 Kafka。因此,如果创建新行或更新任何列。
因此,理想的解决方案是在 table 中引入更多的列,或者使用包含列的新实用程序 table 来识别您上面提到的条件。可能是像“IsDueDate”这样的列,它可以是布尔类型。在数据库中创建一个调度程序(不确定 Mongo 但大多数数据库都有此选项)或任何批处理系统(如 Spring batch/boot 应用程序)以验证您的数据并填充这些列。
更新这些列后,它将通过连接器向 Kafka 触发一条消息,您的前端应用程序会轮询 Kafka 以获取新消息,最终可以在有效负载中使用这些标志来识别触发此事件的条件,您可以执行这些操作在前端。
我在 MongoDB 数据库中有一些任务通知。每个任务都有一个 due_date 和提醒标志。我正在将这些任务推送到 Kafka 主题。有一个 Node JS 应用程序从该主题进行轮询,并根据 due_date 和提醒标志将通知推送到前端应用程序。 due_date 可能已过时或即将到来。
我们需要从 Kafka 向 Node App 发送通知,只要这些基于时间的条件发生,它就会监听:
- 提醒 = 真,距离截止日期还有 X 时间
- 截止日期 = 现在
- 任务仍然存在并且已过期
Kafka 如何做到这一点?
DB 到 Kafka 的交互应该通过源连接器。只要底层 table 发生变化,DB 连接器就可以将事件发布到 Kafka。因此,如果创建新行或更新任何列。
因此,理想的解决方案是在 table 中引入更多的列,或者使用包含列的新实用程序 table 来识别您上面提到的条件。可能是像“IsDueDate”这样的列,它可以是布尔类型。在数据库中创建一个调度程序(不确定 Mongo 但大多数数据库都有此选项)或任何批处理系统(如 Spring batch/boot 应用程序)以验证您的数据并填充这些列。
更新这些列后,它将通过连接器向 Kafka 触发一条消息,您的前端应用程序会轮询 Kafka 以获取新消息,最终可以在有效负载中使用这些标志来识别触发此事件的条件,您可以执行这些操作在前端。