如何在 Spark 中创建节点对?

How to create pairs of nodes in Spark?

我在 Spark 和 Scala 中有以下 DataFrame:

group   nodeId   date
1       1        2016-10-12T12:10:00.000Z
1       2        2016-10-12T12:00:00.000Z
1       3        2016-10-12T12:05:00.000Z
2       1        2016-10-12T12:30:00.000Z
2       2        2016-10-12T12:35:00.000Z

我需要按 group 对记录进行分组,按 date 对它们进行升序排序,并制作成对的顺序 nodeId。此外,date 应转换为 Unix 纪元。

这可以用预期的输出更好地解释:

group   nodeId_1   nodeId_2   date
1       2          3          2016-10-12T12:00:00.000Z
1       3          1          2016-10-12T12:05:00.000Z
2       1          2          2016-10-12T12:30:00.000Z

这是我目前所做的:

df
  .groupBy("group")
  .agg($"nodeId",$"date")
  .orderBy(asc("date"))

但我不知道如何创建 nodeId 对。

如果我正确理解您的要求,您可以在 group 上使用 self-join,在 nodeId 上使用 < 不等式条件:

val df = Seq(
  (1, 1, "2016-10-12T12:10:00.000Z"),
  (1, 2, "2016-10-12T12:00:00.000Z"),
  (1, 3, "2016-10-12T12:05:00.000Z"),
  (2, 1, "2016-10-12T12:30:00.000Z"),
  (2, 2, "2016-10-12T12:35:00.000Z")
).toDF("group", "nodeId", "date")

df.as("df1").join(
  df.as("df2"),
  $"df1.group" === $"df2.group" && $"df1.nodeId" < $"df2.nodeId"
).select(
  $"df1.group", $"df1.nodeId", $"df2.nodeId",
  when($"df1.date" < $"df2.date", $"df1.date").otherwise($"df2.date").as("date")
)

// +-----+------+------+------------------------+
// |group|nodeId|nodeId|date                    |
// +-----+------+------+------------------------+
// |1    |1     |3     |2016-10-12T12:05:00.000Z|
// |1    |1     |2     |2016-10-12T12:00:00.000Z|
// |1    |2     |3     |2016-10-12T12:00:00.000Z|
// |2    |1     |2     |2016-10-12T12:30:00.000Z|
// +-----+------+------+------------------------+

您可以使用 Window 函数和 lead 内置函数来创建对,并使用 to_utc_timestamp 内置函数将日期转换为纪元日期。最后,您必须 filter 未配对的行,因为您在输出中不需要它们。

以下是上述说明的程序。为了清楚起见,我使用了评论

import org.apache.spark.sql.expressions._
def windowSpec = Window.partitionBy("group").orderBy("date")    //defining window function grouping by group and ordering by date

import org.apache.spark.sql.functions._
df.withColumn("date", to_utc_timestamp(col("date"), "Asia/Kathmandu"))     //converting the date to epoch datetime you can choose other timezone as required
  .withColumn("nodeId_2", lead("nodeId", 1).over(windowSpec))  //using window for creating pairs
    .filter(col("nodeId_2").isNotNull)                   //filtering out the unpaired rows
    .select(col("group"), col("nodeId").as("nodeId_1"), col("nodeId_2"), col("date"))  //selecting as required final dataframe
  .show(false)

你应该根据需要得到最终的dataframe

+-----+--------+--------+-------------------+
|group|nodeId_1|nodeId_2|date               |
+-----+--------+--------+-------------------+
|1    |2       |3       |2016-10-12 12:00:00|
|1    |3       |1       |2016-10-12 12:05:00|
|2    |1       |2       |2016-10-12 12:30:00|
+-----+--------+--------+-------------------+

希望回答对你有帮助

注意 为了获得正确的纪元日期,我使用 Asia/Kathmandu 作为时区