Spark SQL 中关于 Dataset.filter 的错误
An error about Dataset.filter in Spark SQL
我只想过滤数据集以包含可以在 MySQL 中找到的记录。
这是数据集:
dataset.show()
+---+-----+
| id| name|
+---+-----+
| 1| a|
| 2| b|
| 3| c|
+---+-----+
这里是 MySQL 中的 table:
+---+-----+
| id| name|
+---+-----+
| 1| a|
| 3| c|
| 4| d|
+---+-----+
这是我的代码(运行 in spark-shell):
import java.util.Properties
case class App(id: Int, name: String)
val data = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c")))
val dataFrame = data.map { case (id, name) => App(id, name) }.toDF
val dataset = dataFrame.as[App]
val url = "jdbc:mysql://ip:port/tbl_name"
val table = "my_tbl_name"
val user = "my_user_name"
val password = "my_password"
val properties = new Properties()
properties.setProperty("user", user)
properties.setProperty("password", password)
dataset.filter((x: App) =>
0 != sqlContext.read.jdbc(url, table, Array("id = " + x.id.toString), properties).count).show()
但我得到 "java.lang.NullPointerException"
at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638)
at org.apache.spark.sql.SQLConf.defaultDataSourceName(SQLConf.scala:558)
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:362)
at org.apache.spark.sql.SQLContext.read(SQLContext.scala:623)
我测试过
val x = App(1, "aa")
sqlContext.read.jdbc(url, table, Array("id = " + x.id.toString), properties).count
val y = App(5, "aa")
sqlContext.read.jdbc(url, table, Array("id = " + y.id.toString), properties).count
我可以得到正确的结果 1 和 0。
过滤器有什么问题?
What's the problem with filter?
您遇到异常是因为您试图在转换 (filter
) 中执行操作(DataFrame
上的 count
)。 Spark 不支持嵌套操作和转换。
正确的解决方案与往常一样 join
在兼容的数据结构上,使用本地数据结构查找或直接查询外部系统(不使用 Spark 数据结构)。
我只想过滤数据集以包含可以在 MySQL 中找到的记录。
这是数据集:
dataset.show()
+---+-----+
| id| name|
+---+-----+
| 1| a|
| 2| b|
| 3| c|
+---+-----+
这里是 MySQL 中的 table:
+---+-----+
| id| name|
+---+-----+
| 1| a|
| 3| c|
| 4| d|
+---+-----+
这是我的代码(运行 in spark-shell):
import java.util.Properties
case class App(id: Int, name: String)
val data = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c")))
val dataFrame = data.map { case (id, name) => App(id, name) }.toDF
val dataset = dataFrame.as[App]
val url = "jdbc:mysql://ip:port/tbl_name"
val table = "my_tbl_name"
val user = "my_user_name"
val password = "my_password"
val properties = new Properties()
properties.setProperty("user", user)
properties.setProperty("password", password)
dataset.filter((x: App) =>
0 != sqlContext.read.jdbc(url, table, Array("id = " + x.id.toString), properties).count).show()
但我得到 "java.lang.NullPointerException"
at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638)
at org.apache.spark.sql.SQLConf.defaultDataSourceName(SQLConf.scala:558)
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:362)
at org.apache.spark.sql.SQLContext.read(SQLContext.scala:623)
我测试过
val x = App(1, "aa")
sqlContext.read.jdbc(url, table, Array("id = " + x.id.toString), properties).count
val y = App(5, "aa")
sqlContext.read.jdbc(url, table, Array("id = " + y.id.toString), properties).count
我可以得到正确的结果 1 和 0。
过滤器有什么问题?
What's the problem with filter?
您遇到异常是因为您试图在转换 (filter
) 中执行操作(DataFrame
上的 count
)。 Spark 不支持嵌套操作和转换。
正确的解决方案与往常一样 join
在兼容的数据结构上,使用本地数据结构查找或直接查询外部系统(不使用 Spark 数据结构)。