如何将包含 np.array(或列表)的 Spark rdd 转换为 Spark DataFrame?
How to convert a Spark rdd containing np.array (or list) to a Spark DataFrame?
如何将包含 np.array(或列表)的 Spark rdd 转换为 Spark DataFrame?
from pyspark.sql import SparkSession
import numpy as np
from pyspark.sql.types import *
# Create a SparkSession
sc = SparkSession.builder.appName("SparkSQL").getOrCreate()
rdd = sc.sparkContext.parallelize(np.array([1.1,2.3,3,4,5,6,7,8,9,10]))
print(rdd.collect())
#[1.1, 2.3, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
Schema = StructType([
StructField('name', ArrayType(FloatType(), True))])
spark_df = rdd.toDF(Schema)
spark_df.show()
[1.1, 2.3, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
22/04/03 16:04:28 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 8)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
...
File "/home/zenbook/.local/lib/python3.9/site-packages/pyspark/sql/types.py", line 1398, in verify_struct
raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object 1.1 in type <class 'numpy.float64'>
注意:为了清楚起见,我删除了一些 spark 错误日志。
我尝试了一个列表并得到了一个非常接近的错误:
rdd = sc.sparkContext.parallelize([1.1,2.3,3,4,5,6,7,8,9,10])
print(rdd.collect())
#[1.1, 2.3, 3, 4, 5, 6, 7, 8, 9, 10]
Schema = StructType([
StructField('name', ArrayType(FloatType(), True))])
spark_df = rdd.toDF(Schema)
spark_df.show()
TypeError: StructType can not accept object 1.1 in type <class 'float'>
在《Learning Spark book(O'reilly)》中,写着ArrayType => List,Tuple or Array Python类型
为什么我需要这样做:我无法使用 Spark_DF 将 byte_array(十六进制)转换为 float_array(尝试了 3 天)。我用 Spark_rdd.
转换了它们
def get_msp(row):
""" extract MSN from row as numpy array"""
return np.frombuffer(row.MSP,dtype=np.float64) # or list adding .tolist()
spectra=df.rdd.map(format_value)
也许目前无法将 Spark_RDD 与 list/array 转换为 Spark_DF?
因为我希望将此 array/list 与 SparkML(PCA 和 LLA)一起使用,所以使用 VectorAssembler 的解决方案也很棒。
Spark 的数据帧不知道 numpy.float64
,但您可以将 rdd 中的值转换为浮点数:
spark.createDataFrame(rdd.map(lambda x: float(x)), FloatType())
演示:
rdd.collect()
# [1.1, 2.3, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
# rdd datatypes are numpy.float64
rdd.map(lambda x: type(x)).collect()
# [numpy.float64,
# numpy.float64,
# . . .
# numpy.float64]
# create dataframe
spark.createDataFrame(rdd.map(lambda x: float(x)), FloatType()).show()
# +-----+
# |value|
# +-----+
# | 1.1|
# | 2.3|
# | 3.0|
# | 4.0|
# | 5.0|
# | 6.0|
# | 7.0|
# | 8.0|
# | 9.0|
# | 10.0|
# +-----+
如何将包含 np.array(或列表)的 Spark rdd 转换为 Spark DataFrame?
from pyspark.sql import SparkSession
import numpy as np
from pyspark.sql.types import *
# Create a SparkSession
sc = SparkSession.builder.appName("SparkSQL").getOrCreate()
rdd = sc.sparkContext.parallelize(np.array([1.1,2.3,3,4,5,6,7,8,9,10]))
print(rdd.collect())
#[1.1, 2.3, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
Schema = StructType([
StructField('name', ArrayType(FloatType(), True))])
spark_df = rdd.toDF(Schema)
spark_df.show()
[1.1, 2.3, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
22/04/03 16:04:28 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 8)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
...
File "/home/zenbook/.local/lib/python3.9/site-packages/pyspark/sql/types.py", line 1398, in verify_struct
raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object 1.1 in type <class 'numpy.float64'>
注意:为了清楚起见,我删除了一些 spark 错误日志。
我尝试了一个列表并得到了一个非常接近的错误:
rdd = sc.sparkContext.parallelize([1.1,2.3,3,4,5,6,7,8,9,10])
print(rdd.collect())
#[1.1, 2.3, 3, 4, 5, 6, 7, 8, 9, 10]
Schema = StructType([
StructField('name', ArrayType(FloatType(), True))])
spark_df = rdd.toDF(Schema)
spark_df.show()
TypeError: StructType can not accept object 1.1 in type <class 'float'>
在《Learning Spark book(O'reilly)》中,写着ArrayType => List,Tuple or Array Python类型
为什么我需要这样做:我无法使用 Spark_DF 将 byte_array(十六进制)转换为 float_array(尝试了 3 天)。我用 Spark_rdd.
转换了它们def get_msp(row):
""" extract MSN from row as numpy array"""
return np.frombuffer(row.MSP,dtype=np.float64) # or list adding .tolist()
spectra=df.rdd.map(format_value)
也许目前无法将 Spark_RDD 与 list/array 转换为 Spark_DF?
因为我希望将此 array/list 与 SparkML(PCA 和 LLA)一起使用,所以使用 VectorAssembler 的解决方案也很棒。
Spark 的数据帧不知道 numpy.float64
,但您可以将 rdd 中的值转换为浮点数:
spark.createDataFrame(rdd.map(lambda x: float(x)), FloatType())
演示:
rdd.collect()
# [1.1, 2.3, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
# rdd datatypes are numpy.float64
rdd.map(lambda x: type(x)).collect()
# [numpy.float64,
# numpy.float64,
# . . .
# numpy.float64]
# create dataframe
spark.createDataFrame(rdd.map(lambda x: float(x)), FloatType()).show()
# +-----+
# |value|
# +-----+
# | 1.1|
# | 2.3|
# | 3.0|
# | 4.0|
# | 5.0|
# | 6.0|
# | 7.0|
# | 8.0|
# | 9.0|
# | 10.0|
# +-----+