比较风暴螺栓中的上一个和下一个元组
Comparing previous and next tuple in storm bolt
我有由风暴拓扑处理的实时数据。数据可以是四种类型,比方说 A、B、C、D。这些数据中的每一种都由螺栓以随机顺序使用。我需要做的是比较两个相同数据类型的元组。例如,我想将 A 型元组与下一个 A 型元组进行比较,或者说将当前 A 型元组与先前收到的 A 型元组进行比较。有没有办法在螺栓中做到这一点?或者我必须将以前的结果保存在数据库中的某个地方(比如 hbase 或缓存)并查询它以与特定类型的当前元组进行比较。
编辑
假设 A、B、C、D 类型的数据流来自 spout
B4 A4 C7 D2 A3 A2 B3 C6 D1 B2 C5 C4 B1 C3 C2 C1 A1-----> Spout --> BOLT
现在我想比较 A1 和 A2,A2 和 A3,A3 和 A4。类似地,B1 与 B2,B2 与 B3 等
您可以在 spout 中发出元组时指定数据类型。
然后你可以使用字段分组,所以每个类型 A 都会转到同一个线程。这样你最多可以有 4 个不同的线程来执行你的 bolt 代码。每个线程中的顺序是有保证的。
builder.setBolt(BOLT_NAME, new BoltClass(),4)
.fieldsGrouping(SPOUT_NAME,new Fields("type"));
Storm 文档中的字段分组定义:
Fields grouping: The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the "user-id" field, tuples with the same "user-id" will always go to the same task, but tuples with different "user-id"'s may go to different tasks.
我有由风暴拓扑处理的实时数据。数据可以是四种类型,比方说 A、B、C、D。这些数据中的每一种都由螺栓以随机顺序使用。我需要做的是比较两个相同数据类型的元组。例如,我想将 A 型元组与下一个 A 型元组进行比较,或者说将当前 A 型元组与先前收到的 A 型元组进行比较。有没有办法在螺栓中做到这一点?或者我必须将以前的结果保存在数据库中的某个地方(比如 hbase 或缓存)并查询它以与特定类型的当前元组进行比较。
编辑
假设 A、B、C、D 类型的数据流来自 spout
B4 A4 C7 D2 A3 A2 B3 C6 D1 B2 C5 C4 B1 C3 C2 C1 A1-----> Spout --> BOLT
现在我想比较 A1 和 A2,A2 和 A3,A3 和 A4。类似地,B1 与 B2,B2 与 B3 等
您可以在 spout 中发出元组时指定数据类型。 然后你可以使用字段分组,所以每个类型 A 都会转到同一个线程。这样你最多可以有 4 个不同的线程来执行你的 bolt 代码。每个线程中的顺序是有保证的。
builder.setBolt(BOLT_NAME, new BoltClass(),4)
.fieldsGrouping(SPOUT_NAME,new Fields("type"));
Storm 文档中的字段分组定义:
Fields grouping: The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the "user-id" field, tuples with the same "user-id" will always go to the same task, but tuples with different "user-id"'s may go to different tasks.