使用 Apache Beam 实现内部连接

Implementing Inner Join using Apache Beam

我正在学习apache beam框架。通过查看 link 中描述的示例,我已经实现了一个内部连接。但我无法理解它是否完全有效。 K

    // Read Cutomer Order File
    PCollection<String> pCollection1 = pipeline.apply(TextIO.read().from(
            "src/main/resources/Join/InnerJoin/CustomerOrder.csv"));

    //Read Customer File
    PCollection<String> pCollection2 = pipeline.apply(TextIO.read()
            .from("src/main/resources/Join/InnerJoin/Customer.csv"));

    //Parse customerOrder data in form of PCollection KeyValue Pair 
    PCollection<KV<String, String>> customerOrderCollection = pCollection1
            .apply(ParDo.of(new CustomerOrderParsing()));

    //Parse Customer data in form of PCollection KeyValue Pair
    PCollection<KV<String, String>> customerCollection = pCollection2.apply(ParDo.of(new CustomerParsing()));

    // Create TupleTag Object
    TupleTag<String> customerOrderrTuple = new TupleTag<String>();
    TupleTag<String> customerTuple = new TupleTag<String>();

    

无法理解我们为什么要这样做。在参考文档时,我可以了解到我们正在根据键值聚合所有元素。但我们为什么要这样做?

    PCollection<KV<String, CoGbkResult>> result = KeyedPCollectionTuple
            .of(customerOrderrTuple, customerOrderCollection).and(customerTuple, customerCollection)
            .apply(CoGroupByKey.<String>create());

我不知道这里发生了什么

    PCollection<String> output = result.apply(ParDo.of(
            new DoFn<KV<String, CoGbkResult>, String>() {
                @ProcessElement
                public void processElement(ProcessContext context) {
                    String strKey = context.element().getKey();
                    CoGbkResult valueObject = context.element().getValue();
                    Iterable<String> customerOrderTable = valueObject.getAll(customerOrderrTuple);
                    Iterable<String> customerTable = valueObject.getAll(customerTuple);
                    
                    for (String order:customerOrderTable) {
                        for (String user:customerTable) {
                            context.output(strKey+","+order+","+user);
                            
                        }
                        
                    }
                }
            }

    ));
 PCollection<KV<String, CoGbkResult>> result = KeyedPCollectionTuple
            .of(customerOrderrTuple, customerOrderCollection).and(customerTuple, customerCollection)
            .apply(CoGroupByKey.<String>create());

这实际上是您的连接发生的地方,它 returns 一个 PCollection 键值,其中 key 是连接键,value 包含此键的所有值。因此,然后您需要迭代所有已连接集合的值以获得相同的键,以您需要的方式连接它们 - 这就是为什么有另一个 DoFn 下游以用户特定的方式执行此操作。

我同意这个非常冗长且可能容易出错的 API(因为它的级别很低),所以您是否考虑过使用 Schema-aware PCollections?有了这个 API 连接和所有其他关系操作就容易多了。

例如,Inner 连接可能看起来像这样(更简单)并且它已经支持不同类型的连接:

PCollection<Transaction> transactions = readTransactions();
PCollection<Review> reviews = readReviews();
PCollection<Row> joined = transactions.apply(
    Join.innerJoin(reviews).using("userId", "productId"));