Spark Streaming:foreachPartition 内的 NullPointerException

Spark Streaming: NullPointerException inside foreachPartition

我有一个 spark 流作业,它从 Kafka 读取并与 Postgres 中现有的 table 进行一些比较,然后再写入 Postrges。这是它的样子:

val message = KafkaUtils.createStream(...).map(_._2)

message.foreachRDD( rdd => {

  if (!rdd.isEmpty){
    val kafkaDF = sqlContext.read.json(rdd)
    println("First")

    kafkaDF.foreachPartition(
      i =>{
        val jdbcDF = sqlContext.read.format("jdbc").options(
          Map("url" -> "jdbc:postgresql://...",
            "dbtable" -> "table", "user" -> "user", "password" -> "pwd" )).load()

        createConnection()
        i.foreach(
          row =>{
            println("Second")
            connection.sendToTable()
          }
        )
        closeConnection()
      }
    )

此代码在行 val jbdcDF = ...

处出现 NullPointerException

我做错了什么?此外,我的日志 "First" 有效,但 "Second" 未显示在日志中的任何位置。我用 kafkaDF.collect().foreach(...) 尝试了整个代码,它运行良好,但性能很差。我希望将其替换为 foreachPartition

谢谢

不清楚 createConnectioncloseConnectionconnection.sendToTable 中是否存在任何问题,但根本问题是尝试嵌套操作/转换。 Spark 不支持它,Spark Streaming 也不例外。

这意味着嵌套的 DataFrame 初始化 (val jdbcDF = sqlContext.read.format ...) 根本无法工作,应该被删除。如果您将其用作参考,则应在与 kafkaDF 相同的级别创建它并使用标准转换(unionAlljoin、...)进行引用。

如果由于某种原因它不是 acceptable 解决方案,您可以在 forEachPartition 中创建纯 JDBC 连接并在 PostgreSQL table 上运行(我猜它是你已经在 sendToTable).

中做了什么

正如@zero323 正确指出的那样,你不能广播你的 jdbc 连接,你也不能创建嵌套的 RDD。 Spark 根本不支持在现有闭包中使用 sparkContext 或 sqlContext,即 foreachPartition,因此出现空指针异常。

有效解决这个问题的唯一方法是在 foreachPartition 中创建一个 JDBC 连接并直接在其上执行 SQL 以执行您想要的任何操作,然后使用相同的连接写回记录.

关于你的第二个编辑问题:

变化:

kafkaDF.foreachPartition(..)

kafkaDF.repartition(numPartition).foreachPartition(..)

其中 numPartition 是所需的分区数。这将增加分区的数量。如果你有多个执行器(每个执行器有多个任务),这些将 运行 并行。