这是 Spark 1.3 中的回归错误吗?
Is this a regression bug in Spark 1.3?
spark SQL 1.2.1 中没有弃用警告,以下代码在 1.3
中停止工作
在 1.2.1 中工作(没有任何弃用警告)
val sqlContext = new HiveContext(sc)
import sqlContext._
val jsonRDD = sqlContext.jsonFile(jsonFilePath)
jsonRDD.registerTempTable("jsonTable")
val jsonResult = sql(s"select * from jsonTable")
val foo = jsonResult.zipWithUniqueId().map {
case (Row(...), uniqueId) => // do something useful
...
}
foo.registerTempTable("...")
Stopped working in 1.3.0(根本无法编译,我所做的只是更改为 1.3)
jsonResult.zipWithUniqueId() //since RDDApi doesn't implement that method
解决方法无效:
尽管这可能会给我一个 RDD[Row]:
jsonResult.rdd.zipWithUniqueId()
现在这行不通了,因为 RDD[Row]
当然没有 registerTempTable
方法
foo.registerTempTable("...")
这是我的问题
- 有解决办法吗? (例如,我只是做错了吗?)
- 这是一个错误吗? (我认为任何停止编译的东西在以前的版本中工作,没有@deprecated 警告显然是一个回归错误)
这不是错误,但很抱歉造成混淆!在 Spark 1.3 之前,Spark SQL 被标记为 Alpha 组件,因为 API 仍在不断变化。使用 Spark 1.3,我们毕业并稳定了 API。可以在 the documentation.
中找到有关移植时需要执行的操作的完整说明
我还可以回答您的具体问题,并说明我们进行这些更改的原因
Stopped working in 1.3.0 (simply does not compile, and all I did was change to 1.3)
jsonResult.zipWithUniqueId() //since RDDApi doesn't implement that method
DataFrames 现在是跨 Scala 和 Java 的统一接口。但是,由于我们必须为 1.X 的其余部分保持与现有 RDD API 的兼容性,因此 DataFrames
不是 RDD
。要获得 RDD 表示,您可以调用 df.rdd
或 df.javaRDD
此外,因为我们担心隐式转换可能会造成一些混淆,所以我们规定您必须显式调用 rdd.toDF
才能从 RDD 进行转换。但是,只有当您的 RDD 包含继承自 Product
的对象(即元组或大小写 类)时,此转换才会自动工作。
回到最初的问题,如果你想对具有任意模式的行进行转换,你需要在映射操作后明确告诉 Spark SQL 关于数据的结构(因为编译器不能) .
import org.apache.spark.sql.types._
val jsonData = sqlContext.jsonRDD(sc.parallelize("""{"name": "Michael", "zip": 94709}""" :: Nil))
val newSchema =
StructType(
StructField("uniqueId", IntegerType) +: jsonData.schema.fields)
val augmentedRows = jsonData.rdd.zipWithUniqueId.map {
case (row, id) =>
Row.fromSeq(id +: row.toSeq)
}
val newDF = sqlContext.createDataFrame(augmentedRows, newSchema)
spark SQL 1.2.1 中没有弃用警告,以下代码在 1.3
中停止工作在 1.2.1 中工作(没有任何弃用警告)
val sqlContext = new HiveContext(sc)
import sqlContext._
val jsonRDD = sqlContext.jsonFile(jsonFilePath)
jsonRDD.registerTempTable("jsonTable")
val jsonResult = sql(s"select * from jsonTable")
val foo = jsonResult.zipWithUniqueId().map {
case (Row(...), uniqueId) => // do something useful
...
}
foo.registerTempTable("...")
Stopped working in 1.3.0(根本无法编译,我所做的只是更改为 1.3)
jsonResult.zipWithUniqueId() //since RDDApi doesn't implement that method
解决方法无效:
尽管这可能会给我一个 RDD[Row]:
jsonResult.rdd.zipWithUniqueId()
现在这行不通了,因为 RDD[Row]
当然没有 registerTempTable
方法
foo.registerTempTable("...")
这是我的问题
- 有解决办法吗? (例如,我只是做错了吗?)
- 这是一个错误吗? (我认为任何停止编译的东西在以前的版本中工作,没有@deprecated 警告显然是一个回归错误)
这不是错误,但很抱歉造成混淆!在 Spark 1.3 之前,Spark SQL 被标记为 Alpha 组件,因为 API 仍在不断变化。使用 Spark 1.3,我们毕业并稳定了 API。可以在 the documentation.
中找到有关移植时需要执行的操作的完整说明我还可以回答您的具体问题,并说明我们进行这些更改的原因
Stopped working in 1.3.0 (simply does not compile, and all I did was change to 1.3)
jsonResult.zipWithUniqueId() //since RDDApi doesn't implement that method
DataFrames 现在是跨 Scala 和 Java 的统一接口。但是,由于我们必须为 1.X 的其余部分保持与现有 RDD API 的兼容性,因此 DataFrames
不是 RDD
。要获得 RDD 表示,您可以调用 df.rdd
或 df.javaRDD
此外,因为我们担心隐式转换可能会造成一些混淆,所以我们规定您必须显式调用 rdd.toDF
才能从 RDD 进行转换。但是,只有当您的 RDD 包含继承自 Product
的对象(即元组或大小写 类)时,此转换才会自动工作。
回到最初的问题,如果你想对具有任意模式的行进行转换,你需要在映射操作后明确告诉 Spark SQL 关于数据的结构(因为编译器不能) .
import org.apache.spark.sql.types._
val jsonData = sqlContext.jsonRDD(sc.parallelize("""{"name": "Michael", "zip": 94709}""" :: Nil))
val newSchema =
StructType(
StructField("uniqueId", IntegerType) +: jsonData.schema.fields)
val augmentedRows = jsonData.rdd.zipWithUniqueId.map {
case (row, id) =>
Row.fromSeq(id +: row.toSeq)
}
val newDF = sqlContext.createDataFrame(augmentedRows, newSchema)