如何将复杂的 Java Class 对象作为参数传递给 Spark 中的 Scala UDF?
How to pass complex Java Class Object as parameter to Scala UDF in Spark?
我有一个 Java 客户端 class(用作 spark-shell
的依赖 Jar)响应 API 调用 - 让我们调用 class SomeAPIRequester
.
在简单的 Java 中,它会 return 使用以下示例代码得到我想要的结果 -
SomeAPIRequester requester = SomeAPIRequester.builder().name("abc").build() // build the class
System.out.println(requester.getSomeItem("id123")) // result: {"id123": "item123"}
我想通过存储在 spark 数据帧(在 scala 中)中的 ID 的 RDD 以分布式方式调用此 API -
val inputIdRdd = sc.parallelize(List("id1", "id2", "id3"...)) // sample RDD of IDs i want to call the API for
我将我的 UDF 定义为 -
val test: UserDefinedFunction = udf((id: String, requester: SomeAPIRequester) => {
requester.getSomeItem(id)
})
并将此 UDF 称为 -
inputIdRdd.toDf("ids").withColumn("apiResult", test(col("ids"), requester) // requester as built with SomeAPIRequester.builder()....
// or directly with RDD ? udf, or a plain scala function ..
inputIdRdd.foreach{ id => test(id, requester) }
当我 运行 在结果上 .show()
或 .take()
时,我在请求者 java class 上得到 NullPointerException
。
我也尝试发送文字 (lit
),我在 scala 中读到了 typedLit
,但我无法转换 Java Requester
class 转换为 scala 中任何允许的 typedLit
类型。
有没有办法通过 UDF 调用此 Java class 对象并从 API 中获取结果?
编辑:
我还尝试在 RDD 的 foreach 块中初始化请求者 class -
inputIdRdd.foreach(x =>{
val apiRequester = SomeAPIRequester.builder()...(argPool).build()
try {
apiRequester.getSomeItem(x)
} catch {
case ex: Exception => println(ex.printStackTrace()); ""
}
})
但是这个 return 没有响应 - 无法初始化 class 等
谢谢!
使用自定义 classes 使用 Spark 需要了解 Spark 的工作原理。不要将您的实例作为参数放在 udf 中。 udfs 中的参数是从数据帧的行中提取的,在这种情况下空指针异常是可以理解的。您可以尝试以下选项:
首先将实例放入udf的范围内:
val requester: SomeAPIRequester = ???
val test: UserDefinedFunction = udf((id: String) => {
requester.getSomeItem(id)
})
此时您需要尽可能将 class 标记为可序列化,否则将出现 NotSerializableException。
如果您的 class 不是 Seriazable 因为它来自第三方,您可以将您的实例标记为 lazy transient val 如您所见在 https://mengdong.github.io/2016/08/16/spark-serialization-memo/ or https://medium.com/@swapnesh.chaubal/writing-to-logentries-from-apache-spark-35831282f53d.
如果您在 RDD 领域工作,您可以使用 mapPartitions 为每个分区创建一个实例。
我有一个 Java 客户端 class(用作 spark-shell
的依赖 Jar)响应 API 调用 - 让我们调用 class SomeAPIRequester
.
在简单的 Java 中,它会 return 使用以下示例代码得到我想要的结果 -
SomeAPIRequester requester = SomeAPIRequester.builder().name("abc").build() // build the class
System.out.println(requester.getSomeItem("id123")) // result: {"id123": "item123"}
我想通过存储在 spark 数据帧(在 scala 中)中的 ID 的 RDD 以分布式方式调用此 API -
val inputIdRdd = sc.parallelize(List("id1", "id2", "id3"...)) // sample RDD of IDs i want to call the API for
我将我的 UDF 定义为 -
val test: UserDefinedFunction = udf((id: String, requester: SomeAPIRequester) => {
requester.getSomeItem(id)
})
并将此 UDF 称为 -
inputIdRdd.toDf("ids").withColumn("apiResult", test(col("ids"), requester) // requester as built with SomeAPIRequester.builder()....
// or directly with RDD ? udf, or a plain scala function ..
inputIdRdd.foreach{ id => test(id, requester) }
当我 运行 在结果上 .show()
或 .take()
时,我在请求者 java class 上得到 NullPointerException
。
我也尝试发送文字 (lit
),我在 scala 中读到了 typedLit
,但我无法转换 Java Requester
class 转换为 scala 中任何允许的 typedLit
类型。
有没有办法通过 UDF 调用此 Java class 对象并从 API 中获取结果?
编辑:
我还尝试在 RDD 的 foreach 块中初始化请求者 class -
inputIdRdd.foreach(x =>{
val apiRequester = SomeAPIRequester.builder()...(argPool).build()
try {
apiRequester.getSomeItem(x)
} catch {
case ex: Exception => println(ex.printStackTrace()); ""
}
})
但是这个 return 没有响应 - 无法初始化 class 等
谢谢!
使用自定义 classes 使用 Spark 需要了解 Spark 的工作原理。不要将您的实例作为参数放在 udf 中。 udfs 中的参数是从数据帧的行中提取的,在这种情况下空指针异常是可以理解的。您可以尝试以下选项:
首先将实例放入udf的范围内:
val requester: SomeAPIRequester = ??? val test: UserDefinedFunction = udf((id: String) => { requester.getSomeItem(id) })
此时您需要尽可能将 class 标记为可序列化,否则将出现 NotSerializableException。
如果您的 class 不是 Seriazable 因为它来自第三方,您可以将您的实例标记为 lazy transient val 如您所见在 https://mengdong.github.io/2016/08/16/spark-serialization-memo/ or https://medium.com/@swapnesh.chaubal/writing-to-logentries-from-apache-spark-35831282f53d.
如果您在 RDD 领域工作,您可以使用 mapPartitions 为每个分区创建一个实例。