pyspark:来自 rdd 的数据框包含列表列表
pyspark: dataframe from rdd containing list of lists
我是 Spark 的新手(使用 Python),即使查看了相关帖子也无法弄清楚。
我有一个 RDD。 RDD的每条记录都是如下列表的列表
[[1073914607, 0, -1],[1073914607, 2, 7.88],[1073914607, 0, -1],[1073914607, 4, 40.0]]
[[1074079003, 0, -1],[1074079003, 2, 2.87],[1074079003, 0, -1],[1074079003, 4, 35.2]]
我想将 RDD 转换为 3 列的数据框,基本上堆叠所有元素列表。数据框应如下所示。
account_id product_id price
1073914607 0 -1
1073914607 2 7.88
1073914607 0 -1
1073914607 4 40
1074079003 0 -1
1074079003 2 2.87
1074079003 0 -1
1074079003 4 35.2
我试过my_rdd.toDF()
,但它给了我两行四列,每个元素列表在一列中。我还尝试了其他帖子中建议的一些可能相关的解决方案。由于我是 spark 的新手,所以我遇到了各种我可以弄清楚的错误。请帮忙。谢谢
添加于 2021 年 7 月 28 日。最后我做了以下循环遍历每个元素并生成一个长列表并将其转换为数据框。可能这不是最有效的方法,但它解决了我的问题。
result_lst=[]
for x in my_rdd.toLocalIterator():
for y in x:
result_lst.append(y)
result_df=spark.createDataFrame(result_lst, ['account_id','product_id','price'])
>>> data = ([[1,2],[1,4]],[[2,5],[2,6]])
>>> df = sc.parallelize(data).toDF(['c1','c2'])
>>> df.show()
+------+------+
| c1| c2|
+------+------+
|[1, 2]|[1, 4]|
|[2, 5]|[2, 6]|
+------+------+
>>> df1 = df.select(df.c1.alias('c3')).union(df.select(df.c2).alias('c3'))
>>> df1.show()
+------+
| c3|
+------+
|[1, 2]|
|[2, 5]|
|[1, 4]|
|[2, 6]|
+------+
>>> df1.select(df1.c3,df1.c3[0],df1.c3[1]).show()
+------+-----+-----+
| c3|c3[0]|c3[1]|
+------+-----+-----+
|[1, 2]| 1| 2|
|[2, 5]| 2| 5|
|[1, 4]| 1| 4|
|[2, 6]| 2| 6|
+------+-----+-----+
后来我用下面的另一种方式解决了这个问题,没有将rdd带入Localiterator()并循环遍历。我想这种新方法更有效。
from pyspark.sql.functions import explode
from pyspark.sql import Row
df_exploded=my_rdd.map(lambda x : Row(x)).toDF().withColumn('_1', explode('_1'))
result_df=df_exploded.select([df_exploded._1[i] for i in range(3)]).toDF('account_id','product_id','price')
我是 Spark 的新手(使用 Python),即使查看了相关帖子也无法弄清楚。
我有一个 RDD。 RDD的每条记录都是如下列表的列表
[[1073914607, 0, -1],[1073914607, 2, 7.88],[1073914607, 0, -1],[1073914607, 4, 40.0]]
[[1074079003, 0, -1],[1074079003, 2, 2.87],[1074079003, 0, -1],[1074079003, 4, 35.2]]
我想将 RDD 转换为 3 列的数据框,基本上堆叠所有元素列表。数据框应如下所示。
account_id product_id price
1073914607 0 -1
1073914607 2 7.88
1073914607 0 -1
1073914607 4 40
1074079003 0 -1
1074079003 2 2.87
1074079003 0 -1
1074079003 4 35.2
我试过my_rdd.toDF()
,但它给了我两行四列,每个元素列表在一列中。我还尝试了其他帖子中建议的一些可能相关的解决方案。由于我是 spark 的新手,所以我遇到了各种我可以弄清楚的错误。请帮忙。谢谢
添加于 2021 年 7 月 28 日。最后我做了以下循环遍历每个元素并生成一个长列表并将其转换为数据框。可能这不是最有效的方法,但它解决了我的问题。
result_lst=[]
for x in my_rdd.toLocalIterator():
for y in x:
result_lst.append(y)
result_df=spark.createDataFrame(result_lst, ['account_id','product_id','price'])
>>> data = ([[1,2],[1,4]],[[2,5],[2,6]])
>>> df = sc.parallelize(data).toDF(['c1','c2'])
>>> df.show()
+------+------+
| c1| c2|
+------+------+
|[1, 2]|[1, 4]|
|[2, 5]|[2, 6]|
+------+------+
>>> df1 = df.select(df.c1.alias('c3')).union(df.select(df.c2).alias('c3'))
>>> df1.show()
+------+
| c3|
+------+
|[1, 2]|
|[2, 5]|
|[1, 4]|
|[2, 6]|
+------+
>>> df1.select(df1.c3,df1.c3[0],df1.c3[1]).show()
+------+-----+-----+
| c3|c3[0]|c3[1]|
+------+-----+-----+
|[1, 2]| 1| 2|
|[2, 5]| 2| 5|
|[1, 4]| 1| 4|
|[2, 6]| 2| 6|
+------+-----+-----+
后来我用下面的另一种方式解决了这个问题,没有将rdd带入Localiterator()并循环遍历。我想这种新方法更有效。
from pyspark.sql.functions import explode
from pyspark.sql import Row
df_exploded=my_rdd.map(lambda x : Row(x)).toDF().withColumn('_1', explode('_1'))
result_df=df_exploded.select([df_exploded._1[i] for i in range(3)]).toDF('account_id','product_id','price')