Scala中使用Flink的leftOuterJoinLateral时出现NullPointerException异常

NullPointerException exception when using Flink's leftOuterJoinLateral in Scala

我正在尝试遵循 the documentation 并创建一个 Table 函数来“扁平化”一些数据。 Table 函数在使用 joinLateral 进行展平时似乎工作正常。但是,当使用 leftOuterJoinLateral 时,出现以下错误。我正在使用 Scala 并尝试了 Table API 和 SQL,结果相同:

Caused by: java.lang.NullPointerException: Null result cannot be stored in a Case Class.

这是我的工作:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.functions.TableFunction

object example_job{
  // Split the List[Int] into multiple rows
  class Split() extends TableFunction[Int] {
    def eval(nums: List[Int]): Unit = {
      nums.foreach(x =>
        if(x != 3) {
          collect(x)
      })
    }
  }

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.createLocalEnvironment()
    val tableEnv = StreamTableEnvironment.create(env)
    val splitMe = new Split()

    // Create some dummy data
    val events: DataStream[(String, List[Int])] = env.fromElements(("simon", List(1,2,3)), ("jessica", List(3)))
    
    val table = tableEnv.fromDataStream(events, 'name, 'numbers)
      .leftOuterJoinLateral(splitMe('numbers) as 'number)
      .select('name, 'number)
    table.toAppendStream[(String, Int)].print()
    env.execute("Flink jira ticket example")
  }
}

当我将 .leftOuterJoinLateral 更改为 .joinLateral 时,我得到了预期的结果:

(simon,1)
(simon,2)

当使用 .leftOuterJoinLateral 时,我会期望这样的东西:

(simon,1)
(simon,2)
(simon,null) // or (simon, None)
(jessica,null) // or (jessica, None)

这似乎是 Scala 的一个错误 API?我想在提票之前先在这里检查一下,以防我做错了什么!

问题是 Flink 默认情况下确实期望一行的所有字段都是非空的。这就是程序在看到外连接操作的 null 结果时失败的原因。为了接受 null 值,您需要通过

禁用空检查
val tableConfig = tableEnv.getConfig
tableConfig.setNullCheck(false)

或者您必须指定结果类型以容忍空值,例如指定自定义 POJO 输出类型:

table.toAppendStream[MyOutput].print()

class MyOutput(var name: String, var number: Integer) {
  def this() {
    this(null, null)
  }

  override def toString: String = s"($name, $number)"
}