如何将字符串值编码为 Spark DataFrame 中的数值
How to encode string values into numeric values in Spark DataFrame
我有一个包含两列的 DataFrame:
df =
Col1 Col2
aaa bbb
ccc aaa
我想将字符串值编码为数值。我设法以这种方式做到了:
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
val indexer1 = new StringIndexer()
.setInputCol("Col1")
.setOutputCol("Col1Index")
.fit(df)
val indexer2 = new StringIndexer()
.setInputCol("Col2")
.setOutputCol("Col2Index")
.fit(df)
val indexed1 = indexer1.transform(df)
val indexed2 = indexer2.transform(df)
val encoder1 = new OneHotEncoder()
.setInputCol("Col1Index")
.setOutputCol("Col1Vec")
val encoder2 = new OneHotEncoder()
.setInputCol("Col2Index")
.setOutputCol("Col2Vec")
val encoded1 = encoder1.transform(indexed1)
encoded1.show()
val encoded2 = encoder2.transform(indexed2)
encoded2.show()
问题是 aaa
在两列中以不同的方式编码。
我如何编码我的 DataFrame 以获得正确编码的新 DataFrame,例如:
df_encoded =
Col1 Col2
1 2
3 1
在两列上训练单个 Indexer
:
val df = Seq(("aaa", "bbb"), ("ccc", "aaa")).toDF("col1", "col2")
val indexer = new StringIndexer().setInputCol("col").fit(
df.select("col1").toDF("col").union(df.select("col2").toDF("col"))
)
并在每一列上应用副本
import org.apache.spark.ml.param.ParamMap
val result = Seq("col1", "col2").foldLeft(df){
(df, col) => indexer
.copy(new ParamMap()
.put(indexer.inputCol, col)
.put(indexer.outputCol, s"${col}_idx"))
.transform(df)
}
result.show
// +----+----+--------+--------+
// |col1|col2|col1_idx|col2_idx|
// +----+----+--------+--------+
// | aaa| bbb| 0.0| 1.0|
// | ccc| aaa| 2.0| 0.0|
// +----+----+--------+--------+
你可以自己改造,例子是我的pyspark代码
- 将转换模型训练为 clf
sindex_pro = StringIndexer(inputCol='StringCol',outputCol='StringCol_c',stringOrderType="frequencyDesc",handleInvalid="keep").fit(province_df)`
- 定义自变压器负载clf
from pyspark.sql.functions import col
from pyspark.ml import Transformer
from pyspark.sql import DataFrame
class SelfSI(Transformer):
def __init__(self, clf,col_name):
super(SelfSI, self).__init__()
self.clf = clf
self.col_name=col_name
def rename_col(self,df,invers=False):
or_name = 'StringCol'
col_name = self.col_name
if invers:
df = df.withColumnRenamed(or_name,col_name)
or_name = col_name + '_c'
col_name = 'StringCol_c'
df = df.withColumnRenamed(col_name,or_name)
return df
def _transform(self, df: DataFrame) -> DataFrame:
df = self.rename_col(df)
df = self.clf.transform(df)
df = self.rename_col(df,invers=True)
return df
- 根据需要转换列名定义模型
pro_si = SelfSI(sindex_pro,'pro_name')
pro_si.transform(df_or)
#or pipline
model = Pipeline(stages=[pro_si,pro_si2]).fit(df_or)
model.transform(df_or)
#result like
province_name|city_name|province_name_c|city_name_c|
+-------------+---------+---------------+-----------+
| 河北| 保定| 23.0| 18.0|
| 河北| 张家| 23.0| 213.0|
| 河北| 承德| 23.0| 126.0|
| 河北| 沧州| 23.0| 6.0|
| 河北| 廊坊| 23.0| 26.0|
| 北京| 北京| 13.0| 107.0|
| 天津| 天津| 10.0| 85.0|
| 河北| 石家| 23.0| 185.0|
我有一个包含两列的 DataFrame:
df =
Col1 Col2
aaa bbb
ccc aaa
我想将字符串值编码为数值。我设法以这种方式做到了:
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
val indexer1 = new StringIndexer()
.setInputCol("Col1")
.setOutputCol("Col1Index")
.fit(df)
val indexer2 = new StringIndexer()
.setInputCol("Col2")
.setOutputCol("Col2Index")
.fit(df)
val indexed1 = indexer1.transform(df)
val indexed2 = indexer2.transform(df)
val encoder1 = new OneHotEncoder()
.setInputCol("Col1Index")
.setOutputCol("Col1Vec")
val encoder2 = new OneHotEncoder()
.setInputCol("Col2Index")
.setOutputCol("Col2Vec")
val encoded1 = encoder1.transform(indexed1)
encoded1.show()
val encoded2 = encoder2.transform(indexed2)
encoded2.show()
问题是 aaa
在两列中以不同的方式编码。
我如何编码我的 DataFrame 以获得正确编码的新 DataFrame,例如:
df_encoded =
Col1 Col2
1 2
3 1
在两列上训练单个 Indexer
:
val df = Seq(("aaa", "bbb"), ("ccc", "aaa")).toDF("col1", "col2")
val indexer = new StringIndexer().setInputCol("col").fit(
df.select("col1").toDF("col").union(df.select("col2").toDF("col"))
)
并在每一列上应用副本
import org.apache.spark.ml.param.ParamMap
val result = Seq("col1", "col2").foldLeft(df){
(df, col) => indexer
.copy(new ParamMap()
.put(indexer.inputCol, col)
.put(indexer.outputCol, s"${col}_idx"))
.transform(df)
}
result.show
// +----+----+--------+--------+
// |col1|col2|col1_idx|col2_idx|
// +----+----+--------+--------+
// | aaa| bbb| 0.0| 1.0|
// | ccc| aaa| 2.0| 0.0|
// +----+----+--------+--------+
你可以自己改造,例子是我的pyspark代码
- 将转换模型训练为 clf
sindex_pro = StringIndexer(inputCol='StringCol',outputCol='StringCol_c',stringOrderType="frequencyDesc",handleInvalid="keep").fit(province_df)`
- 定义自变压器负载clf
from pyspark.sql.functions import col
from pyspark.ml import Transformer
from pyspark.sql import DataFrame
class SelfSI(Transformer):
def __init__(self, clf,col_name):
super(SelfSI, self).__init__()
self.clf = clf
self.col_name=col_name
def rename_col(self,df,invers=False):
or_name = 'StringCol'
col_name = self.col_name
if invers:
df = df.withColumnRenamed(or_name,col_name)
or_name = col_name + '_c'
col_name = 'StringCol_c'
df = df.withColumnRenamed(col_name,or_name)
return df
def _transform(self, df: DataFrame) -> DataFrame:
df = self.rename_col(df)
df = self.clf.transform(df)
df = self.rename_col(df,invers=True)
return df
- 根据需要转换列名定义模型
pro_si = SelfSI(sindex_pro,'pro_name')
pro_si.transform(df_or)
#or pipline
model = Pipeline(stages=[pro_si,pro_si2]).fit(df_or)
model.transform(df_or)
#result like
province_name|city_name|province_name_c|city_name_c|
+-------------+---------+---------------+-----------+
| 河北| 保定| 23.0| 18.0|
| 河北| 张家| 23.0| 213.0|
| 河北| 承德| 23.0| 126.0|
| 河北| 沧州| 23.0| 6.0|
| 河北| 廊坊| 23.0| 26.0|
| 北京| 北京| 13.0| 107.0|
| 天津| 天津| 10.0| 85.0|
| 河北| 石家| 23.0| 185.0|