是否可以使用数据流将数据从 pubsub 流式传输到数据存储?

Is it able to stream data from pubsub to datastore using dataflow?

我尝试使用数据流将数据从 pubsub 流式传输到数据存储。

参考: https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/master/src/main/java/com/google/cloud/teleport/templates

我尝试构建模板,但它根本不起作用。 所以,我认为这是不可能的。

怎么样? 请给我一些建议。

您可能无意中发现了该特定模板中的错误。其中有两个不同的问题,第一个是这个 SO 问题 中回答的问题,它指向缺少的 errorTag,第二个是 Datastore 的编写器实际上使用 GroupByKey 时它将实体写入数据存储区。

如果你 运行 带有 -e 选项的 maven 编译命令,它将向你显示错误消息 GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey。为什么要这样做?它与消息从 PubSub 流式传输而不是批处理(这是我们所期望的)这一事实有关。这意味着没有有限的项目集流入,而是永无止境的项目流。为了使用它,我们需要将其限制为 window 项的流式传输 window 可以被聚合函数(例如 GroupByKey)考虑的项目。帮助将实体写入数据存储的 DatastoreConverters class 实际上会检查我们是否试图多次写入相同的密钥,它通过使用 GroupByKey 函数来做到这一点。

简单的解决方案,只需给它一个流 window 即可使用,这里在管道中添加了第三个 .apply(...) 将 windows 流放在一起并允许您使用此处的数据存储编写器:

import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;

... 

  public static void main(String[] args) {
    PubsubToDatastoreOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(PubsubToDatastoreOptions.class);

    Pipeline pipeline = Pipeline.create(options);
    TupleTag<String> errorTag = new TupleTag<String>("errors") {};

    pipeline
        .apply(PubsubIO.readStrings()
            .fromTopic(options.getPubsubReadTopic()))
        .apply(TransformTextViaJavascript.newBuilder()
            .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
            .setFunctionName(options.getJavascriptTextTransformFunctionName())
            .build())
        .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))))
        .apply(WriteJsonEntities.newBuilder()
            .setProjectId(options.getDatastoreWriteProjectId())
            .setErrorTag(errorTag)
            .build());

    pipeline.run();
  }

现在有其他方法,而且很可能更好,但是这将使您的模板编译和工作。此示例显示 1 秒的 FixedWindow,还有其他选项可以执行此操作,请查看相关文档 Google DataFlow - Windowing

编译您的模板:

mvn compile exec:java -Dexec.mainClass=com.google.cloud.teleport.templates.PubsubToDatastore -Dexec.cleanupDaemonThreads=false -Dexec.args=" \
--project=[YOUR_PROJECTID_HERE] \
--stagingLocation=gs://[YOUR_BUCKET_HERE]/staging \
--tempLocation=gs://[YOUR_BUCKET_HERE]/temp \
--templateLocation=gs://[YOUR_BUCKET_HERE]/templates/PubsubToDatastore.json \
--runner=DataflowRunner"

然后启动作业:

gcloud dataflow jobs run [NAME_OF_THE_JOB_WHATEVER_YOU_LIKE] \
--gcs-location=gs://[YOUR_BUCKET_HERE]/templates/PubsubToDatastore.json \
--zone=[ZONE_WHERE_YOU_WANT_TO_RUN] \
--parameters "pubsubReadTopic=[YOUR_PUBSUB_TOPIC_HERE],datastoreWriteProjectId=[YOUR_PROJECTID_HERE]"

现在您应该会在 GCP 控制台中看到您的作业 运行ning,如果您在那里查看:

请注意,这个特定的解决方案和选择的 window 将意味着 PubSub 消息在数据存储中结束最多延迟一秒。缩短 window 可能会有所帮助,但为了获得更高的吞吐量,您需要与此处显示的管道不同的管道。