如何在一列pyspark中删除带有空值的行和列
How to drop rows and columns with nulls in one column pyspark
如何删除一列 pyspark 中包含空值的行
这是您的示例数据:
from pyspark.sql import functions as F, Window
data = [
('5972015', '2021-06-29'),
('5972015', '2021-08-28'),
('5972015', '2021-08-29'),
('5972015', '2020-12-08'),
('5752541', '2019-01-09'),
('5752541', '2020-04-24'),
('5972015', '2020-04-09'),
('5972015', '2020-04-10'),
('5972015', '2020-01-16'),
('11311226', '2020-06-17')
]
df = spark.createDataFrame(data, ['uid', 'date'])
df = df.withColumn('date', F.col('date').cast('date'))
如果要计算每个 uid 的最大连续天数:
w1 = Window.partitionBy('uid').orderBy(F.col('date').asc_nulls_last())
df = df.withColumn('g', F.when(F.datediff('date', F.lag('date', 1).over(w1)) == 1, 0).otherwise(1))
df = df.withColumn('grp', F.sum('g').over(w1))
w2 = Window.partitionBy('uid', 'grp')
df = df.withColumn('consecutive', F.count('*').over(w2))
df = df.groupBy('uid').agg(F.max('consecutive').alias('max_consecutive'))
df.show()
#+--------+---------------+
#| uid|max_consecutive|
#+--------+---------------+
#| 5972015| 2|
#| 5752541| 1|
#|11311226| 1|
#+--------+---------------+
如果您想查找从最小日期到最大日期的天数:
w1 = Window.partitionBy('uid').orderBy(F.col('date').desc_nulls_last())
w2 = Window.partitionBy('uid').orderBy(F.col('date').asc_nulls_last())
df = df.select(
'uid',
F.datediff(F.max('date').over(w1), F.min('date').over(w2)).alias('datediff')
).distinct()
df.show()
#+--------+--------+
#| uid|datediff|
#+--------+--------+
#| 5972015| 591|
#| 5752541| 471|
#|11311226| 0|
#+--------+--------+
如何删除一列 pyspark 中包含空值的行
这是您的示例数据:
from pyspark.sql import functions as F, Window
data = [
('5972015', '2021-06-29'),
('5972015', '2021-08-28'),
('5972015', '2021-08-29'),
('5972015', '2020-12-08'),
('5752541', '2019-01-09'),
('5752541', '2020-04-24'),
('5972015', '2020-04-09'),
('5972015', '2020-04-10'),
('5972015', '2020-01-16'),
('11311226', '2020-06-17')
]
df = spark.createDataFrame(data, ['uid', 'date'])
df = df.withColumn('date', F.col('date').cast('date'))
如果要计算每个 uid 的最大连续天数:
w1 = Window.partitionBy('uid').orderBy(F.col('date').asc_nulls_last())
df = df.withColumn('g', F.when(F.datediff('date', F.lag('date', 1).over(w1)) == 1, 0).otherwise(1))
df = df.withColumn('grp', F.sum('g').over(w1))
w2 = Window.partitionBy('uid', 'grp')
df = df.withColumn('consecutive', F.count('*').over(w2))
df = df.groupBy('uid').agg(F.max('consecutive').alias('max_consecutive'))
df.show()
#+--------+---------------+
#| uid|max_consecutive|
#+--------+---------------+
#| 5972015| 2|
#| 5752541| 1|
#|11311226| 1|
#+--------+---------------+
如果您想查找从最小日期到最大日期的天数:
w1 = Window.partitionBy('uid').orderBy(F.col('date').desc_nulls_last())
w2 = Window.partitionBy('uid').orderBy(F.col('date').asc_nulls_last())
df = df.select(
'uid',
F.datediff(F.max('date').over(w1), F.min('date').over(w2)).alias('datediff')
).distinct()
df.show()
#+--------+--------+
#| uid|datediff|
#+--------+--------+
#| 5972015| 591|
#| 5752541| 471|
#|11311226| 0|
#+--------+--------+