如何将包含 np.array(或列表)的 Spark rdd 转换为 Spark DataFrame?

How to convert a Spark rdd containing np.array (or list) to a Spark DataFrame?

如何将包含 np.array(或列表)的 Spark rdd 转换为 Spark DataFrame?

from pyspark.sql import SparkSession
import numpy as np
from pyspark.sql.types import *

# Create a SparkSession
sc = SparkSession.builder.appName("SparkSQL").getOrCreate()

rdd = sc.sparkContext.parallelize(np.array([1.1,2.3,3,4,5,6,7,8,9,10]))
print(rdd.collect())
#[1.1, 2.3, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]

Schema = StructType([       
    StructField('name', ArrayType(FloatType(), True))])

spark_df = rdd.toDF(Schema)
spark_df.show()
[1.1, 2.3, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
22/04/03 16:04:28 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 8)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  ...
  File "/home/zenbook/.local/lib/python3.9/site-packages/pyspark/sql/types.py", line 1398, in verify_struct
    raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object 1.1 in type <class 'numpy.float64'>

注意:为了清楚起见,我删除了一些 spark 错误日志。

我尝试了一个列表并得到了一个非常接近的错误:

rdd = sc.sparkContext.parallelize([1.1,2.3,3,4,5,6,7,8,9,10])
print(rdd.collect())
#[1.1, 2.3, 3, 4, 5, 6, 7, 8, 9, 10]

Schema = StructType([       
    StructField('name', ArrayType(FloatType(), True))])

spark_df = rdd.toDF(Schema)
spark_df.show()
TypeError: StructType can not accept object 1.1 in type <class 'float'>

在《Learning Spark book(O'reilly)》中,写着ArrayType => List,Tuple or Array Python类型

为什么我需要这样做:我无法使用 Spark_DF 将 byte_array(十六进制)转换为 float_array(尝试了 3 天)。我用 Spark_rdd.

转换了它们
def get_msp(row):
    """ extract MSN from row as numpy array"""
    return np.frombuffer(row.MSP,dtype=np.float64) # or list adding .tolist()
spectra=df.rdd.map(format_value)

也许目前无法将 Spark_RDD 与 list/array 转换为 Spark_DF?

因为我希望将此 array/list 与 SparkML(PCA 和 LLA)一起使用,所以使用 VectorAssembler 的解决方案也很棒。

Spark 的数据帧不知道 numpy.float64,但您可以将 rdd 中的值转换为浮点数:

spark.createDataFrame(rdd.map(lambda x: float(x)), FloatType())

演示:

rdd.collect()
# [1.1, 2.3, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]

# rdd datatypes are numpy.float64
rdd.map(lambda x: type(x)).collect() 
# [numpy.float64,
#  numpy.float64,
# . . . 
#  numpy.float64]

# create dataframe
spark.createDataFrame(rdd.map(lambda x: float(x)), FloatType()).show()
# +-----+
# |value|
# +-----+
# |  1.1|
# |  2.3|
# |  3.0|
# |  4.0|
# |  5.0|
# |  6.0|
# |  7.0|
# |  8.0|
# |  9.0|
# | 10.0|
# +-----+