明智地创建一个新的列条件

Create a new column condition-wisely

我正在尝试弄清楚如何将我的 Pandas-utilising 函数转换为 PySpark。

我有一个这样的 Pandas DataFrame:

+---+----+
|num| val|
+---+----+
|  1| 0.0|
|  2| 0.0|
|  3|48.6|
|  4|49.0|
|  5|48.7|
|  6|49.1|
|  7|74.5|
|  8|48.7|
|  9| 0.0|
| 10|49.0|
| 11| 0.0|
| 12| 0.0|
+---+----+

下面代码片段中的代码非常简单。它继续前进,直到找到一个非零值。如果有 none 个,它会出于同样的目的倒退

def next_non_zero(data,i,column):
    for j in range(i+1,len(data[column])):
        res = data[column].iloc[j]
        if res !=0:
            return res
    for j in range(i-1,0,-1):
        res = data[column].iloc[j]
        if res !=0:
            return res

def fix_zero(data, column):
    for i, row in data.iterrows():
        if (row[column] == 0):
            data.at[i,column] = next_non_zero(data,i,column)

因此我希望看到

+---+----+
|num| val|
+---+----+
|  1|48.6|
|  2|48.6|
|  3|48.6|
|  4|49.0|
|  5|48.7|
|  6|49.1|
|  7|74.5|
|  8|48.7|
|  9|49.0|
| 10|49.0|
| 11|49.0|
| 12|49.0|
+---+----+

所以我明白,在 PySpark 中,我必须创建一个具有所需结果的新列,并使用 withColumn() 替换现有列,例如。但是,我不明白如何正确地遍历 DataFrame。

我正在尝试在 Window:

上使用函数
my_window = Window.partitionBy().orderBy('num')
df = df.withColumn('new_val', F.when(df.val==0,F.lead(df.val).over(my_window)).
                                 otherwise(F.lag(df.val).over(my_window))

显然,它没有提供我想要的结果,因为它只迭代了一次。 所以我试着写一些像

这样的 udf 递归
def fix_zero(param):

    return F.when(F.lead(param).over(my_window)!=0,F.lead(param).over(my_window)).
                   otherwise(fix_zero(F.lead(param).over(my_window)))

spark_udf = udf(fix_zero, DoubleType())
df = df.withColumn('new_val', F.when(df.val!=0, df.val).otherwise(fix_zero('val')))

我得到了

RecursionError: maximum recursion depth exceeded in comparison

我怀疑这是因为我传递给递归的不是一行,而是 lead() 的结果 无论如何,此刻我完全陷入了这个障碍,非常感谢任何建议

Window 有一种方法可以遍历前面的所有行(或后面的所有行),直到达到非空值。

所以我的第一步是将所有 0 值替换为 null

正在重新创建您的数据框:

values = [
    (1, 0.0),
    (2,0.0),
    (3,48.6),
    (4,49.0),
    (5,48.7),
    (6,49.1),
    (7, 74.5),
    (8,48.7),
    (9,0.0),
   (10,49.0),
   (11,0.0),
   (12,0.0)
]  

df = spark.createDataFrame(values, ['num','val'])

用 null 替换 0

from pyspark.sql.functions import when, lit, col
df= df.withColumn('val_null', when(col('val') != 0.0,col('val')))

然后定义 windows,它与 first 和 null 相结合,将允许我们获得行前的最后一个非空值和行后的第一个非空值

from pyspark.sql import Window
from pyspark.sql.functions import last,first,coalesce


windowForward = Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
ffilled_column = last(df['val_null'], ignorenulls=True).over(windowForward)

windowBackward = Window.rowsBetween(Window.currentRow,Window.unboundedFollowing)
bfilled_column = first(df['val_null'], ignorenulls=True).over(windowBackward)

# creating new columns in df
df =df.withColumn('ffill',ffilled_column).withColumn('bfill',bfilled_column)

# replace null with bfill if bfill is not null otherwise fill with ffill
df =df.withColumn('val_full',coalesce('bfill','ffill'))

使用此技术,我们在 'val_full'

列中得出您的预期输出
+---+----+--------+-----+-----+--------+
|num| val|val_null|ffill|bfill|val_full|
+---+----+--------+-----+-----+--------+
|  1| 0.0|    null| null| 48.6|    48.6|
|  2| 0.0|    null| null| 48.6|    48.6|
|  3|48.6|    48.6| 48.6| 48.6|    48.6|
|  4|49.0|    49.0| 49.0| 49.0|    49.0|
|  5|48.7|    48.7| 48.7| 48.7|    48.7|
|  6|49.1|    49.1| 49.1| 49.1|    49.1|
|  7|74.5|    74.5| 74.5| 74.5|    74.5|
|  8|48.7|    48.7| 48.7| 48.7|    48.7|
|  9| 0.0|    null| 48.7| 49.0|    49.0|
| 10|49.0|    49.0| 49.0| 49.0|    49.0|
| 11| 0.0|    null| 49.0| null|    49.0|
| 12| 0.0|    null| 49.0| null|    49.0|
+---+----+--------+-----+-----+--------+