如何在不使用 .toPandas() hackPySpark 的情况下将统计频率分配给 PySpark 中数据帧的 records/rows?

How can allocate statistic frequency to records/rows of dataframe in PySpark without using .toPandas() hackPySpark?

我是 PySpark 的新手,我想将 预处理 包括 pythonic 的编码和规范化部分脚本翻译成 PySpark 用于 synthetic数据。 (A 和 C 列是分类的)首先,我有 Spark 数据框所谓的 sdf 包括 5 列:

示例如下:

#+----------+-----+---+-------+----+
#|A         |B    |C  |D      |E   |
#+----------+-----+---+-------+----+
#|Sentence  |92   |6  |False  |49  |
#|Sentence  |17   |3  |False  |15  |
#|Sentence  |17   |3  |False  |15  |
#|-         |0    |0  |False  |0   |
#|-         |0    |0  |False  |0   |
#|-         |0    |0  |False  |0   |
#+----------+-----+---+-------+----+

现在我想分配 统计频率 除了其他功能,并将结果与​​ sdf 连接起来。到目前为止,我可以使用 pythonic 脚本来完成它:

#import libs
import copy
import numpy as np
import pandas as pd

from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import scale
from sklearn import preprocessing

#Statistical Preprocessing
def add_freq_to_features(df):
  frequencies_df = df.groupby(list(df.columns)).size().to_frame().rename(columns={0: "Freq"})
  frequencies_df["Freq"] = frequencies_df["Freq"] / frequencies_df["Freq"].sum() # Normalzing 0 & 1
  new_df = pd.merge(df, frequencies_df, how='left', on=list(df.columns))
  
  return new_df

# Encode and Normalize
def normalize_features(df):
  temp_df = df.copy()
  
  le = preprocessing.LabelEncoder()
  #le.fit(temp_df)
    
  temp_df[["A", "C"]] = temp_df[["A", "C"]].apply(le.fit_transform)
  
  for column in ["A", "B", "C", "D", "E"]:
    #-1: all rows selected into 1 
    # reshape(1, -1) select one row contains all columns/features
    temp_df[column] = MinMaxScaler().fit_transform(temp_df[column].values.reshape(-1, 1)) 
    
  return temp_df

# Apply frequency allocation and merge with extracted features df
features_df = add_freq_to_features(features_df)

#Apply Encoding and Normalizing function
normalized_features_df = normalize_features(features_df)


to_numeric_columns = ["A", "B" , "C", "D", "E", "Freq"]
normalized_features_df[to_numeric_columns] = normalized_features_df[to_numeric_columns].apply(pd.to_numeric)
#normalized_features_df

问题:翻译预处理的最佳方法是什么无需将Spark数据帧转换为Pandas数据帧toPandas() 优化管道并处理它 100% spark 形式?

预期输出以 Spark 数据帧的形式显示如下:

#+----------+-----+---+-------+----+----------+
#|A         |B    |C  |D      |E   |Freq      |
#+----------+-----+---+-------+----+----------+
#|Sentence  |92   |6  |False  |49  |0.166667  |
#|Sentence  |17   |3  |False  |15  |0.333333  |
#|Sentence  |17   |3  |False  |15  |0.333333  |
#|-         |0    |0  |False  |0   |0.500000  |
#|-         |0    |0  |False  |0   |0.500000  |
#|-         |0    |0  |False  |0   |0.500000  |
#+----------+-----+---+-------+----+----------+

Spark 具有专为特征工程和机器学习目的而设计的 Spark MLlib 包。也就是说,您不应该像使用 Pandas 那样手动构建功能。归根结底,您仍然必须使用 Spark 来构建模型,那么为什么不开始正确使用 Spark ML 呢?我强烈建议通读几节,例如 building features, building pipelines, then classification/regression,以及其他一些算法。

回到你最初的问题,这是你的示例代码的 Spark 版本(我也在你的笔记本中 运行 它,稍作改动以适应你的变量。)

# this is to build "raw" Freq
sdf2 = (sdf
    .groupBy(sdf.columns)
    .agg(F.count('*').alias('Freq'))
    .withColumn('Encoding_type', F.col('Encoding_type').cast('string'))
)
sdf2.cache().count()
sdf2.show()

# this is to normalize features using MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler

type_indexer = StringIndexer(inputCol='Type', outputCol='Type_Cat')
encoding_indexer = StringIndexer(inputCol='Encoding_type', outputCol='Encoding_Type_Cat')
assembler = VectorAssembler(inputCols=['Type_Cat', 'Length', 'Token_number', 'Encoding_Type_Cat', 'Character_feature', 'Freq'], outputCol='features')
scaler = MinMaxScaler(inputCol='features', outputCol='scaled_features')
pipeline = Pipeline(stages=[type_indexer, encoding_indexer, assembler, scaler])

# Compute summary statistics and generate model
model = pipeline.fit(sdf2)

# rescale each feature to range [min, max].
model.transform(sdf2).show(10, False)

# Output
# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+
# |Type  |Length|Token_number|Encoding_type|Character_feature|Freq|Type_Cat|Encoding_Type_Cat|features                 |scaled_features          |
# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+
# |String|8     |0           |true         |7                |1   |0.0     |0.0              |[0.0,8.0,0.0,0.0,7.0,1.0]|[0.5,1.0,0.5,0.5,1.0,0.5]|
# |String|0     |0           |true         |0                |1   |0.0     |0.0              |(6,[5],[1.0])            |[0.5,0.0,0.5,0.5,0.0,0.5]|
# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+