使用来自 google 数据流的现有发布订阅订阅

Using existing pub sub subscription from google data flow

我正在使用 Google 数据流,其中一个步骤是使用已创建的订阅订阅 pub sub 中的主题。 这是代码片段

CustomPipelineOptions options =
            PipelineOptionsFactory.fromArgs(args).withValidation().as(customPipelineOptions.class);
    Pipeline p = Pipeline.create(options);

    PCollection<TableRow> datastream = p.apply(PubsubIO.Read.named("Read device  data from PubSub")                 .subscription("projects/<projectID>/subscriptions/<subscriptionname>)
            .topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
            .timestampLabel("ts")
            .withCoder(TableRowJsonCoder.of()));

以上代码执行时出现如下错误: 错误处理管道。原因:(b5e276ef8c76419f):步骤 s1 的无法识别的输入 pubsub_subscription。

正在传递正确的订阅名称和项目 ID。 不知道为什么我仍然收到上述错误。

请大家帮忙

指定 2 个来源之一就足够了:主题或订阅。

我建议你试试:

PCollection<TableRow> datastream = p
        .apply(PubsubIO.Read.named("Read device data from PubSub")
        .topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
        .timestampLabel("ts")
        .withCoder(TableRowJsonCoder.of()));

另外:我想您使用的是 Dataflow 1.9 SDK?您可能想考虑搬到 new Beam 2.0.0 release. You can find the reference for PubSub in that SDK here.