明智地创建一个新的列条件
Create a new column condition-wisely
我正在尝试弄清楚如何将我的 Pandas-utilising 函数转换为 PySpark。
我有一个这样的 Pandas DataFrame:
+---+----+
|num| val|
+---+----+
| 1| 0.0|
| 2| 0.0|
| 3|48.6|
| 4|49.0|
| 5|48.7|
| 6|49.1|
| 7|74.5|
| 8|48.7|
| 9| 0.0|
| 10|49.0|
| 11| 0.0|
| 12| 0.0|
+---+----+
下面代码片段中的代码非常简单。它继续前进,直到找到一个非零值。如果有 none 个,它会出于同样的目的倒退
def next_non_zero(data,i,column):
for j in range(i+1,len(data[column])):
res = data[column].iloc[j]
if res !=0:
return res
for j in range(i-1,0,-1):
res = data[column].iloc[j]
if res !=0:
return res
def fix_zero(data, column):
for i, row in data.iterrows():
if (row[column] == 0):
data.at[i,column] = next_non_zero(data,i,column)
因此我希望看到
+---+----+
|num| val|
+---+----+
| 1|48.6|
| 2|48.6|
| 3|48.6|
| 4|49.0|
| 5|48.7|
| 6|49.1|
| 7|74.5|
| 8|48.7|
| 9|49.0|
| 10|49.0|
| 11|49.0|
| 12|49.0|
+---+----+
所以我明白,在 PySpark 中,我必须创建一个具有所需结果的新列,并使用 withColumn() 替换现有列,例如。但是,我不明白如何正确地遍历 DataFrame。
我正在尝试在 Window:
上使用函数
my_window = Window.partitionBy().orderBy('num')
df = df.withColumn('new_val', F.when(df.val==0,F.lead(df.val).over(my_window)).
otherwise(F.lag(df.val).over(my_window))
显然,它没有提供我想要的结果,因为它只迭代了一次。
所以我试着写一些像
这样的 udf 递归
def fix_zero(param):
return F.when(F.lead(param).over(my_window)!=0,F.lead(param).over(my_window)).
otherwise(fix_zero(F.lead(param).over(my_window)))
spark_udf = udf(fix_zero, DoubleType())
df = df.withColumn('new_val', F.when(df.val!=0, df.val).otherwise(fix_zero('val')))
我得到了
RecursionError: maximum recursion depth exceeded in comparison
我怀疑这是因为我传递给递归的不是一行,而是 lead() 的结果
无论如何,此刻我完全陷入了这个障碍,非常感谢任何建议
Window 有一种方法可以遍历前面的所有行(或后面的所有行),直到达到非空值。
所以我的第一步是将所有 0 值替换为 null
正在重新创建您的数据框:
values = [
(1, 0.0),
(2,0.0),
(3,48.6),
(4,49.0),
(5,48.7),
(6,49.1),
(7, 74.5),
(8,48.7),
(9,0.0),
(10,49.0),
(11,0.0),
(12,0.0)
]
df = spark.createDataFrame(values, ['num','val'])
用 null 替换 0
from pyspark.sql.functions import when, lit, col
df= df.withColumn('val_null', when(col('val') != 0.0,col('val')))
然后定义 windows,它与 first 和 null 相结合,将允许我们获得行前的最后一个非空值和行后的第一个非空值
from pyspark.sql import Window
from pyspark.sql.functions import last,first,coalesce
windowForward = Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
ffilled_column = last(df['val_null'], ignorenulls=True).over(windowForward)
windowBackward = Window.rowsBetween(Window.currentRow,Window.unboundedFollowing)
bfilled_column = first(df['val_null'], ignorenulls=True).over(windowBackward)
# creating new columns in df
df =df.withColumn('ffill',ffilled_column).withColumn('bfill',bfilled_column)
# replace null with bfill if bfill is not null otherwise fill with ffill
df =df.withColumn('val_full',coalesce('bfill','ffill'))
使用此技术,我们在 'val_full'
列中得出您的预期输出
+---+----+--------+-----+-----+--------+
|num| val|val_null|ffill|bfill|val_full|
+---+----+--------+-----+-----+--------+
| 1| 0.0| null| null| 48.6| 48.6|
| 2| 0.0| null| null| 48.6| 48.6|
| 3|48.6| 48.6| 48.6| 48.6| 48.6|
| 4|49.0| 49.0| 49.0| 49.0| 49.0|
| 5|48.7| 48.7| 48.7| 48.7| 48.7|
| 6|49.1| 49.1| 49.1| 49.1| 49.1|
| 7|74.5| 74.5| 74.5| 74.5| 74.5|
| 8|48.7| 48.7| 48.7| 48.7| 48.7|
| 9| 0.0| null| 48.7| 49.0| 49.0|
| 10|49.0| 49.0| 49.0| 49.0| 49.0|
| 11| 0.0| null| 49.0| null| 49.0|
| 12| 0.0| null| 49.0| null| 49.0|
+---+----+--------+-----+-----+--------+
我正在尝试弄清楚如何将我的 Pandas-utilising 函数转换为 PySpark。
我有一个这样的 Pandas DataFrame:
+---+----+
|num| val|
+---+----+
| 1| 0.0|
| 2| 0.0|
| 3|48.6|
| 4|49.0|
| 5|48.7|
| 6|49.1|
| 7|74.5|
| 8|48.7|
| 9| 0.0|
| 10|49.0|
| 11| 0.0|
| 12| 0.0|
+---+----+
下面代码片段中的代码非常简单。它继续前进,直到找到一个非零值。如果有 none 个,它会出于同样的目的倒退
def next_non_zero(data,i,column):
for j in range(i+1,len(data[column])):
res = data[column].iloc[j]
if res !=0:
return res
for j in range(i-1,0,-1):
res = data[column].iloc[j]
if res !=0:
return res
def fix_zero(data, column):
for i, row in data.iterrows():
if (row[column] == 0):
data.at[i,column] = next_non_zero(data,i,column)
因此我希望看到
+---+----+
|num| val|
+---+----+
| 1|48.6|
| 2|48.6|
| 3|48.6|
| 4|49.0|
| 5|48.7|
| 6|49.1|
| 7|74.5|
| 8|48.7|
| 9|49.0|
| 10|49.0|
| 11|49.0|
| 12|49.0|
+---+----+
所以我明白,在 PySpark 中,我必须创建一个具有所需结果的新列,并使用 withColumn() 替换现有列,例如。但是,我不明白如何正确地遍历 DataFrame。
我正在尝试在 Window:
上使用函数my_window = Window.partitionBy().orderBy('num')
df = df.withColumn('new_val', F.when(df.val==0,F.lead(df.val).over(my_window)).
otherwise(F.lag(df.val).over(my_window))
显然,它没有提供我想要的结果,因为它只迭代了一次。 所以我试着写一些像
这样的 udf 递归def fix_zero(param):
return F.when(F.lead(param).over(my_window)!=0,F.lead(param).over(my_window)).
otherwise(fix_zero(F.lead(param).over(my_window)))
spark_udf = udf(fix_zero, DoubleType())
df = df.withColumn('new_val', F.when(df.val!=0, df.val).otherwise(fix_zero('val')))
我得到了
RecursionError: maximum recursion depth exceeded in comparison
我怀疑这是因为我传递给递归的不是一行,而是 lead() 的结果 无论如何,此刻我完全陷入了这个障碍,非常感谢任何建议
Window 有一种方法可以遍历前面的所有行(或后面的所有行),直到达到非空值。
所以我的第一步是将所有 0 值替换为 null
正在重新创建您的数据框:
values = [
(1, 0.0),
(2,0.0),
(3,48.6),
(4,49.0),
(5,48.7),
(6,49.1),
(7, 74.5),
(8,48.7),
(9,0.0),
(10,49.0),
(11,0.0),
(12,0.0)
]
df = spark.createDataFrame(values, ['num','val'])
用 null 替换 0
from pyspark.sql.functions import when, lit, col
df= df.withColumn('val_null', when(col('val') != 0.0,col('val')))
然后定义 windows,它与 first 和 null 相结合,将允许我们获得行前的最后一个非空值和行后的第一个非空值
from pyspark.sql import Window
from pyspark.sql.functions import last,first,coalesce
windowForward = Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
ffilled_column = last(df['val_null'], ignorenulls=True).over(windowForward)
windowBackward = Window.rowsBetween(Window.currentRow,Window.unboundedFollowing)
bfilled_column = first(df['val_null'], ignorenulls=True).over(windowBackward)
# creating new columns in df
df =df.withColumn('ffill',ffilled_column).withColumn('bfill',bfilled_column)
# replace null with bfill if bfill is not null otherwise fill with ffill
df =df.withColumn('val_full',coalesce('bfill','ffill'))
使用此技术,我们在 'val_full'
列中得出您的预期输出+---+----+--------+-----+-----+--------+
|num| val|val_null|ffill|bfill|val_full|
+---+----+--------+-----+-----+--------+
| 1| 0.0| null| null| 48.6| 48.6|
| 2| 0.0| null| null| 48.6| 48.6|
| 3|48.6| 48.6| 48.6| 48.6| 48.6|
| 4|49.0| 49.0| 49.0| 49.0| 49.0|
| 5|48.7| 48.7| 48.7| 48.7| 48.7|
| 6|49.1| 49.1| 49.1| 49.1| 49.1|
| 7|74.5| 74.5| 74.5| 74.5| 74.5|
| 8|48.7| 48.7| 48.7| 48.7| 48.7|
| 9| 0.0| null| 48.7| 49.0| 49.0|
| 10|49.0| 49.0| 49.0| 49.0| 49.0|
| 11| 0.0| null| 49.0| null| 49.0|
| 12| 0.0| null| 49.0| null| 49.0|
+---+----+--------+-----+-----+--------+