Pyspark:如何提取子列并将它们重新转换为分类变量
Pyspark: how to extract subcolumns and re-transform them to categorical variables
我在使用来自 RandomForestRegressor
的 spark 数据帧时遇到问题,我需要将其与另一个数据帧(原始数据)连接。
from pyspark import SparkContext, SparkConf
sc = SparkContext(conf=SparkConf())
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
下面是一些示例数据:
columns = ['pays', 'zol', 'group_cont_typ', 'id_periode_gestion', 'target']
vals = [('AE', 'AFRIC', 'DR', 201601, 34.67),
('AE', 'AFRIC', 'DR', 201602, 59.38),
('AE', 'ASIA', 'RF', 201601, 123.45),
('AE', 'ASIA', 'RF', 201602, 186.32)]
df = sqlContext.createDataFrame(vals, columns)
df.show()
+----+-----+--------------+------------------+------+
|pays| zol|group_cont_typ|id_periode_gestion|target|
+----+-----+--------------+------------------+------+
| AE|AFRIC| DR| 201601| 34.67|
| AE|AFRIC| DR| 201602| 59.38|
| AE| ASIA| RF| 201601|123.45|
| AE| ASIA| RF| 201602|186.32|
+----+-----+--------------+------------------+------+
我已将 3 个分类变量转换为数值变量,以便在预测中使用它们:
si_pays = StringIndexer(inputCol='pays', outputCol='pays_encode')
si_zol = StringIndexer(inputCol='zol', outputCol='zol_encode')
si_type = StringIndexer(inputCol='group_cont_typ', outputCol='group_cont_typ_encode')
df = si_pays.fit(df).transform(df)
df = si_zol.fit(df).transform(df)
df = si_type.fit(df).transform(df)
然后我更改了数据框的格式以适应 RandomForestRegressor
:
所需的格式
input_cols = ['pays_encode', 'zol_encode', 'group_cont_typ_encode', 'id_periode_gestion']
df = df.rdd.map(lambda x: (x['target'],
Vectors.dense([x[col] for col in input_cols]))) \
.toDF(["label", "features"]) \
.select([F.col('label').cast(DoubleType()).alias('label'), 'features'])
这让我可以 运行 预测(我通常在不同的子集上训练和测试,但为了简单起见,我都在同一个数据帧上进行):
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(numTrees=120, maxDepth=8, maxBins=64)
model = rf.fit(df)
forecasts = model.transform(df)
forecasts.show()
+------+--------------------+-----------------+
| label| features| prediction|
+------+--------------------+-----------------+
| 34.67|[57.0,0.0,0.0,201...|38.58532881795905|
| 59.38|[57.0,0.0,0.0,201...|69.21916671695188|
|123.45|[57.0,8.0,1.0,201...|94.17987290587061|
|186.32|[57.0,8.0,1.0,201...| 91.3936760453811|
+------+--------------------+-----------------+
既然我们到了这一步,这是我的问题:我需要将结果与原始数据结合起来以呈现它们,以及如何将它们加入已转换的列中。
所以我所做的是提取特征的子列:
from pyspark.sql.functions import udf, col
def to_array(col):
def to_array_(v):
return v.toArray().tolist()
return udf(to_array_, ArrayType(DoubleType()))(col)
forecasts = forecasts.withColumn("feature", to_array(col("features"))).select(["prediction", "label"] +
[col("feature")[i] for i in range(4)])
然后我想用 IndexToString
将变量恢复到它们的原始分类值,但这是行不通的。
back_to_pays = IndexToString().setInputCol("feature[0]").setOutputCol("pays")
back_to_zol = IndexToString().setInputCol("feature[1]").setOutputCol("zol")
back_to_type = IndexToString().setInputCol("feature[2]").setOutputCol("group_cont_typ")
forecasts = back_to_pays.transform(forecasts)
forecasts = back_to_zol.transform(forecasts)
forecasts = back_to_type.transform(forecasts)
forecasts.show()
Traceback (most recent call last):
"/runner.py", line 109, in _run_spark
forecasts = back_to_pays.transform(forecasts)
File "/usr/hdp/2.6.2.0-205/spark/python/lib/pyspark.zip/pyspark/ml/pipeline.py", line 114, in transform
File "/usr/hdp/2.6.2.0-205/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 149, in _transform
File "/usr/hdp/2.6.2.0-205/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/usr/hdp/2.6.2.0-205/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
File "/usr/hdp/2.6.2.0-205/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o354.transform.
: java.lang.ClassCastException: org.apache.spark.ml.attribute.UnresolvedAttribute$ cannot be cast to org.apache.spark.ml.attribute.NominalAttribute
at org.apache.spark.ml.feature.IndexToString.transform(StringIndexer.scala:292)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
如何在从向量列中提取子列时重新转换为分类数据?
如果有 id 列,连接会更容易,但您可以使用行号创建,然后使用行号连接它们,因为行的位置在转换期间不会改变。
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
df = df.withColumn("rowid", rowNumber().over(w))
forecasts = forecasts.withColumn("rowid", rowNumber().over(w))
mergedDF = df.join(forecasts, "rowid").drop("rowid")
mergedDF.show()
我在使用来自 RandomForestRegressor
的 spark 数据帧时遇到问题,我需要将其与另一个数据帧(原始数据)连接。
from pyspark import SparkContext, SparkConf
sc = SparkContext(conf=SparkConf())
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
下面是一些示例数据:
columns = ['pays', 'zol', 'group_cont_typ', 'id_periode_gestion', 'target']
vals = [('AE', 'AFRIC', 'DR', 201601, 34.67),
('AE', 'AFRIC', 'DR', 201602, 59.38),
('AE', 'ASIA', 'RF', 201601, 123.45),
('AE', 'ASIA', 'RF', 201602, 186.32)]
df = sqlContext.createDataFrame(vals, columns)
df.show()
+----+-----+--------------+------------------+------+
|pays| zol|group_cont_typ|id_periode_gestion|target|
+----+-----+--------------+------------------+------+
| AE|AFRIC| DR| 201601| 34.67|
| AE|AFRIC| DR| 201602| 59.38|
| AE| ASIA| RF| 201601|123.45|
| AE| ASIA| RF| 201602|186.32|
+----+-----+--------------+------------------+------+
我已将 3 个分类变量转换为数值变量,以便在预测中使用它们:
si_pays = StringIndexer(inputCol='pays', outputCol='pays_encode')
si_zol = StringIndexer(inputCol='zol', outputCol='zol_encode')
si_type = StringIndexer(inputCol='group_cont_typ', outputCol='group_cont_typ_encode')
df = si_pays.fit(df).transform(df)
df = si_zol.fit(df).transform(df)
df = si_type.fit(df).transform(df)
然后我更改了数据框的格式以适应 RandomForestRegressor
:
input_cols = ['pays_encode', 'zol_encode', 'group_cont_typ_encode', 'id_periode_gestion']
df = df.rdd.map(lambda x: (x['target'],
Vectors.dense([x[col] for col in input_cols]))) \
.toDF(["label", "features"]) \
.select([F.col('label').cast(DoubleType()).alias('label'), 'features'])
这让我可以 运行 预测(我通常在不同的子集上训练和测试,但为了简单起见,我都在同一个数据帧上进行):
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(numTrees=120, maxDepth=8, maxBins=64)
model = rf.fit(df)
forecasts = model.transform(df)
forecasts.show()
+------+--------------------+-----------------+
| label| features| prediction|
+------+--------------------+-----------------+
| 34.67|[57.0,0.0,0.0,201...|38.58532881795905|
| 59.38|[57.0,0.0,0.0,201...|69.21916671695188|
|123.45|[57.0,8.0,1.0,201...|94.17987290587061|
|186.32|[57.0,8.0,1.0,201...| 91.3936760453811|
+------+--------------------+-----------------+
既然我们到了这一步,这是我的问题:我需要将结果与原始数据结合起来以呈现它们,以及如何将它们加入已转换的列中。
所以我所做的是提取特征的子列:
from pyspark.sql.functions import udf, col
def to_array(col):
def to_array_(v):
return v.toArray().tolist()
return udf(to_array_, ArrayType(DoubleType()))(col)
forecasts = forecasts.withColumn("feature", to_array(col("features"))).select(["prediction", "label"] +
[col("feature")[i] for i in range(4)])
然后我想用 IndexToString
将变量恢复到它们的原始分类值,但这是行不通的。
back_to_pays = IndexToString().setInputCol("feature[0]").setOutputCol("pays")
back_to_zol = IndexToString().setInputCol("feature[1]").setOutputCol("zol")
back_to_type = IndexToString().setInputCol("feature[2]").setOutputCol("group_cont_typ")
forecasts = back_to_pays.transform(forecasts)
forecasts = back_to_zol.transform(forecasts)
forecasts = back_to_type.transform(forecasts)
forecasts.show()
Traceback (most recent call last):
"/runner.py", line 109, in _run_spark
forecasts = back_to_pays.transform(forecasts)
File "/usr/hdp/2.6.2.0-205/spark/python/lib/pyspark.zip/pyspark/ml/pipeline.py", line 114, in transform
File "/usr/hdp/2.6.2.0-205/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 149, in _transform
File "/usr/hdp/2.6.2.0-205/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/usr/hdp/2.6.2.0-205/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
File "/usr/hdp/2.6.2.0-205/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o354.transform.
: java.lang.ClassCastException: org.apache.spark.ml.attribute.UnresolvedAttribute$ cannot be cast to org.apache.spark.ml.attribute.NominalAttribute
at org.apache.spark.ml.feature.IndexToString.transform(StringIndexer.scala:292)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
如何在从向量列中提取子列时重新转换为分类数据?
如果有 id 列,连接会更容易,但您可以使用行号创建,然后使用行号连接它们,因为行的位置在转换期间不会改变。
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
df = df.withColumn("rowid", rowNumber().over(w))
forecasts = forecasts.withColumn("rowid", rowNumber().over(w))
mergedDF = df.join(forecasts, "rowid").drop("rowid")
mergedDF.show()