如何顺序迭代 Pyspark Dataframe 中的行

how to sequentially iterate rows in Pyspark Dataframe

我有一个这样的 Spark DataFrame:

 +-------+------+-----+---------------+
 |Account|nature|value|           time|
 +-------+------+-----+---------------+
 |      a|     1|   50|10:05:37:293084|
 |      a|     1|   50|10:06:46:806510|
 |      a|     0|   50|11:19:42:951479|
 |      a|     1|   40|19:14:50:479055|
 |      a|     0|   50|16:56:17:251624|
 |      a|     1|   40|16:33:12:133861|
 |      a|     1|   20|17:33:01:385710|
 |      b|     0|   30|12:54:49:483725|
 |      b|     0|   40|19:23:25:845489|
 |      b|     1|   30|10:58:02:276576|
 |      b|     1|   40|12:18:27:161290|
 |      b|     0|   50|12:01:50:698592|
 |      b|     0|   50|08:45:53:894441|
 |      b|     0|   40|17:36:55:827330|
 |      b|     1|   50|17:18:41:728486|
 +-------+------+-----+---------------+

我想将一行的 nature 列与具有相同 Accountvalue[= 的其他行进行比较43=],我应该期待,并添加名为 Repeated 的新列。如果性质发生变化,新列的两行都会得到 true,从 10,反之亦然.例如,上面的数据框应该是这样的:

   +-------+------+-----+---------------+--------+
   |Account|nature|value|           time|Repeated|
   +-------+------+-----+---------------+--------+
   |      a|     1|   50|10:05:37:293084|   true |
   |      a|     1|   50|10:06:46:806510|    true|
   |      a|     0|   50|11:19:42:951479|   true |
   |      a|     0|   50|16:56:17:251624|   true |
   |      b|     0|   50|08:45:53:894441|   true |
   |      b|     0|   50|12:01:50:698592|   false|
   |      b|     1|   50|17:18:41:728486|   true |
   |      a|     1|   40|16:33:12:133861|   false|
   |      a|     1|   40|19:14:50:479055|   false|
   |      b|     1|   40|12:18:27:161290|    true|
   |      b|     0|   40|17:36:55:827330|   true |
   |      b|     0|   40|19:23:25:845489|   false|
   |      b|     1|   30|10:58:02:276576|    true|
   |      b|     0|   30|12:54:49:483725|   true |
   |      a|     1|   20|17:33:01:385710|   false|
   +-------+------+-----+---------------+--------+              
        

我的解决方案是,我必须对 Accountvalue 进行 group by 或 window 列;然后在每组中,将每一行的 nature 与其他行的 nature 进行比较,作为比较的结果,Repeated 列已满。 我用 Spark Window 函数做了这个计算。像这样:

windowSpec  = Window.partitionBy("Account","value").orderBy("time")

df.withColumn("Repeated", coalesce(f.when(lead(df['nature']).over(windowSpec)!=df['nature'],lit(True)).otherwise(False))).show()

结果是这样的,不是我想要的结果:

 +-------+------+-----+---------------+--------+
 |Account|nature|value|           time|Repeated|
 +-------+------+-----+---------------+--------+
 |      a|     1|   50|10:05:37:293084|   false|
 |      a|     1|   50|10:06:46:806510|    true|
 |      a|     0|   50|11:19:42:951479|   false|
 |      a|     0|   50|16:56:17:251624|   false|
 |      b|     0|   50|08:45:53:894441|   false|
 |      b|     0|   50|12:01:50:698592|    true|
 |      b|     1|   50|17:18:41:728486|   false|
 |      a|     1|   40|16:33:12:133861|   false|
 |      a|     1|   40|19:14:50:479055|   false|
 |      b|     1|   40|12:18:27:161290|    true|
 |      b|     0|   40|17:36:55:827330|   false|
 |      b|     0|   40|19:23:25:845489|   false|
 |      b|     1|   30|10:58:02:276576|    true|
 |      b|     0|   30|12:54:49:483725|   false|
 |      a|     1|   20|17:33:01:385710|   false|
 +-------+------+-----+---------------+--------+

更新: 进一步解释,如果我们假设第一个Spark Dataframe被命名为“df”,在下面,我在“Account”和“value”的每一组中写下到底想做什么:

a = df.withColumn('repeated',lit(False))
for i in range(len(group)):
    j = i+1
for j in j<=len(group):
    if a.loc[i,'nature']!=a.loc[j,'nature'] and  a.loc[j,'repeated']==False:
             a.loc[i,'repeated'] = True
             a.loc[j,'repeated'] = True

能否指导我如何使用 Pyspark Window?

非常感谢任何帮助。

您实际上需要保证您在数据框中看到的顺序是实际顺序。你能做到吗?您需要一个列来对发生的事情按顺序进行排序。将新数据插入数据框并不能保证其顺序。

A window & Lag 将允许您查看前几行的值并进行所需的调整。
仅供参考:我在这里使用 coalesce 就好像它是第一行,没有任何价值可以与之比较。考虑使用第二个参数来合并您认为适合帐户中第一个值的情况。)

如果您需要它,请查看 monotonically increasing function。它可以帮助您创建我们确定性地查看此数据所需的按值排序。

from pyspark.sql.functions import lag 
from pyspark.sql.functions import lit 
from pyspark.sql.functions import coalesce
from pyspark.sql.window import Window

spark.sql("create table nature (Account string,nature int, value int, order int)"); 
spark.sql("insert into nature values ('a', 1, 50,1), ('a', 1, 40,2),('a',0,50,3),('b',0,30,4),('b',0,40,5),('b',1,30,6),('b',1,40,7)")
windowSpec  = Window.partitionBy("Account").orderBy("order")
nature = spark.table("nature");
nature.withColumn("Repeated", coalesce( lead(nature['nature']).over(windowSpec) != nature['nature'], lit(True)) ).show()
|Account|nature|value|order|Repeated|
+-------+------+-----+-----+--------+
|      b|     0|   30|    4|   false|
|      b|     0|   40|    5|    true|
|      b|     1|   30|    6|   false|
|      b|     1|   40|    7|    true|
|      a|     1|   50|    1|   false|
|      a|     1|   40|    2|    true|
|      a|     0|   50|    3|    true|
+-------+------+-----+-----+--------+

编辑: 从你的描述中不清楚我应该向前看还是向后看。我已经更改了我的代码以期待一行,因为这与您输出中的帐户 'B' 一致。但是,在您的示例输出中,Account 'A' 的逻辑似乎与 'B' 的逻辑不同。 (或者我不明白从 '1' 开始而不是从 '0' 开始的微妙之处。)如果你想向前看一行,请使用 lead,如果你想向后看一行,请使用 lag.

问题已解决。 虽然这种方式成本很高,但是也还可以。

  def check(part):
    df = part
    size = len(df)
    for i in range(size):
      if (df.loc[i,'repeated'] == True):
          continue
      else:
          for j in range((i+1),size):
            if (df.loc[i,'nature']!=df.loc[j,'nature']) & (df.loc[j,'repeated']==False):
                df.loc[j,'repeated'] = True
                df.loc[i,'repeated'] = True
                break
  return df

df.groupby("Account","value").applyInPandas(check, schema="Account string, nature int,value long,time string,repeated boolean").show()

更新 1: 另一个没有任何迭代的解决方案。

def check(df):
   df = df.sort_values('verified_time')
   df['index'] = df.index
   df['IS_REPEATED'] = 0
   df1 = df.sort_values(['nature'],ascending=[True]).reset_index(drop=True)
   df2 = df.sort_values(['nature'],ascending=[False]).reset_index(drop=True)
   df1['IS_REPEATED']=df1['nature']^df2['nature']
   df3 = df1.sort_values(['index'],ascending=[True])
   df = df3.drop(['index'],axis=1)
   return df

df = df.groupby("account_id", "amount").applyInPandas(gf.check2,schema=gf.get_schema('trx'))