I get error: "Overload resolution ambiguity" from MapElements transform in Apache Beam when using Kotlin
I get error: "Overload resolution ambiguity" from MapElements transform in Apache Beam when using Kotlin
我正在探索 GoogleCloudPlatform 在 Github 上提供的 Apache Beam 数据流模板。
特别是,我正在将 PubSubToBigQuery 模板从 Java 转换为 Kotlin。
通过这样做,我在 274
行的 MapElements.input(...).via(...)
转换中遇到了 过载歧义解析 错误。错误信息是:
Error:(62, 22) Kotlin: Cannot choose among the following candidates without completing type inference:
public final fun <NewInputT : Any!> via(fn: ((input: BigQueryInsertError!) -> FailsafeElement<String!, String!>!)!): MapElements<BigQueryInsertError!, FailsafeElement<String!, String!>!>! defined in org.apache.beam.sdk.transforms.MapElements
public final fun <NewInputT : Any!> via(fn: ((input: BigQueryInsertError!) -> FailsafeElement<String!, String!>!)!): MapElements<BigQueryInsertError!, FailsafeElement<String!, String!>!>! defined in org.apache.beam.sdk.transforms.MapElements
相关的 Java 代码片段是:
/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
PCollection<FailsafeElement<String, String>> failedInserts =
writeResult
.getFailedInsertsWithErr()
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
.setCoder(FAILSAFE_ELEMENT_CODER);
Kotlin 转换如下:
/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
val failedInserts: PCollection<FailsafeElement<String, String>> =
writeResult.failedInsertsWithErr
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.encodedTypeDescriptor)
.via { e: BigQueryInsertError -> wrapBigQueryInsertError(e) })
.setCoder(FAILSAFE_ELEMENT_CODER)
我不知道如何解决这个问题。任何帮助都会很好。
原因是Java和Kotlin的重载规则略有不同,也就是说在Kotlin中有两个匹配的重载;
public <NewInputT> MapElements<NewInputT, OutputT> via(ProcessFunction<NewInputT, OutputT> fn)
public <NewInputT> MapElements<NewInputT, OutputT> via(SerializableFunction<NewInputT, OutputT> fn)
最简单的解决方法是将 lambda 显式指定为 SerializableFunction
以获得正确的重载;
.via<BigQueryInsertError> (SerializableFunction { e: BigQueryInsertError -> wrapBigQueryInsertError(e) }))
我正在探索 GoogleCloudPlatform 在 Github 上提供的 Apache Beam 数据流模板。
特别是,我正在将 PubSubToBigQuery 模板从 Java 转换为 Kotlin。
通过这样做,我在 274
行的 MapElements.input(...).via(...)
转换中遇到了 过载歧义解析 错误。错误信息是:
Error:(62, 22) Kotlin: Cannot choose among the following candidates without completing type inference:
public final fun <NewInputT : Any!> via(fn: ((input: BigQueryInsertError!) -> FailsafeElement<String!, String!>!)!): MapElements<BigQueryInsertError!, FailsafeElement<String!, String!>!>! defined in org.apache.beam.sdk.transforms.MapElements
public final fun <NewInputT : Any!> via(fn: ((input: BigQueryInsertError!) -> FailsafeElement<String!, String!>!)!): MapElements<BigQueryInsertError!, FailsafeElement<String!, String!>!>! defined in org.apache.beam.sdk.transforms.MapElements
相关的 Java 代码片段是:
/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
PCollection<FailsafeElement<String, String>> failedInserts =
writeResult
.getFailedInsertsWithErr()
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
.setCoder(FAILSAFE_ELEMENT_CODER);
Kotlin 转换如下:
/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
val failedInserts: PCollection<FailsafeElement<String, String>> =
writeResult.failedInsertsWithErr
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.encodedTypeDescriptor)
.via { e: BigQueryInsertError -> wrapBigQueryInsertError(e) })
.setCoder(FAILSAFE_ELEMENT_CODER)
我不知道如何解决这个问题。任何帮助都会很好。
原因是Java和Kotlin的重载规则略有不同,也就是说在Kotlin中有两个匹配的重载;
public <NewInputT> MapElements<NewInputT, OutputT> via(ProcessFunction<NewInputT, OutputT> fn)
public <NewInputT> MapElements<NewInputT, OutputT> via(SerializableFunction<NewInputT, OutputT> fn)
最简单的解决方法是将 lambda 显式指定为 SerializableFunction
以获得正确的重载;
.via<BigQueryInsertError> (SerializableFunction { e: BigQueryInsertError -> wrapBigQueryInsertError(e) }))