如何将属性添加到 Google Dataflow PubSubIO Write
How to add attributes to Google Dataflow PubSubIO Write
我有一个场景可以订阅 Pubsub 主题,读取有效负载并将其发布回另一个具有基于有效负载的附加属性的 PubSub 主题。找不到用于填充自定义属性的任何示例。有人可以帮助选择选项或示例吗?
流水线
。申请(
“阅读 PubSub 事件”,
PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
.apply("写 PubSub 事件", PubsubIO.writeMessages().to(options.getOutputTopic()));
提前致谢
给你!
final List<String> elements = Arrays.asList(
//Name, Product,Epoch time millis
"Robert, TV, 1613141590000",
"Maria, Phone, 1612718280000",
"Juan, Laptop, 1611618000000",
"Rebeca, Videogame, 1610000000000"
);
p
.apply(Create.of(elements))
.apply("to PubSubMessage", ParDo.of(new DoFn<String, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext c) {
String[] columns = c.element().split(", ");
HashMap<String, String> attributes = new HashMap<String, String>();
attributes.put("timestamp", columns[2]);
attributes.put("buyer", columns[0]);
PubsubMessage message = new PubsubMessage(c.element().getBytes(StandardCharsets.UTF_8), attributes);
c.output(message);
}
}))
.apply(PubsubIO.writeMessages().to(options.getTopic()));
我有一个场景可以订阅 Pubsub 主题,读取有效负载并将其发布回另一个具有基于有效负载的附加属性的 PubSub 主题。找不到用于填充自定义属性的任何示例。有人可以帮助选择选项或示例吗?
流水线 。申请( “阅读 PubSub 事件”, PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription())) .apply("写 PubSub 事件", PubsubIO.writeMessages().to(options.getOutputTopic()));
提前致谢
给你!
final List<String> elements = Arrays.asList(
//Name, Product,Epoch time millis
"Robert, TV, 1613141590000",
"Maria, Phone, 1612718280000",
"Juan, Laptop, 1611618000000",
"Rebeca, Videogame, 1610000000000"
);
p
.apply(Create.of(elements))
.apply("to PubSubMessage", ParDo.of(new DoFn<String, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext c) {
String[] columns = c.element().split(", ");
HashMap<String, String> attributes = new HashMap<String, String>();
attributes.put("timestamp", columns[2]);
attributes.put("buyer", columns[0]);
PubsubMessage message = new PubsubMessage(c.element().getBytes(StandardCharsets.UTF_8), attributes);
c.output(message);
}
}))
.apply(PubsubIO.writeMessages().to(options.getTopic()));