如何在 Spark 中使用过滤来制作节点对?
How to make pairs of nodes using filtering in Spark?
我在 Spark 和 Scala 中有以下 DataFrame:
nodeId typeFrom typeTo date
1 A G 2016-10-12T12:10:00.000Z
2 B A 2016-10-12T12:00:00.000Z
3 A B 2016-10-12T12:05:00.000Z
4 D C 2016-10-12T12:30:00.000Z
5 G D 2016-10-12T12:35:00.000Z
我想在 typeFrom
和 typeTo
值相同的情况下成对 nodeId
。
以上示例的预期输出如下:
nodeId_1 nodeId_2 type date
1 2 A 2016-10-12T12:10:00.000Z
3 2 A 2016-10-12T12:05:00.000Z
2 3 B 2016-10-12T12:00:00.000Z
4 5 C 2016-10-12T12:30:00.000Z
5 1 G 2016-10-12T12:35:00.000Z
我不知道如何配对 nodeId
:
df.
.filter($"typeFrom" === $"typeTo")
.???
您可以使用 self-join 将 nodeFrom
与 nodeTo
匹配:
val df = Seq(
(1, "A", "G", "2016-10-12T12:10:00.000Z"),
(2, "B", "A", "2016-10-12T12:00:00.000Z"),
(3, "A", "B", "2016-10-12T12:05:00.000Z"),
(4, "D", "C", "2016-10-12T12:30:00.000Z"),
(5, "G", "D", "2016-10-12T12:35:00.000Z")
).toDF("nodeId", "typeFrom", "typeTo", "date")
df.as("df1").join(
df.as("df2"),
$"df1.typeFrom" === $"df2.typeTo"
).select(
$"df1.nodeId".as("nodeId_1"), $"df2.nodeId".as("nodeId_2"), $"df1.typeFrom".as("type"), $"df1.date"
).show(truncate=false)
// +--------+--------+----+------------------------+
// |nodeId_1|nodeId_2|type|date |
// +--------+--------+----+------------------------+
// |1 |2 |A |2016-10-12T12:10:00.000Z|
// |2 |3 |B |2016-10-12T12:00:00.000Z|
// |3 |2 |A |2016-10-12T12:05:00.000Z|
// |4 |5 |D |2016-10-12T12:30:00.000Z|
// |5 |1 |G |2016-10-12T12:35:00.000Z|
// +--------+--------+----+------------------------+
我在 Spark 和 Scala 中有以下 DataFrame:
nodeId typeFrom typeTo date
1 A G 2016-10-12T12:10:00.000Z
2 B A 2016-10-12T12:00:00.000Z
3 A B 2016-10-12T12:05:00.000Z
4 D C 2016-10-12T12:30:00.000Z
5 G D 2016-10-12T12:35:00.000Z
我想在 typeFrom
和 typeTo
值相同的情况下成对 nodeId
。
以上示例的预期输出如下:
nodeId_1 nodeId_2 type date
1 2 A 2016-10-12T12:10:00.000Z
3 2 A 2016-10-12T12:05:00.000Z
2 3 B 2016-10-12T12:00:00.000Z
4 5 C 2016-10-12T12:30:00.000Z
5 1 G 2016-10-12T12:35:00.000Z
我不知道如何配对 nodeId
:
df.
.filter($"typeFrom" === $"typeTo")
.???
您可以使用 self-join 将 nodeFrom
与 nodeTo
匹配:
val df = Seq(
(1, "A", "G", "2016-10-12T12:10:00.000Z"),
(2, "B", "A", "2016-10-12T12:00:00.000Z"),
(3, "A", "B", "2016-10-12T12:05:00.000Z"),
(4, "D", "C", "2016-10-12T12:30:00.000Z"),
(5, "G", "D", "2016-10-12T12:35:00.000Z")
).toDF("nodeId", "typeFrom", "typeTo", "date")
df.as("df1").join(
df.as("df2"),
$"df1.typeFrom" === $"df2.typeTo"
).select(
$"df1.nodeId".as("nodeId_1"), $"df2.nodeId".as("nodeId_2"), $"df1.typeFrom".as("type"), $"df1.date"
).show(truncate=false)
// +--------+--------+----+------------------------+
// |nodeId_1|nodeId_2|type|date |
// +--------+--------+----+------------------------+
// |1 |2 |A |2016-10-12T12:10:00.000Z|
// |2 |3 |B |2016-10-12T12:00:00.000Z|
// |3 |2 |A |2016-10-12T12:05:00.000Z|
// |4 |5 |D |2016-10-12T12:30:00.000Z|
// |5 |1 |G |2016-10-12T12:35:00.000Z|
// +--------+--------+----+------------------------+