Pyspark 更新特征向量中的值

Pyspark update the value in feature vector

我正在构建文本分类器并使用 spark countVectorizer 创建特征向量。

现在要将此向量与 BIDGL 库一起使用,我需要将特征向量中的所有 0 转换为 1。

这是我的特征向量,它是一个稀疏向量:

vectorizer_df.select('features').show(2)
+--------------------+
|            features|
+--------------------+
|(1000,[4,6,11,13,...|
|(1000,[0,1,2,3,4,...|
+--------------------+
only showing top 2 rows

我正在尝试按以下方式更新值。首先将稀疏向量转换为密集向量

from pyspark.mllib.linalg import Vectors, VectorUDT
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.functions import udf

update_vector = udf(lambda vector: Vectors.dense(vector), VectorUDT())


df = vectorizer_df.withColumn('features',update_vector(vectorizer_df.features))

df.select('features').show(2)
+--------------------+
|            features|
+--------------------+
|[0.0,0.0,0.0,0.0,...|
|[5571.0,4688.0,24...|
+--------------------+
only showing top 2 rows

一旦我有了密集向量,我就会尝试将所有元素加 1

def add1(x):
    return x+1
def array_for(x):
    return np.array([add1(xi) for xi in x])

add_udf_one = udf(lambda z: array_for(z), VectorUDT())

df = df.select('features', add_udf_one('features').alias('feature_1'))

df.select('feature_1').show(2)

但现在我得到如下类型错误:

TypeError: cannot serialize array([  ....]) of type <class 'numpy.ndarray'>

完整错误如下

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-98-5aa5196824cf> in <module>
----> 1 df.select('feature_1').show(2)

/usr/local/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    348         """
    349         if isinstance(truncate, bool) and truncate:
--> 350             print(self._jdf.showString(n, 20, vertical))
    351         else:
    352             print(self._jdf.showString(n, int(truncate), vertical))

/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o1192.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 63.0 failed 1 times, most recent failure: Lost task 0.0 in stage 63.0 (TID 4886, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 324, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in dump_stream
    for obj in iterator:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 313, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 73, in <lambda>
    return lambda *a: toInternal(f(*a))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 682, in toInternal
    return self._cachedSqlType().toInternal(self.serialize(obj))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 169, in serialize
    raise TypeError("cannot serialize %r of type %r" % (obj, type(obj)))
TypeError: cannot serialize array([  1.,   1.,   1.,   1.,   2.,   1., 326.,   1.,   1.,   1.,   1.,
         2.,   1.,   3.,   1.,   1.,   1.,   1., 383.,   1., 312.,   1.,
         1.,   1.,   1.,   1.,   1.,  39.,   1.,   1.,   1.,   1.,   1.,
       180.,   1.,   1.,   1., 167.,   4.,   1.,   1.,   1.,   1.,   1.,
         1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1., 133.,   1.,
         1.,   1., 123.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,
         1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,
         1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,  96.,   1.,   7.,
         7.,   7.,   7.,   7.,   7.,   7.,   1.,   1.,  13.,   1.,   1.,
         1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,
         1.,   4.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,
         1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,
         1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.]) of type <class 'numpy.ndarray'>

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon.read(PythonUDFRunner.scala:83)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon.read(PythonUDFRunner.scala:66)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:439)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:614)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:253)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

关于如何更新 pyspark 特征向量的任何建议 >

谢谢

您快完成了,但是 Spark 不支持 NumPy 类型,并且 VectorUDT 无论如何都不会匹配。

而是

import numpy as np
from pyspark.sql.functions import udf
from pyspark.ml.linalg import DenseVector, SparseVector, Vectors, VectorUDT

@udf(VectorUDT())
def zeros_to_ones(v):
    if v is None:
        return v
    # Sparse vector will become dense
    if isinstance(v, SparseVector):
        v = v.toArray()
        return DenseVector(np.where(v == 0, 1, v))
    if isinstance(v, DenseVector):
        return DenseVector(np.where(v.array == 0, 1, v.array))

用法:

df = spark.createDataFrame(
    [(1, Vectors.dense([0, 1, 0, 3])), (2, Vectors.sparse(4, [0, 3], [0, 1]))], 
    ("id", "features")
)

df.withColumn("features_no_zeros", zeros_to_ones("features")).show(truncate=False)
+---+-------------------+-----------------+                                     
|id |features           |features_no_zeros|
+---+-------------------+-----------------+
|1  |[0.0,1.0,0.0,3.0]  |[1.0,1.0,1.0,3.0]|
|2  |(4,[0,3],[0.0,1.0])|[1.0,1.0,1.0,1.0]|
+---+-------------------+-----------------+