在风暴拓扑中存储中间数据
storing intermediate data in storm topology
我正在读取 2 个 kafka 主题的数据。可以描述为:
Topic1 data content: VehicleRegistrationNo, Timestamp, Location
Topic2 data content: VehicleRegistrationNo, Timestamp, Speed
我需要根据最近的时间戳合并这两条消息,并将元组输出为消息 VehicleRegistrationNo, Timestamp, Speed, Location
。我正在通过 2 个 spout S1
和 S2
阅读这些主题。然后 bolt MergeS1andS2
从这些 spouts 获取输入并作为:
if (message from S1):
save present message from S1 along with 2 previous messages (3 consecutive locations) to LocationHashMap
elseif (message from S2):
get locations details from LocationHashmap and merge speed for same Vehicle with location info, then send details to next bolt as tuple
我知道 HashMap 不是在多节点中存储数据的有效方式。所以我阅读了有关 Trident 和 Redis 来存储中间数据的信息。在这个可以在分布式拓扑中工作的 senario 中,我应该用什么来存储我的中间数据。
任何没有-sql 的数据库都可以。选择一个唯一标识元组的键,无论来自哪个主题。逻辑类似于:
- 尝试从数据库中查找元组。
- 如果数据库中不存在元组,则将从主题中获取的元组存储到数据库中。
- 如果元组存在,合并数据库元组和主题元组的内容,并将生成的元组存回数据库(覆盖数据库中先前元组的内容)
我正在读取 2 个 kafka 主题的数据。可以描述为:
Topic1 data content: VehicleRegistrationNo, Timestamp, Location
Topic2 data content: VehicleRegistrationNo, Timestamp, Speed
我需要根据最近的时间戳合并这两条消息,并将元组输出为消息 VehicleRegistrationNo, Timestamp, Speed, Location
。我正在通过 2 个 spout S1
和 S2
阅读这些主题。然后 bolt MergeS1andS2
从这些 spouts 获取输入并作为:
if (message from S1):
save present message from S1 along with 2 previous messages (3 consecutive locations) to LocationHashMap
elseif (message from S2):
get locations details from LocationHashmap and merge speed for same Vehicle with location info, then send details to next bolt as tuple
我知道 HashMap 不是在多节点中存储数据的有效方式。所以我阅读了有关 Trident 和 Redis 来存储中间数据的信息。在这个可以在分布式拓扑中工作的 senario 中,我应该用什么来存储我的中间数据。
任何没有-sql 的数据库都可以。选择一个唯一标识元组的键,无论来自哪个主题。逻辑类似于:
- 尝试从数据库中查找元组。
- 如果数据库中不存在元组,则将从主题中获取的元组存储到数据库中。
- 如果元组存在,合并数据库元组和主题元组的内容,并将生成的元组存回数据库(覆盖数据库中先前元组的内容)