在 PySpark 中创建 Class 的 Apache Spark RDD
Creating an Apache Spark RDD of a Class in PySpark
我必须将 Scala 代码转换为 python。
scala 代码将字符串的 RDD 转换为 case-class 的 RDD。代码如下:
case class Stock(
stockName: String,
dt: String,
openPrice: Double,
highPrice: Double,
lowPrice: Double,
closePrice: Double,
adjClosePrice: Double,
volume: Double
)
def parseStock(inputRecord: String, stockName: String): Stock = {
val column = inputRecord.split(",")
Stock(
stockName,
column(0),
column(1).toDouble,
column(2).toDouble,
column(3).toDouble,
column(4).toDouble,
column(5).toDouble,
column(6).toDouble)
}
def parseRDD(rdd: RDD[String], stockName: String): RDD[Stock] = {
val header = rdd.first
rdd.filter((data) => {
data(0) != header(0) && !data.contains("null")
})
.map(data => parseStock(data, stockName))
}
是否可以在 PySpark 中实现它?我尝试使用以下代码,但出现错误
from dataclasses import dataclass
@dataclass(eq=True,frozen=True)
class Stock:
stockName : str
dt: str
openPrice: float
highPrice: float
lowPrice: float
closePrice: float
adjClosePrice: float
volume: float
def parseStock(inputRecord, stockName):
column = inputRecord.split(",")
return Stock(stockName,
column[0],
column[1],
column[2],
column[3],
column[4],
column[5],
column[6])
def parseRDD(rdd, stockName):
header = rdd.first()
res = rdd.filter(lambda data : data != header).map(lambda data : parseStock(data, stockName))
return res
错误
Py4JJavaError:调用 z:org.apache.spark.api.python.PythonRDD.collectAndServe 时发生错误。
:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 21.0 中的任务 0 失败 1 次,最近的失败:阶段 21.0 中丢失任务 0.0(TID 31,本地主机,执行程序驱动程序):org.apache.spark.api.python.PythonException:追溯(最近通话最后一次):
文件“/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py”,第 364 行,在 main
func, 分析器, 反序列化器, 序列化器 = read_command(pickleSer, infile)
文件“/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py”,第 69 行,在 read_command
命令 = serializer._read_with_length(文件)
文件“/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py”,第 173 行,在 _read_with_length
return self.loads(对象)
文件“/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py”,第 587 行,加载
return pickle.loads(obj, encoding=编码)
AttributeError:无法在
上获取属性 'main'
数据集 API 不适用于 python。
"A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar."
https://spark.apache.org/docs/latest/sql-programming-guide.html
我必须将 Scala 代码转换为 python。
scala 代码将字符串的 RDD 转换为 case-class 的 RDD。代码如下:
case class Stock(
stockName: String,
dt: String,
openPrice: Double,
highPrice: Double,
lowPrice: Double,
closePrice: Double,
adjClosePrice: Double,
volume: Double
)
def parseStock(inputRecord: String, stockName: String): Stock = {
val column = inputRecord.split(",")
Stock(
stockName,
column(0),
column(1).toDouble,
column(2).toDouble,
column(3).toDouble,
column(4).toDouble,
column(5).toDouble,
column(6).toDouble)
}
def parseRDD(rdd: RDD[String], stockName: String): RDD[Stock] = {
val header = rdd.first
rdd.filter((data) => {
data(0) != header(0) && !data.contains("null")
})
.map(data => parseStock(data, stockName))
}
是否可以在 PySpark 中实现它?我尝试使用以下代码,但出现错误
from dataclasses import dataclass
@dataclass(eq=True,frozen=True)
class Stock:
stockName : str
dt: str
openPrice: float
highPrice: float
lowPrice: float
closePrice: float
adjClosePrice: float
volume: float
def parseStock(inputRecord, stockName):
column = inputRecord.split(",")
return Stock(stockName,
column[0],
column[1],
column[2],
column[3],
column[4],
column[5],
column[6])
def parseRDD(rdd, stockName):
header = rdd.first()
res = rdd.filter(lambda data : data != header).map(lambda data : parseStock(data, stockName))
return res
错误 Py4JJavaError:调用 z:org.apache.spark.api.python.PythonRDD.collectAndServe 时发生错误。 :org.apache.spark.SparkException:作业因阶段失败而中止:阶段 21.0 中的任务 0 失败 1 次,最近的失败:阶段 21.0 中丢失任务 0.0(TID 31,本地主机,执行程序驱动程序):org.apache.spark.api.python.PythonException:追溯(最近通话最后一次):
文件“/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py”,第 364 行,在 main
func, 分析器, 反序列化器, 序列化器 = read_command(pickleSer, infile)
文件“/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py”,第 69 行,在 read_command
命令 = serializer._read_with_length(文件)
文件“/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py”,第 173 行,在 _read_with_length
return self.loads(对象)
文件“/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py”,第 587 行,加载
return pickle.loads(obj, encoding=编码)
AttributeError:无法在
数据集 API 不适用于 python。
"A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar."
https://spark.apache.org/docs/latest/sql-programming-guide.html