Google 数据流:根据条件仅向 PubSub 主题之一输出消息
Google Dataflow: output message only to one of the PubSub topics based on condition
在我的管道中,我想根据先前转换的结果将消息输出到 PubSub 主题之一。目前我正在将输出发送到同一主题:
SearchItemGeneratorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(SearchItemGeneratorOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(...)
//other transformations
.apply("ParseFile", new ParseFile()) // outputs PCollection<Message>, where each Message has a MessageType property with the name of the topic.
.apply("WriteItemsToTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
这是我的消息对象:
class Message {
private MessageType messageType;
private String payload;
//constructor, getters
}
我的 ParseFile 转换器输出 PCollection,每个 Message 对象都有一个 属性 messageType。基于消息类型 属性,我想输出到消息的不同 PubSub 主题负载 属性。我在 this 文章段落 中阅读了相同的 PCollection 多个转换过程,但仍然不知道如何在我的案例中应用它或其他解决方案。
更新
感谢@Andrew 提供的解决方案。
我通过使用 TupleTag 解决了我的问题,但方法相似。
我在主管道中创建了两个不同的 TupleTag 对象:
public static final TupleTag<String> full = new TupleTag<>("full");
public static final TupleTag<String> delta = new TupleTag<>("delta");
然后根据我的条件,我用正确的 TupleTag 在 DoFn 中输出消息:
TupleTag tupleTag = //assign full or delta TupleTag
processContext.output(tupleTag, jsonObject.toString());
并在主管道中通过每个 TupleTag 从 PCollectionTuple 中选择发送到 Pub/Sub 个主题。
messages.get(full)
.apply("SendToIndexTopic", PubsubIO.writeStrings().to(options.getOutputIndexTopic()));
messages.get(delta)
.apply("SendToDeltaTopic", PubsubIO.writeStrings().to(options.getOutputDeltaTopic()));
唯一要提的是我的 TupleTag 对象是静态对象。
您可以对管道进行分区以将消息发布到多个 Pub/Sub 主题。分区将允许您将消息分开,而不是将它们复制到不同的 Pub/Sub 主题。您需要提前了解所有 Pub/Sub 主题。参考:Partition.
示例:
// partition pipeline
PCollectionList<Message> msgs = p.apply(Partition.of(2, new PartitionFn<Message>() {
public int partitionFor(Message msg, int numPartitions) {
// TODO: determine how to partition messages
if (msg.messageType == "x") {
return 0;
} else {
return 1;
}
}
}));
// access partitions
PCollection<Message> partition1 = msgs.get(0);
partition1.apply("WriteItemsToTopic1", PubsubIO.writeStrings().to(options.getOutputTopic1()));
PCollection<Message> partition2 = msgs.get(1);
partition2.apply("WriteItemsToTopic2", PubsubIO.writeStrings().to(options.getOutputTopic2()));
在我的管道中,我想根据先前转换的结果将消息输出到 PubSub 主题之一。目前我正在将输出发送到同一主题:
SearchItemGeneratorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(SearchItemGeneratorOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(...)
//other transformations
.apply("ParseFile", new ParseFile()) // outputs PCollection<Message>, where each Message has a MessageType property with the name of the topic.
.apply("WriteItemsToTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
这是我的消息对象:
class Message {
private MessageType messageType;
private String payload;
//constructor, getters
}
我的 ParseFile 转换器输出 PCollection,每个 Message 对象都有一个 属性 messageType。基于消息类型 属性,我想输出到消息的不同 PubSub 主题负载 属性。我在 this 文章段落 中阅读了相同的 PCollection 多个转换过程,但仍然不知道如何在我的案例中应用它或其他解决方案。
更新 感谢@Andrew 提供的解决方案。 我通过使用 TupleTag 解决了我的问题,但方法相似。 我在主管道中创建了两个不同的 TupleTag 对象:
public static final TupleTag<String> full = new TupleTag<>("full");
public static final TupleTag<String> delta = new TupleTag<>("delta");
然后根据我的条件,我用正确的 TupleTag 在 DoFn 中输出消息:
TupleTag tupleTag = //assign full or delta TupleTag
processContext.output(tupleTag, jsonObject.toString());
并在主管道中通过每个 TupleTag 从 PCollectionTuple 中选择发送到 Pub/Sub 个主题。
messages.get(full)
.apply("SendToIndexTopic", PubsubIO.writeStrings().to(options.getOutputIndexTopic()));
messages.get(delta)
.apply("SendToDeltaTopic", PubsubIO.writeStrings().to(options.getOutputDeltaTopic()));
唯一要提的是我的 TupleTag 对象是静态对象。
您可以对管道进行分区以将消息发布到多个 Pub/Sub 主题。分区将允许您将消息分开,而不是将它们复制到不同的 Pub/Sub 主题。您需要提前了解所有 Pub/Sub 主题。参考:Partition.
示例:
// partition pipeline
PCollectionList<Message> msgs = p.apply(Partition.of(2, new PartitionFn<Message>() {
public int partitionFor(Message msg, int numPartitions) {
// TODO: determine how to partition messages
if (msg.messageType == "x") {
return 0;
} else {
return 1;
}
}
}));
// access partitions
PCollection<Message> partition1 = msgs.get(0);
partition1.apply("WriteItemsToTopic1", PubsubIO.writeStrings().to(options.getOutputTopic1()));
PCollection<Message> partition2 = msgs.get(1);
partition2.apply("WriteItemsToTopic2", PubsubIO.writeStrings().to(options.getOutputTopic2()));