将列传递给 pyspark 中的 when 函数
Passing column to when function in pyspark
我有两个 pyspark 数据帧:
第一个数据框:plants
+-----+--------+
|plant|station |
+-----+--------+
|Kech | st1 |
|Casa | st2 |
+-----+--------+
第二个数据帧:站
+-------+--------+
|program|station |
+-------+--------+
|pr1 | null|
|pr2 | st1 |
+-------+--------+
我想要的是用第一个数据帧中的所有列站替换第二个数据帧站中的空值。像这样:
+-------+--------------+
|program|station |
+-------+--------------+
|pr1 | [st1, st2]|
|pr2 | st1 |
+-------+--------------+
我这样做了:
stList = plants.select(F.col('station')).rdd.map(lambda x: x[0]).collect()
stations = stations.select(
F.col('program')
F.when(stations.station.isNull(), stList).otherwise(stations.station).alias('station')
)
但是当不接受 python 列表作为参数时它给我一个错误
快速解决方法是
F.lit(str(stList))
这应该工作。
为了更好地进行类型转换,请使用下面提到的代码。
stations = stations.select(
F.col('program'),
F.when(stations.station.isNull(), func.array([func.lit(x) for x in stList]))
.otherwise(func.array(stations.station)).alias('station')
)
感谢您的回复。
我通过将列转换为 pandas 找到了解决方案。
stList = list(plants.select(F.col('station')).toPandas()['station'])
然后使用:
F.when(stations.station.isNull(), F.array([F.lit(x) for x in station])).otherwise(stations['station']).alias('station')
直接给出数组
首先,station 列中不能保留不同的数据类型,需要保持一致。
+-------+--------------+
|program|station |
+-------+--------------+
|pr1 | [st1, st2]| # this is array
|pr2 | st1 | # this is string
+-------+--------------+
其次,这应该可以解决问题:
from pyspark.sql import functions as F
# Create the stList as a string.
stList = ",".join(plants.select(F.col('station')).rdd.map(lambda x: x[0]).collect())
# coalesce the variables and then apply pyspark.sql.functions.split function
stations = (stations.select(
F.col('program'),
F.split(F.coalesce(stations.station, F.lit(stList)), ",").alias('station')))
stations.show()
输出:
+-------+----------+
|program| station|
+-------+----------+
| pr1|[st1, st2]|
| pr2| [st1]|
+-------+----------+
我有两个 pyspark 数据帧:
第一个数据框:plants
+-----+--------+
|plant|station |
+-----+--------+
|Kech | st1 |
|Casa | st2 |
+-----+--------+
第二个数据帧:站
+-------+--------+
|program|station |
+-------+--------+
|pr1 | null|
|pr2 | st1 |
+-------+--------+
我想要的是用第一个数据帧中的所有列站替换第二个数据帧站中的空值。像这样:
+-------+--------------+
|program|station |
+-------+--------------+
|pr1 | [st1, st2]|
|pr2 | st1 |
+-------+--------------+
我这样做了:
stList = plants.select(F.col('station')).rdd.map(lambda x: x[0]).collect()
stations = stations.select(
F.col('program')
F.when(stations.station.isNull(), stList).otherwise(stations.station).alias('station')
)
但是当不接受 python 列表作为参数时它给我一个错误
快速解决方法是
F.lit(str(stList))
这应该工作。
为了更好地进行类型转换,请使用下面提到的代码。
stations = stations.select(
F.col('program'),
F.when(stations.station.isNull(), func.array([func.lit(x) for x in stList]))
.otherwise(func.array(stations.station)).alias('station')
)
感谢您的回复。
我通过将列转换为 pandas 找到了解决方案。
stList = list(plants.select(F.col('station')).toPandas()['station'])
然后使用:
F.when(stations.station.isNull(), F.array([F.lit(x) for x in station])).otherwise(stations['station']).alias('station')
直接给出数组
首先,station 列中不能保留不同的数据类型,需要保持一致。
+-------+--------------+
|program|station |
+-------+--------------+
|pr1 | [st1, st2]| # this is array
|pr2 | st1 | # this is string
+-------+--------------+
其次,这应该可以解决问题:
from pyspark.sql import functions as F
# Create the stList as a string.
stList = ",".join(plants.select(F.col('station')).rdd.map(lambda x: x[0]).collect())
# coalesce the variables and then apply pyspark.sql.functions.split function
stations = (stations.select(
F.col('program'),
F.split(F.coalesce(stations.station, F.lit(stList)), ",").alias('station')))
stations.show()
输出:
+-------+----------+
|program| station|
+-------+----------+
| pr1|[st1, st2]|
| pr2| [st1]|
+-------+----------+