在 PySpark 中创建 Class 的 Apache Spark RDD

Creating an Apache Spark RDD of a Class in PySpark

我必须将 Scala 代码转换为 python。

scala 代码将字符串的 RDD 转换为 case-class 的 RDD。代码如下:

case class Stock(
                  stockName: String,
                  dt: String,
                  openPrice: Double,
                  highPrice: Double,
                  lowPrice: Double,
                  closePrice: Double,
                  adjClosePrice: Double,
                  volume: Double
                )


  def parseStock(inputRecord: String, stockName: String): Stock = {
    val column = inputRecord.split(",")
    Stock(
      stockName,
      column(0),
      column(1).toDouble,
      column(2).toDouble,
      column(3).toDouble,
      column(4).toDouble,
      column(5).toDouble,
      column(6).toDouble)
  }

  def parseRDD(rdd: RDD[String], stockName: String): RDD[Stock] = {
    val header = rdd.first
    rdd.filter((data) => {
      data(0) != header(0) && !data.contains("null")
    })
      .map(data => parseStock(data, stockName))
  }  

是否可以在 PySpark 中实现它?我尝试使用以下代码,但出现错误

from dataclasses import dataclass

@dataclass(eq=True,frozen=True)
class Stock:
    stockName : str
    dt: str
    openPrice: float
    highPrice: float
    lowPrice: float
    closePrice: float
    adjClosePrice: float
    volume: float


 

def parseStock(inputRecord, stockName):
  column = inputRecord.split(",")
  return Stock(stockName,
               column[0],
               column[1],
               column[2],
               column[3],
               column[4],
               column[5],
               column[6])

def parseRDD(rdd, stockName):
  header = rdd.first()
  res = rdd.filter(lambda data : data != header).map(lambda data : parseStock(data, stockName))
  return res

错误 Py4JJavaError:调用 z:org.apache.spark.api.python.PythonRDD.collectAndServe 时发生错误。 :org.apache.spark.SparkException:作业因阶段失败而中止:阶段 21.0 中的任务 0 失败 1 次,最近的失败:阶段 21.0 中丢失任务 0.0(TID 31,本地主机,执行程序驱动程序):org.apache.spark.api.python.PythonException:追溯(最近通话最后一次):

文件“/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py”,第 364 行,在 main func, 分析器, 反序列化器, 序列化器 = read_command(pickleSer, infile) 文件“/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py”,第 69 行,在 read_command 命令 = serializer._read_with_length(文件) 文件“/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py”,第 173 行,在 _read_with_length return self.loads(对象) 文件“/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py”,第 587 行,加载 return pickle.loads(obj, encoding=编码) AttributeError:无法在

上获取属性 'main'

数据集 API 不适用于 python。

"A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar."

https://spark.apache.org/docs/latest/sql-programming-guide.html