如何传递数组列并将其转换为 pyspark 中的 numpy 数组
How to pass a array column and convert it to a numpy array in pyspark
我有如下数据框:
from pyspark import SparkContext, SparkConf,SQLContext
import numpy as np
from scipy.spatial.distance import cosine
from pyspark.sql.functions import lit,countDistinct,udf,array,struct
import pyspark.sql.functions as F
config = SparkConf("local")
sc = SparkContext(conf=config)
sqlContext=SQLContext(sc)
@udf("float")
def myfunction(x):
y=np.array([1,3,9])
x=np.array(x)
return cosine(x,y)
df = sqlContext.createDataFrame([("doc_3",1,3,9), ("doc_1",9,6,0), ("doc_2",9,9,3) ]).withColumnRenamed("_1","doc").withColumnRenamed("_2","word1").withColumnRenamed("_3","word2").withColumnRenamed("_4","word3")
df2=df.select("doc", array([c for c in df.columns if c not in {'doc'}]).alias("words"))
df2=df2.withColumn("cosine",myfunction("words"))
这会引发错误:
19/10/02 21:24:58 错误执行器:阶段 1.0 (TID 1) 中的任务 0.0 出现异常
net.razorvine.pickle.PickleException: expected zero arguments for
construction of ClassDict (for numpy.dtype) at
net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707) at
net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175) at
net.razorvine.pickle.Unpickler.load(Unpickler.java:99) at
net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
我不确定为什么不能将列表类型转换为 numpy 数组?感谢任何帮助
这与您 中的问题基本相同。你创建了一个 udf 并告诉 spark 这个函数将 return 一个 float
,但是你 return 一个 numpy.float64
类型的对象。
您可以通过调用 item()
将 numpy 类型转换为 python 类型,如下所示:
import numpy as np
from scipy.spatial.distance import cosine
from pyspark.sql.functions import lit,countDistinct,udf,array,struct
import pyspark.sql.functions as F
@udf("float")
def myfunction(x):
y=np.array([1,3,9])
x=np.array(x)
return cosine(x,y).item()
df = spark.createDataFrame([("doc_3",1,3,9), ("doc_1",9,6,0), ("doc_2",9,9,3) ]).withColumnRenamed("_1","doc").withColumnRenamed("_2","word1").withColumnRenamed("_3","word2").withColumnRenamed("_4","word3")
df2=df.select("doc", array([c for c in df.columns if c not in {'doc'}]).alias("words"))
df2=df2.withColumn("cosine",myfunction("words"))
df2.show(truncate=False)
输出:
+-----+---------+----------+
| doc | words | cosine |
+-----+---------+----------+
|doc_3|[1, 3, 9]| 0.0 |
|doc_1|[9, 6, 0]|0.7383323 |
|doc_2|[9, 9, 3]|0.49496463|
+-----+---------+----------+
我有如下数据框:
from pyspark import SparkContext, SparkConf,SQLContext
import numpy as np
from scipy.spatial.distance import cosine
from pyspark.sql.functions import lit,countDistinct,udf,array,struct
import pyspark.sql.functions as F
config = SparkConf("local")
sc = SparkContext(conf=config)
sqlContext=SQLContext(sc)
@udf("float")
def myfunction(x):
y=np.array([1,3,9])
x=np.array(x)
return cosine(x,y)
df = sqlContext.createDataFrame([("doc_3",1,3,9), ("doc_1",9,6,0), ("doc_2",9,9,3) ]).withColumnRenamed("_1","doc").withColumnRenamed("_2","word1").withColumnRenamed("_3","word2").withColumnRenamed("_4","word3")
df2=df.select("doc", array([c for c in df.columns if c not in {'doc'}]).alias("words"))
df2=df2.withColumn("cosine",myfunction("words"))
这会引发错误:
19/10/02 21:24:58 错误执行器:阶段 1.0 (TID 1) 中的任务 0.0 出现异常
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype) at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707) at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175) at net.razorvine.pickle.Unpickler.load(Unpickler.java:99) at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
我不确定为什么不能将列表类型转换为 numpy 数组?感谢任何帮助
这与您 float
,但是你 return 一个 numpy.float64
类型的对象。
您可以通过调用 item()
将 numpy 类型转换为 python 类型,如下所示:
import numpy as np
from scipy.spatial.distance import cosine
from pyspark.sql.functions import lit,countDistinct,udf,array,struct
import pyspark.sql.functions as F
@udf("float")
def myfunction(x):
y=np.array([1,3,9])
x=np.array(x)
return cosine(x,y).item()
df = spark.createDataFrame([("doc_3",1,3,9), ("doc_1",9,6,0), ("doc_2",9,9,3) ]).withColumnRenamed("_1","doc").withColumnRenamed("_2","word1").withColumnRenamed("_3","word2").withColumnRenamed("_4","word3")
df2=df.select("doc", array([c for c in df.columns if c not in {'doc'}]).alias("words"))
df2=df2.withColumn("cosine",myfunction("words"))
df2.show(truncate=False)
输出:
+-----+---------+----------+
| doc | words | cosine |
+-----+---------+----------+
|doc_3|[1, 3, 9]| 0.0 |
|doc_1|[9, 6, 0]|0.7383323 |
|doc_2|[9, 9, 3]|0.49496463|
+-----+---------+----------+