如何将属性添加到 Google Dataflow PubSubIO Write

How to add attributes to Google Dataflow PubSubIO Write

我有一个场景可以订阅 Pubsub 主题,读取有效负载并将其发布回另一个具有基于有效负载的附加属性的 PubSub 主题。找不到用于填充自定义属性的任何示例。有人可以帮助选择选项或示例吗?

流水线 。申请( “阅读 PubSub 事件”, PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription())) .apply("写 PubSub 事件", PubsubIO.writeMessages().to(options.getOutputTopic()));

提前致谢

给你!

    final List<String> elements = Arrays.asList(
            //Name, Product,Epoch time millis
            "Robert, TV, 1613141590000",
            "Maria, Phone, 1612718280000",
            "Juan, Laptop, 1611618000000",
            "Rebeca, Videogame, 1610000000000"
    );

    p
            .apply(Create.of(elements))
            .apply("to PubSubMessage", ParDo.of(new DoFn<String, PubsubMessage>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    String[] columns = c.element().split(", ");

                    HashMap<String, String> attributes = new HashMap<String, String>();
                    attributes.put("timestamp", columns[2]);
                    attributes.put("buyer", columns[0]);

                    PubsubMessage message = new PubsubMessage(c.element().getBytes(StandardCharsets.UTF_8), attributes);

                    c.output(message);
                }
            }))
            .apply(PubsubIO.writeMessages().to(options.getTopic()));