如何在某些情况下在 Spark DataFrame 中创建新列 'count'
how to create new column 'count' in Spark DataFrame under some condition
我有一个关于连接日志的 DataFrame,其中包含列 Id
、targetIP
、Time
。此 DataFrame 中的每条记录都是一个系统的连接事件。 id表示本次连接,targetIP
表示本次连接的目标IP地址,Time为连接时间。具有值:
ID
Time
targetIP
1
1
192.163.0.1
2
2
192.163.0.2
3
3
192.163.0.1
4
5
192.163.0.1
5
6
192.163.0.2
6
7
192.163.0.2
7
8
192.163.0.2
我想在某些条件下创建一个新列:过去2个时间单位内到本次目标IP地址的连接数。所以结果 DataFrame 应该是:
ID
Time
targetIP
count
1
1
192.163.0.1
0
2
2
192.163.0.2
0
3
3
192.163.0.1
1
4
5
192.163.0.1
1
5
6
192.163.0.2
0
6
7
192.163.0.2
1
7
8
192.163.0.2
2
例如ID=7
,targetIP
在过去2个时间单位192.163.0.2
连接到系统,即ID=5
和ID=6
,并且他们的 targetIP
也是 192.163.0.2
。所以关于 ID=7
的计数是 2.
期待您的帮助。
您可以使用 count
over Window,范围介于 - 2 和当前行之间,以获取最近 2 个时间单位的 IP 计数。
使用 Spark SQL 你可以这样做:
df.createOrReplaceTempView("connection_logs")
df1 = spark.sql("""
SELECT *,
COUNT(*) OVER(PARTITION BY targetIP ORDER BY Time
RANGE BETWEEN 2 PRECEDING AND CURRENT ROW
) -1 AS count
FROM connection_logs
ORDER BY ID
""")
df1.show()
#+---+----+-----------+-----+
#| ID|Time| targetIP|count|
#+---+----+-----------+-----+
#| 1| 1|192.163.0.1| 0|
#| 2| 2|192.163.0.2| 0|
#| 3| 3|192.163.0.1| 1|
#| 4| 5|192.163.0.1| 1|
#| 5| 6|192.163.0.2| 0|
#| 6| 7|192.163.0.2| 1|
#| 7| 8|192.163.0.2| 2|
#+---+----+-----------+-----+
或者使用 DataFrame API:
from pyspark.sql import Window
from pyspark.sql import functions as F
time_unit = lambda x: x
w = Window.partitionBy("targetIP").orderBy(col("Time").cast("int")).rangeBetween(-time_unit(2), 0)
df1 = df.withColumn("count", F.count("*").over(w) - 1).orderBy("ID")
df1.show()
所以,您基本上需要的是一个 window 函数。
让我们从您的初始数据开始
import org.apache.spark.sql.expressions.Window
import spark.implicits._
case class Event(ID: Int, Time: Int, targetIP: String)
val events = Seq(
Event(1, 1, "192.163.0.1"),
Event(2, 2, "192.163.0.2"),
Event(3, 3, "192.163.0.1"),
Event(4, 5, "192.163.0.1"),
Event(5, 6, "192.163.0.2"),
Event(6, 7, "192.163.0.2"),
Event(7, 8, "192.163.0.2")
).toDS()
现在我们需要定义一个window函数本身
val timeWindow = Window.orderBy($"Time").rowsBetween(-2, -1)
现在是最有趣的部分:如何计算超过 window 的数?没有简单的方法,所以我们将执行以下操作
- 将所有 targetIp 聚合到列表中
- 过滤列表以查找仅需要的 ips
- 计算列表的大小
val df = events
.withColumn("tmp", collect_list($"targetIp").over(timeWindow))
.withColumn("count", size(expr("filter(tst, x -> x == targetIp)")))
.drop($"tmp")
结果将包含我们需要的新列“计数”!
更新:
@blackbishop 编写的没有聚合的更短版本,
val timeWindow = Window.partitionBy($"targetIP").orderBy($"Time").rangeBetween(-2, Window.currentRow)
val df = events
.withColumn("count", count("*").over(timeWindow) - lit(1))
.explain(true)
我有一个关于连接日志的 DataFrame,其中包含列 Id
、targetIP
、Time
。此 DataFrame 中的每条记录都是一个系统的连接事件。 id表示本次连接,targetIP
表示本次连接的目标IP地址,Time为连接时间。具有值:
ID | Time | targetIP |
---|---|---|
1 | 1 | 192.163.0.1 |
2 | 2 | 192.163.0.2 |
3 | 3 | 192.163.0.1 |
4 | 5 | 192.163.0.1 |
5 | 6 | 192.163.0.2 |
6 | 7 | 192.163.0.2 |
7 | 8 | 192.163.0.2 |
我想在某些条件下创建一个新列:过去2个时间单位内到本次目标IP地址的连接数。所以结果 DataFrame 应该是:
ID | Time | targetIP | count |
---|---|---|---|
1 | 1 | 192.163.0.1 | 0 |
2 | 2 | 192.163.0.2 | 0 |
3 | 3 | 192.163.0.1 | 1 |
4 | 5 | 192.163.0.1 | 1 |
5 | 6 | 192.163.0.2 | 0 |
6 | 7 | 192.163.0.2 | 1 |
7 | 8 | 192.163.0.2 | 2 |
例如ID=7
,targetIP
在过去2个时间单位192.163.0.2
连接到系统,即ID=5
和ID=6
,并且他们的 targetIP
也是 192.163.0.2
。所以关于 ID=7
的计数是 2.
期待您的帮助。
您可以使用 count
over Window,范围介于 - 2 和当前行之间,以获取最近 2 个时间单位的 IP 计数。
使用 Spark SQL 你可以这样做:
df.createOrReplaceTempView("connection_logs")
df1 = spark.sql("""
SELECT *,
COUNT(*) OVER(PARTITION BY targetIP ORDER BY Time
RANGE BETWEEN 2 PRECEDING AND CURRENT ROW
) -1 AS count
FROM connection_logs
ORDER BY ID
""")
df1.show()
#+---+----+-----------+-----+
#| ID|Time| targetIP|count|
#+---+----+-----------+-----+
#| 1| 1|192.163.0.1| 0|
#| 2| 2|192.163.0.2| 0|
#| 3| 3|192.163.0.1| 1|
#| 4| 5|192.163.0.1| 1|
#| 5| 6|192.163.0.2| 0|
#| 6| 7|192.163.0.2| 1|
#| 7| 8|192.163.0.2| 2|
#+---+----+-----------+-----+
或者使用 DataFrame API:
from pyspark.sql import Window
from pyspark.sql import functions as F
time_unit = lambda x: x
w = Window.partitionBy("targetIP").orderBy(col("Time").cast("int")).rangeBetween(-time_unit(2), 0)
df1 = df.withColumn("count", F.count("*").over(w) - 1).orderBy("ID")
df1.show()
所以,您基本上需要的是一个 window 函数。
让我们从您的初始数据开始
import org.apache.spark.sql.expressions.Window
import spark.implicits._
case class Event(ID: Int, Time: Int, targetIP: String)
val events = Seq(
Event(1, 1, "192.163.0.1"),
Event(2, 2, "192.163.0.2"),
Event(3, 3, "192.163.0.1"),
Event(4, 5, "192.163.0.1"),
Event(5, 6, "192.163.0.2"),
Event(6, 7, "192.163.0.2"),
Event(7, 8, "192.163.0.2")
).toDS()
现在我们需要定义一个window函数本身
val timeWindow = Window.orderBy($"Time").rowsBetween(-2, -1)
现在是最有趣的部分:如何计算超过 window 的数?没有简单的方法,所以我们将执行以下操作
- 将所有 targetIp 聚合到列表中
- 过滤列表以查找仅需要的 ips
- 计算列表的大小
val df = events
.withColumn("tmp", collect_list($"targetIp").over(timeWindow))
.withColumn("count", size(expr("filter(tst, x -> x == targetIp)")))
.drop($"tmp")
结果将包含我们需要的新列“计数”!
更新:
@blackbishop 编写的没有聚合的更短版本,
val timeWindow = Window.partitionBy($"targetIP").orderBy($"Time").rangeBetween(-2, Window.currentRow)
val df = events
.withColumn("count", count("*").over(timeWindow) - lit(1))
.explain(true)