Dstream 上的 combineByKey 抛出错误
combineByKey on a Dstream throws an error
我有一个包含元组 (String, Int)
的数据流
当我尝试 combineByKey
时,它说我要指定参数:Partitioner
my_dstream.combineByKey(
(v) => (v,1),
(acc:(Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
然而,当我在 rdd 上使用它时,它工作正常:
my_dstream.foreachRDD( rdd =>
rdd.combineByKey(
(v) => (v,1),
(acc:(Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
))
我在哪里可以得到这个 partitioner?
Where can I get this partitioner ?
您可以自己创建。 Spark 开箱即用,带有两个分区器:HashPartitioner
and RangePartitioner
。默认是前者。您可以通过它的构造函数实例化,您需要传递所需分区的数量:
val numOfPartitions = // specify the amount you want
val hashPartitioner = new HashPartitioner(numOfPartitions)
my_dstream.combineByKey(
(v) => (v,1),
(acc:(Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2),
hashPartitioner
)
我有一个包含元组 (String, Int)
的数据流
当我尝试 combineByKey
时,它说我要指定参数:Partitioner
my_dstream.combineByKey(
(v) => (v,1),
(acc:(Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
然而,当我在 rdd 上使用它时,它工作正常:
my_dstream.foreachRDD( rdd =>
rdd.combineByKey(
(v) => (v,1),
(acc:(Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
))
我在哪里可以得到这个 partitioner?
Where can I get this partitioner ?
您可以自己创建。 Spark 开箱即用,带有两个分区器:HashPartitioner
and RangePartitioner
。默认是前者。您可以通过它的构造函数实例化,您需要传递所需分区的数量:
val numOfPartitions = // specify the amount you want
val hashPartitioner = new HashPartitioner(numOfPartitions)
my_dstream.combineByKey(
(v) => (v,1),
(acc:(Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2),
hashPartitioner
)