使用来自 google 数据流的现有发布订阅订阅
Using existing pub sub subscription from google data flow
我正在使用 Google 数据流,其中一个步骤是使用已创建的订阅订阅 pub sub 中的主题。
这是代码片段
CustomPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(customPipelineOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<TableRow> datastream = p.apply(PubsubIO.Read.named("Read device data from PubSub") .subscription("projects/<projectID>/subscriptions/<subscriptionname>)
.topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
.timestampLabel("ts")
.withCoder(TableRowJsonCoder.of()));
以上代码执行时出现如下错误:
错误处理管道。原因:(b5e276ef8c76419f):步骤 s1 的无法识别的输入 pubsub_subscription。
正在传递正确的订阅名称和项目 ID。
不知道为什么我仍然收到上述错误。
请大家帮忙
指定 2 个来源之一就足够了:主题或订阅。
我建议你试试:
PCollection<TableRow> datastream = p
.apply(PubsubIO.Read.named("Read device data from PubSub")
.topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
.timestampLabel("ts")
.withCoder(TableRowJsonCoder.of()));
另外:我想您使用的是 Dataflow 1.9 SDK?您可能想考虑搬到 new Beam 2.0.0 release. You can find the reference for PubSub in that SDK here.
我正在使用 Google 数据流,其中一个步骤是使用已创建的订阅订阅 pub sub 中的主题。 这是代码片段
CustomPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(customPipelineOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<TableRow> datastream = p.apply(PubsubIO.Read.named("Read device data from PubSub") .subscription("projects/<projectID>/subscriptions/<subscriptionname>)
.topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
.timestampLabel("ts")
.withCoder(TableRowJsonCoder.of()));
以上代码执行时出现如下错误: 错误处理管道。原因:(b5e276ef8c76419f):步骤 s1 的无法识别的输入 pubsub_subscription。
正在传递正确的订阅名称和项目 ID。 不知道为什么我仍然收到上述错误。
请大家帮忙
指定 2 个来源之一就足够了:主题或订阅。
我建议你试试:
PCollection<TableRow> datastream = p
.apply(PubsubIO.Read.named("Read device data from PubSub")
.topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
.timestampLabel("ts")
.withCoder(TableRowJsonCoder.of()));
另外:我想您使用的是 Dataflow 1.9 SDK?您可能想考虑搬到 new Beam 2.0.0 release. You can find the reference for PubSub in that SDK here.