如何在某些情况下在 Spark DataFrame 中创建新列 'count'

how to create new column 'count' in Spark DataFrame under some condition

我有一个关于连接日志的 DataFrame,其中包含列 IdtargetIPTime。此 DataFrame 中的每条记录都是一个系统的连接事件。 id表示本次连接,targetIP表示本次连接的目标IP地址,Time为连接时间。具有值:

ID Time targetIP
1 1 192.163.0.1
2 2 192.163.0.2
3 3 192.163.0.1
4 5 192.163.0.1
5 6 192.163.0.2
6 7 192.163.0.2
7 8 192.163.0.2

我想在某些条件下创建一个新列:过去2个时间单位内到本次目标IP地址的连接数。所以结果 DataFrame 应该是:

ID Time targetIP count
1 1 192.163.0.1 0
2 2 192.163.0.2 0
3 3 192.163.0.1 1
4 5 192.163.0.1 1
5 6 192.163.0.2 0
6 7 192.163.0.2 1
7 8 192.163.0.2 2

例如ID=7targetIP在过去2个时间单位192.163.0.2连接到系统,即ID=5ID=6,并且他们的 targetIP 也是 192.163.0.2。所以关于 ID=7 的计数是 2.

期待您的帮助。

您可以使用 count over Window,范围介于 - 2 和当前行之间,以获取最近 2 个时间单位的 IP 计数。

使用 Spark SQL 你可以这样做:

df.createOrReplaceTempView("connection_logs")

df1 = spark.sql("""
    SELECT  *,
            COUNT(*) OVER(PARTITION BY targetIP ORDER BY Time 
                          RANGE BETWEEN 2 PRECEDING AND CURRENT ROW
                          ) -1 AS count
    FROM    connection_logs
    ORDER BY ID
""")

df1.show()

#+---+----+-----------+-----+
#| ID|Time|   targetIP|count|
#+---+----+-----------+-----+
#|  1|   1|192.163.0.1|    0|
#|  2|   2|192.163.0.2|    0|
#|  3|   3|192.163.0.1|    1|
#|  4|   5|192.163.0.1|    1|
#|  5|   6|192.163.0.2|    0|
#|  6|   7|192.163.0.2|    1|
#|  7|   8|192.163.0.2|    2|
#+---+----+-----------+-----+

或者使用 DataFrame API:

from pyspark.sql import Window
from pyspark.sql import functions as F

time_unit = lambda x: x

w = Window.partitionBy("targetIP").orderBy(col("Time").cast("int")).rangeBetween(-time_unit(2), 0)

df1 = df.withColumn("count", F.count("*").over(w) - 1).orderBy("ID")

df1.show()

所以,您基本上需要的是一个 window 函数。

让我们从您的初始数据开始

import org.apache.spark.sql.expressions.Window
import spark.implicits._

case class Event(ID: Int, Time: Int, targetIP: String)

val events = Seq(
    Event(1, 1, "192.163.0.1"),
    Event(2, 2, "192.163.0.2"),
    Event(3, 3, "192.163.0.1"),
    Event(4, 5, "192.163.0.1"),
    Event(5, 6, "192.163.0.2"),
    Event(6, 7, "192.163.0.2"),
    Event(7, 8, "192.163.0.2")
).toDS()

现在我们需要定义一个window函数本身

val timeWindow = Window.orderBy($"Time").rowsBetween(-2, -1)

现在是最有趣的部分:如何计算超过 window 的数?没有简单的方法,所以我们将执行以下操作

  1. 将所有 targetIp 聚合到列表中
  2. 过滤列表以查找仅需要的 ips
  3. 计算列表的大小
val df = events
        .withColumn("tmp", collect_list($"targetIp").over(timeWindow))
        .withColumn("count", size(expr("filter(tst, x -> x == targetIp)")))
        .drop($"tmp")

结果将包含我们需要的新列“计数”!

更新:

@blackbishop 编写的没有聚合的更短版本,

val timeWindow = Window.partitionBy($"targetIP").orderBy($"Time").rangeBetween(-2, Window.currentRow)
val df = events
        .withColumn("count", count("*").over(timeWindow) - lit(1))
        .explain(true)