通过基于条件连接另一列的值来创建新的 pyspark DataFrame 列
Create new pyspark DataFrame column by concatenating values of another column based on a conditional
我在 pyspark
中有一个数据框,如下所示
df.show()
+-------+--------------------+--------------------+
| Dev_No| model| Tested|
+-------+--------------------+--------------------+
|BTA16C5| Windows PC| N|
|BTA16C5| SRL| N|
|BTA16C5| Hewlett Packard| N|
|CTA16C5| Android Devices| Y|
|CTA16C5| Hewlett Packard| N|
|4MY16A5| Other| N|
|4MY16A5| Other| N|
|4MY16A5| Tablet| Y|
|4MY16A5| Other| N|
|4MY16A5| Cable STB| Y|
|4MY16A5| Other| N|
|4MY16A5| Windows PC| Y|
|4MY16A5| Windows PC| Y|
|4MY16A5| Smart Watch| Y|
+-------+--------------------+--------------------+
现在使用上面的数据框,我想用一个名为 Tested_devices
的 newcolumn
创建下面的数据框,并用每个 Dev_No
[=35= 的值填充列] model
其中 Tested
是 Y
并以逗号分隔填充所有值。
df1.show()
+-------+--------------------+--------------------+------------------------------------------------------+
| Dev_No| model| Tested| Tested_devices|
+-------+--------------------+--------------------+------------------------------------------------------+
|BTA16C5| Windows PC| N| |
|BTA16C5| SRL| N| |
|BTA16C5| Hewlett Packard| N| |
|CTA16C5| Android Devices| Y| Android Devices|
|CTA16C5| Hewlett Packard| N| |
|4MY16A5| Other| N| |
|4MY16A5| Other| N| |
|4MY16A5| Tablet| Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5| Other| N| |
|4MY16A5| Cable STB| Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5| Other| N| |
|4MY16A5| Windows PC| Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5| Windows PC| Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5| Smart Watch| Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
+-------+--------------------+--------------------+------------------------------------------------------+
我尝试了类似下面的方法 select Dev_No
和 model
其中 Tested
是 Y
a = df.select("Dev_No", "model"), when(df.Tested == 'Y')
我无法得到结果。它给了我以下错误
TypeError: when() takes exactly 2 arguments (1 given)
我怎样才能达到我想要的
更新
对于 spark 1.6,您将需要一种替代方法。在不使用 udf
或任何 Window
函数的情况下执行此操作的一种方法是使用收集的值创建第二个临时 DataFrame,然后将其连接回原始 DataFrame。
首先按 Dev_No
和 Tested
分组,然后使用 concat_ws
和 collect_list
进行聚合。聚合后,仅过滤测试设备的 DataFrame。
import pyspark.sql.functions as f
# create temporary DataFrame
df2 = df.groupBy('Dev_No', 'Tested')\
.agg(f.concat_ws(", ", f.collect_list('model')).alias('Tested_devices'))\
.where(f.col('Tested') == 'Y')
df2.show(truncate=False)
#+-------+------+------------------------------------------------------+
#|Dev_No |Tested|Tested_devices |
#+-------+------+------------------------------------------------------+
#|CTA16C5|Y |Android Devices |
#|4MY16A5|Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#+-------+------+------------------------------------------------------+
现在使用 Dev_No
和 Tested
列作为连接键,对 df
和 df2
进行左连接:
df.join(df2, on=['Dev_No', 'Tested'], how='left')\
.select('Dev_No', 'model', 'Tested', 'Tested_devices')\
.show(truncate=False)
在末尾使用 select
的目的是让列的顺序与原始 DataFrame 的顺序相同,以便显示 - 如果您愿意,可以删除此步骤。
这将导致以下输出(与下面相同的输出(带有 concat_ws
):
#+-------+---------------+------+------------------------------------------------------+
#|Dev_No |model |Tested|Tested_devices |
#+-------+---------------+------+------------------------------------------------------+
#|4MY16A5|Other |N |null |
#|4MY16A5|Other |N |null |
#|4MY16A5|Other |N |null |
#|4MY16A5|Other |N |null |
#|CTA16C5|Hewlett Packard|N |null |
#|BTA16C5|Windows PC |N |null |
#|BTA16C5|SRL |N |null |
#|BTA16C5|Hewlett Packard|N |null |
#|CTA16C5|Android Devices|Y |Android Devices |
#|4MY16A5|Tablet |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Cable STB |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Smart Watch |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#+-------+---------------+------+------------------------------------------------------+
原答案:(适用于更高版本的 Spark)
您可以使用两个 pyspark.sql.functions.when()
statements- one of them within a call to pyspark.sql.functions.collect_list()
over a Window
, taking advantage of the fact that the default null
value :
from pyspark.sql import Window
import pyspark.sql.functions as f
df.select(
"*",
f.when(
f.col("Tested") == "Y",
f.collect_list(
f.when(
f.col("Tested") == "Y",
f.col('model')
)
).over(Window.partitionBy("Dev_No"))
).alias("Tested_devices")
).show(truncate=False)
#+-------+---------------+------+--------------------------------------------------------+
#|Dev_No |model |Tested|Tested_devices |
#+-------+---------------+------+--------------------------------------------------------+
#|BTA16C5|Windows PC |N |null |
#|BTA16C5|SRL |N |null |
#|BTA16C5|Hewlett Packard|N |null |
#|4MY16A5|Other |N |null |
#|4MY16A5|Other |N |null |
#|4MY16A5|Tablet |Y |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Other |N |null |
#|4MY16A5|Cable STB |Y |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Other |N |null |
#|4MY16A5|Windows PC |Y |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Windows PC |Y |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Smart Watch |Y |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|CTA16C5|Android Devices|Y |[Android Devices] |
#|CTA16C5|Hewlett Packard|N |null |
#+-------+---------------+------+--------------------------------------------------------+
相反,如果您希望输出与您在问题中显示的完全一样 - 作为逗号分隔值的字符串而不是列表和空字符串而不是 null
- 您可以按如下方式稍微修改:
使用pyspark.sql.functions.concat_ws
将collect_list
的输出连接成一个字符串。我使用 ", "
作为分隔符。这相当于在 python 中执行 ", ".join(some_list)
。接下来,我们将 .otherwise(f.lit(""))
添加到外部 when()
调用的末尾,以指定如果条件为 False
.[=43,我们想要 return 文字空字符串=]
df.select(
"*",
f.when(
f.col("Tested") == "Y",
f.concat_ws(
", ",
f.collect_list(
f.when(
f.col("Tested") == "Y",
f.col('model')
)
).over(Window.partitionBy("Dev_No"))
)
).otherwise(f.lit("")).alias("Tested_devices")
).show(truncate=False)
#+-------+---------------+------+------------------------------------------------------+
#|Dev_No |model |Tested|Tested_devices |
#+-------+---------------+------+------------------------------------------------------+
#|BTA16C5|Windows PC |N | |
#|BTA16C5|SRL |N | |
#|BTA16C5|Hewlett Packard|N | |
#|4MY16A5|Other |N | |
#|4MY16A5|Other |N | |
#|4MY16A5|Tablet |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Other |N | |
#|4MY16A5|Cable STB |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Other |N | |
#|4MY16A5|Windows PC |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Smart Watch |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|CTA16C5|Android Devices|Y |Android Devices |
#|CTA16C5|Hewlett Packard|N | |
#+-------+---------------+------+------------------------------------------------------+
使用 pyspark-sql
语法,上面的第一个例子等同于:
df.registerTempTable("df")
query = """
SELECT *,
CASE
WHEN Tested = 'Y'
THEN COLLECT_LIST(
CASE
WHEN Tested = 'Y'
THEN model
END
) OVER (PARTITION BY Dev_No)
END AS Tested_devices
FROM df
"""
sqlCtx.sql(query).show(truncate=False)
为了清晰和解释而发表评论
pyspark > 1.6
#window function to group by Dev_No
from pyspark.sql import Window
windowSpec = Window.partitionBy("Dev_No")
from pyspark.sql import functions as f
from pyspark.sql import types as t
#udf function to change the collected list to string and also to check if Tested column is Y or N
@f.udf(t.StringType())
def populatedUdfFunc(tested, list):
if(tested == "Y"):
return ", ".join(list)
else:
return ""
#collecting models when Tested is Y using window function defined above
df.withColumn("Tested_devices", populatedUdfFunc(f.col("Tested"), f.collect_list(f.when(f.col("Tested") == "Y", f.col("model")).otherwise(None)).over(windowSpec))).show(truncate=False)
哪个应该给你
+-------+---------------+------+------------------------------------------------------+
|Dev_No |model |Tested|Tested_devices |
+-------+---------------+------+------------------------------------------------------+
|BTA16C5|Windows PC |N | |
|BTA16C5|SRL |N | |
|BTA16C5|Hewlett Packard|N | |
|4MY16A5|Other |N | |
|4MY16A5|Other |N | |
|4MY16A5|Tablet |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
|4MY16A5|Other |N | |
|4MY16A5|Cable STB |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
|4MY16A5|Other |N | |
|4MY16A5|Windows PC |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
|4MY16A5|Windows PC |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
|4MY16A5|Smart Watch |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
|CTA16C5|Android Devices|Y |Android Devices |
|CTA16C5|Hewlett Packard|N | |
+-------+---------------+------+------------------------------------------------------+
火花 = 1.6
对于 pyspark 1.6,collect_list
不能与 window
函数一起使用,并且 没有 collect_list 函数定义在 SqlContext 中。因此,您将不得不 不使用 window 函数并使用 HiveContext 而不是 SQLContext
from pyspark.sql import functions as f
from pyspark.sql import types as t
#udf function to change the collected list to string and also to check if Tested column is Y or N
def populatedUdfFunc(list):
return ", ".join(list)
populateUdf = f.udf(populatedUdfFunc, t.StringType())
#collecting models when Tested is Y using window function defined above
tempdf = df.groupBy("Dev_No").agg(populateUdf(f.collect_list(f.when(f.col("Tested") == "Y", f.col("model")).otherwise(None))).alias("Tested_devices"))
df.join(
tempdf,
(df["Dev_No"] == tempdf["Dev_No"]) & (df["Tested"] == f.lit("Y")), "left").show(truncate=False)
你会得到与上面相同的输出
我在 pyspark
中有一个数据框,如下所示
df.show()
+-------+--------------------+--------------------+
| Dev_No| model| Tested|
+-------+--------------------+--------------------+
|BTA16C5| Windows PC| N|
|BTA16C5| SRL| N|
|BTA16C5| Hewlett Packard| N|
|CTA16C5| Android Devices| Y|
|CTA16C5| Hewlett Packard| N|
|4MY16A5| Other| N|
|4MY16A5| Other| N|
|4MY16A5| Tablet| Y|
|4MY16A5| Other| N|
|4MY16A5| Cable STB| Y|
|4MY16A5| Other| N|
|4MY16A5| Windows PC| Y|
|4MY16A5| Windows PC| Y|
|4MY16A5| Smart Watch| Y|
+-------+--------------------+--------------------+
现在使用上面的数据框,我想用一个名为 Tested_devices
的 newcolumn
创建下面的数据框,并用每个 Dev_No
[=35= 的值填充列] model
其中 Tested
是 Y
并以逗号分隔填充所有值。
df1.show()
+-------+--------------------+--------------------+------------------------------------------------------+
| Dev_No| model| Tested| Tested_devices|
+-------+--------------------+--------------------+------------------------------------------------------+
|BTA16C5| Windows PC| N| |
|BTA16C5| SRL| N| |
|BTA16C5| Hewlett Packard| N| |
|CTA16C5| Android Devices| Y| Android Devices|
|CTA16C5| Hewlett Packard| N| |
|4MY16A5| Other| N| |
|4MY16A5| Other| N| |
|4MY16A5| Tablet| Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5| Other| N| |
|4MY16A5| Cable STB| Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5| Other| N| |
|4MY16A5| Windows PC| Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5| Windows PC| Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5| Smart Watch| Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
+-------+--------------------+--------------------+------------------------------------------------------+
我尝试了类似下面的方法 select Dev_No
和 model
其中 Tested
是 Y
a = df.select("Dev_No", "model"), when(df.Tested == 'Y')
我无法得到结果。它给了我以下错误
TypeError: when() takes exactly 2 arguments (1 given)
我怎样才能达到我想要的
更新
对于 spark 1.6,您将需要一种替代方法。在不使用 udf
或任何 Window
函数的情况下执行此操作的一种方法是使用收集的值创建第二个临时 DataFrame,然后将其连接回原始 DataFrame。
首先按 Dev_No
和 Tested
分组,然后使用 concat_ws
和 collect_list
进行聚合。聚合后,仅过滤测试设备的 DataFrame。
import pyspark.sql.functions as f
# create temporary DataFrame
df2 = df.groupBy('Dev_No', 'Tested')\
.agg(f.concat_ws(", ", f.collect_list('model')).alias('Tested_devices'))\
.where(f.col('Tested') == 'Y')
df2.show(truncate=False)
#+-------+------+------------------------------------------------------+
#|Dev_No |Tested|Tested_devices |
#+-------+------+------------------------------------------------------+
#|CTA16C5|Y |Android Devices |
#|4MY16A5|Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#+-------+------+------------------------------------------------------+
现在使用 Dev_No
和 Tested
列作为连接键,对 df
和 df2
进行左连接:
df.join(df2, on=['Dev_No', 'Tested'], how='left')\
.select('Dev_No', 'model', 'Tested', 'Tested_devices')\
.show(truncate=False)
在末尾使用 select
的目的是让列的顺序与原始 DataFrame 的顺序相同,以便显示 - 如果您愿意,可以删除此步骤。
这将导致以下输出(与下面相同的输出(带有 concat_ws
):
#+-------+---------------+------+------------------------------------------------------+
#|Dev_No |model |Tested|Tested_devices |
#+-------+---------------+------+------------------------------------------------------+
#|4MY16A5|Other |N |null |
#|4MY16A5|Other |N |null |
#|4MY16A5|Other |N |null |
#|4MY16A5|Other |N |null |
#|CTA16C5|Hewlett Packard|N |null |
#|BTA16C5|Windows PC |N |null |
#|BTA16C5|SRL |N |null |
#|BTA16C5|Hewlett Packard|N |null |
#|CTA16C5|Android Devices|Y |Android Devices |
#|4MY16A5|Tablet |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Cable STB |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Smart Watch |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#+-------+---------------+------+------------------------------------------------------+
原答案:(适用于更高版本的 Spark)
您可以使用两个 pyspark.sql.functions.when()
statements- one of them within a call to pyspark.sql.functions.collect_list()
over a Window
, taking advantage of the fact that the default null
value
from pyspark.sql import Window
import pyspark.sql.functions as f
df.select(
"*",
f.when(
f.col("Tested") == "Y",
f.collect_list(
f.when(
f.col("Tested") == "Y",
f.col('model')
)
).over(Window.partitionBy("Dev_No"))
).alias("Tested_devices")
).show(truncate=False)
#+-------+---------------+------+--------------------------------------------------------+
#|Dev_No |model |Tested|Tested_devices |
#+-------+---------------+------+--------------------------------------------------------+
#|BTA16C5|Windows PC |N |null |
#|BTA16C5|SRL |N |null |
#|BTA16C5|Hewlett Packard|N |null |
#|4MY16A5|Other |N |null |
#|4MY16A5|Other |N |null |
#|4MY16A5|Tablet |Y |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Other |N |null |
#|4MY16A5|Cable STB |Y |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Other |N |null |
#|4MY16A5|Windows PC |Y |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Windows PC |Y |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Smart Watch |Y |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|CTA16C5|Android Devices|Y |[Android Devices] |
#|CTA16C5|Hewlett Packard|N |null |
#+-------+---------------+------+--------------------------------------------------------+
相反,如果您希望输出与您在问题中显示的完全一样 - 作为逗号分隔值的字符串而不是列表和空字符串而不是 null
- 您可以按如下方式稍微修改:
使用pyspark.sql.functions.concat_ws
将collect_list
的输出连接成一个字符串。我使用 ", "
作为分隔符。这相当于在 python 中执行 ", ".join(some_list)
。接下来,我们将 .otherwise(f.lit(""))
添加到外部 when()
调用的末尾,以指定如果条件为 False
.[=43,我们想要 return 文字空字符串=]
df.select(
"*",
f.when(
f.col("Tested") == "Y",
f.concat_ws(
", ",
f.collect_list(
f.when(
f.col("Tested") == "Y",
f.col('model')
)
).over(Window.partitionBy("Dev_No"))
)
).otherwise(f.lit("")).alias("Tested_devices")
).show(truncate=False)
#+-------+---------------+------+------------------------------------------------------+
#|Dev_No |model |Tested|Tested_devices |
#+-------+---------------+------+------------------------------------------------------+
#|BTA16C5|Windows PC |N | |
#|BTA16C5|SRL |N | |
#|BTA16C5|Hewlett Packard|N | |
#|4MY16A5|Other |N | |
#|4MY16A5|Other |N | |
#|4MY16A5|Tablet |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Other |N | |
#|4MY16A5|Cable STB |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Other |N | |
#|4MY16A5|Windows PC |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Smart Watch |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|CTA16C5|Android Devices|Y |Android Devices |
#|CTA16C5|Hewlett Packard|N | |
#+-------+---------------+------+------------------------------------------------------+
使用 pyspark-sql
语法,上面的第一个例子等同于:
df.registerTempTable("df")
query = """
SELECT *,
CASE
WHEN Tested = 'Y'
THEN COLLECT_LIST(
CASE
WHEN Tested = 'Y'
THEN model
END
) OVER (PARTITION BY Dev_No)
END AS Tested_devices
FROM df
"""
sqlCtx.sql(query).show(truncate=False)
为了清晰和解释而发表评论
pyspark > 1.6
#window function to group by Dev_No
from pyspark.sql import Window
windowSpec = Window.partitionBy("Dev_No")
from pyspark.sql import functions as f
from pyspark.sql import types as t
#udf function to change the collected list to string and also to check if Tested column is Y or N
@f.udf(t.StringType())
def populatedUdfFunc(tested, list):
if(tested == "Y"):
return ", ".join(list)
else:
return ""
#collecting models when Tested is Y using window function defined above
df.withColumn("Tested_devices", populatedUdfFunc(f.col("Tested"), f.collect_list(f.when(f.col("Tested") == "Y", f.col("model")).otherwise(None)).over(windowSpec))).show(truncate=False)
哪个应该给你
+-------+---------------+------+------------------------------------------------------+
|Dev_No |model |Tested|Tested_devices |
+-------+---------------+------+------------------------------------------------------+
|BTA16C5|Windows PC |N | |
|BTA16C5|SRL |N | |
|BTA16C5|Hewlett Packard|N | |
|4MY16A5|Other |N | |
|4MY16A5|Other |N | |
|4MY16A5|Tablet |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
|4MY16A5|Other |N | |
|4MY16A5|Cable STB |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
|4MY16A5|Other |N | |
|4MY16A5|Windows PC |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
|4MY16A5|Windows PC |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
|4MY16A5|Smart Watch |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
|CTA16C5|Android Devices|Y |Android Devices |
|CTA16C5|Hewlett Packard|N | |
+-------+---------------+------+------------------------------------------------------+
火花 = 1.6
对于 pyspark 1.6,collect_list
不能与 window
函数一起使用,并且 没有 collect_list 函数定义在 SqlContext 中。因此,您将不得不 不使用 window 函数并使用 HiveContext 而不是 SQLContext
from pyspark.sql import functions as f
from pyspark.sql import types as t
#udf function to change the collected list to string and also to check if Tested column is Y or N
def populatedUdfFunc(list):
return ", ".join(list)
populateUdf = f.udf(populatedUdfFunc, t.StringType())
#collecting models when Tested is Y using window function defined above
tempdf = df.groupBy("Dev_No").agg(populateUdf(f.collect_list(f.when(f.col("Tested") == "Y", f.col("model")).otherwise(None))).alias("Tested_devices"))
df.join(
tempdf,
(df["Dev_No"] == tempdf["Dev_No"]) & (df["Tested"] == f.lit("Y")), "left").show(truncate=False)
你会得到与上面相同的输出