pyspark:稀疏向量到 scipy 稀疏矩阵
pyspark: sparse vectors to scipy sparse matrix
我有一个 spark 数据框,其中有一列是短句,还有一列是分类变量。我想对句子执行 tf-idf
,对分类变量执行 one-hot-encoding
,然后一旦它的大小小得多(对于 scikit-learn 模型),就将其输出到驱动程序上的稀疏矩阵。
以稀疏形式从 spark 中获取数据的最佳方法是什么?似乎在稀疏向量上只有一个 toArray()
方法,它输出 numpy
数组。但是,文档确实说 scipy 稀疏数组 can be used in the place of spark sparse arrays.
还要记住 tf_idf 值实际上是一列稀疏数组。理想情况下,将所有这些特征放入一个大的稀疏矩阵中会很好。
一种可能的解决方案可以表述如下:
将特征转换为RDD
并提取向量:
from pyspark.ml.linalg import SparseVector
from operator import attrgetter
df = sc.parallelize([
(SparseVector(3, [0, 2], [1.0, 3.0]), ),
(SparseVector(3, [1], [4.0]), )
]).toDF(["features"])
features = df.rdd.map(attrgetter("features"))
添加行索引:
indexed_features = features.zipWithIndex()
扁平化为元组的 RDD (i, j, value)
:
def explode(row):
vec, i = row
for j, v in zip(vec.indices, vec.values):
yield i, j, v
entries = indexed_features.flatMap(explode)
收集并重塑:
row_indices, col_indices, data = zip(*entries.collect())
计算形状:
shape = (
df.count(),
df.rdd.map(attrgetter("features")).first().size
)
创建稀疏矩阵:
from scipy.sparse import csr_matrix
mat = csr_matrix((data, (row_indices, col_indices)), shape=shape)
快速完整性检查:
mat.todense()
预期结果:
matrix([[ 1., 0., 3.],
[ 0., 4., 0.]])
另一个:
将 features
的每一行转换为矩阵:
import numpy as np
def as_matrix(vec):
data, indices = vec.values, vec.indices
shape = 1, vec.size
return csr_matrix((data, indices, np.array([0, vec.values.size])), shape)
mats = features.map(as_matrix)
并减少 vstack
:
from scipy.sparse import vstack
mat = mats.reduce(lambda x, y: vstack([x, y]))
或collect
和vstack
mat = vstack(mats.collect())
我有一个 spark 数据框,其中有一列是短句,还有一列是分类变量。我想对句子执行 tf-idf
,对分类变量执行 one-hot-encoding
,然后一旦它的大小小得多(对于 scikit-learn 模型),就将其输出到驱动程序上的稀疏矩阵。
以稀疏形式从 spark 中获取数据的最佳方法是什么?似乎在稀疏向量上只有一个 toArray()
方法,它输出 numpy
数组。但是,文档确实说 scipy 稀疏数组 can be used in the place of spark sparse arrays.
还要记住 tf_idf 值实际上是一列稀疏数组。理想情况下,将所有这些特征放入一个大的稀疏矩阵中会很好。
一种可能的解决方案可以表述如下:
将特征转换为
RDD
并提取向量:from pyspark.ml.linalg import SparseVector from operator import attrgetter df = sc.parallelize([ (SparseVector(3, [0, 2], [1.0, 3.0]), ), (SparseVector(3, [1], [4.0]), ) ]).toDF(["features"]) features = df.rdd.map(attrgetter("features"))
添加行索引:
indexed_features = features.zipWithIndex()
扁平化为元组的 RDD
(i, j, value)
:def explode(row): vec, i = row for j, v in zip(vec.indices, vec.values): yield i, j, v entries = indexed_features.flatMap(explode)
收集并重塑:
row_indices, col_indices, data = zip(*entries.collect())
计算形状:
shape = ( df.count(), df.rdd.map(attrgetter("features")).first().size )
创建稀疏矩阵:
from scipy.sparse import csr_matrix mat = csr_matrix((data, (row_indices, col_indices)), shape=shape)
快速完整性检查:
mat.todense()
预期结果:
matrix([[ 1., 0., 3.], [ 0., 4., 0.]])
另一个:
将
features
的每一行转换为矩阵:import numpy as np def as_matrix(vec): data, indices = vec.values, vec.indices shape = 1, vec.size return csr_matrix((data, indices, np.array([0, vec.values.size])), shape) mats = features.map(as_matrix)
并减少
vstack
:from scipy.sparse import vstack mat = mats.reduce(lambda x, y: vstack([x, y]))
或
collect
和vstack
mat = vstack(mats.collect())