如何在不使用 .toPandas() hackPySpark 的情况下将统计频率分配给 PySpark 中数据帧的 records/rows?
How can allocate statistic frequency to records/rows of dataframe in PySpark without using .toPandas() hackPySpark?
我是 PySpark 的新手,我想将 预处理 包括 pythonic 的编码和规范化部分脚本翻译成 PySpark 用于 synthetic数据。 (A 和 C 列是分类的)首先,我有 Spark 数据框所谓的 sdf
包括 5 列:
示例如下:
#+----------+-----+---+-------+----+
#|A |B |C |D |E |
#+----------+-----+---+-------+----+
#|Sentence |92 |6 |False |49 |
#|Sentence |17 |3 |False |15 |
#|Sentence |17 |3 |False |15 |
#|- |0 |0 |False |0 |
#|- |0 |0 |False |0 |
#|- |0 |0 |False |0 |
#+----------+-----+---+-------+----+
现在我想分配 统计频率 除了其他功能,并将结果与 sdf
连接起来。到目前为止,我可以使用 pythonic 脚本来完成它:
#import libs
import copy
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import scale
from sklearn import preprocessing
#Statistical Preprocessing
def add_freq_to_features(df):
frequencies_df = df.groupby(list(df.columns)).size().to_frame().rename(columns={0: "Freq"})
frequencies_df["Freq"] = frequencies_df["Freq"] / frequencies_df["Freq"].sum() # Normalzing 0 & 1
new_df = pd.merge(df, frequencies_df, how='left', on=list(df.columns))
return new_df
# Encode and Normalize
def normalize_features(df):
temp_df = df.copy()
le = preprocessing.LabelEncoder()
#le.fit(temp_df)
temp_df[["A", "C"]] = temp_df[["A", "C"]].apply(le.fit_transform)
for column in ["A", "B", "C", "D", "E"]:
#-1: all rows selected into 1
# reshape(1, -1) select one row contains all columns/features
temp_df[column] = MinMaxScaler().fit_transform(temp_df[column].values.reshape(-1, 1))
return temp_df
# Apply frequency allocation and merge with extracted features df
features_df = add_freq_to_features(features_df)
#Apply Encoding and Normalizing function
normalized_features_df = normalize_features(features_df)
to_numeric_columns = ["A", "B" , "C", "D", "E", "Freq"]
normalized_features_df[to_numeric_columns] = normalized_features_df[to_numeric_columns].apply(pd.to_numeric)
#normalized_features_df
问题:翻译预处理的最佳方法是什么无需将Spark数据帧转换为Pandas数据帧toPandas()
优化管道并处理它 100% spark 形式?
预期输出以 Spark 数据帧的形式显示如下:
#+----------+-----+---+-------+----+----------+
#|A |B |C |D |E |Freq |
#+----------+-----+---+-------+----+----------+
#|Sentence |92 |6 |False |49 |0.166667 |
#|Sentence |17 |3 |False |15 |0.333333 |
#|Sentence |17 |3 |False |15 |0.333333 |
#|- |0 |0 |False |0 |0.500000 |
#|- |0 |0 |False |0 |0.500000 |
#|- |0 |0 |False |0 |0.500000 |
#+----------+-----+---+-------+----+----------+
Spark 具有专为特征工程和机器学习目的而设计的 Spark MLlib 包。也就是说,您不应该像使用 Pandas 那样手动构建功能。归根结底,您仍然必须使用 Spark 来构建模型,那么为什么不开始正确使用 Spark ML 呢?我强烈建议通读几节,例如 building features, building pipelines, then classification/regression,以及其他一些算法。
回到你最初的问题,这是你的示例代码的 Spark 版本(我也在你的笔记本中 运行 它,稍作改动以适应你的变量。)
# this is to build "raw" Freq
sdf2 = (sdf
.groupBy(sdf.columns)
.agg(F.count('*').alias('Freq'))
.withColumn('Encoding_type', F.col('Encoding_type').cast('string'))
)
sdf2.cache().count()
sdf2.show()
# this is to normalize features using MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
type_indexer = StringIndexer(inputCol='Type', outputCol='Type_Cat')
encoding_indexer = StringIndexer(inputCol='Encoding_type', outputCol='Encoding_Type_Cat')
assembler = VectorAssembler(inputCols=['Type_Cat', 'Length', 'Token_number', 'Encoding_Type_Cat', 'Character_feature', 'Freq'], outputCol='features')
scaler = MinMaxScaler(inputCol='features', outputCol='scaled_features')
pipeline = Pipeline(stages=[type_indexer, encoding_indexer, assembler, scaler])
# Compute summary statistics and generate model
model = pipeline.fit(sdf2)
# rescale each feature to range [min, max].
model.transform(sdf2).show(10, False)
# Output
# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+
# |Type |Length|Token_number|Encoding_type|Character_feature|Freq|Type_Cat|Encoding_Type_Cat|features |scaled_features |
# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+
# |String|8 |0 |true |7 |1 |0.0 |0.0 |[0.0,8.0,0.0,0.0,7.0,1.0]|[0.5,1.0,0.5,0.5,1.0,0.5]|
# |String|0 |0 |true |0 |1 |0.0 |0.0 |(6,[5],[1.0]) |[0.5,0.0,0.5,0.5,0.0,0.5]|
# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+
我是 PySpark 的新手,我想将 预处理 包括 pythonic 的编码和规范化部分脚本翻译成 PySpark 用于 synthetic数据。 (A 和 C 列是分类的)首先,我有 Spark 数据框所谓的 sdf
包括 5 列:
示例如下:
#+----------+-----+---+-------+----+
#|A |B |C |D |E |
#+----------+-----+---+-------+----+
#|Sentence |92 |6 |False |49 |
#|Sentence |17 |3 |False |15 |
#|Sentence |17 |3 |False |15 |
#|- |0 |0 |False |0 |
#|- |0 |0 |False |0 |
#|- |0 |0 |False |0 |
#+----------+-----+---+-------+----+
现在我想分配 统计频率 除了其他功能,并将结果与 sdf
连接起来。到目前为止,我可以使用 pythonic 脚本来完成它:
#import libs
import copy
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import scale
from sklearn import preprocessing
#Statistical Preprocessing
def add_freq_to_features(df):
frequencies_df = df.groupby(list(df.columns)).size().to_frame().rename(columns={0: "Freq"})
frequencies_df["Freq"] = frequencies_df["Freq"] / frequencies_df["Freq"].sum() # Normalzing 0 & 1
new_df = pd.merge(df, frequencies_df, how='left', on=list(df.columns))
return new_df
# Encode and Normalize
def normalize_features(df):
temp_df = df.copy()
le = preprocessing.LabelEncoder()
#le.fit(temp_df)
temp_df[["A", "C"]] = temp_df[["A", "C"]].apply(le.fit_transform)
for column in ["A", "B", "C", "D", "E"]:
#-1: all rows selected into 1
# reshape(1, -1) select one row contains all columns/features
temp_df[column] = MinMaxScaler().fit_transform(temp_df[column].values.reshape(-1, 1))
return temp_df
# Apply frequency allocation and merge with extracted features df
features_df = add_freq_to_features(features_df)
#Apply Encoding and Normalizing function
normalized_features_df = normalize_features(features_df)
to_numeric_columns = ["A", "B" , "C", "D", "E", "Freq"]
normalized_features_df[to_numeric_columns] = normalized_features_df[to_numeric_columns].apply(pd.to_numeric)
#normalized_features_df
问题:翻译预处理的最佳方法是什么无需将Spark数据帧转换为Pandas数据帧toPandas()
优化管道并处理它 100% spark 形式?
预期输出以 Spark 数据帧的形式显示如下:
#+----------+-----+---+-------+----+----------+
#|A |B |C |D |E |Freq |
#+----------+-----+---+-------+----+----------+
#|Sentence |92 |6 |False |49 |0.166667 |
#|Sentence |17 |3 |False |15 |0.333333 |
#|Sentence |17 |3 |False |15 |0.333333 |
#|- |0 |0 |False |0 |0.500000 |
#|- |0 |0 |False |0 |0.500000 |
#|- |0 |0 |False |0 |0.500000 |
#+----------+-----+---+-------+----+----------+
Spark 具有专为特征工程和机器学习目的而设计的 Spark MLlib 包。也就是说,您不应该像使用 Pandas 那样手动构建功能。归根结底,您仍然必须使用 Spark 来构建模型,那么为什么不开始正确使用 Spark ML 呢?我强烈建议通读几节,例如 building features, building pipelines, then classification/regression,以及其他一些算法。
回到你最初的问题,这是你的示例代码的 Spark 版本(我也在你的笔记本中 运行 它,稍作改动以适应你的变量。)
# this is to build "raw" Freq
sdf2 = (sdf
.groupBy(sdf.columns)
.agg(F.count('*').alias('Freq'))
.withColumn('Encoding_type', F.col('Encoding_type').cast('string'))
)
sdf2.cache().count()
sdf2.show()
# this is to normalize features using MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
type_indexer = StringIndexer(inputCol='Type', outputCol='Type_Cat')
encoding_indexer = StringIndexer(inputCol='Encoding_type', outputCol='Encoding_Type_Cat')
assembler = VectorAssembler(inputCols=['Type_Cat', 'Length', 'Token_number', 'Encoding_Type_Cat', 'Character_feature', 'Freq'], outputCol='features')
scaler = MinMaxScaler(inputCol='features', outputCol='scaled_features')
pipeline = Pipeline(stages=[type_indexer, encoding_indexer, assembler, scaler])
# Compute summary statistics and generate model
model = pipeline.fit(sdf2)
# rescale each feature to range [min, max].
model.transform(sdf2).show(10, False)
# Output
# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+
# |Type |Length|Token_number|Encoding_type|Character_feature|Freq|Type_Cat|Encoding_Type_Cat|features |scaled_features |
# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+
# |String|8 |0 |true |7 |1 |0.0 |0.0 |[0.0,8.0,0.0,0.0,7.0,1.0]|[0.5,1.0,0.5,0.5,1.0,0.5]|
# |String|0 |0 |true |0 |1 |0.0 |0.0 |(6,[5],[1.0]) |[0.5,0.0,0.5,0.5,0.0,0.5]|
# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+