使用 windows 连接 PySpark 行

Concatenate PySpark rows using windows

我有以下 PySpark 数据框。

+-----+---------------------+-------------------+
| name|           someString|          timestamp|
+-----+---------------------+-------------------+
|name1|            A-aa1-aa2|2012-01-10 00:00:00|
|name1|                 B-bb|2012-01-11 00:00:00|
|name1|            C-cc1-cc2|2012-01-13 00:00:00|
|name1|        D-dd1-dd2-dd3|2012-01-14 00:00:00|
|name1|                 E-ee|2012-01-15 00:00:00|
|name2|                 F-ff|2012-01-10 00:00:00|
|name2|        G-gg1-gg2-gg3|2012-01-11 00:00:00|
|name2|        H-hh1-hh2-hh3|2012-01-12 00:00:00|
|name2|I-ii1-ii2-ii3-ii4-ii5|2012-01-14 00:00:00|
+-----+---------------------+-------------------+

在这个数据框中,我想创建一个新的数据框(比如df2),它有一个列(名为"concatStrings"),连接行[=40]中的所有元素=] 在 someString 列中 3 天的滚动时间 window 对于每个唯一的名称类型 (以及所有df1 列)。

在上面的示例中,我希望 df2 如下所示:

+-----+---------------------+-------------------+--------------------------------------+
| name|           someString|          timestamp|                         concatStrings|
+-----+---------------------+-------------------+--------------------------------------+
|name1|            A-aa1-aa2|2012-01-10 00:00:00|   A-aa1-aa2                          | 
|name1|                 B-bb|2012-01-11 00:00:00|   A-aa1-aa2-B-bb                     | 
|name1|            C-cc1-cc2|2012-01-13 00:00:00|   B-bb-C-cc1-cc2                     |
|name1|        D-dd1-dd2-dd3|2012-01-14 00:00:00|   C-cc1-cc2-D-dd1-dd2-dd3            | 
|name1|                 E-ee|2012-01-15 00:00:00|   C-cc1-cc2-D-dd1-dd2-dd3-E-ee       |
|name2|                 F-ff|2012-01-10 00:00:00|   F-ff                               |
|name2|        G-gg1-gg2-gg3|2012-01-11 00:00:00|   F-ff-G-gg1-gg2-gg3                 |
|name2|        H-hh1-hh2-hh3|2012-01-12 00:00:00|   F-ff-G-gg1-gg2-gg3-H-hh1-hh2-hh3   |
|name2|I-ii1-ii2-ii3-ii4-ii5|2012-01-14 00:00:00|   H-hh1-hh2-hh3-I-ii1-ii2-ii3-ii4-ii5|
+-----+---------------------+-------------------+--------------------------------------+

我该怎么做?

以下是我到目前为止尝试过的代码:

win_ts = (
    Window.partitionBy("name")
    .orderBy(F.col("timestamp").cast("long"))
    .rangeBetween(-(3 - 1) * 86400, 0)
)

df2 = df1.withColumn( "concatStrings" , F.concat_ws("-" , F.col("someString").over(win_ts) ) )

但是,当我尝试上面的代码片段时,出现了下面提到的错误:

Py4JJavaError: An error occurred while calling o235.withColumn.
: org.apache.spark.sql.AnalysisException: Expression 'someString#72' not supported within a window function.;;
Project [name#70, someString#72, timestamp#76, concatStrings#124]
+- Project [name#70, someString#72, timestamp#76, _w0#125L, _we0#126, concat_ws(-, _we0#126) AS concatStrings#124]
   +- Window [someString#72 windowspecdefinition(name#70, _w0#125L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -172800, currentrow$())) AS _we0#126], [name#70], [_w0#125L ASC NULLS FIRST]
      +- Project [name#70, someString#72, timestamp#76, cast(timestamp#76 as bigint) AS _w0#125L]
         +- Project [name#70, someString#72, timestamp#76]

您可以使用下面提到的代码生成数据框df1

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import *
from pyspark.sql.types import TimestampType

df1_Stats = Row("name", "timestamp1", "someString")

df1_stat1 = df1_Stats("name1", "2012-01-10 00:00:00", "A-aa1-aa2")
df1_stat2 = df1_Stats("name1", "2012-01-11 00:00:00", "B-bb")
df1_stat3 = df1_Stats("name1", "2012-01-13 00:00:00", "C-cc1-cc2")
df1_stat4 = df1_Stats("name1", "2012-01-14 00:00:00", "D-dd1-dd2-dd3")
df1_stat5 = df1_Stats("name1", "2012-01-15 00:00:00", "E-ee")
df1_stat6 = df1_Stats("name2", "2012-01-10 00:00:00", "F-ff")
df1_stat7 = df1_Stats("name2", "2012-01-11 00:00:00", "G-gg1-gg2-gg3")
df1_stat8 = df1_Stats("name2", "2012-01-12 00:00:00", "H-hh1-hh2-hh3")
df1_stat9 = df1_Stats("name2", "2012-01-14 00:00:00", "I-ii1-ii2-ii3-ii4-ii5")

df1_stat_lst = [
    df1_stat1,
    df1_stat2,
    df1_stat3,
    df1_stat4,
    df1_stat5,
    df1_stat6,
    df1_stat7,
    df1_stat8,
    df1_stat9,
]

df1 = spark.createDataFrame(df1_stat_lst)
df1 = df1.withColumn( "timestamp" , df1["timestamp1"].cast(TimestampType()) ).drop("timestamp1")

首先需要收集为列表,然后使用array_join将其转换为字符串。这应该有效:

df2 = df1.withColumn("concatStrings" , F.collect_list("someString").over(win_ts)) \
         .withColumn("concatStrings", F.array_join("concatStrings", '-'))