如何将字符串值编码为 Spark DataFrame 中的数值

How to encode string values into numeric values in Spark DataFrame

我有一个包含两列的 DataFrame:

df = 
  Col1   Col2
  aaa    bbb
  ccc    aaa

我想将字符串值编码为数值。我设法以这种方式做到了:

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}

val indexer1 = new StringIndexer()
                    .setInputCol("Col1")
                    .setOutputCol("Col1Index")
                    .fit(df)

val indexer2 = new StringIndexer()
                    .setInputCol("Col2")
                    .setOutputCol("Col2Index")
                    .fit(df)

val indexed1 = indexer1.transform(df)
val indexed2 = indexer2.transform(df)

val encoder1 = new OneHotEncoder()
                    .setInputCol("Col1Index")
                    .setOutputCol("Col1Vec")

val encoder2 = new OneHotEncoder()
                    .setInputCol("Col2Index")
                    .setOutputCol("Col2Vec")

val encoded1 = encoder1.transform(indexed1)
encoded1.show()

val encoded2 = encoder2.transform(indexed2)
encoded2.show()

问题是 aaa 在两列中以不同的方式编码。 我如何编码我的 DataFrame 以获得正确编码的新 DataFrame,例如:

df_encoded = 
   Col1   Col2
   1      2
   3      1

在两列上训练单个 Indexer

val df = Seq(("aaa", "bbb"), ("ccc", "aaa")).toDF("col1", "col2")

val indexer = new StringIndexer().setInputCol("col").fit(
   df.select("col1").toDF("col").union(df.select("col2").toDF("col"))
)

并在每一列上应用副本

import org.apache.spark.ml.param.ParamMap

val result = Seq("col1", "col2").foldLeft(df){
  (df, col) => indexer
    .copy(new ParamMap()
      .put(indexer.inputCol, col)
      .put(indexer.outputCol, s"${col}_idx"))
    .transform(df)
}

result.show
// +----+----+--------+--------+
// |col1|col2|col1_idx|col2_idx|
// +----+----+--------+--------+
// | aaa| bbb|     0.0|     1.0|
// | ccc| aaa|     2.0|     0.0|
// +----+----+--------+--------+

你可以自己改造,例子是我的pyspark代码

  1. 将转换模型训练为 clf
sindex_pro = StringIndexer(inputCol='StringCol',outputCol='StringCol_c',stringOrderType="frequencyDesc",handleInvalid="keep").fit(province_df)`
  1. 定义自变压器负载clf
from pyspark.sql.functions import col
from pyspark.ml import Transformer
from pyspark.sql import DataFrame
class SelfSI(Transformer):
    def __init__(self, clf,col_name):
        super(SelfSI, self).__init__()
        self.clf = clf
        self.col_name=col_name
    def rename_col(self,df,invers=False):
        or_name = 'StringCol'
        col_name = self.col_name
        if invers:
            df = df.withColumnRenamed(or_name,col_name)
            or_name = col_name + '_c'
            col_name = 'StringCol_c'
        df = df.withColumnRenamed(col_name,or_name)
        return df

    def _transform(self, df: DataFrame) -> DataFrame:
        df = self.rename_col(df)
        df = self.clf.transform(df)
        df = self.rename_col(df,invers=True)
        return df
  1. 根据需要转换列名定义模型
pro_si = SelfSI(sindex_pro,'pro_name')
pro_si.transform(df_or)
#or pipline
model = Pipeline(stages=[pro_si,pro_si2]).fit(df_or)
model.transform(df_or)
#result like
province_name|city_name|province_name_c|city_name_c|
+-------------+---------+---------------+-----------+
|           河北|       保定|           23.0|       18.0|
|           河北|       张家|           23.0|      213.0|
|           河北|       承德|           23.0|      126.0|
|           河北|       沧州|           23.0|        6.0|
|           河北|       廊坊|           23.0|       26.0|
|           北京|       北京|           13.0|      107.0|
|           天津|       天津|           10.0|       85.0|
|           河北|       石家|           23.0|      185.0|