如何在 Spark 中以 DenseVector 为键对 RDD 进行 groupByKey?
How to groupByKey a RDD, with DenseVector as key, in Spark?
我创建了一个 RDD,每个成员都是一个键值对,键是 DenseVector
,值是 int
。例如
[(DenseVector([3,4]),10), (DenseVector([3,4]),20)]
现在我想按键 k1
: DenseVector([3,4])
进行分组。我希望该行为将键 k1
的所有值分组,即 10
和 20
。但是我得到的结果是
[(DenseVector([3,4]), 10), (DenseVector([3,4]), 20)]
而不是
[(DenseVector([3,4]), [10,20])]
如果我遗漏了什么,请告诉我。
相同的代码是:
#simplified version of code
#rdd1 is an rdd containing [(DenseVector([3,4]),10), (DenseVector([3,4]),20)]
rdd1.groupByKey().map(lambda x : (x[0], list(x[1])))
print(rdd1.collect())
好吧,这是一个棘手的问题,简短的回答是你做不到。要了解为什么您必须更深入地研究 DenseVector
实施。 DenseVector
只是 NumPy 的包装器 float64
ndarray
>>> dv1 = DenseVector([3.0, 4.0])
>>> type(dv1.array)
<type 'numpy.ndarray'>
>>> dv1.array.dtype
dtype('float64')
由于 NumPy ndarrays
,与 DenseVector
不同的是,可变的不能以有意义的方式散列,尽管有趣的是提供了 __hash__
方法。有一个有趣的问题涵盖了这个问题(参见:numpy ndarray hashability)。
>>> dv1.array.__hash__() is None
False
>>> hash(dv1.array)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: unhashable type: 'numpy.ndarray'
DenseVector
从 object
继承了 __hash__
方法,它只是基于一个 id
(给定实例的内存地址):
>>> id(d1) / 16 == hash(d1)
True
不幸的是,这意味着具有相同内容的两个 DenseVectors
具有不同的哈希值:
>>> dv2 = DenseVector([3.0, 4.0])
>>> hash(dv1) == hash(dv2)
False
你能做什么?最简单的方法是使用提供一致 hash
实现的不可变数据结构,例如元组:
rdd.groupBy(lambda (k, v): tuple(k))
注意:实际上使用数组作为键很可能是个坏主意。对于大量元素,散列过程可能非常昂贵而无法使用。尽管如此,如果你真的需要这样的东西,Scala 似乎工作得很好:
import org.apache.spark.mllib.linalg.Vectors
val rdd = sc.parallelize(
(Vectors.dense(3, 4), 10) :: (Vectors.dense(3, 4), 20) :: Nil)
rdd.groupByKey.collect
我创建了一个 RDD,每个成员都是一个键值对,键是 DenseVector
,值是 int
。例如
[(DenseVector([3,4]),10), (DenseVector([3,4]),20)]
现在我想按键 k1
: DenseVector([3,4])
进行分组。我希望该行为将键 k1
的所有值分组,即 10
和 20
。但是我得到的结果是
[(DenseVector([3,4]), 10), (DenseVector([3,4]), 20)]
而不是
[(DenseVector([3,4]), [10,20])]
如果我遗漏了什么,请告诉我。
相同的代码是:
#simplified version of code
#rdd1 is an rdd containing [(DenseVector([3,4]),10), (DenseVector([3,4]),20)]
rdd1.groupByKey().map(lambda x : (x[0], list(x[1])))
print(rdd1.collect())
好吧,这是一个棘手的问题,简短的回答是你做不到。要了解为什么您必须更深入地研究 DenseVector
实施。 DenseVector
只是 NumPy 的包装器 float64
ndarray
>>> dv1 = DenseVector([3.0, 4.0])
>>> type(dv1.array)
<type 'numpy.ndarray'>
>>> dv1.array.dtype
dtype('float64')
由于 NumPy ndarrays
,与 DenseVector
不同的是,可变的不能以有意义的方式散列,尽管有趣的是提供了 __hash__
方法。有一个有趣的问题涵盖了这个问题(参见:numpy ndarray hashability)。
>>> dv1.array.__hash__() is None
False
>>> hash(dv1.array)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: unhashable type: 'numpy.ndarray'
DenseVector
从 object
继承了 __hash__
方法,它只是基于一个 id
(给定实例的内存地址):
>>> id(d1) / 16 == hash(d1)
True
不幸的是,这意味着具有相同内容的两个 DenseVectors
具有不同的哈希值:
>>> dv2 = DenseVector([3.0, 4.0])
>>> hash(dv1) == hash(dv2)
False
你能做什么?最简单的方法是使用提供一致 hash
实现的不可变数据结构,例如元组:
rdd.groupBy(lambda (k, v): tuple(k))
注意:实际上使用数组作为键很可能是个坏主意。对于大量元素,散列过程可能非常昂贵而无法使用。尽管如此,如果你真的需要这样的东西,Scala 似乎工作得很好:
import org.apache.spark.mllib.linalg.Vectors
val rdd = sc.parallelize(
(Vectors.dense(3, 4), 10) :: (Vectors.dense(3, 4), 20) :: Nil)
rdd.groupByKey.collect