在 Pyspark 中将稀疏向量转换为密集向量
Convert Sparse Vector to Dense Vector in Pyspark
我有一个像这样的稀疏向量
>>> countVectors.rdd.map(lambda vector: vector[1]).collect()
[SparseVector(13, {0: 1.0, 2: 1.0, 3: 1.0, 6: 1.0, 8: 1.0, 9: 1.0, 10: 1.0, 12: 1.0}), SparseVector(13, {0: 1.0, 1: 1.0, 2: 1.0, 4: 1.0}), SparseVector(13, {0: 1.0, 1: 1.0, 3: 1.0, 4: 1.0, 7: 1.0}), SparseVector(13, {1: 1.0, 2: 1.0, 5: 1.0, 11: 1.0})]
我正在尝试像这样在 pyspark 2.0.0 中将其转换为密集向量
>>> frequencyVectors = countVectors.rdd.map(lambda vector: vector[1])
>>> frequencyVectors.map(lambda vector: Vectors.dense(vector)).collect()
我收到这样的错误:
16/12/26 14:03:35 ERROR Executor: Exception in task 0.0 in stage 13.0 (TID 13)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<stdin>", line 1, in <lambda>
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 878, in dense
return DenseVector(elements)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 286, in __init__
ar = np.array(ar, dtype=np.float64)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/ml/linalg/__init__.py", line 701, in __getitem__
raise ValueError("Index %d out of bounds." % index)
ValueError: Index 13 out of bounds.
如何实现这种转换?这里有什么问题吗?
这解决了我的问题
frequencyDenseVectors = frequencyVectors.map(lambda vector: DenseVector(vector.toArray()))
# to convert spark vector column in pyspark dataframe to dense vector
from pyspark.ml.linalg import DenseVector
@udf(T.ArrayType(T.FloatType()))
def toDense(v):
v = DenseVector(v)
new_array = list([float(x) for x in v])
return new_array
df.withColumn('features',toDense('features')).show()
#here 'features' column is vector type
如果您的 PySpark DataFrame 是 DataFrame[SparseVector],以下是对我有用的:
df2=df.select("features")
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=feat_cols, outputCol="features_dense")
df3 = assembler.transform(df2).select('features_dense')
我有一个像这样的稀疏向量
>>> countVectors.rdd.map(lambda vector: vector[1]).collect()
[SparseVector(13, {0: 1.0, 2: 1.0, 3: 1.0, 6: 1.0, 8: 1.0, 9: 1.0, 10: 1.0, 12: 1.0}), SparseVector(13, {0: 1.0, 1: 1.0, 2: 1.0, 4: 1.0}), SparseVector(13, {0: 1.0, 1: 1.0, 3: 1.0, 4: 1.0, 7: 1.0}), SparseVector(13, {1: 1.0, 2: 1.0, 5: 1.0, 11: 1.0})]
我正在尝试像这样在 pyspark 2.0.0 中将其转换为密集向量
>>> frequencyVectors = countVectors.rdd.map(lambda vector: vector[1])
>>> frequencyVectors.map(lambda vector: Vectors.dense(vector)).collect()
我收到这样的错误:
16/12/26 14:03:35 ERROR Executor: Exception in task 0.0 in stage 13.0 (TID 13)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<stdin>", line 1, in <lambda>
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 878, in dense
return DenseVector(elements)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 286, in __init__
ar = np.array(ar, dtype=np.float64)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/ml/linalg/__init__.py", line 701, in __getitem__
raise ValueError("Index %d out of bounds." % index)
ValueError: Index 13 out of bounds.
如何实现这种转换?这里有什么问题吗?
这解决了我的问题
frequencyDenseVectors = frequencyVectors.map(lambda vector: DenseVector(vector.toArray()))
# to convert spark vector column in pyspark dataframe to dense vector
from pyspark.ml.linalg import DenseVector
@udf(T.ArrayType(T.FloatType()))
def toDense(v):
v = DenseVector(v)
new_array = list([float(x) for x in v])
return new_array
df.withColumn('features',toDense('features')).show()
#here 'features' column is vector type
如果您的 PySpark DataFrame 是 DataFrame[SparseVector],以下是对我有用的:
df2=df.select("features")
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=feat_cols, outputCol="features_dense")
df3 = assembler.transform(df2).select('features_dense')