如何使用同一个键组合一对 DStream 键和值?
How can I combine a DStream pair of key and value using the same key?
我想使用 spark 将第一个 DStream 更改为第二个。但我不知道该怎么做?我已经尝试过 groupByKey(),它不起作用,而 aggregateByKey(),它只使用 RDD 而不是 DStream。
这是当前结果:
DStream [(1,value1),(2,value2),(3,value3),(1,value4),(1,value5),(2,value6)]
这是我想要的结果:
DStream(1,(value1,value4,value5)) ,(2,(value2,value5)) ,(3,(value3))
感谢您的回复。
groupByKey
正是这样做的。它将 DStream[K, V]
转换为 DStream[(K, Seq[V])]
。我怀疑您对输出的期望可能是错误的。由于 DStream
只是 RDDs
组的无限序列,因此单独应用于每个 RDD
。所以如果第一批包含:
(1,value1),(2,value2),(3,value3),(1,value4)
和第二个
(1,value5),(2,value6)
你会得到
(1, [value1, value4]), (2, [value2]), (3, value3)
和
(1,[value5]),(2,[value6])
分别。
虽然 DStreams
支持有状态操作 (updateStateByKey
),但您不太可能希望将其用于不断增长的集合。
我想使用 spark 将第一个 DStream 更改为第二个。但我不知道该怎么做?我已经尝试过 groupByKey(),它不起作用,而 aggregateByKey(),它只使用 RDD 而不是 DStream。
这是当前结果:
DStream [(1,value1),(2,value2),(3,value3),(1,value4),(1,value5),(2,value6)]
这是我想要的结果:
DStream(1,(value1,value4,value5)) ,(2,(value2,value5)) ,(3,(value3))
感谢您的回复。
groupByKey
正是这样做的。它将 DStream[K, V]
转换为 DStream[(K, Seq[V])]
。我怀疑您对输出的期望可能是错误的。由于 DStream
只是 RDDs
组的无限序列,因此单独应用于每个 RDD
。所以如果第一批包含:
(1,value1),(2,value2),(3,value3),(1,value4)
和第二个
(1,value5),(2,value6)
你会得到
(1, [value1, value4]), (2, [value2]), (3, value3)
和
(1,[value5]),(2,[value6])
分别。
虽然 DStreams
支持有状态操作 (updateStateByKey
),但您不太可能希望将其用于不断增长的集合。