Google Cloud Dataflow:使用 DirectPipelineRunner(本地作业)在管道中访问 Google Cloud Pub/Sub?

Google Cloud Dataflow: Accessing Google Cloud Pub/Sub in pipeline with DirectPipelineRunner (local job)?

我已经使用 Google Cloud Dataflow SDK 编写了流式传输管道,但我想在本地测试我的管道。我的管道从 Google Pub/Sub 获取输入数据。

是否可以 运行 使用 DirectPipelineRunner 访问 Pub/Sub (pubsubIO) 的作业(本地执行,而不是在 Google 云中)?

我在以普通用户帐户登录时遇到权限问题运行。我是项目的所有者,我正在尝试访问 pub/sub 主题。

DirectPipelineRunner 当前不支持 PubsubIO。在本地使用的时候会报错说有"no evaluator registered for PubsubIO.Read".

您的权限问题可能来自其他来源。

其实是可以的,但是DirectPipelineRunner不支持无限数据源。因此,您必须像这样设置 maxReadTimemaxNumRecords

PubsubIO.Read.topic("projects/<project-id>/topics/<topic>").maxNumRecords(1000);

来自 PubSub documentation

A PTransform that continuously reads from a Cloud Pub/Sub stream and returns a PCollection of Strings containing the items from the stream. When running with a PipelineRunner that only supports bounded PCollections (such as DirectPipelineRunner), only a bounded portion of the input Pub/Sub stream can be processed. As such, either PubsubIO.Read.Bound.maxNumRecords(int) or PubsubIO.Read.Bound.maxReadTime(Duration) must be set.

InProcessPipelineRunner is a new version of the DirectPipelineRunner introduce in Dataflow SDK for Java 1.6.0 包括对无限 PCollections 的支持。

(注意:在 Apache Beam 中,此功能已添加到 DirectRunner,但在 Java 的数据流 SDK 中,我们要到 2.0 才能这样做,因为它可以更好地检查模型导致额外的测试失败,我们认为这是向后不兼容的更改。因此暂时添加了伴随的 InProcessPipelineRunner。)

还有一些很棒的新功能 support 可用于测试延迟和乱序数据。

只是为了帮助任何搜索此内容的人,

使用最新版本,您可以做到这一点。如果您想 运行 本地管道,请使用 "DirectRunner" 到 运行 本地管道。在云中使用 "DataflowRunner" 到 运行。

设置暂存位置和 运行ner,如下所示。

streamingOption.setStagingLocation(PipelineConstants.PUBSUB_STAGING_LOCATION);

streamingOption.setRunner(DataflowRunner.class);

或将其作为参数传递。

您能否详细说明您遇到的权限问题?