如何使用 Python 在 Spark 中添加两个稀疏向量
How to add two Sparse Vectors in Spark using Python
我到处搜索,但找不到如何使用 Python 添加两个稀疏向量。
我想像这样添加两个稀疏向量:-
(1048576, {110522: 0.6931, 521365: 1.0986, 697409: 1.0986, 725041: 0.6931, 749730: 0.6931, 962395: 0.6931})
(1048576, {4471: 1.0986, 725041: 0.6931, 850325: 1.0986, 962395: 0.6931})
像这样的东西应该可以工作:
from pyspark.mllib.linalg import Vectors, SparseVector, DenseVector
import numpy as np
def add(v1, v2):
"""Add two sparse vectors
>>> v1 = Vectors.sparse(3, {0: 1.0, 2: 1.0})
>>> v2 = Vectors.sparse(3, {1: 1.0})
>>> add(v1, v2)
SparseVector(3, {0: 1.0, 1: 1.0, 2: 1.0})
"""
assert isinstance(v1, SparseVector) and isinstance(v2, SparseVector)
assert v1.size == v2.size
# Compute union of indices
indices = set(v1.indices).union(set(v2.indices))
# Not particularly efficient but we are limited by SPARK-10973
# Create index: value dicts
v1d = dict(zip(v1.indices, v1.values))
v2d = dict(zip(v2.indices, v2.values))
zero = np.float64(0)
# Create dictionary index: (v1[index] + v2[index])
values = {i: v1d.get(i, zero) + v2d.get(i, zero)
for i in indices
if v1d.get(i, zero) + v2d.get(i, zero) != zero}
return Vectors.sparse(v1.size, values)
如果您只喜欢单遍并且不关心引入的零,您可以像这样修改上面的代码:
from collections import defaultdict
def add(v1, v2):
assert isinstance(v1, SparseVector) and isinstance(v2, SparseVector)
assert v1.size == v2.size
values = defaultdict(float) # Dictionary with default value 0.0
# Add values from v1
for i in range(v1.indices.size):
values[v1.indices[i]] += v1.values[i]
# Add values from v2
for i in range(v2.indices.size):
values[v2.indices[i]] += v2.values[i]
return Vectors.sparse(v1.size, dict(values))
如果你愿意,可以试试猴子补丁 SparseVector
:
SparseVector.__add__ = add
v1 = Vectors.sparse(5, {0: 1.0, 2: 3.0})
v2 = Vectors.sparse(5, {0: -3.0, 2: -3.0, 4: 10})
v1 + v2
## SparseVector(5, {0: -2.0, 4: 10.0})
或者您应该可以使用 scipy.sparse
。
from scipy.sparse import csc_matrix
from pyspark.mllib.regression import LabeledPoint
m1 = csc_matrix((
v1.values,
(v1.indices, [0] * v1.numNonzeros())),
shape=(v1.size, 1))
m2 = csc_matrix((
v2.values,
(v2.indices, [0] * v2.numNonzeros())),
shape=(v2.size, 1))
LabeledPoint(0, m1 + m2)
我遇到了同样的问题,但在中等大小的数据集(约 20M 记录,向量大小 = 10k)上,我无法在不到几个小时的时间内完成其他解决方案
所以我采用了另一种相关方法,只用了几分钟就完成了:
import numpy as np
def to_sparse(v):
values = {i: e for i,e in enumerate(v) if e != 0}
return Vectors.sparse(v.size, values)
rdd.aggregate(
np.zeros(vector_size),
lambda acc, b: acc + b.toArray(),
lambda acc, b: acc + b
).map(to_sparse)
基本思想是不要在 reduce 的每一步都构建稀疏向量,只在最后构建一次,让 numpy 完成所有向量加法工作。即使使用需要打乱密集向量的 aggregateByKey,也只需要几分钟。
以上所有函数都是将两个相同大小的稀疏向量相加。我试图添加不同长度的稀疏向量,并在 Java here 中找到了与我的要求类似的东西
所以在 python 中编写的函数如下:
def combineSparseVectors(svs):
size = 0
nonzeros = 0
for sv in svs :
size += sv.size
nonzeros += len(sv.indices)
if nonzeros != 0 :
indices = np.empty([nonzeros])
values = np.empty([nonzeros])
pointer_D = 0
totalPt_D = 0
pointer_V = 0
for sv in svs :
indicesSV = sv.indices
for i in indicesSV :
indices[pointer_D] = i + totalPt_D
pointer_D=pointer_D+1
totalPt_D += sv.size
valuesSV = sv.values
for d in valuesSV :
values[pointer_V] = d
pointer_V=pointer_V+1
return SparseVector(size, indices, values)
else :
return null
其他答案与Spark的编程理念相违背。更简单的是,只需将 pyspark.ml.lingalg.SparseVector
(下面代码中的 urOldVec)转换为 Scipy.sparse.csc_matrix
对象(即列向量),然后使用“+”运算符添加。
import scipy.sparse as sps
urNewVec = sps.csc_matrix(urOldVec)
urNewVec + urNewVec
如 pyspark.ml.linalg
的文档中所述,scipy.sparse
向量可以改为传递到 pyspark。
我到处搜索,但找不到如何使用 Python 添加两个稀疏向量。 我想像这样添加两个稀疏向量:-
(1048576, {110522: 0.6931, 521365: 1.0986, 697409: 1.0986, 725041: 0.6931, 749730: 0.6931, 962395: 0.6931})
(1048576, {4471: 1.0986, 725041: 0.6931, 850325: 1.0986, 962395: 0.6931})
像这样的东西应该可以工作:
from pyspark.mllib.linalg import Vectors, SparseVector, DenseVector
import numpy as np
def add(v1, v2):
"""Add two sparse vectors
>>> v1 = Vectors.sparse(3, {0: 1.0, 2: 1.0})
>>> v2 = Vectors.sparse(3, {1: 1.0})
>>> add(v1, v2)
SparseVector(3, {0: 1.0, 1: 1.0, 2: 1.0})
"""
assert isinstance(v1, SparseVector) and isinstance(v2, SparseVector)
assert v1.size == v2.size
# Compute union of indices
indices = set(v1.indices).union(set(v2.indices))
# Not particularly efficient but we are limited by SPARK-10973
# Create index: value dicts
v1d = dict(zip(v1.indices, v1.values))
v2d = dict(zip(v2.indices, v2.values))
zero = np.float64(0)
# Create dictionary index: (v1[index] + v2[index])
values = {i: v1d.get(i, zero) + v2d.get(i, zero)
for i in indices
if v1d.get(i, zero) + v2d.get(i, zero) != zero}
return Vectors.sparse(v1.size, values)
如果您只喜欢单遍并且不关心引入的零,您可以像这样修改上面的代码:
from collections import defaultdict
def add(v1, v2):
assert isinstance(v1, SparseVector) and isinstance(v2, SparseVector)
assert v1.size == v2.size
values = defaultdict(float) # Dictionary with default value 0.0
# Add values from v1
for i in range(v1.indices.size):
values[v1.indices[i]] += v1.values[i]
# Add values from v2
for i in range(v2.indices.size):
values[v2.indices[i]] += v2.values[i]
return Vectors.sparse(v1.size, dict(values))
如果你愿意,可以试试猴子补丁 SparseVector
:
SparseVector.__add__ = add
v1 = Vectors.sparse(5, {0: 1.0, 2: 3.0})
v2 = Vectors.sparse(5, {0: -3.0, 2: -3.0, 4: 10})
v1 + v2
## SparseVector(5, {0: -2.0, 4: 10.0})
或者您应该可以使用 scipy.sparse
。
from scipy.sparse import csc_matrix
from pyspark.mllib.regression import LabeledPoint
m1 = csc_matrix((
v1.values,
(v1.indices, [0] * v1.numNonzeros())),
shape=(v1.size, 1))
m2 = csc_matrix((
v2.values,
(v2.indices, [0] * v2.numNonzeros())),
shape=(v2.size, 1))
LabeledPoint(0, m1 + m2)
我遇到了同样的问题,但在中等大小的数据集(约 20M 记录,向量大小 = 10k)上,我无法在不到几个小时的时间内完成其他解决方案
所以我采用了另一种相关方法,只用了几分钟就完成了:
import numpy as np
def to_sparse(v):
values = {i: e for i,e in enumerate(v) if e != 0}
return Vectors.sparse(v.size, values)
rdd.aggregate(
np.zeros(vector_size),
lambda acc, b: acc + b.toArray(),
lambda acc, b: acc + b
).map(to_sparse)
基本思想是不要在 reduce 的每一步都构建稀疏向量,只在最后构建一次,让 numpy 完成所有向量加法工作。即使使用需要打乱密集向量的 aggregateByKey,也只需要几分钟。
以上所有函数都是将两个相同大小的稀疏向量相加。我试图添加不同长度的稀疏向量,并在 Java here 中找到了与我的要求类似的东西
def combineSparseVectors(svs):
size = 0
nonzeros = 0
for sv in svs :
size += sv.size
nonzeros += len(sv.indices)
if nonzeros != 0 :
indices = np.empty([nonzeros])
values = np.empty([nonzeros])
pointer_D = 0
totalPt_D = 0
pointer_V = 0
for sv in svs :
indicesSV = sv.indices
for i in indicesSV :
indices[pointer_D] = i + totalPt_D
pointer_D=pointer_D+1
totalPt_D += sv.size
valuesSV = sv.values
for d in valuesSV :
values[pointer_V] = d
pointer_V=pointer_V+1
return SparseVector(size, indices, values)
else :
return null
其他答案与Spark的编程理念相违背。更简单的是,只需将 pyspark.ml.lingalg.SparseVector
(下面代码中的 urOldVec)转换为 Scipy.sparse.csc_matrix
对象(即列向量),然后使用“+”运算符添加。
import scipy.sparse as sps
urNewVec = sps.csc_matrix(urOldVec)
urNewVec + urNewVec
如 pyspark.ml.linalg
的文档中所述,scipy.sparse
向量可以改为传递到 pyspark。