在 Apache Bean、Dataflow 中使用 PCollections 生成图形表示
Generate graph representation using PCollections in Apache Bean, Dataflow
我在以下架构中有一个数据源 - “People ID”、“Address ID”。我已将数据加载到 PCollection 中。
每个人可以有多个地址id,每个地址可以分配给多人。我要查找的是所有相关人员或共享相同地址 ID 的人员,而不仅仅是第一级到“n”级。
假设 P1 与 A1 和 A2 有边,P2 与 A2 有边,A3 和 P3 与 A3 和 A4 有边。在这种情况下,如果我画一个图,我可以发现 P1 与 P2 相关,因为两者共享 A2。 P1 也与 P3 相关,因为 P1 通过 A2 与 P2 相关,而 P2 通过 A3 与 P3 相关。
我的最终目标是找到这群有关系的人(通过地址 ID)。到目前为止我所做的是尝试利用 Join.innerJoin 形成临时 table 结构并将其循环到我们需要的任何级别。
PCollection<PeopleAddress> PA = readEdges(); //
PCollection<KV<String, PeopleAddress>> KAddressPA = transform(PA); // String is the address ID
PCollection<KV<String, KV<PeopleAddress, PeopleAddress>> data = Join.innerJoin(KAddressPA, KAddressPA);
//Above PCollection will give all first level edges, from this we will form a PeopleToPeople connection
PCollection<PeoplePeopleConnection> PP = getConnection(data); // From LHS and RHS we will read the ids and store.
// With the new set of People People Connection we can get new set of PeopleAddress edges..
Class PeoplePeopleConnection { String basePId; String cPId; }
Class PeopleAddress { String pId; String aId; }
我正在考虑将上面的代码循环n次以获得N级连接。但感觉有点过劳。很多边是重复的。我想知道在 PCollection 中是否有任何方法可以做到这一点。就像当我们找到与现有 Person 的地址的连接时,如何 link 它到现有的 person 对象。一些如何将新的一组 PeopleConnection 或 PeopleAddress 连接合并回单个 PColleciton。
解决问题的不同视角?
好的,到目前为止,你会有一对住在同一地址的人,对吧?
PCollection<PeoplePeopleConnection> PP = getConnection(data);
这些对形成了一个没有地址的图表 - 只有人,并且有 distance=1
。我喜欢这个,因为它可以让我们专注于人,而放弃地址。
那么,给定 (P1, P2)
,(P2, P3)
- 我们如何也达到 (P1, P3)
?
我们可以这样做:
PCollection<KV<String, String>> twoWayPairs = PP.apply(
FlatMapElements(pair -> Lists.of(KV.of(pair.basePId, pair.cPId),
KV.of(pair.cPId, pair.basePId))));
然后,我们可以加入他们,就像你以前那样:
PCollection<KV<String, Iterable<String>> groupedData = twoWayPairs
.apply(GroupByKey.create());
给定 (P1, P2)
和 (P2, P3)
作为输入,此 returns (P2, [P1, P3])
、(P1, [P2])
、(P3, [P2])
。从这一对中,我们可以得出 (P1, P3)
作为 distance=2
个邻居的列表。
groupedData.apply(FlatMapElements((KV<String, Iterable<String>>) neighbors -> {
List<KV<String, String>> newPairs = cartesianProduct(neighbors.getValue());
if (newPairs.size() == 0) {
return Lists.of(KV.of(neighbors.getKey(), neighbors.getValue().get(0)),
KV.of(neighbors.getValue().get(0), neighbors.getKey()));
} else {
return newPairs;
}
});
为什么要检查 newPairs
是否为空?因为当 newPairs
元素为空时,我们就会遇到没有链接到其他元素的对的情况(例如前面的 (P1, [P2])
)。
所以,最后,你应该能够做这样的事情:
// We get the distance=1 elements:
PCollection<KV<String, String>> twoWayPairs = PP.apply(
FlatMapElements(pair -> Lists.of(KV.of(pair.basePId, pair.cPId),
KV.of(pair.cPId, pair.basePId))));
for(int i = 1; i < MAX_DISTANCE; i++) {
twoWayPairs = twoWayPairs
.apply(GroupByKey.create())
.apply(FlatMapElements((KV<String, Iterable<String>>) neighbors -> {
List<KV<String, String>> newPairs = cartesianProduct(neighbors.getValue());
if (newPairs.size() == 0) {
return Lists.of(KV.of(neighbors.getKey(), neighbors.getValue().get(0)),
KV.of(neighbors.getValue().get(0), neighbors.getKey()));
} else {
return newPairs;
}
});
}
这应该有助于生成具有 distance<N
的邻居。
考虑到在这种情况下,正在改组的数据显着增加,因此在进行非常大的距离之前要小心。
我在以下架构中有一个数据源 - “People ID”、“Address ID”。我已将数据加载到 PCollection 中。
每个人可以有多个地址id,每个地址可以分配给多人。我要查找的是所有相关人员或共享相同地址 ID 的人员,而不仅仅是第一级到“n”级。
假设 P1 与 A1 和 A2 有边,P2 与 A2 有边,A3 和 P3 与 A3 和 A4 有边。在这种情况下,如果我画一个图,我可以发现 P1 与 P2 相关,因为两者共享 A2。 P1 也与 P3 相关,因为 P1 通过 A2 与 P2 相关,而 P2 通过 A3 与 P3 相关。
我的最终目标是找到这群有关系的人(通过地址 ID)。到目前为止我所做的是尝试利用 Join.innerJoin 形成临时 table 结构并将其循环到我们需要的任何级别。
PCollection<PeopleAddress> PA = readEdges(); //
PCollection<KV<String, PeopleAddress>> KAddressPA = transform(PA); // String is the address ID
PCollection<KV<String, KV<PeopleAddress, PeopleAddress>> data = Join.innerJoin(KAddressPA, KAddressPA);
//Above PCollection will give all first level edges, from this we will form a PeopleToPeople connection
PCollection<PeoplePeopleConnection> PP = getConnection(data); // From LHS and RHS we will read the ids and store.
// With the new set of People People Connection we can get new set of PeopleAddress edges..
Class PeoplePeopleConnection { String basePId; String cPId; }
Class PeopleAddress { String pId; String aId; }
我正在考虑将上面的代码循环n次以获得N级连接。但感觉有点过劳。很多边是重复的。我想知道在 PCollection 中是否有任何方法可以做到这一点。就像当我们找到与现有 Person 的地址的连接时,如何 link 它到现有的 person 对象。一些如何将新的一组 PeopleConnection 或 PeopleAddress 连接合并回单个 PColleciton。
解决问题的不同视角?
好的,到目前为止,你会有一对住在同一地址的人,对吧?
PCollection<PeoplePeopleConnection> PP = getConnection(data);
这些对形成了一个没有地址的图表 - 只有人,并且有 distance=1
。我喜欢这个,因为它可以让我们专注于人,而放弃地址。
那么,给定 (P1, P2)
,(P2, P3)
- 我们如何也达到 (P1, P3)
?
我们可以这样做:
PCollection<KV<String, String>> twoWayPairs = PP.apply(
FlatMapElements(pair -> Lists.of(KV.of(pair.basePId, pair.cPId),
KV.of(pair.cPId, pair.basePId))));
然后,我们可以加入他们,就像你以前那样:
PCollection<KV<String, Iterable<String>> groupedData = twoWayPairs
.apply(GroupByKey.create());
给定 (P1, P2)
和 (P2, P3)
作为输入,此 returns (P2, [P1, P3])
、(P1, [P2])
、(P3, [P2])
。从这一对中,我们可以得出 (P1, P3)
作为 distance=2
个邻居的列表。
groupedData.apply(FlatMapElements((KV<String, Iterable<String>>) neighbors -> {
List<KV<String, String>> newPairs = cartesianProduct(neighbors.getValue());
if (newPairs.size() == 0) {
return Lists.of(KV.of(neighbors.getKey(), neighbors.getValue().get(0)),
KV.of(neighbors.getValue().get(0), neighbors.getKey()));
} else {
return newPairs;
}
});
为什么要检查 newPairs
是否为空?因为当 newPairs
元素为空时,我们就会遇到没有链接到其他元素的对的情况(例如前面的 (P1, [P2])
)。
所以,最后,你应该能够做这样的事情:
// We get the distance=1 elements:
PCollection<KV<String, String>> twoWayPairs = PP.apply(
FlatMapElements(pair -> Lists.of(KV.of(pair.basePId, pair.cPId),
KV.of(pair.cPId, pair.basePId))));
for(int i = 1; i < MAX_DISTANCE; i++) {
twoWayPairs = twoWayPairs
.apply(GroupByKey.create())
.apply(FlatMapElements((KV<String, Iterable<String>>) neighbors -> {
List<KV<String, String>> newPairs = cartesianProduct(neighbors.getValue());
if (newPairs.size() == 0) {
return Lists.of(KV.of(neighbors.getKey(), neighbors.getValue().get(0)),
KV.of(neighbors.getValue().get(0), neighbors.getKey()));
} else {
return newPairs;
}
});
}
这应该有助于生成具有 distance<N
的邻居。
考虑到在这种情况下,正在改组的数据显着增加,因此在进行非常大的距离之前要小心。