使用 ValueProvider 从多个 Pubsub 订阅中读取

Read from multiple Pubsub subscriptions using ValueProvider

我有多个来自 Cloud PubSub 的订阅,要使用 Apache Beam 根据特定前缀模式进行阅读。我扩展了 PTransform class 并实现了 expand() 方法以从多个订阅中读取数据,并对 PCollectionList 进行 Flatten 转换(每个 PCollection subscription)。我在将订阅前缀作为 ValueProvider 传递到 expand() 方法时遇到问题,因为 expand() 是在模板创建时调用的,而不是在启动作业时调用的。但是,如果我只使用 1 个订阅,我可以将 ValueProvider 传递给 PubsubIO.readStrings().fromSubscription()

这是一些示例代码。

public class MultiPubSubIO extends PTransform<PBegin, PCollection<PubsubMessage>> {

    private ValueProvider<String> prefixPubsub;

    public MultiPubSubIO(@Nullable String name, ValueProvider<String> prefixPubsub) {
        super(name);
        this.prefixPubsub = prefixPubsub;
    }

    @Override
    public PCollection<PubsubMessage> expand(PBegin input) {
        List<String> myList = null;

        try {
            // prefixPubsub.get() will return error
            myList = PubsubHelper.getAllSubscription("projectID", prefixPubsub.get());
        } catch (Exception e) {
            LogHelper.error(String.format("Error getting list of subscription : %s",e.toString()));
        }

        List<PCollection<PubsubMessage>> collectionList = new ArrayList<PCollection<PubsubMessage>>();

        if(myList != null && !myList.isEmpty()){
            for(String subs : myList){
                PCollection<PubsubMessage> pCollection = input
                        .apply("ReadPubSub", PubsubIO.readMessagesWithAttributes().fromSubscription(this.prefixPubsub));    
                collectionList.add(pCollection);
            }

            PCollection<PubsubMessage> pubsubMessagePCollection = PCollectionList.of(collectionList)
                    .apply("FlattenPcollections", Flatten.pCollections());
            return pubsubMessagePCollection;
        } else {
            LogHelper.error(String.format("No subscription with prefix %s found", prefixPubsub));
            return null;
        }
    }

    public static MultiPubSubIO read(ValueProvider<String> prefixPubsub){
        return new MultiPubSubIO(null, prefixPubsub);
    }
}

所以我在想如何用同样的方式PubsubIO.read().fromSubscription()读取ValueProvider。还是我遗漏了什么?

搜索到的链接:

很遗憾,目前无法做到这一点:

  • a ValueProvider 的值不可能影响变换扩展——在扩展时,它是未知的;到知道的时候,管线的形状已经确定了。

  • 目前没有像 PubsubIO.read() 这样可以接受 PCollection 主题名称的转换。最终会有(它由 Splittable DoFn 启用),但这需要一段时间 - 目前没有人在做这件事。

您可以使用来自 apache beam io 模块的 MultipleReadFromPubSub https://beam.apache.org/releases/pydoc/2.27.0/_modules/apache_beam/io/gcp/pubsub.html

topic_1 = PubSubSourceDescriptor('projects/myproject/topics/a_topic')
topic_2 = PubSubSourceDescriptor(
            'projects/myproject2/topics/b_topic',
            'my_label',
            'my_timestamp_attribute')
subscription_1 = PubSubSourceDescriptor(
            'projects/myproject/subscriptions/a_subscription')

results = pipeline | MultipleReadFromPubSub(
            [topic_1, topic_2, subscription_1])