将 PySpark 密集列向量转换为行
Convert PySpark Dense Column Vectors into Rows
我有一个包含 3 列的数据框,每个条目都是一个相同长度的密集向量。
如何融化 Vector 条目?
当前数据帧:
第 1 列 |第 2 列 |
[1.0,2.0,3.0]|[10.0,4.0,3.0]
[5.0,4.0,3.0]|[11.0,26.0,3.0]
[9.0,8.0,7.0]|[13.0,7.0,3.0]
预计:
第 1 列|第 2 列
1.0。 10.0
2.0。 4.0
3.0。 3.0
5.0。 11.0
4.0。 26.0
3.0。 3.0
9.0。 13.0
...
第 1 步: 让我们创建初始 DataFrame:
myValues = [([1.0,2.0,3.0],[10.0,4.0,3.0]),([5.0,4.0,3.0],[11.0,26.0,3.0]),([9.0,8.0,7.0],[13.0,7.0,3.0])]
df = sqlContext.createDataFrame(myValues,['column1','column2'])
df.show()
+---------------+-----------------+
| column1| column2|
+---------------+-----------------+
|[1.0, 2.0, 3.0]| [10.0, 4.0, 3.0]|
|[5.0, 4.0, 3.0]|[11.0, 26.0, 3.0]|
|[9.0, 8.0, 7.0]| [13.0, 7.0, 3.0]|
+---------------+-----------------+
第 2 步: 现在,explode
两个列,但在我们 zip
数组之后。这里我们事先知道 list/array
的长度是 3。
from pyspark.sql.functions import array, struct
tmp = explode(array(*[
struct(col("column1").getItem(i).alias("column1"), col("column2").getItem(i).alias("column2"))
for i in range(3)
]))
df=(df.withColumn("tmp", tmp).select(col("tmp").getItem("column1").alias('column1'), col("tmp").getItem("column2").alias('column2')))
df.show()
+-------+-------+
|column1|column2|
+-------+-------+
| 1.0| 10.0|
| 2.0| 4.0|
| 3.0| 3.0|
| 5.0| 11.0|
| 4.0| 26.0|
| 3.0| 3.0|
| 9.0| 13.0|
| 8.0| 7.0|
| 7.0| 3.0|
+-------+-------+
我有一个包含 3 列的数据框,每个条目都是一个相同长度的密集向量。 如何融化 Vector 条目?
当前数据帧:
第 1 列 |第 2 列 |
[1.0,2.0,3.0]|[10.0,4.0,3.0]
[5.0,4.0,3.0]|[11.0,26.0,3.0]
[9.0,8.0,7.0]|[13.0,7.0,3.0]
预计:
第 1 列|第 2 列
1.0。 10.0
2.0。 4.0
3.0。 3.0
5.0。 11.0
4.0。 26.0
3.0。 3.0
9.0。 13.0
...
第 1 步: 让我们创建初始 DataFrame:
myValues = [([1.0,2.0,3.0],[10.0,4.0,3.0]),([5.0,4.0,3.0],[11.0,26.0,3.0]),([9.0,8.0,7.0],[13.0,7.0,3.0])]
df = sqlContext.createDataFrame(myValues,['column1','column2'])
df.show()
+---------------+-----------------+
| column1| column2|
+---------------+-----------------+
|[1.0, 2.0, 3.0]| [10.0, 4.0, 3.0]|
|[5.0, 4.0, 3.0]|[11.0, 26.0, 3.0]|
|[9.0, 8.0, 7.0]| [13.0, 7.0, 3.0]|
+---------------+-----------------+
第 2 步: 现在,explode
两个列,但在我们 zip
数组之后。这里我们事先知道 list/array
的长度是 3。
from pyspark.sql.functions import array, struct
tmp = explode(array(*[
struct(col("column1").getItem(i).alias("column1"), col("column2").getItem(i).alias("column2"))
for i in range(3)
]))
df=(df.withColumn("tmp", tmp).select(col("tmp").getItem("column1").alias('column1'), col("tmp").getItem("column2").alias('column2')))
df.show()
+-------+-------+
|column1|column2|
+-------+-------+
| 1.0| 10.0|
| 2.0| 4.0|
| 3.0| 3.0|
| 5.0| 11.0|
| 4.0| 26.0|
| 3.0| 3.0|
| 9.0| 13.0|
| 8.0| 7.0|
| 7.0| 3.0|
+-------+-------+