如何在 Spark SQL 中分配非唯一递增索引(索引标记),在加入来自另一个数据帧的特定值时设置回 0

How to assign non unique incrementing index (index markup) in Spark SQL, set back to 0 on joining the specific value from another dataframe

有一个data的DataFrame像

|timestamp          |value|
|2021-01-01 12:00:00| 10.0|
|2021-01-01 12:00:01| 10.0|
|2021-01-01 12:00:02| 10.0|
|2021-01-01 12:00:03| 10.0|
|2021-01-01 12:00:04| 10.0|
|2021-01-01 12:00:05| 10.0|
|2021-01-01 12:00:06| 10.0|
|2021-01-01 12:00:07| 10.0|

events 的 DataFrame 喜欢

|timestamp          |event|
|2021-01-01 12:00:01| true|
|2021-01-01 12:00:05| true|

基于此,我想在初始 DataFrame 中再添加一列,这是自 event:

开始以来数据的 index
|timestamp          |value|index|
|2021-01-01 12:00:00| 10.0|    1|
|2021-01-01 12:00:01| 10.0|    2|
|2021-01-01 12:00:02| 10.0|    3|
|2021-01-01 12:00:03| 10.0|    4|
|2021-01-01 12:00:04| 10.0|    5|
|2021-01-01 12:00:05| 10.0|    1|
|2021-01-01 12:00:06| 10.0|    2|
|2021-01-01 12:00:07| 10.0|    3|

我试过

.withColumn("index",monotonically_increasing_id())

但是在与其他 DataFrame 连接时无法将其设置回 0。所以,欢迎任何想法。

您可以在 timestamp 上加入 data df 和 event df,然后在 event 列上使用条件累积和来定义组。最后,按group列分区设置行号。

像这样:

import org.apache.spark.sql.expressions.Window

val result = data.join(
    events, 
    Seq("timestamp"), 
    "left"
).withColumn(
    "group",
    sum(when(col("event"), 1).otherwise(0)).over(Window.orderBy("timestamp"))
).withColumn(
    "index",
    row_number().over(Window.partitionBy("group").orderBy("timestamp"))
).drop("group", "event")

result.show
//+-------------------+-----+-----+
//|          timestamp|value|index|
//+-------------------+-----+-----+
//|2021-01-01 12:00:00| 10.0|    1|
//|2021-01-01 12:00:01| 10.0|    1|
//|2021-01-01 12:00:02| 10.0|    2|
//|2021-01-01 12:00:03| 10.0|    3|
//|2021-01-01 12:00:04| 10.0|    4|
//|2021-01-01 12:00:05| 10.0|    1|
//|2021-01-01 12:00:06| 10.0|    2|
//|2021-01-01 12:00:07| 10.0|    3|
//+-------------------+-----+-----+

您可以使用 Window 函数来实现它:

from pyspark.sql import SparkSessionRow, Window
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

加入原始DF后的示例数据(为简单起见,我将timestamp列更改为整数类型):

df = spark.createDataFrame([
    Row(timestamp=0, value='foo', event=True),
    Row(timestamp=1, value='foo', event=None),
    Row(timestamp=2, value='foo', event=None),
    Row(timestamp=3, value='foo', event=None),
    Row(timestamp=4, value='foo', event=None),
    Row(timestamp=5, value='foo', event=True),
    Row(timestamp=6, value='foo', event=None),
    Row(timestamp=7, value='foo', event=None),
])

然后我通过前向填充“组”的第一个时间戳来创建一个带有 group_id 的列。 然后可以使用此 group_id 创建索引 F.row_number():

(
    df
    .withColumn('group_id', F.when(F.col('event'), F.col('timestamp')))
    .withColumn('group_id', F.last('group_id', ignorenulls=True).over(Window.orderBy('timestamp')))
    .withColumn('index', F.row_number().over(Window.partitionBy('group_id').orderBy('timestamp')))
    .show()
)

# Output:
+---------+-----+-----+--------+-----+
|timestamp|value|event|group_id|index|
+---------+-----+-----+--------+-----+
|        0|  foo| true|       0|    1|
|        1|  foo| null|       0|    2|
|        2|  foo| null|       0|    3|
|        3|  foo| null|       0|    4|
|        4|  foo| null|       0|    5|
|        5|  foo| true|       5|    1|
|        6|  foo| null|       5|    2|
|        7|  foo| null|       5|    3|
+---------+-----+-----+--------+-----+