使用 Apache Beam 实现内部连接
Implementing Inner Join using Apache Beam
我正在学习apache beam框架。通过查看 link 中描述的示例,我已经实现了一个内部连接。但我无法理解它是否完全有效。 K
// Read Cutomer Order File
PCollection<String> pCollection1 = pipeline.apply(TextIO.read().from(
"src/main/resources/Join/InnerJoin/CustomerOrder.csv"));
//Read Customer File
PCollection<String> pCollection2 = pipeline.apply(TextIO.read()
.from("src/main/resources/Join/InnerJoin/Customer.csv"));
//Parse customerOrder data in form of PCollection KeyValue Pair
PCollection<KV<String, String>> customerOrderCollection = pCollection1
.apply(ParDo.of(new CustomerOrderParsing()));
//Parse Customer data in form of PCollection KeyValue Pair
PCollection<KV<String, String>> customerCollection = pCollection2.apply(ParDo.of(new CustomerParsing()));
// Create TupleTag Object
TupleTag<String> customerOrderrTuple = new TupleTag<String>();
TupleTag<String> customerTuple = new TupleTag<String>();
无法理解我们为什么要这样做。在参考文档时,我可以了解到我们正在根据键值聚合所有元素。但我们为什么要这样做?
PCollection<KV<String, CoGbkResult>> result = KeyedPCollectionTuple
.of(customerOrderrTuple, customerOrderCollection).and(customerTuple, customerCollection)
.apply(CoGroupByKey.<String>create());
我不知道这里发生了什么
PCollection<String> output = result.apply(ParDo.of(
new DoFn<KV<String, CoGbkResult>, String>() {
@ProcessElement
public void processElement(ProcessContext context) {
String strKey = context.element().getKey();
CoGbkResult valueObject = context.element().getValue();
Iterable<String> customerOrderTable = valueObject.getAll(customerOrderrTuple);
Iterable<String> customerTable = valueObject.getAll(customerTuple);
for (String order:customerOrderTable) {
for (String user:customerTable) {
context.output(strKey+","+order+","+user);
}
}
}
}
));
PCollection<KV<String, CoGbkResult>> result = KeyedPCollectionTuple
.of(customerOrderrTuple, customerOrderCollection).and(customerTuple, customerCollection)
.apply(CoGroupByKey.<String>create());
这实际上是您的连接发生的地方,它 returns 一个 PCollection
键值,其中 key
是连接键,value
包含此键的所有值。因此,然后您需要迭代所有已连接集合的值以获得相同的键,以您需要的方式连接它们 - 这就是为什么有另一个 DoFn
下游以用户特定的方式执行此操作。
我同意这个非常冗长且可能容易出错的 API(因为它的级别很低),所以您是否考虑过使用 Schema-aware PCollection
s?有了这个 API 连接和所有其他关系操作就容易多了。
例如,Inner
连接可能看起来像这样(更简单)并且它已经支持不同类型的连接:
PCollection<Transaction> transactions = readTransactions();
PCollection<Review> reviews = readReviews();
PCollection<Row> joined = transactions.apply(
Join.innerJoin(reviews).using("userId", "productId"));
我正在学习apache beam框架。通过查看 link 中描述的示例,我已经实现了一个内部连接。但我无法理解它是否完全有效。 K
// Read Cutomer Order File
PCollection<String> pCollection1 = pipeline.apply(TextIO.read().from(
"src/main/resources/Join/InnerJoin/CustomerOrder.csv"));
//Read Customer File
PCollection<String> pCollection2 = pipeline.apply(TextIO.read()
.from("src/main/resources/Join/InnerJoin/Customer.csv"));
//Parse customerOrder data in form of PCollection KeyValue Pair
PCollection<KV<String, String>> customerOrderCollection = pCollection1
.apply(ParDo.of(new CustomerOrderParsing()));
//Parse Customer data in form of PCollection KeyValue Pair
PCollection<KV<String, String>> customerCollection = pCollection2.apply(ParDo.of(new CustomerParsing()));
// Create TupleTag Object
TupleTag<String> customerOrderrTuple = new TupleTag<String>();
TupleTag<String> customerTuple = new TupleTag<String>();
无法理解我们为什么要这样做。在参考文档时,我可以了解到我们正在根据键值聚合所有元素。但我们为什么要这样做?
PCollection<KV<String, CoGbkResult>> result = KeyedPCollectionTuple
.of(customerOrderrTuple, customerOrderCollection).and(customerTuple, customerCollection)
.apply(CoGroupByKey.<String>create());
我不知道这里发生了什么
PCollection<String> output = result.apply(ParDo.of(
new DoFn<KV<String, CoGbkResult>, String>() {
@ProcessElement
public void processElement(ProcessContext context) {
String strKey = context.element().getKey();
CoGbkResult valueObject = context.element().getValue();
Iterable<String> customerOrderTable = valueObject.getAll(customerOrderrTuple);
Iterable<String> customerTable = valueObject.getAll(customerTuple);
for (String order:customerOrderTable) {
for (String user:customerTable) {
context.output(strKey+","+order+","+user);
}
}
}
}
));
PCollection<KV<String, CoGbkResult>> result = KeyedPCollectionTuple
.of(customerOrderrTuple, customerOrderCollection).and(customerTuple, customerCollection)
.apply(CoGroupByKey.<String>create());
这实际上是您的连接发生的地方,它 returns 一个 PCollection
键值,其中 key
是连接键,value
包含此键的所有值。因此,然后您需要迭代所有已连接集合的值以获得相同的键,以您需要的方式连接它们 - 这就是为什么有另一个 DoFn
下游以用户特定的方式执行此操作。
我同意这个非常冗长且可能容易出错的 API(因为它的级别很低),所以您是否考虑过使用 Schema-aware PCollection
s?有了这个 API 连接和所有其他关系操作就容易多了。
例如,Inner
连接可能看起来像这样(更简单)并且它已经支持不同类型的连接:
PCollection<Transaction> transactions = readTransactions();
PCollection<Review> reviews = readReviews();
PCollection<Row> joined = transactions.apply(
Join.innerJoin(reviews).using("userId", "productId"));