Spark:在多个数据帧上使用相同的 OneHotEncoder
Spark: Use same OneHotEncoder on multiple dataframes
我有两个 DataFrames
具有相同的列,我想转换
使用 One-Hot-Encoding 将分类列转换为向量。问题是
例如,在训练集中可能出现 3 个唯一值,而在
您的测试集可能少于此。
Training Set: Test Set:
+------------+ +------------+
| Type | | Type |
+------------+ +------------+
| 0 | | 0 |
| 1 | | 1 |
| 1 | | 1 |
| 3 | | 1 |
+------------+ +------------+
在这种情况下,OneHotEncoder
在训练集和测试集上创建具有不同长度的向量(因为向量的每个元素代表一个唯一值的存在)。
是否可以在多个 DataFrames
上使用相同的 OneHotEncoder
?
没有 fit
函数,所以我不知道该怎么做。
谢谢。
Spark >= 3.0:
旧式 OneHotEncoder
已被删除,OneHotEncoderEstimator
已重命名为 OneHotEncoder
:
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderModel
encoder = (OneHotEncoder()
.setInputCols(["type"])
.setOutputCols(["encoded"])
.setDropLast(False))
Spark >= 2.3:
Spark 2.3 添加了新的 OneHotEncoderEstimator
和 OneHotEncoderModel
类,它们的工作方式与您期望的一样。
from pyspark.ml.feature import OneHotEncoderEstimator, OneHotEncoderModel
encoder = (OneHotEncoderEstimator()
.setInputCols(["type"])
.setOutputCols(["encoded"])
.setDropLast(False))
model = encoder.fit(training) # type: OneHotEncoderModel
model.transform(training).show()
# +----+-------------+
# |type| encoded|
# +----+-------------+
# | 0.0|(4,[0],[1.0])|
# | 1.0|(4,[1],[1.0])|
# | 1.0|(4,[1],[1.0])|
# | 3.0|(4,[3],[1.0])|
# +----+-------------+
model.transform(testing).show()
# +----+-------------+
# |type| encoded|
# +----+-------------+
# | 0.0|(4,[0],[1.0])|
# | 1.0|(4,[1],[1.0])|
# | 1.0|(4,[1],[1.0])|
# | 1.0|(4,[1],[1.0])|
# +----+-------------+
Spark < 2.3
OneHotEncoder
不能单独使用。相反,它应该是 Pipeline
的一部分,它可以在其中利用列元数据。考虑以下示例:
training = sc.parallelize([(0., ), (1., ), (1., ), (3., )]).toDF(["type"])
testing = sc.parallelize([(0., ), (1., ), (1., ), (1., )]).toDF(["type"])
当您直接使用编码器时,它不知道上下文:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder().setOutputCol("encoded").setDropLast(False)
encoder.setInputCol("type").transform(training).show()
## +----+-------------+
## |type| encoded|
## +----+-------------+
## | 0.0|(4,[0],[1.0])|
## | 1.0|(4,[1],[1.0])|
## | 1.0|(4,[1],[1.0])|
## | 3.0|(4,[3],[1.0])|
## +----+-------------+
encoder.setInputCol("type").transform(testing).show()
## +----+-------------+
## |type| encoded|
## +----+-------------+
## | 0.0|(2,[0],[1.0])|
## | 1.0|(2,[1],[1.0])|
## | 1.0|(2,[1],[1.0])|
## | 1.0|(2,[1],[1.0])|
## +----+-------------+
现在让我们添加所需的元数据。例如,可以使用 StringIndexer
:
indexer = (StringIndexer()
.setInputCol("type")
.setOutputCol("type_idx")
.fit(training))
如果您在索引列上应用编码器,您将在两个数据集上获得一致的编码:
(encoder.setInputCol("type_idx")
.transform(indexer.transform(training))
.show())
## +----+--------+-------------+
## |type|type_idx| encoded|
## +----+--------+-------------+
## | 0.0| 1.0|(3,[1],[1.0])|
## | 1.0| 0.0|(3,[0],[1.0])|
## | 1.0| 0.0|(3,[0],[1.0])|
## | 3.0| 2.0|(3,[2],[1.0])|
## +----+--------+-------------+
(编码器
.setInputCol("type_idx")
.transform(indexer.transform(测试))
.show())
## +----+--------+-------------+
## |type|type_idx| encoded|
## +----+--------+-------------+
## | 0.0| 1.0|(3,[1],[1.0])|
## | 1.0| 0.0|(3,[0],[1.0])|
## | 1.0| 0.0|(3,[0],[1.0])|
## | 1.0| 0.0|(3,[0],[1.0])|
## +----+--------+-------------+
请注意,您通过这种方式获得的标签不会反映输入数据中的值。如果一致的编码是一项硬性要求,您应该手动提供架构:
from pyspark.sql.types import StructType, StructField, DoubleType
meta = {"ml_attr": {
"name": "type",
"type": "nominal",
"vals": ["0.0", "1.0", "3.0"]
}}
schema = StructType([StructField("type", DoubleType(), False, meta)])
training = sc.parallelize([(0., ), (1., ), (1., ), (3., )]).toDF(schema)
testing = sc.parallelize([(0., ), (1., ), (1., ), (1., )]).toDF(schema)
assert (
encoder.setInputCol("type").transform(training).first()[-1].size ==
encoder.setInputCol("type").transform(testing).first()[-1].size
)
我们可以通过创建元矩阵和创建多个 OneHotEncoder 将其扩展到多列数据集。
这些步骤可以在管道中进行。
我有两个 DataFrames
具有相同的列,我想转换
使用 One-Hot-Encoding 将分类列转换为向量。问题是
例如,在训练集中可能出现 3 个唯一值,而在
您的测试集可能少于此。
Training Set: Test Set:
+------------+ +------------+
| Type | | Type |
+------------+ +------------+
| 0 | | 0 |
| 1 | | 1 |
| 1 | | 1 |
| 3 | | 1 |
+------------+ +------------+
在这种情况下,OneHotEncoder
在训练集和测试集上创建具有不同长度的向量(因为向量的每个元素代表一个唯一值的存在)。
是否可以在多个 DataFrames
上使用相同的 OneHotEncoder
?
没有 fit
函数,所以我不知道该怎么做。
谢谢。
Spark >= 3.0:
旧式 OneHotEncoder
已被删除,OneHotEncoderEstimator
已重命名为 OneHotEncoder
:
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderModel
encoder = (OneHotEncoder()
.setInputCols(["type"])
.setOutputCols(["encoded"])
.setDropLast(False))
Spark >= 2.3:
Spark 2.3 添加了新的 OneHotEncoderEstimator
和 OneHotEncoderModel
类,它们的工作方式与您期望的一样。
from pyspark.ml.feature import OneHotEncoderEstimator, OneHotEncoderModel
encoder = (OneHotEncoderEstimator()
.setInputCols(["type"])
.setOutputCols(["encoded"])
.setDropLast(False))
model = encoder.fit(training) # type: OneHotEncoderModel
model.transform(training).show()
# +----+-------------+
# |type| encoded|
# +----+-------------+
# | 0.0|(4,[0],[1.0])|
# | 1.0|(4,[1],[1.0])|
# | 1.0|(4,[1],[1.0])|
# | 3.0|(4,[3],[1.0])|
# +----+-------------+
model.transform(testing).show()
# +----+-------------+
# |type| encoded|
# +----+-------------+
# | 0.0|(4,[0],[1.0])|
# | 1.0|(4,[1],[1.0])|
# | 1.0|(4,[1],[1.0])|
# | 1.0|(4,[1],[1.0])|
# +----+-------------+
Spark < 2.3
OneHotEncoder
不能单独使用。相反,它应该是 Pipeline
的一部分,它可以在其中利用列元数据。考虑以下示例:
training = sc.parallelize([(0., ), (1., ), (1., ), (3., )]).toDF(["type"])
testing = sc.parallelize([(0., ), (1., ), (1., ), (1., )]).toDF(["type"])
当您直接使用编码器时,它不知道上下文:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder().setOutputCol("encoded").setDropLast(False)
encoder.setInputCol("type").transform(training).show()
## +----+-------------+
## |type| encoded|
## +----+-------------+
## | 0.0|(4,[0],[1.0])|
## | 1.0|(4,[1],[1.0])|
## | 1.0|(4,[1],[1.0])|
## | 3.0|(4,[3],[1.0])|
## +----+-------------+
encoder.setInputCol("type").transform(testing).show()
## +----+-------------+
## |type| encoded|
## +----+-------------+
## | 0.0|(2,[0],[1.0])|
## | 1.0|(2,[1],[1.0])|
## | 1.0|(2,[1],[1.0])|
## | 1.0|(2,[1],[1.0])|
## +----+-------------+
现在让我们添加所需的元数据。例如,可以使用 StringIndexer
:
indexer = (StringIndexer()
.setInputCol("type")
.setOutputCol("type_idx")
.fit(training))
如果您在索引列上应用编码器,您将在两个数据集上获得一致的编码:
(encoder.setInputCol("type_idx")
.transform(indexer.transform(training))
.show())
## +----+--------+-------------+
## |type|type_idx| encoded|
## +----+--------+-------------+
## | 0.0| 1.0|(3,[1],[1.0])|
## | 1.0| 0.0|(3,[0],[1.0])|
## | 1.0| 0.0|(3,[0],[1.0])|
## | 3.0| 2.0|(3,[2],[1.0])|
## +----+--------+-------------+
(编码器 .setInputCol("type_idx") .transform(indexer.transform(测试)) .show())
## +----+--------+-------------+
## |type|type_idx| encoded|
## +----+--------+-------------+
## | 0.0| 1.0|(3,[1],[1.0])|
## | 1.0| 0.0|(3,[0],[1.0])|
## | 1.0| 0.0|(3,[0],[1.0])|
## | 1.0| 0.0|(3,[0],[1.0])|
## +----+--------+-------------+
请注意,您通过这种方式获得的标签不会反映输入数据中的值。如果一致的编码是一项硬性要求,您应该手动提供架构:
from pyspark.sql.types import StructType, StructField, DoubleType
meta = {"ml_attr": {
"name": "type",
"type": "nominal",
"vals": ["0.0", "1.0", "3.0"]
}}
schema = StructType([StructField("type", DoubleType(), False, meta)])
training = sc.parallelize([(0., ), (1., ), (1., ), (3., )]).toDF(schema)
testing = sc.parallelize([(0., ), (1., ), (1., ), (1., )]).toDF(schema)
assert (
encoder.setInputCol("type").transform(training).first()[-1].size ==
encoder.setInputCol("type").transform(testing).first()[-1].size
)
我们可以通过创建元矩阵和创建多个 OneHotEncoder 将其扩展到多列数据集。 这些步骤可以在管道中进行。