Pyspark 管道性能

Pyspark Pipeline Performance

使用 2 个单独的管道与使用 1 个组合管道在性能上有什么区别吗?

例如,2 个独立的管道:

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

df = spark.createDataFrame([
    (1.0, 0, 1, 1, 0),
    (0.0, 1, 0, 0, 1)
], ("label", "x1", "x2", "x3", "x4"))

pipeline1 = Pipeline(stages=[
    VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")
])

pipeline2 = Pipeline(stages=[
    VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])
df = pipeline1.fit(df).transform(df)
df = pipeline2.fit(df).transform(df)

1 条合并管道:

pipeline = Pipeline(stages=[
    VectorAssembler(inputCols=["x1", "x2"], outputCol="features1"),
    VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])

df = pipeline.fit(df).transform(df)

我将 Dataframe 转换为 rdd,然后查看了 Job。 它表明 2 个独立的管道比一个管道需要更多的步骤来完成作业。见下文:

2 个独立的管道:

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

df = spark.createDataFrame([
    (1.0, 0, 1, 1, 0),
    (0.0, 1, 0, 0, 1)
], ("label", "x1", "x2", "x3", "x4"))

pipeline1 = Pipeline(stages=[
    VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")
])

pipeline2 = Pipeline(stages=[
    VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])
df = pipeline1.fit(df).transform(df)
print(df.rdd.toDebugString())

df = pipeline2.fit(df).transform(df)
print(df.rdd.toDebugString())
b'(8) MapPartitionsRDD[124] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[123] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  SQLExecutionRDD[122] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[121] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[120] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[119] at map at SerDeUtil.scala:69 []\n |  MapPartitionsRDD[118] at mapPartitions at SerDeUtil.scala:117 []\n |  PythonRDD[117] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[116] at readRDDFromFile at PythonRDD.scala:274 []'
b'(8) MapPartitionsRDD[128] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[127] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  SQLExecutionRDD[126] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[125] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[120] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[119] at map at SerDeUtil.scala:69 []\n |  MapPartitionsRDD[118] at mapPartitions at SerDeUtil.scala:117 []\n |  PythonRDD[117] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[116] at readRDDFromFile at PythonRDD.scala:274 []'

1 管道:

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

df = spark.createDataFrame([
    (1.0, 0, 1, 1, 0),
    (0.0, 1, 0, 0, 1)
], ("label", "x1", "x2", "x3", "x4"))

pipeline = Pipeline(stages=[
    VectorAssembler(inputCols=["x1", "x2"], outputCol="features1"),
    VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])

df2 = pipeline.fit(df).transform(df)
print(df2.rdd.toDebugString())
b'(8) MapPartitionsRDD[79] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[78] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  SQLExecutionRDD[77] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[76] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[75] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[74] at map at SerDeUtil.scala:69 []\n |  MapPartitionsRDD[73] at mapPartitions at SerDeUtil.scala:117 []\n |  PythonRDD[72] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[71] at readRDDFromFile at PythonRDD.scala:274 []'

希望我能回答你的问题。