pyspark:将 DataFrame 的行组合成 DenseVector
pyspark: combine rows of DataFrame into DenseVector
我有一个包含两列的 DataFrame
:
df = sqlContext.createDataFrame([
(1, 'a'), (2, 'a'),
(3, 'b'), (4, 'b'),
(5, 'c'), (6, 'c'),
(7, 'd'), (8, 'd'),
], schema=['value', 'name'])
编辑 2017/01/13:
我根据实体-属性-值模型从 SQL table 派生此数据框。因此,每一行都会有一个额外的第三个实体列 "id"。
我想根据 ml
包的分类器的要求将其转换为 "features" DataFrame
。对于单个列,这可以使用 VectorAssembler
:
来实现
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['value'], outputCol="features")
selected_features = assembler.transform(df).select('features')
selected_features.collect()
[Row(features=DenseVector([1.0])),
Row(features=DenseVector([2.0])),
Row(features=DenseVector([3.0])),
Row(features=DenseVector([4.0])),
Row(features=DenseVector([5.0])),
Row(features=DenseVector([6.0])),
Row(features=DenseVector([7.0])),
Row(features=DenseVector([8.0]))]
我要的是这个:
[Row(features=DenseVector([1.0, 2.0])),
Row(features=DenseVector([3.0, 4.0])),
Row(features=DenseVector([5.0, 6.0])),
Row(features=DenseVector([7.0, 8.0]))]
根据第 name
列的值将第 value
列的值组合成 DenseVector
的最有效方法是什么?
我正在考虑 GroupedData 的自定义聚合函数的示例,它可以与 groupby
:
一起使用
df.groupby('name').vector_agg().collect()
类似于PostgreSQL array_agg函数:
SELECT array_agg(df.value) FROM table as df
GROUP BY df.name;
从您的数据结构中,您只需要使用相同的 table 和 filter
那些 values
相同(或反转)的行来执行 join
。
df = sqlContext.createDataFrame([
(1, 'a'), (2, 'a'),
(3, 'b'), (4, 'b'),
(5, 'c'), (6, 'c'),
(7, 'd'), (8, 'd'),
], schema=['value', 'name'])
xf = df.select(df["name"].alias("nam"), df["value"].alias("val"))
pf = df.join(xf, df["name"] == xf["nam"], "inner").where(xf["val"] < df["value"]).select(df["value"], xf["val"], df["name"])
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['value', "val"], outputCol="features")
selected_features = assembler.transform(pf).select('features')
selected_features.collect()
#[Row(features=DenseVector([2.0, 1.0])),
# Row(features=DenseVector([4.0, 3.0])),
# Row(features=DenseVector([6.0, 5.0])),
# Row(features=DenseVector([8.0, 7.0]))]
我认为你的问题是 ill-defined,因为对于一个固定的 name
,没有办法知道哪个 value
属于哪个列。 ml
包中的分类器都要求在训练样本之间一致地使用每一列。在您的示例中,列恰好按所需顺序提供,但实际上您不能依赖于此。
如果你能给出你的特征索引并从这样的事情开始,你的问题就可以解决:
df = sc.sql.createDataFrame([
('a', ('f1', 1)), ('a', ('f2', 2)),
('b', ('f1', 3)), ('b', ('f2', 4)),
('c', ('f1', 5)), ('c', ('f2', 6)),
('d', ('f1', 7)), ('d', ('f2', 8)),
], schema=['name', 'feature'])
首先,按 name
分组并将您的特征汇总为列表:
import pyspark.sql.functions as F
df.groupBy('name')\
.agg(F.collect_list('feature'))\
.show()
输出:
+----+---------------------+
|name|collect_list(feature)|
+----+---------------------+
| d| [[f1,7], [f2,8]]|
| c| [[f1,5], [f2,6]]|
| b| [[f1,3], [f2,4]]|
| a| [[f1,1], [f2,2]]|
+----+---------------------+
接下来,使用withColumn
中的udf将这个数组转换为DenseVector。把它们放在一起:
from pyspark.ml.linalg import Vectors, VectorUDT
import pyspark.sql.functions as F
list_to_dense = F.udf(lambda l: Vectors.dense([v for (k,v) in sorted(l)]), VectorUDT())
df.groupBy('name')\
.agg(F.collect_list('features'))\
.withColumn('features', list_to_dense('collect_list(features)'))\
.select('features')\
.collect()
输出:
[Row(features=DenseVector([7.0, 8.0])),
Row(features=DenseVector([5.0, 6.0])),
Row(features=DenseVector([3.0, 4.0])),
Row(features=DenseVector([1.0, 2.0]))]
我有一个包含两列的 DataFrame
:
df = sqlContext.createDataFrame([
(1, 'a'), (2, 'a'),
(3, 'b'), (4, 'b'),
(5, 'c'), (6, 'c'),
(7, 'd'), (8, 'd'),
], schema=['value', 'name'])
编辑 2017/01/13: 我根据实体-属性-值模型从 SQL table 派生此数据框。因此,每一行都会有一个额外的第三个实体列 "id"。
我想根据 ml
包的分类器的要求将其转换为 "features" DataFrame
。对于单个列,这可以使用 VectorAssembler
:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['value'], outputCol="features")
selected_features = assembler.transform(df).select('features')
selected_features.collect()
[Row(features=DenseVector([1.0])),
Row(features=DenseVector([2.0])),
Row(features=DenseVector([3.0])),
Row(features=DenseVector([4.0])),
Row(features=DenseVector([5.0])),
Row(features=DenseVector([6.0])),
Row(features=DenseVector([7.0])),
Row(features=DenseVector([8.0]))]
我要的是这个:
[Row(features=DenseVector([1.0, 2.0])),
Row(features=DenseVector([3.0, 4.0])),
Row(features=DenseVector([5.0, 6.0])),
Row(features=DenseVector([7.0, 8.0]))]
根据第 name
列的值将第 value
列的值组合成 DenseVector
的最有效方法是什么?
我正在考虑 GroupedData 的自定义聚合函数的示例,它可以与 groupby
:
df.groupby('name').vector_agg().collect()
类似于PostgreSQL array_agg函数:
SELECT array_agg(df.value) FROM table as df
GROUP BY df.name;
从您的数据结构中,您只需要使用相同的 table 和 filter
那些 values
相同(或反转)的行来执行 join
。
df = sqlContext.createDataFrame([
(1, 'a'), (2, 'a'),
(3, 'b'), (4, 'b'),
(5, 'c'), (6, 'c'),
(7, 'd'), (8, 'd'),
], schema=['value', 'name'])
xf = df.select(df["name"].alias("nam"), df["value"].alias("val"))
pf = df.join(xf, df["name"] == xf["nam"], "inner").where(xf["val"] < df["value"]).select(df["value"], xf["val"], df["name"])
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['value', "val"], outputCol="features")
selected_features = assembler.transform(pf).select('features')
selected_features.collect()
#[Row(features=DenseVector([2.0, 1.0])),
# Row(features=DenseVector([4.0, 3.0])),
# Row(features=DenseVector([6.0, 5.0])),
# Row(features=DenseVector([8.0, 7.0]))]
我认为你的问题是 ill-defined,因为对于一个固定的 name
,没有办法知道哪个 value
属于哪个列。 ml
包中的分类器都要求在训练样本之间一致地使用每一列。在您的示例中,列恰好按所需顺序提供,但实际上您不能依赖于此。
如果你能给出你的特征索引并从这样的事情开始,你的问题就可以解决:
df = sc.sql.createDataFrame([
('a', ('f1', 1)), ('a', ('f2', 2)),
('b', ('f1', 3)), ('b', ('f2', 4)),
('c', ('f1', 5)), ('c', ('f2', 6)),
('d', ('f1', 7)), ('d', ('f2', 8)),
], schema=['name', 'feature'])
首先,按 name
分组并将您的特征汇总为列表:
import pyspark.sql.functions as F
df.groupBy('name')\
.agg(F.collect_list('feature'))\
.show()
输出:
+----+---------------------+
|name|collect_list(feature)|
+----+---------------------+
| d| [[f1,7], [f2,8]]|
| c| [[f1,5], [f2,6]]|
| b| [[f1,3], [f2,4]]|
| a| [[f1,1], [f2,2]]|
+----+---------------------+
接下来,使用withColumn
中的udf将这个数组转换为DenseVector。把它们放在一起:
from pyspark.ml.linalg import Vectors, VectorUDT
import pyspark.sql.functions as F
list_to_dense = F.udf(lambda l: Vectors.dense([v for (k,v) in sorted(l)]), VectorUDT())
df.groupBy('name')\
.agg(F.collect_list('features'))\
.withColumn('features', list_to_dense('collect_list(features)'))\
.select('features')\
.collect()
输出:
[Row(features=DenseVector([7.0, 8.0])),
Row(features=DenseVector([5.0, 6.0])),
Row(features=DenseVector([3.0, 4.0])),
Row(features=DenseVector([1.0, 2.0]))]