是否可以使用数据流将数据从 pubsub 流式传输到数据存储?
Is it able to stream data from pubsub to datastore using dataflow?
我尝试使用数据流将数据从 pubsub 流式传输到数据存储。
我尝试构建模板,但它根本不起作用。
所以,我认为这是不可能的。
怎么样?
请给我一些建议。
您可能无意中发现了该特定模板中的错误。其中有两个不同的问题,第一个是这个 SO 问题 中回答的问题,它指向缺少的 errorTag
,第二个是 Datastore 的编写器实际上使用 GroupByKey
时它将实体写入数据存储区。
如果你 运行 带有 -e
选项的 maven 编译命令,它将向你显示错误消息 GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey
。为什么要这样做?它与消息从 PubSub 流式传输而不是批处理(这是我们所期望的)这一事实有关。这意味着没有有限的项目集流入,而是永无止境的项目流。为了使用它,我们需要将其限制为 window 项的流式传输 window 可以被聚合函数(例如 GroupByKey
)考虑的项目。帮助将实体写入数据存储的 DatastoreConverters
class 实际上会检查我们是否试图多次写入相同的密钥,它通过使用 GroupByKey
函数来做到这一点。
简单的解决方案,只需给它一个流 window 即可使用,这里在管道中添加了第三个 .apply(...)
将 windows 流放在一起并允许您使用此处的数据存储编写器:
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;
...
public static void main(String[] args) {
PubsubToDatastoreOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(PubsubToDatastoreOptions.class);
Pipeline pipeline = Pipeline.create(options);
TupleTag<String> errorTag = new TupleTag<String>("errors") {};
pipeline
.apply(PubsubIO.readStrings()
.fromTopic(options.getPubsubReadTopic()))
.apply(TransformTextViaJavascript.newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.build())
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(WriteJsonEntities.newBuilder()
.setProjectId(options.getDatastoreWriteProjectId())
.setErrorTag(errorTag)
.build());
pipeline.run();
}
现在有其他方法,而且很可能更好,但是这将使您的模板编译和工作。此示例显示 1 秒的 FixedWindow,还有其他选项可以执行此操作,请查看相关文档 Google DataFlow - Windowing。
编译您的模板:
mvn compile exec:java -Dexec.mainClass=com.google.cloud.teleport.templates.PubsubToDatastore -Dexec.cleanupDaemonThreads=false -Dexec.args=" \
--project=[YOUR_PROJECTID_HERE] \
--stagingLocation=gs://[YOUR_BUCKET_HERE]/staging \
--tempLocation=gs://[YOUR_BUCKET_HERE]/temp \
--templateLocation=gs://[YOUR_BUCKET_HERE]/templates/PubsubToDatastore.json \
--runner=DataflowRunner"
然后启动作业:
gcloud dataflow jobs run [NAME_OF_THE_JOB_WHATEVER_YOU_LIKE] \
--gcs-location=gs://[YOUR_BUCKET_HERE]/templates/PubsubToDatastore.json \
--zone=[ZONE_WHERE_YOU_WANT_TO_RUN] \
--parameters "pubsubReadTopic=[YOUR_PUBSUB_TOPIC_HERE],datastoreWriteProjectId=[YOUR_PROJECTID_HERE]"
现在您应该会在 GCP 控制台中看到您的作业 运行ning,如果您在那里查看:
请注意,这个特定的解决方案和选择的 window 将意味着 PubSub 消息在数据存储中结束最多延迟一秒。缩短 window 可能会有所帮助,但为了获得更高的吞吐量,您需要与此处显示的管道不同的管道。
我尝试使用数据流将数据从 pubsub 流式传输到数据存储。
我尝试构建模板,但它根本不起作用。 所以,我认为这是不可能的。
怎么样? 请给我一些建议。
您可能无意中发现了该特定模板中的错误。其中有两个不同的问题,第一个是这个 SO 问题 errorTag
,第二个是 Datastore 的编写器实际上使用 GroupByKey
时它将实体写入数据存储区。
如果你 运行 带有 -e
选项的 maven 编译命令,它将向你显示错误消息 GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey
。为什么要这样做?它与消息从 PubSub 流式传输而不是批处理(这是我们所期望的)这一事实有关。这意味着没有有限的项目集流入,而是永无止境的项目流。为了使用它,我们需要将其限制为 window 项的流式传输 window 可以被聚合函数(例如 GroupByKey
)考虑的项目。帮助将实体写入数据存储的 DatastoreConverters
class 实际上会检查我们是否试图多次写入相同的密钥,它通过使用 GroupByKey
函数来做到这一点。
简单的解决方案,只需给它一个流 window 即可使用,这里在管道中添加了第三个 .apply(...)
将 windows 流放在一起并允许您使用此处的数据存储编写器:
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;
...
public static void main(String[] args) {
PubsubToDatastoreOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(PubsubToDatastoreOptions.class);
Pipeline pipeline = Pipeline.create(options);
TupleTag<String> errorTag = new TupleTag<String>("errors") {};
pipeline
.apply(PubsubIO.readStrings()
.fromTopic(options.getPubsubReadTopic()))
.apply(TransformTextViaJavascript.newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.build())
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(WriteJsonEntities.newBuilder()
.setProjectId(options.getDatastoreWriteProjectId())
.setErrorTag(errorTag)
.build());
pipeline.run();
}
现在有其他方法,而且很可能更好,但是这将使您的模板编译和工作。此示例显示 1 秒的 FixedWindow,还有其他选项可以执行此操作,请查看相关文档 Google DataFlow - Windowing。
编译您的模板:
mvn compile exec:java -Dexec.mainClass=com.google.cloud.teleport.templates.PubsubToDatastore -Dexec.cleanupDaemonThreads=false -Dexec.args=" \
--project=[YOUR_PROJECTID_HERE] \
--stagingLocation=gs://[YOUR_BUCKET_HERE]/staging \
--tempLocation=gs://[YOUR_BUCKET_HERE]/temp \
--templateLocation=gs://[YOUR_BUCKET_HERE]/templates/PubsubToDatastore.json \
--runner=DataflowRunner"
然后启动作业:
gcloud dataflow jobs run [NAME_OF_THE_JOB_WHATEVER_YOU_LIKE] \
--gcs-location=gs://[YOUR_BUCKET_HERE]/templates/PubsubToDatastore.json \
--zone=[ZONE_WHERE_YOU_WANT_TO_RUN] \
--parameters "pubsubReadTopic=[YOUR_PUBSUB_TOPIC_HERE],datastoreWriteProjectId=[YOUR_PROJECTID_HERE]"
现在您应该会在 GCP 控制台中看到您的作业 运行ning,如果您在那里查看:
请注意,这个特定的解决方案和选择的 window 将意味着 PubSub 消息在数据存储中结束最多延迟一秒。缩短 window 可能会有所帮助,但为了获得更高的吞吐量,您需要与此处显示的管道不同的管道。