如何顺序迭代 Pyspark Dataframe 中的行
how to sequentially iterate rows in Pyspark Dataframe
我有一个这样的 Spark DataFrame:
+-------+------+-----+---------------+
|Account|nature|value| time|
+-------+------+-----+---------------+
| a| 1| 50|10:05:37:293084|
| a| 1| 50|10:06:46:806510|
| a| 0| 50|11:19:42:951479|
| a| 1| 40|19:14:50:479055|
| a| 0| 50|16:56:17:251624|
| a| 1| 40|16:33:12:133861|
| a| 1| 20|17:33:01:385710|
| b| 0| 30|12:54:49:483725|
| b| 0| 40|19:23:25:845489|
| b| 1| 30|10:58:02:276576|
| b| 1| 40|12:18:27:161290|
| b| 0| 50|12:01:50:698592|
| b| 0| 50|08:45:53:894441|
| b| 0| 40|17:36:55:827330|
| b| 1| 50|17:18:41:728486|
+-------+------+-----+---------------+
我想将一行的 nature 列与具有相同 Account 和 value[= 的其他行进行比较43=],我应该期待,并添加名为 Repeated 的新列。如果性质发生变化,新列的两行都会得到 true,从 1 到 0,反之亦然.例如,上面的数据框应该是这样的:
+-------+------+-----+---------------+--------+
|Account|nature|value| time|Repeated|
+-------+------+-----+---------------+--------+
| a| 1| 50|10:05:37:293084| true |
| a| 1| 50|10:06:46:806510| true|
| a| 0| 50|11:19:42:951479| true |
| a| 0| 50|16:56:17:251624| true |
| b| 0| 50|08:45:53:894441| true |
| b| 0| 50|12:01:50:698592| false|
| b| 1| 50|17:18:41:728486| true |
| a| 1| 40|16:33:12:133861| false|
| a| 1| 40|19:14:50:479055| false|
| b| 1| 40|12:18:27:161290| true|
| b| 0| 40|17:36:55:827330| true |
| b| 0| 40|19:23:25:845489| false|
| b| 1| 30|10:58:02:276576| true|
| b| 0| 30|12:54:49:483725| true |
| a| 1| 20|17:33:01:385710| false|
+-------+------+-----+---------------+--------+
我的解决方案是,我必须对 Account 和 value 进行 group by 或 window 列;然后在每组中,将每一行的 nature 与其他行的 nature 进行比较,作为比较的结果,Repeated 列已满。
我用 Spark Window 函数做了这个计算。像这样:
windowSpec = Window.partitionBy("Account","value").orderBy("time")
df.withColumn("Repeated", coalesce(f.when(lead(df['nature']).over(windowSpec)!=df['nature'],lit(True)).otherwise(False))).show()
结果是这样的,不是我想要的结果:
+-------+------+-----+---------------+--------+
|Account|nature|value| time|Repeated|
+-------+------+-----+---------------+--------+
| a| 1| 50|10:05:37:293084| false|
| a| 1| 50|10:06:46:806510| true|
| a| 0| 50|11:19:42:951479| false|
| a| 0| 50|16:56:17:251624| false|
| b| 0| 50|08:45:53:894441| false|
| b| 0| 50|12:01:50:698592| true|
| b| 1| 50|17:18:41:728486| false|
| a| 1| 40|16:33:12:133861| false|
| a| 1| 40|19:14:50:479055| false|
| b| 1| 40|12:18:27:161290| true|
| b| 0| 40|17:36:55:827330| false|
| b| 0| 40|19:23:25:845489| false|
| b| 1| 30|10:58:02:276576| true|
| b| 0| 30|12:54:49:483725| false|
| a| 1| 20|17:33:01:385710| false|
+-------+------+-----+---------------+--------+
更新:
进一步解释,如果我们假设第一个Spark Dataframe被命名为“df”,在下面,我在“Account”和“value”的每一组中写下到底想做什么:
a = df.withColumn('repeated',lit(False))
for i in range(len(group)):
j = i+1
for j in j<=len(group):
if a.loc[i,'nature']!=a.loc[j,'nature'] and a.loc[j,'repeated']==False:
a.loc[i,'repeated'] = True
a.loc[j,'repeated'] = True
能否指导我如何使用 Pyspark Window?
非常感谢任何帮助。
您实际上需要保证您在数据框中看到的顺序是实际顺序。你能做到吗?您需要一个列来对发生的事情按顺序进行排序。将新数据插入数据框并不能保证其顺序。
A window & Lag 将允许您查看前几行的值并进行所需的调整。
仅供参考:我在这里使用 coalesce 就好像它是第一行,没有任何价值可以与之比较。考虑使用第二个参数来合并您认为适合帐户中第一个值的情况。)
如果您需要它,请查看 monotonically increasing function。它可以帮助您创建我们确定性地查看此数据所需的按值排序。
from pyspark.sql.functions import lag
from pyspark.sql.functions import lit
from pyspark.sql.functions import coalesce
from pyspark.sql.window import Window
spark.sql("create table nature (Account string,nature int, value int, order int)");
spark.sql("insert into nature values ('a', 1, 50,1), ('a', 1, 40,2),('a',0,50,3),('b',0,30,4),('b',0,40,5),('b',1,30,6),('b',1,40,7)")
windowSpec = Window.partitionBy("Account").orderBy("order")
nature = spark.table("nature");
nature.withColumn("Repeated", coalesce( lead(nature['nature']).over(windowSpec) != nature['nature'], lit(True)) ).show()
|Account|nature|value|order|Repeated|
+-------+------+-----+-----+--------+
| b| 0| 30| 4| false|
| b| 0| 40| 5| true|
| b| 1| 30| 6| false|
| b| 1| 40| 7| true|
| a| 1| 50| 1| false|
| a| 1| 40| 2| true|
| a| 0| 50| 3| true|
+-------+------+-----+-----+--------+
编辑:
从你的描述中不清楚我应该向前看还是向后看。我已经更改了我的代码以期待一行,因为这与您输出中的帐户 'B' 一致。但是,在您的示例输出中,Account 'A' 的逻辑似乎与 'B' 的逻辑不同。 (或者我不明白从 '1' 开始而不是从 '0' 开始的微妙之处。)如果你想向前看一行,请使用 lead
,如果你想向后看一行,请使用 lag
.
问题已解决。
虽然这种方式成本很高,但是也还可以。
def check(part):
df = part
size = len(df)
for i in range(size):
if (df.loc[i,'repeated'] == True):
continue
else:
for j in range((i+1),size):
if (df.loc[i,'nature']!=df.loc[j,'nature']) & (df.loc[j,'repeated']==False):
df.loc[j,'repeated'] = True
df.loc[i,'repeated'] = True
break
return df
df.groupby("Account","value").applyInPandas(check, schema="Account string, nature int,value long,time string,repeated boolean").show()
更新 1:
另一个没有任何迭代的解决方案。
def check(df):
df = df.sort_values('verified_time')
df['index'] = df.index
df['IS_REPEATED'] = 0
df1 = df.sort_values(['nature'],ascending=[True]).reset_index(drop=True)
df2 = df.sort_values(['nature'],ascending=[False]).reset_index(drop=True)
df1['IS_REPEATED']=df1['nature']^df2['nature']
df3 = df1.sort_values(['index'],ascending=[True])
df = df3.drop(['index'],axis=1)
return df
df = df.groupby("account_id", "amount").applyInPandas(gf.check2,schema=gf.get_schema('trx'))
我有一个这样的 Spark DataFrame:
+-------+------+-----+---------------+
|Account|nature|value| time|
+-------+------+-----+---------------+
| a| 1| 50|10:05:37:293084|
| a| 1| 50|10:06:46:806510|
| a| 0| 50|11:19:42:951479|
| a| 1| 40|19:14:50:479055|
| a| 0| 50|16:56:17:251624|
| a| 1| 40|16:33:12:133861|
| a| 1| 20|17:33:01:385710|
| b| 0| 30|12:54:49:483725|
| b| 0| 40|19:23:25:845489|
| b| 1| 30|10:58:02:276576|
| b| 1| 40|12:18:27:161290|
| b| 0| 50|12:01:50:698592|
| b| 0| 50|08:45:53:894441|
| b| 0| 40|17:36:55:827330|
| b| 1| 50|17:18:41:728486|
+-------+------+-----+---------------+
我想将一行的 nature 列与具有相同 Account 和 value[= 的其他行进行比较43=],我应该期待,并添加名为 Repeated 的新列。如果性质发生变化,新列的两行都会得到 true,从 1 到 0,反之亦然.例如,上面的数据框应该是这样的:
+-------+------+-----+---------------+--------+
|Account|nature|value| time|Repeated|
+-------+------+-----+---------------+--------+
| a| 1| 50|10:05:37:293084| true |
| a| 1| 50|10:06:46:806510| true|
| a| 0| 50|11:19:42:951479| true |
| a| 0| 50|16:56:17:251624| true |
| b| 0| 50|08:45:53:894441| true |
| b| 0| 50|12:01:50:698592| false|
| b| 1| 50|17:18:41:728486| true |
| a| 1| 40|16:33:12:133861| false|
| a| 1| 40|19:14:50:479055| false|
| b| 1| 40|12:18:27:161290| true|
| b| 0| 40|17:36:55:827330| true |
| b| 0| 40|19:23:25:845489| false|
| b| 1| 30|10:58:02:276576| true|
| b| 0| 30|12:54:49:483725| true |
| a| 1| 20|17:33:01:385710| false|
+-------+------+-----+---------------+--------+
我的解决方案是,我必须对 Account 和 value 进行 group by 或 window 列;然后在每组中,将每一行的 nature 与其他行的 nature 进行比较,作为比较的结果,Repeated 列已满。 我用 Spark Window 函数做了这个计算。像这样:
windowSpec = Window.partitionBy("Account","value").orderBy("time")
df.withColumn("Repeated", coalesce(f.when(lead(df['nature']).over(windowSpec)!=df['nature'],lit(True)).otherwise(False))).show()
结果是这样的,不是我想要的结果:
+-------+------+-----+---------------+--------+
|Account|nature|value| time|Repeated|
+-------+------+-----+---------------+--------+
| a| 1| 50|10:05:37:293084| false|
| a| 1| 50|10:06:46:806510| true|
| a| 0| 50|11:19:42:951479| false|
| a| 0| 50|16:56:17:251624| false|
| b| 0| 50|08:45:53:894441| false|
| b| 0| 50|12:01:50:698592| true|
| b| 1| 50|17:18:41:728486| false|
| a| 1| 40|16:33:12:133861| false|
| a| 1| 40|19:14:50:479055| false|
| b| 1| 40|12:18:27:161290| true|
| b| 0| 40|17:36:55:827330| false|
| b| 0| 40|19:23:25:845489| false|
| b| 1| 30|10:58:02:276576| true|
| b| 0| 30|12:54:49:483725| false|
| a| 1| 20|17:33:01:385710| false|
+-------+------+-----+---------------+--------+
更新: 进一步解释,如果我们假设第一个Spark Dataframe被命名为“df”,在下面,我在“Account”和“value”的每一组中写下到底想做什么:
a = df.withColumn('repeated',lit(False))
for i in range(len(group)):
j = i+1
for j in j<=len(group):
if a.loc[i,'nature']!=a.loc[j,'nature'] and a.loc[j,'repeated']==False:
a.loc[i,'repeated'] = True
a.loc[j,'repeated'] = True
能否指导我如何使用 Pyspark Window?
非常感谢任何帮助。
您实际上需要保证您在数据框中看到的顺序是实际顺序。你能做到吗?您需要一个列来对发生的事情按顺序进行排序。将新数据插入数据框并不能保证其顺序。
A window & Lag 将允许您查看前几行的值并进行所需的调整。
仅供参考:我在这里使用 coalesce 就好像它是第一行,没有任何价值可以与之比较。考虑使用第二个参数来合并您认为适合帐户中第一个值的情况。)
如果您需要它,请查看 monotonically increasing function。它可以帮助您创建我们确定性地查看此数据所需的按值排序。
from pyspark.sql.functions import lag
from pyspark.sql.functions import lit
from pyspark.sql.functions import coalesce
from pyspark.sql.window import Window
spark.sql("create table nature (Account string,nature int, value int, order int)");
spark.sql("insert into nature values ('a', 1, 50,1), ('a', 1, 40,2),('a',0,50,3),('b',0,30,4),('b',0,40,5),('b',1,30,6),('b',1,40,7)")
windowSpec = Window.partitionBy("Account").orderBy("order")
nature = spark.table("nature");
nature.withColumn("Repeated", coalesce( lead(nature['nature']).over(windowSpec) != nature['nature'], lit(True)) ).show()
|Account|nature|value|order|Repeated|
+-------+------+-----+-----+--------+
| b| 0| 30| 4| false|
| b| 0| 40| 5| true|
| b| 1| 30| 6| false|
| b| 1| 40| 7| true|
| a| 1| 50| 1| false|
| a| 1| 40| 2| true|
| a| 0| 50| 3| true|
+-------+------+-----+-----+--------+
编辑:
从你的描述中不清楚我应该向前看还是向后看。我已经更改了我的代码以期待一行,因为这与您输出中的帐户 'B' 一致。但是,在您的示例输出中,Account 'A' 的逻辑似乎与 'B' 的逻辑不同。 (或者我不明白从 '1' 开始而不是从 '0' 开始的微妙之处。)如果你想向前看一行,请使用 lead
,如果你想向后看一行,请使用 lag
.
问题已解决。 虽然这种方式成本很高,但是也还可以。
def check(part):
df = part
size = len(df)
for i in range(size):
if (df.loc[i,'repeated'] == True):
continue
else:
for j in range((i+1),size):
if (df.loc[i,'nature']!=df.loc[j,'nature']) & (df.loc[j,'repeated']==False):
df.loc[j,'repeated'] = True
df.loc[i,'repeated'] = True
break
return df
df.groupby("Account","value").applyInPandas(check, schema="Account string, nature int,value long,time string,repeated boolean").show()
更新 1: 另一个没有任何迭代的解决方案。
def check(df):
df = df.sort_values('verified_time')
df['index'] = df.index
df['IS_REPEATED'] = 0
df1 = df.sort_values(['nature'],ascending=[True]).reset_index(drop=True)
df2 = df.sort_values(['nature'],ascending=[False]).reset_index(drop=True)
df1['IS_REPEATED']=df1['nature']^df2['nature']
df3 = df1.sort_values(['index'],ascending=[True])
df = df3.drop(['index'],axis=1)
return df
df = df.groupby("account_id", "amount").applyInPandas(gf.check2,schema=gf.get_schema('trx'))