Spark Streaming:foreachPartition 内的 NullPointerException
Spark Streaming: NullPointerException inside foreachPartition
我有一个 spark 流作业,它从 Kafka 读取并与 Postgres 中现有的 table 进行一些比较,然后再写入 Postrges。这是它的样子:
val message = KafkaUtils.createStream(...).map(_._2)
message.foreachRDD( rdd => {
if (!rdd.isEmpty){
val kafkaDF = sqlContext.read.json(rdd)
println("First")
kafkaDF.foreachPartition(
i =>{
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:postgresql://...",
"dbtable" -> "table", "user" -> "user", "password" -> "pwd" )).load()
createConnection()
i.foreach(
row =>{
println("Second")
connection.sendToTable()
}
)
closeConnection()
}
)
此代码在行 val jbdcDF = ...
处出现 NullPointerException
我做错了什么?此外,我的日志 "First"
有效,但 "Second"
未显示在日志中的任何位置。我用 kafkaDF.collect().foreach(...)
尝试了整个代码,它运行良好,但性能很差。我希望将其替换为 foreachPartition
。
谢谢
不清楚 createConnection
、closeConnection
或 connection.sendToTable
中是否存在任何问题,但根本问题是尝试嵌套操作/转换。 Spark 不支持它,Spark Streaming 也不例外。
这意味着嵌套的 DataFrame
初始化 (val jdbcDF = sqlContext.read.format ...
) 根本无法工作,应该被删除。如果您将其用作参考,则应在与 kafkaDF
相同的级别创建它并使用标准转换(unionAll
、join
、...)进行引用。
如果由于某种原因它不是 acceptable 解决方案,您可以在 forEachPartition
中创建纯 JDBC 连接并在 PostgreSQL table 上运行(我猜它是你已经在 sendToTable
).
中做了什么
正如@zero323 正确指出的那样,你不能广播你的 jdbc 连接,你也不能创建嵌套的 RDD。 Spark 根本不支持在现有闭包中使用 sparkContext 或 sqlContext,即 foreachPartition,因此出现空指针异常。
有效解决这个问题的唯一方法是在 foreachPartition 中创建一个 JDBC 连接并直接在其上执行 SQL 以执行您想要的任何操作,然后使用相同的连接写回记录.
关于你的第二个编辑问题:
变化:
kafkaDF.foreachPartition(..)
到
kafkaDF.repartition(numPartition).foreachPartition(..)
其中 numPartition 是所需的分区数。这将增加分区的数量。如果你有多个执行器(每个执行器有多个任务),这些将 运行 并行。
我有一个 spark 流作业,它从 Kafka 读取并与 Postgres 中现有的 table 进行一些比较,然后再写入 Postrges。这是它的样子:
val message = KafkaUtils.createStream(...).map(_._2)
message.foreachRDD( rdd => {
if (!rdd.isEmpty){
val kafkaDF = sqlContext.read.json(rdd)
println("First")
kafkaDF.foreachPartition(
i =>{
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:postgresql://...",
"dbtable" -> "table", "user" -> "user", "password" -> "pwd" )).load()
createConnection()
i.foreach(
row =>{
println("Second")
connection.sendToTable()
}
)
closeConnection()
}
)
此代码在行 val jbdcDF = ...
我做错了什么?此外,我的日志 "First"
有效,但 "Second"
未显示在日志中的任何位置。我用 kafkaDF.collect().foreach(...)
尝试了整个代码,它运行良好,但性能很差。我希望将其替换为 foreachPartition
。
谢谢
不清楚 createConnection
、closeConnection
或 connection.sendToTable
中是否存在任何问题,但根本问题是尝试嵌套操作/转换。 Spark 不支持它,Spark Streaming 也不例外。
这意味着嵌套的 DataFrame
初始化 (val jdbcDF = sqlContext.read.format ...
) 根本无法工作,应该被删除。如果您将其用作参考,则应在与 kafkaDF
相同的级别创建它并使用标准转换(unionAll
、join
、...)进行引用。
如果由于某种原因它不是 acceptable 解决方案,您可以在 forEachPartition
中创建纯 JDBC 连接并在 PostgreSQL table 上运行(我猜它是你已经在 sendToTable
).
正如@zero323 正确指出的那样,你不能广播你的 jdbc 连接,你也不能创建嵌套的 RDD。 Spark 根本不支持在现有闭包中使用 sparkContext 或 sqlContext,即 foreachPartition,因此出现空指针异常。
有效解决这个问题的唯一方法是在 foreachPartition 中创建一个 JDBC 连接并直接在其上执行 SQL 以执行您想要的任何操作,然后使用相同的连接写回记录.
关于你的第二个编辑问题:
变化:
kafkaDF.foreachPartition(..)
到
kafkaDF.repartition(numPartition).foreachPartition(..)
其中 numPartition 是所需的分区数。这将增加分区的数量。如果你有多个执行器(每个执行器有多个任务),这些将 运行 并行。