Storm 多字段分组
Storm multi-fields grouping
我想做的是按两个字段 ("remote-client-ip", "request-params"
) 对流进行分组,然后计算每组中的元组数。并将它们组合成一张地图。这是我的拓扑结构:
topology.newStream("kafka-spout-stream-1", repeatSpout)
.each(new Fields("str"), new URLParser(), new Fields(fieldNames))
.each(new Fields("remote-client-ip", "request-params"), new HTTPParameterExtractor(), new Fields("query-string"))
.groupBy(new Fields("remote-client-ip", "query-string"))
.aggregate(new Fields("remote-client-ip", "query-string"), new Count(), new Fields("user-word-count"))
.groupBy(new Fields("remote-client-ip"))
.persistentAggregate(new MemoryMapState.Factory(), new UserQueryStringCombiner(), new Fields("user-word-count-list"));
但是经过调试,我发现一开始数据流被阻塞了groupBy()
,这是一个多字段的分组。在随后的聚合语句中,我没有为 Count()
执行任何操作。
所以我想我误解了一些关于多字段分组和聚合之间交互的概念。
请让我知道我的猜测是对还是错。
谢谢!
您正在使用拓扑中的 Aggregate()
函数对已经分组的字段进行分组。试试这个:
.aggregate(new Count(), new Fields("user-word-count"))
而不是这个:
.aggregate(new Fields("remote-client-ip", "query-string"), new Count(), new Fields("user-word-count"))
我想做的是按两个字段 ("remote-client-ip", "request-params"
) 对流进行分组,然后计算每组中的元组数。并将它们组合成一张地图。这是我的拓扑结构:
topology.newStream("kafka-spout-stream-1", repeatSpout)
.each(new Fields("str"), new URLParser(), new Fields(fieldNames))
.each(new Fields("remote-client-ip", "request-params"), new HTTPParameterExtractor(), new Fields("query-string"))
.groupBy(new Fields("remote-client-ip", "query-string"))
.aggregate(new Fields("remote-client-ip", "query-string"), new Count(), new Fields("user-word-count"))
.groupBy(new Fields("remote-client-ip"))
.persistentAggregate(new MemoryMapState.Factory(), new UserQueryStringCombiner(), new Fields("user-word-count-list"));
但是经过调试,我发现一开始数据流被阻塞了groupBy()
,这是一个多字段的分组。在随后的聚合语句中,我没有为 Count()
执行任何操作。
所以我想我误解了一些关于多字段分组和聚合之间交互的概念。
请让我知道我的猜测是对还是错。 谢谢!
您正在使用拓扑中的 Aggregate()
函数对已经分组的字段进行分组。试试这个:
.aggregate(new Count(), new Fields("user-word-count"))
而不是这个:
.aggregate(new Fields("remote-client-ip", "query-string"), new Count(), new Fields("user-word-count"))