如何 运行 来自 SBT 的数据流上的 Scio 管道(本地)

How to run a Scio pipeline on Dataflow from SBT (local)

我正在尝试 运行 我在 Dataflow 上的第一个 Scio 管道。

可以找到有问题的代码 here。不过我觉得这不是太重要。
我的第一个实验是使用 DirecRunner 读取一些本地 CSV 文件并编写另一个本地 CSV 文件。效果如预期。

现在,我正在尝试从 GCS 读取文件,将输出写入 BigQuery 和 运行 管道使用 DataflowRunner。我已经进行了所有必要的更改 (或者这就是我所相信的)。但是我做不到 运行.

我已经 gcloud auth application-default login 并且当我这样做时

sbt run --runner=DataflowRunner --project=project-id --input-path=gs://path/to/data --output-table=dataset.table

我可以看到 Jb 已在 Dataflow 中提交。但是,一小时后作业失败并显示以下错误消息。

Workflow failed. Causes: The Dataflow job appears to be stuck because no worker activity has been seen in the last 1h.

(注意,作业一直没有做任何事情,因为这是一个实验,所以数据太简单,无法花费超过几分钟的时间).

检查 StackDriver 我可以发现以下错误:

java.lang.ClassNotFoundException: scala.collection.Seq

与一些 jackson 相关的事情:

java.util.ServiceConfigurationError: com.fasterxml.jackson.databind.Module: Provider com.fasterxml.jackson.module.scala.DefaultScalaModule could not be instantiated

这就是一开始杀死每个执行者的原因。我真的不明白为什么我找不到Scala标准库。

我还尝试先创建一个模板,然后 运行 稍后使用:

sbt run --runner=DataflowRunner --project=project-id --input-path=gs://path/to/data --output-table=dataset.table --stagingLocation=gs://path/to/staging --templateLocation=gs://path/to/templates/template-1

但是,在 运行 设置模板后,我得到了同样的错误。
此外,我注意到 staging 文件夹中有很多 jar,但 scala-library.jar 不在其中。

我遗漏了一些明显的东西?

这是 sbt 1.3.0 的一个已知问题,它引入了一些重大更改 w.r.t。 class 装载机。试试 1.2.8?

此外,杰克逊问题可能与 Java 11 或更高版本有关。暂时留在 Java 8。

通过设置 sbt 修复 classLoaderLayeringStrategy:

run / classLoaderLayeringStrategy := ClassLoaderLayeringStrategy.Flat

sbt 为 运行 和 run 的应用程序使用了一个新的类加载器。这会导致其他 类 已经被 JVM(例如 Predef)加载,从而减少启动时间。有关详细信息,请参阅 in-process classloaders

这不能很好地与 Beam DataflowRunner 一起使用,因为它明确地不从父类加载器阶段 类,请参阅 PipelineResources.java#L51:

Attempts to detect all the resources the class loader has access to. This does not recurse to class loader parents stopping it from pulling in resources from the system class loader.

因此解决方法是强制将您的应用程序使用的所有 类 加载到同一个类加载器中,以便 DataflowRunner 暂存所有内容。

希望对您有所帮助