Apache Storm 加入模式 - 至少一次
Apache Storm Join Pattern - At least once
我正在 Storm 中实现一个 bolt,它从 RabbitMQ spout (https://github.com/ppat/storm-rabbitmq) 接收消息。
我必须在 Storm 中处理的每个事件都作为来自 Rabbit 的两条消息到达,所以我在螺栓上有一个 fieldsGrouping 以便两条消息到达同一个螺栓。
我的第一个方法是:
- 收到第一个元组并将消息保存在内存中
- 确认第一个元组
- 当第二个元组到达时,从内存中获取第一个元组,并从 spout 发出一个锚定到第二个元组的新元组。
这行得通,但如果工作人员死亡,我可能会丢失消息,因为我会在获取第二个元组并进行处理之前确认第一个元组。
我将其更改为:
- 接收第一个元组并将其保存在内存中
- 当第二个元组到达时从内存中获取第一个元组,发出一个锚定到两个输入元组的新元组并确认两个输入元组。
内存缓存是一个 Guava 缓存,有时间到期,当元组因超时而被逐出时,我将在拓扑中 fail() 它,以便稍后重新处理它。
这似乎可行,但是当我进行一些测试时,我遇到了系统停止从 Rabbit 队列中获取消息的情况。
队列上的预取设置为 5,setMaxSpoutPending 设置为 7。在 Rabbit 界面中,我看到 5 条 Unacked 消息。
在风暴日志中,我看到相同的元组一遍又一遍地从缓存中逐出。
我知道问题是 spout 只会获取 5 条消息,它们都是一对消息的第一部分。我可以增加预取,但不能保证这不会在生产中发生。
所以我的问题是:如何在 Storm 中处理这些问题时实现连接?
Storm 没有为此提供好的解决方案...您需要的是 可靠 存储来缓冲第一个元组(即有状态运算符)。因此,您可以立即确认第一个元组并在失败后恢复状态。
- 据我所知,Trident 支持一些状态处理。但是我没用过
- 作为第二种选择,您可以使用分布式键值存储(如 Casandra)作为缓冲区。当然,这会是一个手写的解决方案,即您需要自己编写所有Casandra交互代码。
- 最后但同样重要的是,您可以切换到支持有状态运算符(如 Apache Flink)的流处理系统。 (免责声明:我是 Flink 的提交者)
我正在 Storm 中实现一个 bolt,它从 RabbitMQ spout (https://github.com/ppat/storm-rabbitmq) 接收消息。
我必须在 Storm 中处理的每个事件都作为来自 Rabbit 的两条消息到达,所以我在螺栓上有一个 fieldsGrouping 以便两条消息到达同一个螺栓。
我的第一个方法是:
- 收到第一个元组并将消息保存在内存中
- 确认第一个元组
- 当第二个元组到达时,从内存中获取第一个元组,并从 spout 发出一个锚定到第二个元组的新元组。
这行得通,但如果工作人员死亡,我可能会丢失消息,因为我会在获取第二个元组并进行处理之前确认第一个元组。
我将其更改为:
- 接收第一个元组并将其保存在内存中
- 当第二个元组到达时从内存中获取第一个元组,发出一个锚定到两个输入元组的新元组并确认两个输入元组。
内存缓存是一个 Guava 缓存,有时间到期,当元组因超时而被逐出时,我将在拓扑中 fail() 它,以便稍后重新处理它。
这似乎可行,但是当我进行一些测试时,我遇到了系统停止从 Rabbit 队列中获取消息的情况。
队列上的预取设置为 5,setMaxSpoutPending 设置为 7。在 Rabbit 界面中,我看到 5 条 Unacked 消息。
在风暴日志中,我看到相同的元组一遍又一遍地从缓存中逐出。
我知道问题是 spout 只会获取 5 条消息,它们都是一对消息的第一部分。我可以增加预取,但不能保证这不会在生产中发生。
所以我的问题是:如何在 Storm 中处理这些问题时实现连接?
Storm 没有为此提供好的解决方案...您需要的是 可靠 存储来缓冲第一个元组(即有状态运算符)。因此,您可以立即确认第一个元组并在失败后恢复状态。
- 据我所知,Trident 支持一些状态处理。但是我没用过
- 作为第二种选择,您可以使用分布式键值存储(如 Casandra)作为缓冲区。当然,这会是一个手写的解决方案,即您需要自己编写所有Casandra交互代码。
- 最后但同样重要的是,您可以切换到支持有状态运算符(如 Apache Flink)的流处理系统。 (免责声明:我是 Flink 的提交者)