Pyspark 更新特征向量中的值
Pyspark update the value in feature vector
我正在构建文本分类器并使用 spark countVectorizer 创建特征向量。
现在要将此向量与 BIDGL 库一起使用,我需要将特征向量中的所有 0 转换为 1。
这是我的特征向量,它是一个稀疏向量:
vectorizer_df.select('features').show(2)
+--------------------+
| features|
+--------------------+
|(1000,[4,6,11,13,...|
|(1000,[0,1,2,3,4,...|
+--------------------+
only showing top 2 rows
我正在尝试按以下方式更新值。首先将稀疏向量转换为密集向量
from pyspark.mllib.linalg import Vectors, VectorUDT
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.functions import udf
update_vector = udf(lambda vector: Vectors.dense(vector), VectorUDT())
df = vectorizer_df.withColumn('features',update_vector(vectorizer_df.features))
df.select('features').show(2)
+--------------------+
| features|
+--------------------+
|[0.0,0.0,0.0,0.0,...|
|[5571.0,4688.0,24...|
+--------------------+
only showing top 2 rows
一旦我有了密集向量,我就会尝试将所有元素加 1
def add1(x):
return x+1
def array_for(x):
return np.array([add1(xi) for xi in x])
add_udf_one = udf(lambda z: array_for(z), VectorUDT())
df = df.select('features', add_udf_one('features').alias('feature_1'))
df.select('feature_1').show(2)
但现在我得到如下类型错误:
TypeError: cannot serialize array([ ....]) of type <class 'numpy.ndarray'>
完整错误如下
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-98-5aa5196824cf> in <module>
----> 1 df.select('feature_1').show(2)
/usr/local/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
348 """
349 if isinstance(truncate, bool) and truncate:
--> 350 print(self._jdf.showString(n, 20, vertical))
351 else:
352 print(self._jdf.showString(n, int(truncate), vertical))
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o1192.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 63.0 failed 1 times, most recent failure: Lost task 0.0 in stage 63.0 (TID 4886, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 324, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in dump_stream
for obj in iterator:
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 313, in _batched
for item in iterator:
File "<string>", line 1, in <lambda>
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 73, in <lambda>
return lambda *a: toInternal(f(*a))
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 682, in toInternal
return self._cachedSqlType().toInternal(self.serialize(obj))
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 169, in serialize
raise TypeError("cannot serialize %r of type %r" % (obj, type(obj)))
TypeError: cannot serialize array([ 1., 1., 1., 1., 2., 1., 326., 1., 1., 1., 1.,
2., 1., 3., 1., 1., 1., 1., 383., 1., 312., 1.,
1., 1., 1., 1., 1., 39., 1., 1., 1., 1., 1.,
180., 1., 1., 1., 167., 4., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 133., 1.,
1., 1., 123., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 96., 1., 7.,
7., 7., 7., 7., 7., 7., 1., 1., 13., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 4., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]) of type <class 'numpy.ndarray'>
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon.read(PythonUDFRunner.scala:83)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon.read(PythonUDFRunner.scala:66)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
关于如何更新 pyspark 特征向量的任何建议 >
谢谢
您快完成了,但是 Spark 不支持 NumPy 类型,并且 VectorUDT
无论如何都不会匹配。
而是
import numpy as np
from pyspark.sql.functions import udf
from pyspark.ml.linalg import DenseVector, SparseVector, Vectors, VectorUDT
@udf(VectorUDT())
def zeros_to_ones(v):
if v is None:
return v
# Sparse vector will become dense
if isinstance(v, SparseVector):
v = v.toArray()
return DenseVector(np.where(v == 0, 1, v))
if isinstance(v, DenseVector):
return DenseVector(np.where(v.array == 0, 1, v.array))
用法:
df = spark.createDataFrame(
[(1, Vectors.dense([0, 1, 0, 3])), (2, Vectors.sparse(4, [0, 3], [0, 1]))],
("id", "features")
)
df.withColumn("features_no_zeros", zeros_to_ones("features")).show(truncate=False)
+---+-------------------+-----------------+
|id |features |features_no_zeros|
+---+-------------------+-----------------+
|1 |[0.0,1.0,0.0,3.0] |[1.0,1.0,1.0,3.0]|
|2 |(4,[0,3],[0.0,1.0])|[1.0,1.0,1.0,1.0]|
+---+-------------------+-----------------+
我正在构建文本分类器并使用 spark countVectorizer 创建特征向量。
现在要将此向量与 BIDGL 库一起使用,我需要将特征向量中的所有 0 转换为 1。
这是我的特征向量,它是一个稀疏向量:
vectorizer_df.select('features').show(2)
+--------------------+
| features|
+--------------------+
|(1000,[4,6,11,13,...|
|(1000,[0,1,2,3,4,...|
+--------------------+
only showing top 2 rows
我正在尝试按以下方式更新值。首先将稀疏向量转换为密集向量
from pyspark.mllib.linalg import Vectors, VectorUDT
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.functions import udf
update_vector = udf(lambda vector: Vectors.dense(vector), VectorUDT())
df = vectorizer_df.withColumn('features',update_vector(vectorizer_df.features))
df.select('features').show(2)
+--------------------+
| features|
+--------------------+
|[0.0,0.0,0.0,0.0,...|
|[5571.0,4688.0,24...|
+--------------------+
only showing top 2 rows
一旦我有了密集向量,我就会尝试将所有元素加 1
def add1(x):
return x+1
def array_for(x):
return np.array([add1(xi) for xi in x])
add_udf_one = udf(lambda z: array_for(z), VectorUDT())
df = df.select('features', add_udf_one('features').alias('feature_1'))
df.select('feature_1').show(2)
但现在我得到如下类型错误:
TypeError: cannot serialize array([ ....]) of type <class 'numpy.ndarray'>
完整错误如下
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-98-5aa5196824cf> in <module>
----> 1 df.select('feature_1').show(2)
/usr/local/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
348 """
349 if isinstance(truncate, bool) and truncate:
--> 350 print(self._jdf.showString(n, 20, vertical))
351 else:
352 print(self._jdf.showString(n, int(truncate), vertical))
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o1192.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 63.0 failed 1 times, most recent failure: Lost task 0.0 in stage 63.0 (TID 4886, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 324, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in dump_stream
for obj in iterator:
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 313, in _batched
for item in iterator:
File "<string>", line 1, in <lambda>
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 73, in <lambda>
return lambda *a: toInternal(f(*a))
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 682, in toInternal
return self._cachedSqlType().toInternal(self.serialize(obj))
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 169, in serialize
raise TypeError("cannot serialize %r of type %r" % (obj, type(obj)))
TypeError: cannot serialize array([ 1., 1., 1., 1., 2., 1., 326., 1., 1., 1., 1.,
2., 1., 3., 1., 1., 1., 1., 383., 1., 312., 1.,
1., 1., 1., 1., 1., 39., 1., 1., 1., 1., 1.,
180., 1., 1., 1., 167., 4., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 133., 1.,
1., 1., 123., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 96., 1., 7.,
7., 7., 7., 7., 7., 7., 1., 1., 13., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 4., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]) of type <class 'numpy.ndarray'>
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon.read(PythonUDFRunner.scala:83)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon.read(PythonUDFRunner.scala:66)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
关于如何更新 pyspark 特征向量的任何建议 >
谢谢
您快完成了,但是 Spark 不支持 NumPy 类型,并且 VectorUDT
无论如何都不会匹配。
而是
import numpy as np
from pyspark.sql.functions import udf
from pyspark.ml.linalg import DenseVector, SparseVector, Vectors, VectorUDT
@udf(VectorUDT())
def zeros_to_ones(v):
if v is None:
return v
# Sparse vector will become dense
if isinstance(v, SparseVector):
v = v.toArray()
return DenseVector(np.where(v == 0, 1, v))
if isinstance(v, DenseVector):
return DenseVector(np.where(v.array == 0, 1, v.array))
用法:
df = spark.createDataFrame(
[(1, Vectors.dense([0, 1, 0, 3])), (2, Vectors.sparse(4, [0, 3], [0, 1]))],
("id", "features")
)
df.withColumn("features_no_zeros", zeros_to_ones("features")).show(truncate=False)
+---+-------------------+-----------------+
|id |features |features_no_zeros|
+---+-------------------+-----------------+
|1 |[0.0,1.0,0.0,3.0] |[1.0,1.0,1.0,3.0]|
|2 |(4,[0,3],[0.0,1.0])|[1.0,1.0,1.0,1.0]|
+---+-------------------+-----------------+