使用 windows 连接 PySpark 行
Concatenate PySpark rows using windows
我有以下 PySpark 数据框。
+-----+---------------------+-------------------+
| name| someString| timestamp|
+-----+---------------------+-------------------+
|name1| A-aa1-aa2|2012-01-10 00:00:00|
|name1| B-bb|2012-01-11 00:00:00|
|name1| C-cc1-cc2|2012-01-13 00:00:00|
|name1| D-dd1-dd2-dd3|2012-01-14 00:00:00|
|name1| E-ee|2012-01-15 00:00:00|
|name2| F-ff|2012-01-10 00:00:00|
|name2| G-gg1-gg2-gg3|2012-01-11 00:00:00|
|name2| H-hh1-hh2-hh3|2012-01-12 00:00:00|
|name2|I-ii1-ii2-ii3-ii4-ii5|2012-01-14 00:00:00|
+-----+---------------------+-------------------+
在这个数据框中,我想创建一个新的数据框(比如df2
),它有一个列(名为"concatStrings"
),连接行[=40]中的所有元素=] 在 someString
列中 3 天的滚动时间 window 对于每个唯一的名称类型 (以及所有df1
列)。
在上面的示例中,我希望 df2
如下所示:
+-----+---------------------+-------------------+--------------------------------------+
| name| someString| timestamp| concatStrings|
+-----+---------------------+-------------------+--------------------------------------+
|name1| A-aa1-aa2|2012-01-10 00:00:00| A-aa1-aa2 |
|name1| B-bb|2012-01-11 00:00:00| A-aa1-aa2-B-bb |
|name1| C-cc1-cc2|2012-01-13 00:00:00| B-bb-C-cc1-cc2 |
|name1| D-dd1-dd2-dd3|2012-01-14 00:00:00| C-cc1-cc2-D-dd1-dd2-dd3 |
|name1| E-ee|2012-01-15 00:00:00| C-cc1-cc2-D-dd1-dd2-dd3-E-ee |
|name2| F-ff|2012-01-10 00:00:00| F-ff |
|name2| G-gg1-gg2-gg3|2012-01-11 00:00:00| F-ff-G-gg1-gg2-gg3 |
|name2| H-hh1-hh2-hh3|2012-01-12 00:00:00| F-ff-G-gg1-gg2-gg3-H-hh1-hh2-hh3 |
|name2|I-ii1-ii2-ii3-ii4-ii5|2012-01-14 00:00:00| H-hh1-hh2-hh3-I-ii1-ii2-ii3-ii4-ii5|
+-----+---------------------+-------------------+--------------------------------------+
我该怎么做?
以下是我到目前为止尝试过的代码:
win_ts = (
Window.partitionBy("name")
.orderBy(F.col("timestamp").cast("long"))
.rangeBetween(-(3 - 1) * 86400, 0)
)
df2 = df1.withColumn( "concatStrings" , F.concat_ws("-" , F.col("someString").over(win_ts) ) )
但是,当我尝试上面的代码片段时,出现了下面提到的错误:
Py4JJavaError: An error occurred while calling o235.withColumn.
: org.apache.spark.sql.AnalysisException: Expression 'someString#72' not supported within a window function.;;
Project [name#70, someString#72, timestamp#76, concatStrings#124]
+- Project [name#70, someString#72, timestamp#76, _w0#125L, _we0#126, concat_ws(-, _we0#126) AS concatStrings#124]
+- Window [someString#72 windowspecdefinition(name#70, _w0#125L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -172800, currentrow$())) AS _we0#126], [name#70], [_w0#125L ASC NULLS FIRST]
+- Project [name#70, someString#72, timestamp#76, cast(timestamp#76 as bigint) AS _w0#125L]
+- Project [name#70, someString#72, timestamp#76]
您可以使用下面提到的代码生成数据框df1
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import *
from pyspark.sql.types import TimestampType
df1_Stats = Row("name", "timestamp1", "someString")
df1_stat1 = df1_Stats("name1", "2012-01-10 00:00:00", "A-aa1-aa2")
df1_stat2 = df1_Stats("name1", "2012-01-11 00:00:00", "B-bb")
df1_stat3 = df1_Stats("name1", "2012-01-13 00:00:00", "C-cc1-cc2")
df1_stat4 = df1_Stats("name1", "2012-01-14 00:00:00", "D-dd1-dd2-dd3")
df1_stat5 = df1_Stats("name1", "2012-01-15 00:00:00", "E-ee")
df1_stat6 = df1_Stats("name2", "2012-01-10 00:00:00", "F-ff")
df1_stat7 = df1_Stats("name2", "2012-01-11 00:00:00", "G-gg1-gg2-gg3")
df1_stat8 = df1_Stats("name2", "2012-01-12 00:00:00", "H-hh1-hh2-hh3")
df1_stat9 = df1_Stats("name2", "2012-01-14 00:00:00", "I-ii1-ii2-ii3-ii4-ii5")
df1_stat_lst = [
df1_stat1,
df1_stat2,
df1_stat3,
df1_stat4,
df1_stat5,
df1_stat6,
df1_stat7,
df1_stat8,
df1_stat9,
]
df1 = spark.createDataFrame(df1_stat_lst)
df1 = df1.withColumn( "timestamp" , df1["timestamp1"].cast(TimestampType()) ).drop("timestamp1")
首先需要收集为列表,然后使用array_join
将其转换为字符串。这应该有效:
df2 = df1.withColumn("concatStrings" , F.collect_list("someString").over(win_ts)) \
.withColumn("concatStrings", F.array_join("concatStrings", '-'))
我有以下 PySpark 数据框。
+-----+---------------------+-------------------+
| name| someString| timestamp|
+-----+---------------------+-------------------+
|name1| A-aa1-aa2|2012-01-10 00:00:00|
|name1| B-bb|2012-01-11 00:00:00|
|name1| C-cc1-cc2|2012-01-13 00:00:00|
|name1| D-dd1-dd2-dd3|2012-01-14 00:00:00|
|name1| E-ee|2012-01-15 00:00:00|
|name2| F-ff|2012-01-10 00:00:00|
|name2| G-gg1-gg2-gg3|2012-01-11 00:00:00|
|name2| H-hh1-hh2-hh3|2012-01-12 00:00:00|
|name2|I-ii1-ii2-ii3-ii4-ii5|2012-01-14 00:00:00|
+-----+---------------------+-------------------+
在这个数据框中,我想创建一个新的数据框(比如df2
),它有一个列(名为"concatStrings"
),连接行[=40]中的所有元素=] 在 someString
列中 3 天的滚动时间 window 对于每个唯一的名称类型 (以及所有df1
列)。
在上面的示例中,我希望 df2
如下所示:
+-----+---------------------+-------------------+--------------------------------------+
| name| someString| timestamp| concatStrings|
+-----+---------------------+-------------------+--------------------------------------+
|name1| A-aa1-aa2|2012-01-10 00:00:00| A-aa1-aa2 |
|name1| B-bb|2012-01-11 00:00:00| A-aa1-aa2-B-bb |
|name1| C-cc1-cc2|2012-01-13 00:00:00| B-bb-C-cc1-cc2 |
|name1| D-dd1-dd2-dd3|2012-01-14 00:00:00| C-cc1-cc2-D-dd1-dd2-dd3 |
|name1| E-ee|2012-01-15 00:00:00| C-cc1-cc2-D-dd1-dd2-dd3-E-ee |
|name2| F-ff|2012-01-10 00:00:00| F-ff |
|name2| G-gg1-gg2-gg3|2012-01-11 00:00:00| F-ff-G-gg1-gg2-gg3 |
|name2| H-hh1-hh2-hh3|2012-01-12 00:00:00| F-ff-G-gg1-gg2-gg3-H-hh1-hh2-hh3 |
|name2|I-ii1-ii2-ii3-ii4-ii5|2012-01-14 00:00:00| H-hh1-hh2-hh3-I-ii1-ii2-ii3-ii4-ii5|
+-----+---------------------+-------------------+--------------------------------------+
我该怎么做?
以下是我到目前为止尝试过的代码:
win_ts = (
Window.partitionBy("name")
.orderBy(F.col("timestamp").cast("long"))
.rangeBetween(-(3 - 1) * 86400, 0)
)
df2 = df1.withColumn( "concatStrings" , F.concat_ws("-" , F.col("someString").over(win_ts) ) )
但是,当我尝试上面的代码片段时,出现了下面提到的错误:
Py4JJavaError: An error occurred while calling o235.withColumn.
: org.apache.spark.sql.AnalysisException: Expression 'someString#72' not supported within a window function.;;
Project [name#70, someString#72, timestamp#76, concatStrings#124]
+- Project [name#70, someString#72, timestamp#76, _w0#125L, _we0#126, concat_ws(-, _we0#126) AS concatStrings#124]
+- Window [someString#72 windowspecdefinition(name#70, _w0#125L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -172800, currentrow$())) AS _we0#126], [name#70], [_w0#125L ASC NULLS FIRST]
+- Project [name#70, someString#72, timestamp#76, cast(timestamp#76 as bigint) AS _w0#125L]
+- Project [name#70, someString#72, timestamp#76]
您可以使用下面提到的代码生成数据框df1
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import *
from pyspark.sql.types import TimestampType
df1_Stats = Row("name", "timestamp1", "someString")
df1_stat1 = df1_Stats("name1", "2012-01-10 00:00:00", "A-aa1-aa2")
df1_stat2 = df1_Stats("name1", "2012-01-11 00:00:00", "B-bb")
df1_stat3 = df1_Stats("name1", "2012-01-13 00:00:00", "C-cc1-cc2")
df1_stat4 = df1_Stats("name1", "2012-01-14 00:00:00", "D-dd1-dd2-dd3")
df1_stat5 = df1_Stats("name1", "2012-01-15 00:00:00", "E-ee")
df1_stat6 = df1_Stats("name2", "2012-01-10 00:00:00", "F-ff")
df1_stat7 = df1_Stats("name2", "2012-01-11 00:00:00", "G-gg1-gg2-gg3")
df1_stat8 = df1_Stats("name2", "2012-01-12 00:00:00", "H-hh1-hh2-hh3")
df1_stat9 = df1_Stats("name2", "2012-01-14 00:00:00", "I-ii1-ii2-ii3-ii4-ii5")
df1_stat_lst = [
df1_stat1,
df1_stat2,
df1_stat3,
df1_stat4,
df1_stat5,
df1_stat6,
df1_stat7,
df1_stat8,
df1_stat9,
]
df1 = spark.createDataFrame(df1_stat_lst)
df1 = df1.withColumn( "timestamp" , df1["timestamp1"].cast(TimestampType()) ).drop("timestamp1")
首先需要收集为列表,然后使用array_join
将其转换为字符串。这应该有效:
df2 = df1.withColumn("concatStrings" , F.collect_list("someString").over(win_ts)) \
.withColumn("concatStrings", F.array_join("concatStrings", '-'))