迭代 pyspark 数据框的行,但将每一行保留为数据框

Iterating over rows of pyspark dataframe but keep each row as a dataframe

我是 Pyspark 的新手。在这里,我想遍历 DataFrame 的每一行。我希望每一行再次作为数据框。

示例:

首先这是我的数据框

+----------+---------------------+-------+
|account_id|transaction_timestamp|balance|
+----------+---------------------+-------+
|      8881|        1582047353000| 500.00|
|        45|        1582198671000| 500.00|
|         3|        1582047353000| 500.00|
+----------+---------------------+-------+

现在我想要输出如下:

+----------+---------------------+-------+
|account_id|transaction_timestamp|balance|
+----------+---------------------+-------+
|      8881|        1582047353000| 500.00|
+----------+---------------------+-------+

+----------+---------------------+-------+
|account_id|transaction_timestamp|balance|
+----------+---------------------+-------+
|        45|        1582198671000| 500.00|
+----------+---------------------+-------+

+----------+---------------------+-------+
|account_id|transaction_timestamp|balance|
+----------+---------------------+-------+
|         3|        1582047353000| 500.00|
+----------+---------------------+-------+

我们可以将 row_number 添加到每一行,然后 遍历每一行 我们可以根据 row_number 值分离数据帧。

Example:

df.show()
#+----------+---------------------+-------+
#|account_id|transaction_timestamp|balance|
#+----------+---------------------+-------+
#|      8881|        1582047353000| 500.00|
#|        45|        1582047353000| 500.00|
#|         3|        1582047353000| 500.00|
#+----------+---------------------+-------+
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window

df1=df.withColumn("rn",row_number().over(w)).show()

dict_df = {}

#get max row_number value
max_val=df1.select(max("rn")).collect()[0][0]

#loop until max value and filter based on row_number and add into dict_df dictionary.
for x in range(max_val):
    dict_df[x+1] = df1.filter(col('rn')==x+1).drop(col('rn'))

dict_df
#{1: DataFrame[account_id: string, transaction_timestamp: string, balance: string], 2: DataFrame[account_id: string, transaction_timestamp: string, balance: string], 3: DataFrame[account_id: string, transaction_timestamp: string, balance: string]}

#accessing dict_df

dict_df[1].show()
#+----------+---------------------+-------+
#|account_id|transaction_timestamp|balance|
#+----------+---------------------+-------+
#|      8881|        1582047353000| 500.00|
#+----------+---------------------+-------+

dict_df[2].show()
#+----------+---------------------+-------+
#|account_id|transaction_timestamp|balance|
#+----------+---------------------+-------+
#|        45|        1582047353000| 500.00|
#+----------+---------------------+-------+

dict_df[3].show()
#+----------+---------------------+-------+
#|account_id|transaction_timestamp|balance|
#+----------+---------------------+-------+
#|         3|        1582047353000| 500.00|
#+----------+---------------------+-------+

#using globals() to define variable(not recommended)
for x in range(max_val):
    globals()['df_{}'.format(x+1)] = df1.filter(col('rn')==x+1).drop(col('rn'))

df_1.show()
#+----------+---------------------+-------+
#|account_id|transaction_timestamp|balance|
#+----------+---------------------+-------+
#|      8881|        1582047353000| 500.00|
#+----------+---------------------+-------+