PySpark:带有条件的布尔先前值
PySpark: boolean previous values with conditions
我的数据是这样的:
data = [("110125","James","2021-12-05","NY","PA",60000),("110125","James","2021-12-07","NY","PA",3000),("110125","James","2021-12-07","NY","AT",3000),
("5225","Michael","2021-12-25","LA","AT",60000),("5225","Michael","2021-12-17","LA","PA",15000),("5225","Michael","2021-12-17","LA","PA",65000)]
columns = ["id","Name","Date","Local","Office","salary"]
df = spark.createDataFrame(data = data, schema = columns)
输入:
+--------+--------+----------+-----+------+------+
| id |Name |Date |Local|Office|salary|
+--------+--------+----------+-----+------+------+
| 110125| James |2021-12-05|NY |PA | 60000|
| 110125| James |2021-12-07|NY |PA | 3000 |
| 110125| James |2021-12-07|NY |AT | 3000 |
| 5225 | Michael|2021-12-25|LA |AT | 60000|
| 5225 | Michael|2021-12-17|LA |PA | 15000|
| 5225 | Michael|2021-12-17|LA |PA | 65000|
+--------+--------+----------+-----+------+------+
我想要一个新列“Check
”,如果 4 个值中的一个 Date, Local; Offfice; Salary
与以前的值不同并且相同 id, name
那么 True。
输出:
+--------+--------+----------+-----+------+------+-----+
| id |Name |Date |Local|Office|salary|Check|
+--------+--------+----------+-----+------+------+-----+
| 110125| James |2021-12-05|NY |PA | 60000| |
| 110125| James |2021-12-07|NY |PA | 3000 | True|
| 110125| James |2021-12-07|NY |AT | 3000 | True|
| 5225 | Michael|2021-12-25|LA |AT | 60000| |
| 5225 | Michael|2021-12-17|LA |PA | 15000| True|
| 5225 | Michael|2021-12-17|LA |PA | 65000| True|
+--------+--------+----------+-----+------+------+-----+
我的 PySpark 代码:
df.groupby("ID", "Name").withColumn("Check", F.when((F.col('Local') == F.lag('Local')) |(F.col('Office') == F.lag('Office'))|
(F.col('Date') == F.lag('Date'))|(F.col('salary') == F.lag('salary')), False ).otherwise(True))
AttributeError: 'GroupedData' object has no attribute 'withColumn'
您想使用 window:
from pyspark.sql import Window, functions as F
w = Window.partitionBy("id", "name").orderBy("Date")
df = df.withColumn(
"Check",
~((F.col('Local') == F.lag('Local').over(w))
& (F.col('Office') == F.lag('Office').over(w))
& (F.col('Date') == F.lag('Date').over(w))
& (F.col('salary') == F.lag('salary').over(w))
)
)
df.show()
#+------+-------+----------+-----+------+------+-----+
#| id| Name| Date|Local|Office|salary|Check|
#+------+-------+----------+-----+------+------+-----+
#|110125| James|2021-12-05| NY| PA| 60000| null|
#|110125| James|2021-12-07| NY| PA| 3000| true|
#|110125| James|2021-12-07| NY| AT| 3000| true|
#| 5225|Michael|2021-12-17| LA| PA| 15000| null|
#| 5225|Michael|2021-12-17| LA| PA| 65000| true|
#| 5225|Michael|2021-12-25| LA| AT| 60000| true|
#+------+-------+----------+-----+------+------+-----+
我的数据是这样的:
data = [("110125","James","2021-12-05","NY","PA",60000),("110125","James","2021-12-07","NY","PA",3000),("110125","James","2021-12-07","NY","AT",3000),
("5225","Michael","2021-12-25","LA","AT",60000),("5225","Michael","2021-12-17","LA","PA",15000),("5225","Michael","2021-12-17","LA","PA",65000)]
columns = ["id","Name","Date","Local","Office","salary"]
df = spark.createDataFrame(data = data, schema = columns)
输入:
+--------+--------+----------+-----+------+------+
| id |Name |Date |Local|Office|salary|
+--------+--------+----------+-----+------+------+
| 110125| James |2021-12-05|NY |PA | 60000|
| 110125| James |2021-12-07|NY |PA | 3000 |
| 110125| James |2021-12-07|NY |AT | 3000 |
| 5225 | Michael|2021-12-25|LA |AT | 60000|
| 5225 | Michael|2021-12-17|LA |PA | 15000|
| 5225 | Michael|2021-12-17|LA |PA | 65000|
+--------+--------+----------+-----+------+------+
我想要一个新列“Check
”,如果 4 个值中的一个 Date, Local; Offfice; Salary
与以前的值不同并且相同 id, name
那么 True。
输出:
+--------+--------+----------+-----+------+------+-----+
| id |Name |Date |Local|Office|salary|Check|
+--------+--------+----------+-----+------+------+-----+
| 110125| James |2021-12-05|NY |PA | 60000| |
| 110125| James |2021-12-07|NY |PA | 3000 | True|
| 110125| James |2021-12-07|NY |AT | 3000 | True|
| 5225 | Michael|2021-12-25|LA |AT | 60000| |
| 5225 | Michael|2021-12-17|LA |PA | 15000| True|
| 5225 | Michael|2021-12-17|LA |PA | 65000| True|
+--------+--------+----------+-----+------+------+-----+
我的 PySpark 代码:
df.groupby("ID", "Name").withColumn("Check", F.when((F.col('Local') == F.lag('Local')) |(F.col('Office') == F.lag('Office'))|
(F.col('Date') == F.lag('Date'))|(F.col('salary') == F.lag('salary')), False ).otherwise(True))
AttributeError: 'GroupedData' object has no attribute 'withColumn'
您想使用 window:
from pyspark.sql import Window, functions as F
w = Window.partitionBy("id", "name").orderBy("Date")
df = df.withColumn(
"Check",
~((F.col('Local') == F.lag('Local').over(w))
& (F.col('Office') == F.lag('Office').over(w))
& (F.col('Date') == F.lag('Date').over(w))
& (F.col('salary') == F.lag('salary').over(w))
)
)
df.show()
#+------+-------+----------+-----+------+------+-----+
#| id| Name| Date|Local|Office|salary|Check|
#+------+-------+----------+-----+------+------+-----+
#|110125| James|2021-12-05| NY| PA| 60000| null|
#|110125| James|2021-12-07| NY| PA| 3000| true|
#|110125| James|2021-12-07| NY| AT| 3000| true|
#| 5225|Michael|2021-12-17| LA| PA| 15000| null|
#| 5225|Michael|2021-12-17| LA| PA| 65000| true|
#| 5225|Michael|2021-12-25| LA| AT| 60000| true|
#+------+-------+----------+-----+------+------+-----+