Storm 中的延迟队列实现——Kafka、Cassandra、Redis 或 Beanstalk?
Delayed Queue implementation in Storm – Kafka, Cassandra, Redis or Beanstalk?
我有一个风暴拓扑来处理来自 Kafka 的消息,并根据手头的任务在 Cassandra 中进行 HTTP 调用/保存。我一收到消息就处理。由于来自外部源(如 HTTP)的响应,很少有消息未被完全处理。如果 HTTP 服务器没有 respond/returns 一段时间后重试的错误消息,我想实现一个重试的指数退避机制。我想不出几个可以用来实现它们的想法。如果有任何其他我可以使用的容错解决方案,我也想知道它们中的哪一个将是更好的解决方案。由于这用于实现指数退避,每条消息将具有不同的延迟时间。
- 在 Kafka 中发送另一个主题,稍后使用。 我的首选解决方案。我知道我们可以使用 Kafka 偏移量,以便在后期使用消息。我怎么找不到 documentation/Sample 代码来做同样的事情。如果有人能帮助我解决这个问题,那将非常有帮助。
- 编写消息Cassandra / Redis并编写一个调度程序来获取未处理并准备好被消费的消息并将其发送到Kafka以便我的风暴拓扑可以消耗它。 (其他遗留项目(非 Storm)中的现有解决方案)
- 延迟发送到 Beanstalk(其他遗留项目(非 Storm)中的现有解决方案。我想如何避免使用此解决方案并仅在我需要时使用它别无选择)。
虽然这正是我想做的。我找不到文档来实现
中提到的 delayProcessingUntil
我在过去使用 Beanstalk 完成了数据存储中的计划作业并延迟,但我更愿意使用 Kafka。
我认为您的用例描述了对数据库而不是队列的需求。您希望临时存储记录,直到它们出现,然后将其删除,这样它们就不会出现在以后的搜索中。如您的分析所示,尝试在队列中这样做充其量是尴尬的。
我建议您在 Cassandra 中创建另一个列族来保存这些延迟的请求。您将存储请求本身以及重试时间。您是否还想要一系列失败的 HTTP 尝试和相关数据取决于您。当延迟的请求最终得到满足时,您将从 CF 中删除相应的行。搜索延迟请求也很简单。
当然,任何数据库,即使是本地驱动器或 HDFS 中的文件也可以。
Kafka spout 具有内置的指数退避消息重试功能。您可以通过 spout 配置来配置初始延迟、延迟乘数和最大延迟。如果bolt有错误,可以调用collector.fail(input)。之后你就让它喷出重试。
您可能对 Kafka Retry 项目感兴趣 https://github.com/IBM/kafka-retry。它使用单个重试主题提供延迟重试队列。
我有一个风暴拓扑来处理来自 Kafka 的消息,并根据手头的任务在 Cassandra 中进行 HTTP 调用/保存。我一收到消息就处理。由于来自外部源(如 HTTP)的响应,很少有消息未被完全处理。如果 HTTP 服务器没有 respond/returns 一段时间后重试的错误消息,我想实现一个重试的指数退避机制。我想不出几个可以用来实现它们的想法。如果有任何其他我可以使用的容错解决方案,我也想知道它们中的哪一个将是更好的解决方案。由于这用于实现指数退避,每条消息将具有不同的延迟时间。
- 在 Kafka 中发送另一个主题,稍后使用。 我的首选解决方案。我知道我们可以使用 Kafka 偏移量,以便在后期使用消息。我怎么找不到 documentation/Sample 代码来做同样的事情。如果有人能帮助我解决这个问题,那将非常有帮助。
- 编写消息Cassandra / Redis并编写一个调度程序来获取未处理并准备好被消费的消息并将其发送到Kafka以便我的风暴拓扑可以消耗它。 (其他遗留项目(非 Storm)中的现有解决方案)
- 延迟发送到 Beanstalk(其他遗留项目(非 Storm)中的现有解决方案。我想如何避免使用此解决方案并仅在我需要时使用它别无选择)。
虽然这正是我想做的。我找不到文档来实现
我在过去使用 Beanstalk 完成了数据存储中的计划作业并延迟,但我更愿意使用 Kafka。
我认为您的用例描述了对数据库而不是队列的需求。您希望临时存储记录,直到它们出现,然后将其删除,这样它们就不会出现在以后的搜索中。如您的分析所示,尝试在队列中这样做充其量是尴尬的。
我建议您在 Cassandra 中创建另一个列族来保存这些延迟的请求。您将存储请求本身以及重试时间。您是否还想要一系列失败的 HTTP 尝试和相关数据取决于您。当延迟的请求最终得到满足时,您将从 CF 中删除相应的行。搜索延迟请求也很简单。
当然,任何数据库,即使是本地驱动器或 HDFS 中的文件也可以。
Kafka spout 具有内置的指数退避消息重试功能。您可以通过 spout 配置来配置初始延迟、延迟乘数和最大延迟。如果bolt有错误,可以调用collector.fail(input)。之后你就让它喷出重试。
您可能对 Kafka Retry 项目感兴趣 https://github.com/IBM/kafka-retry。它使用单个重试主题提供延迟重试队列。