如何将复杂的 Java Class 对象作为参数传递给 Spark 中的 Scala UDF?

How to pass complex Java Class Object as parameter to Scala UDF in Spark?

我有一个 Java 客户端 class(用作 spark-shell 的依赖 Jar)响应 API 调用 - 让我们调用 class SomeAPIRequester.

在简单的 Java 中,它会 return 使用以下示例代码得到我想要的结果 -

SomeAPIRequester requester = SomeAPIRequester.builder().name("abc").build() // build the class
System.out.println(requester.getSomeItem("id123"))  // result: {"id123": "item123"}

我想通过存储在 spark 数据帧(在 scala 中)中的 ID 的 RDD 以分布式方式调用此 API -

val inputIdRdd = sc.parallelize(List("id1", "id2", "id3"...))  // sample RDD of IDs i want to call the API for

我将我的 UDF 定义为 -

val test: UserDefinedFunction = udf((id: String, requester: SomeAPIRequester) => {
   requester.getSomeItem(id)
})

并将此 UDF 称为 -

inputIdRdd.toDf("ids").withColumn("apiResult", test(col("ids"), requester)  // requester as built with SomeAPIRequester.builder()....

// or directly with RDD ? udf, or a plain scala function .. 
inputIdRdd.foreach{ id => test(id, requester) }

当我 运行 在结果上 .show().take() 时,我在请求者 java class 上得到 NullPointerException

我也尝试发送文字 (lit),我在 scala 中读到了 typedLit,但我无法转换 Java Requester class 转换为 scala 中任何允许的 typedLit 类型。

有没有办法通过 UDF 调用此 Java class 对象并从 API 中获取结果?

编辑:

我还尝试在 RDD 的 foreach 块中初始化请求者 class -

inputIdRdd.foreach(x =>{
  val apiRequester = SomeAPIRequester.builder()...(argPool).build()

  try {
    apiRequester.getSomeItem(x)
  } catch {
    case ex: Exception => println(ex.printStackTrace()); ""
  }
})

但是这个 return 没有响应 - 无法初始化 class 等

谢谢!

使用自定义 classes 使用 Spark 需要了解 Spark 的工作原理。不要将您的实例作为参数放在 udf 中。 udfs 中的参数是从数据帧的行中提取的,在这种情况下空指针异常是可以理解的。您可以尝试以下选项:

  1. 首先将实例放入udf的范围内:

    val requester: SomeAPIRequester = ???
    
    val test: UserDefinedFunction = udf((id: String) => {
         requester.getSomeItem(id)
    })
    

此时您需要尽可能将 class 标记为可序列化,否则将出现 NotSerializableException。

  1. 如果您的 class 不是 Seriazable 因为它来自第三方,您可以将您的实例标记为 lazy transient val 如您所见在 https://mengdong.github.io/2016/08/16/spark-serialization-memo/ or https://medium.com/@swapnesh.chaubal/writing-to-logentries-from-apache-spark-35831282f53d.

  2. 如果您在 RDD 领域工作,您可以使用 mapPartitions 为每个分区创建一个实例。