使用 ValueProvider 从多个 Pubsub 订阅中读取
Read from multiple Pubsub subscriptions using ValueProvider
我有多个来自 Cloud PubSub 的订阅,要使用 Apache Beam 根据特定前缀模式进行阅读。我扩展了 PTransform
class 并实现了 expand()
方法以从多个订阅中读取数据,并对 PCollectionList
进行 Flatten
转换(每个 PCollection
subscription
)。我在将订阅前缀作为 ValueProvider
传递到 expand()
方法时遇到问题,因为 expand()
是在模板创建时调用的,而不是在启动作业时调用的。但是,如果我只使用 1 个订阅,我可以将 ValueProvider
传递给 PubsubIO.readStrings().fromSubscription()
。
这是一些示例代码。
public class MultiPubSubIO extends PTransform<PBegin, PCollection<PubsubMessage>> {
private ValueProvider<String> prefixPubsub;
public MultiPubSubIO(@Nullable String name, ValueProvider<String> prefixPubsub) {
super(name);
this.prefixPubsub = prefixPubsub;
}
@Override
public PCollection<PubsubMessage> expand(PBegin input) {
List<String> myList = null;
try {
// prefixPubsub.get() will return error
myList = PubsubHelper.getAllSubscription("projectID", prefixPubsub.get());
} catch (Exception e) {
LogHelper.error(String.format("Error getting list of subscription : %s",e.toString()));
}
List<PCollection<PubsubMessage>> collectionList = new ArrayList<PCollection<PubsubMessage>>();
if(myList != null && !myList.isEmpty()){
for(String subs : myList){
PCollection<PubsubMessage> pCollection = input
.apply("ReadPubSub", PubsubIO.readMessagesWithAttributes().fromSubscription(this.prefixPubsub));
collectionList.add(pCollection);
}
PCollection<PubsubMessage> pubsubMessagePCollection = PCollectionList.of(collectionList)
.apply("FlattenPcollections", Flatten.pCollections());
return pubsubMessagePCollection;
} else {
LogHelper.error(String.format("No subscription with prefix %s found", prefixPubsub));
return null;
}
}
public static MultiPubSubIO read(ValueProvider<String> prefixPubsub){
return new MultiPubSubIO(null, prefixPubsub);
}
}
所以我在想如何用同样的方式PubsubIO.read().fromSubscription()
读取ValueProvider
。还是我遗漏了什么?
搜索到的链接:
- extract-value-from-valueprovider-in-apache-beam - 回答谈到使用 DoFn,而我需要接收 PBegin 的 PTransform。
很遗憾,目前无法做到这一点:
a ValueProvider
的值不可能影响变换扩展——在扩展时,它是未知的;到知道的时候,管线的形状已经确定了。
目前没有像 PubsubIO.read()
这样可以接受 PCollection
主题名称的转换。最终会有(它由 Splittable DoFn 启用),但这需要一段时间 - 目前没有人在做这件事。
您可以使用来自 apache beam io 模块的 MultipleReadFromPubSub
https://beam.apache.org/releases/pydoc/2.27.0/_modules/apache_beam/io/gcp/pubsub.html
topic_1 = PubSubSourceDescriptor('projects/myproject/topics/a_topic')
topic_2 = PubSubSourceDescriptor(
'projects/myproject2/topics/b_topic',
'my_label',
'my_timestamp_attribute')
subscription_1 = PubSubSourceDescriptor(
'projects/myproject/subscriptions/a_subscription')
results = pipeline | MultipleReadFromPubSub(
[topic_1, topic_2, subscription_1])
我有多个来自 Cloud PubSub 的订阅,要使用 Apache Beam 根据特定前缀模式进行阅读。我扩展了 PTransform
class 并实现了 expand()
方法以从多个订阅中读取数据,并对 PCollectionList
进行 Flatten
转换(每个 PCollection
subscription
)。我在将订阅前缀作为 ValueProvider
传递到 expand()
方法时遇到问题,因为 expand()
是在模板创建时调用的,而不是在启动作业时调用的。但是,如果我只使用 1 个订阅,我可以将 ValueProvider
传递给 PubsubIO.readStrings().fromSubscription()
。
这是一些示例代码。
public class MultiPubSubIO extends PTransform<PBegin, PCollection<PubsubMessage>> {
private ValueProvider<String> prefixPubsub;
public MultiPubSubIO(@Nullable String name, ValueProvider<String> prefixPubsub) {
super(name);
this.prefixPubsub = prefixPubsub;
}
@Override
public PCollection<PubsubMessage> expand(PBegin input) {
List<String> myList = null;
try {
// prefixPubsub.get() will return error
myList = PubsubHelper.getAllSubscription("projectID", prefixPubsub.get());
} catch (Exception e) {
LogHelper.error(String.format("Error getting list of subscription : %s",e.toString()));
}
List<PCollection<PubsubMessage>> collectionList = new ArrayList<PCollection<PubsubMessage>>();
if(myList != null && !myList.isEmpty()){
for(String subs : myList){
PCollection<PubsubMessage> pCollection = input
.apply("ReadPubSub", PubsubIO.readMessagesWithAttributes().fromSubscription(this.prefixPubsub));
collectionList.add(pCollection);
}
PCollection<PubsubMessage> pubsubMessagePCollection = PCollectionList.of(collectionList)
.apply("FlattenPcollections", Flatten.pCollections());
return pubsubMessagePCollection;
} else {
LogHelper.error(String.format("No subscription with prefix %s found", prefixPubsub));
return null;
}
}
public static MultiPubSubIO read(ValueProvider<String> prefixPubsub){
return new MultiPubSubIO(null, prefixPubsub);
}
}
所以我在想如何用同样的方式PubsubIO.read().fromSubscription()
读取ValueProvider
。还是我遗漏了什么?
搜索到的链接:
- extract-value-from-valueprovider-in-apache-beam - 回答谈到使用 DoFn,而我需要接收 PBegin 的 PTransform。
很遗憾,目前无法做到这一点:
a
ValueProvider
的值不可能影响变换扩展——在扩展时,它是未知的;到知道的时候,管线的形状已经确定了。目前没有像
PubsubIO.read()
这样可以接受PCollection
主题名称的转换。最终会有(它由 Splittable DoFn 启用),但这需要一段时间 - 目前没有人在做这件事。
您可以使用来自 apache beam io 模块的 MultipleReadFromPubSub
https://beam.apache.org/releases/pydoc/2.27.0/_modules/apache_beam/io/gcp/pubsub.html
topic_1 = PubSubSourceDescriptor('projects/myproject/topics/a_topic')
topic_2 = PubSubSourceDescriptor(
'projects/myproject2/topics/b_topic',
'my_label',
'my_timestamp_attribute')
subscription_1 = PubSubSourceDescriptor(
'projects/myproject/subscriptions/a_subscription')
results = pipeline | MultipleReadFromPubSub(
[topic_1, topic_2, subscription_1])