Pyspark 管道性能
Pyspark Pipeline Performance
使用 2 个单独的管道与使用 1 个组合管道在性能上有什么区别吗?
例如,2 个独立的管道:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
df = spark.createDataFrame([
(1.0, 0, 1, 1, 0),
(0.0, 1, 0, 0, 1)
], ("label", "x1", "x2", "x3", "x4"))
pipeline1 = Pipeline(stages=[
VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")
])
pipeline2 = Pipeline(stages=[
VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])
df = pipeline1.fit(df).transform(df)
df = pipeline2.fit(df).transform(df)
1 条合并管道:
pipeline = Pipeline(stages=[
VectorAssembler(inputCols=["x1", "x2"], outputCol="features1"),
VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])
df = pipeline.fit(df).transform(df)
我将 Dataframe 转换为 rdd,然后查看了 Job。
它表明 2 个独立的管道比一个管道需要更多的步骤来完成作业。见下文:
2 个独立的管道:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
df = spark.createDataFrame([
(1.0, 0, 1, 1, 0),
(0.0, 1, 0, 0, 1)
], ("label", "x1", "x2", "x3", "x4"))
pipeline1 = Pipeline(stages=[
VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")
])
pipeline2 = Pipeline(stages=[
VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])
df = pipeline1.fit(df).transform(df)
print(df.rdd.toDebugString())
df = pipeline2.fit(df).transform(df)
print(df.rdd.toDebugString())
b'(8) MapPartitionsRDD[124] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[123] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | SQLExecutionRDD[122] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[121] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[120] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[119] at map at SerDeUtil.scala:69 []\n | MapPartitionsRDD[118] at mapPartitions at SerDeUtil.scala:117 []\n | PythonRDD[117] at RDD at PythonRDD.scala:53 []\n | ParallelCollectionRDD[116] at readRDDFromFile at PythonRDD.scala:274 []'
b'(8) MapPartitionsRDD[128] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[127] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | SQLExecutionRDD[126] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[125] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[120] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[119] at map at SerDeUtil.scala:69 []\n | MapPartitionsRDD[118] at mapPartitions at SerDeUtil.scala:117 []\n | PythonRDD[117] at RDD at PythonRDD.scala:53 []\n | ParallelCollectionRDD[116] at readRDDFromFile at PythonRDD.scala:274 []'
1 管道:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
df = spark.createDataFrame([
(1.0, 0, 1, 1, 0),
(0.0, 1, 0, 0, 1)
], ("label", "x1", "x2", "x3", "x4"))
pipeline = Pipeline(stages=[
VectorAssembler(inputCols=["x1", "x2"], outputCol="features1"),
VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])
df2 = pipeline.fit(df).transform(df)
print(df2.rdd.toDebugString())
b'(8) MapPartitionsRDD[79] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[78] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | SQLExecutionRDD[77] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[76] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[75] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[74] at map at SerDeUtil.scala:69 []\n | MapPartitionsRDD[73] at mapPartitions at SerDeUtil.scala:117 []\n | PythonRDD[72] at RDD at PythonRDD.scala:53 []\n | ParallelCollectionRDD[71] at readRDDFromFile at PythonRDD.scala:274 []'
希望我能回答你的问题。
使用 2 个单独的管道与使用 1 个组合管道在性能上有什么区别吗?
例如,2 个独立的管道:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
df = spark.createDataFrame([
(1.0, 0, 1, 1, 0),
(0.0, 1, 0, 0, 1)
], ("label", "x1", "x2", "x3", "x4"))
pipeline1 = Pipeline(stages=[
VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")
])
pipeline2 = Pipeline(stages=[
VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])
df = pipeline1.fit(df).transform(df)
df = pipeline2.fit(df).transform(df)
1 条合并管道:
pipeline = Pipeline(stages=[
VectorAssembler(inputCols=["x1", "x2"], outputCol="features1"),
VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])
df = pipeline.fit(df).transform(df)
我将 Dataframe 转换为 rdd,然后查看了 Job。 它表明 2 个独立的管道比一个管道需要更多的步骤来完成作业。见下文:
2 个独立的管道:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
df = spark.createDataFrame([
(1.0, 0, 1, 1, 0),
(0.0, 1, 0, 0, 1)
], ("label", "x1", "x2", "x3", "x4"))
pipeline1 = Pipeline(stages=[
VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")
])
pipeline2 = Pipeline(stages=[
VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])
df = pipeline1.fit(df).transform(df)
print(df.rdd.toDebugString())
df = pipeline2.fit(df).transform(df)
print(df.rdd.toDebugString())
b'(8) MapPartitionsRDD[124] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[123] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | SQLExecutionRDD[122] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[121] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[120] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[119] at map at SerDeUtil.scala:69 []\n | MapPartitionsRDD[118] at mapPartitions at SerDeUtil.scala:117 []\n | PythonRDD[117] at RDD at PythonRDD.scala:53 []\n | ParallelCollectionRDD[116] at readRDDFromFile at PythonRDD.scala:274 []'
b'(8) MapPartitionsRDD[128] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[127] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | SQLExecutionRDD[126] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[125] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[120] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[119] at map at SerDeUtil.scala:69 []\n | MapPartitionsRDD[118] at mapPartitions at SerDeUtil.scala:117 []\n | PythonRDD[117] at RDD at PythonRDD.scala:53 []\n | ParallelCollectionRDD[116] at readRDDFromFile at PythonRDD.scala:274 []'
1 管道:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
df = spark.createDataFrame([
(1.0, 0, 1, 1, 0),
(0.0, 1, 0, 0, 1)
], ("label", "x1", "x2", "x3", "x4"))
pipeline = Pipeline(stages=[
VectorAssembler(inputCols=["x1", "x2"], outputCol="features1"),
VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])
df2 = pipeline.fit(df).transform(df)
print(df2.rdd.toDebugString())
b'(8) MapPartitionsRDD[79] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[78] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | SQLExecutionRDD[77] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[76] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[75] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[74] at map at SerDeUtil.scala:69 []\n | MapPartitionsRDD[73] at mapPartitions at SerDeUtil.scala:117 []\n | PythonRDD[72] at RDD at PythonRDD.scala:53 []\n | ParallelCollectionRDD[71] at readRDDFromFile at PythonRDD.scala:274 []'
希望我能回答你的问题。