如何使用google提供的模板[pubsub to Datastore]?

How to use google provided template [pubsub to Datastore]?

我想使用这个 google 提供的模板将数据从 pubsub 流式传输到数据存储。 https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/PubsubToDatastore.java

我按照步骤写了这篇文档。 https://github.com/GoogleCloudPlatform/DataflowTemplates

我通过了这一步

mvn clean && mvn compile

但是下一步,错误发生了。

    [INFO] --- exec-maven-plugin:1.6.0:java (default-cli) @ google-cloud-teleport-java ---
    2018-08-17 13:36:19 INFO  DataflowRunner:266 - PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 117 files. Enable logging at DEBUG level to see which files wi
    ll be staged.
    [WARNING]
    java.lang.IllegalStateException: Missing required properties: errorTag
            at com.google.cloud.teleport.templates.common.AutoValue_DatastoreConverters_WriteJsonEntities$Builder.build(AutoValue_DatastoreConverters_WriteJsonEntities.java:89)
            at com.google.cloud.teleport.templates.PubsubToDatastore.main(PubsubToDatastore.java:65)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.codehaus.mojo.exec.ExecJavaMojo.run(ExecJavaMojo.java:282)
            at java.lang.Thread.run(Thread.java:748)
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD FAILURE
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 35.348 s
    [INFO] Finished at: 2018-08-17T13:36:20+09:00
    [INFO] Final Memory: 59M/146M
    [INFO] ------------------------------------------------------------------------
    [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project google-cloud-teleport-java: An exception occured while executing the Java class. Missing required propert
    ies: errorTag -> [Help 1]
    [ERROR]
    [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
    [ERROR] Re-run Maven using the -X switch to enable full debug logging.
    [ERROR]
    [ERROR] For more information about the errors and possible solutions, please read the following articles:
    [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException

然后,我尝试了 DatastoreToPubsub 模板和 GSCTextToDatastore 模板,这些都成功了。

https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/DatastoreToPubsub.java

https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/TextToDatastore.java

所以,我不明白什么是问题。 哪里错了?

请多多指教...
问候。

看起来您在那个特定的 DataflowTemplate 中发现了一个错误,即使在写入 JSON 实体时需要路径,管道也没有配置错误路径。修复相对简单,应该尽快掌握。同时,您可以通过对 PubsubToDatastore 管道代码的两个更改来使管道工作。

首先修改代码,使 PubsubToDatastoreOptions 扩展 ErrorWriteOptions 接口。您的新选项声明应类似于以下内容:

interface PubsubToDatastoreOptions
  extends
  PipelineOptions,
  PubsubReadOptions,
  JavascriptTextTransformerOptions,
  DatastoreWriteOptions,
  ErrorWriteOptions {}

然后修改 main 方法中的代码,以便管道配置错误 TupleTag 并将任何错误消息路由到 LogErrors 转换。这将确保任何未能输出到 Datastore 的数据都被捕获并存储在 GCS 上。您的新主要方法应类似于以下内容:

TupleTag<String> errorTag = new TupleTag<String>(){};

Pipeline pipeline = Pipeline.create(options);

pipeline
    .apply(PubsubIO.readStrings()
        .fromTopic(options.getPubsubReadTopic()))
    .apply(TransformTextViaJavascript.newBuilder()
        .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
        .setFunctionName(options.getJavascriptTextTransformFunctionName())
        .build())
    .apply(WriteJsonEntities.newBuilder()
        .setProjectId(options.getDatastoreWriteProjectId())
        .setErrorTag(errorTag)
        .build())
    .apply(LogErrors.newBuilder()
        .setErrorWritePath(options.getErrorWritePath())
        .setErrorTag(errorTag)
        .build());