尝试将转换器函数附加到 Google 数据流管道时出现 NullPointerException

NullPointerException while trying to attach a transformer function to a Google Dataflow pipeline

在通过 Google Cloud Dataflow WordCount Pipeline Example 并为 运行 本地管道创建 Scala 应用程序时,我遇到以下异常:

Exception in thread "main" java.lang.NullPointerException
    at com.google.cloud.dataflow.sdk.util.SerializableUtils.clone(SerializableUtils.java:89)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$Bound.<init>(ParDo.java:700)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$Unbound.of(ParDo.java:661)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.of(ParDo.java:551)
    at apps.MiniDataFlowApp$.delayedEndpoint$apps$MiniDataFlowApp(MiniDataFlowApp.scala:32)
    at apps.MiniDataFlowApp$delayedInit$body.apply(MiniDataFlowApp.scala:17)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main.apply(App.scala:76)
    at scala.App$$anonfun$main.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at apps.MiniDataFlowApp$.main(MiniDataFlowApp.scala:17)
    at apps.MiniDataFlowApp.main(MiniDataFlowApp.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)

其中 MiniDataFlowApp.scala:32 对应于以下创建管道的片段中的 .apply(ParDo.of(extractWords))

val p: Pipeline = Pipeline.create(options)
p.apply(TextIO.Read.from("some.input.txt"))
.apply(ParDo.of(extractWords))
.apply(Count.perElement[String]())
.apply(ParDo.of(formatOutput))
.apply(TextIO.Write.to("some.output.txt"))

extractWords 实现 DoFn 如下:

val extractWords = new DoFn[String, String]() {
    override def processElement(c: DoFn[String, String]#ProcessContext) {
      c.element.split("[^a-zA-Z']+").filter(_.nonEmpty).map(_ => c.output(_))
    }
}

中描述的问题似乎相似。但是,我不认为我有一个无法序列化的 class 就像导致该问题的问题一样。至少,如果那是问题所在,我不明白为什么我可能会遇到序列化问题。

感谢您花时间阅读我的问题和任何见解!

这是一个初始化顺序问题。在 Scala 中,class 主体中的 val(对象是相应 class 的单例实例)按声明顺序初始化。

这意味着当 p 被初始化时,extractWords 和 formatOutput 还没有被初始化,并且为空。 (我看过 OP 代码的其余部分;这些成员是 val p 之后的 val。)

至少有3种解法:

1) 更改 val 的顺序,使依赖项(extractWords、formatOutput)排在第一位。

2) 使 extractWords 和 formatOutput 惰性值。这将使它们在访问时得到初始化,并保证它们只被初始化一次。

3) 制作 extractWords 和 formatOutput defs。这将使它们在每次访问时都被重新计算,这可能会也可能不会。