如何评估包装在单一方法中的 pyspark 转换?
How are pyspark transformations wrapped in single method evaluated?
我正在尝试组织几个在 pyspark 中执行的数据转换。我有类似于下面的代码。
def main():
spark_session = SparkSession\
.builder\
.appName(config.SPARK_CONFIG['AppName']) \
.getOrCreate()
data = getData(spark_session)
analytics = Analytics(data)
analytics.execute_and_save_analytics()
spark_session.stop()
def getData(spark_session):
sqlContext = pyspark.SQLContext(spark_session.sparkContext)
return sqlContext.read.option('user', user).option('password', pswd)\
.jdbc('jdbc:sqlserver://' + sqlserver + ':' + port\
+ ';database=' + database, table)
class Analytics():
def __init__(self, df):
self.df = df
def _execute(self):
df0 = self.df.withColumn('col3', df.col31 + df.col32)
# df0.persist()
df1 = df0.filter(df.col3 > 10).groupBy('col1', 'col2').count()
df2 = df0.filter(df.col3 < 10).groupBy('col1', 'col2').count()
return df1, df2
def execute_and_save_analytics(self):
output_df1, output_df2 = self._execute()
output_df1.coalesce(1).write.csv('/path/file.csv', header='true')
output_df2.coalesce(1).write.csv('/path/file.csv', header='true')
我怎样才能以这种方式重新组织代码,使 df0 只被评估一次?我尝试像评论行中那样使用 persist() ,但没有任何性能改进。有什么想法吗?
另一个类似的问题,如果您没有一个 _execute(),而是许多类似的方法 _execute1()、_execute2() 等,您将如何组织您的管道。
我想如果我分别调用 _execute() 方法,那么 PySpark 将分别评估每个转换管道(?),因此我会降低性能。
编辑:鉴于转换(过滤器、groupBy、计数)只是示例,我正在寻找适用于任何类型转换或 col3 定义的解决方案。
edit2:看来在init中调用cache()是这里最好的优化。
原样(注释掉 persist
)df0
无论如何都会被计算两次。你的代码结构不会有任何影响。
将您的代码拆分为
def _execute_1(self):
df0 = self.df.withColumn('col3', df.col31 + df.col32)
df1 = df0.filter(df.col3 > 10).groupBy('col1', 'col2').count()
return df1
def _execute_2(self):
df0 = self.df.withColumn('col3', df.col31 + df.col32)
df2 = df0.filter(df.col3 < 10).groupBy('col1', 'col2').count()
return df2
不会有任何区别。无需深入了解 cache
保证,您可以:
def __init__(self, df):
self.df = df.withColumn('col3', df.col31 + df.col32).cache()
def _execute_1(self):
return df0.filter(df.col3 > 10).groupBy('col1', 'col2').count()
def _execute_2(self):
return df0.filter(df.col3 < 10).groupBy('col1', 'col2').count()
def execute_and_save_analytics(self):
output_df1 = self._execute_1()
output_df2 = self._execute_2()
output_df1.coalesce(1).write.csv('/path/file1.csv', header='true')
output_df2.coalesce(1).write.csv('/path/file2.csv', header='true')
self.df.unpersist()
但更简单的方法是:
(self.df
.withColumn('col3', df.col31 + df.col32 > 10)
.repartition("col3")
.write.partitionBy("col3")
.write.csv('/path/file.csv', header='true'))
我正在尝试组织几个在 pyspark 中执行的数据转换。我有类似于下面的代码。
def main():
spark_session = SparkSession\
.builder\
.appName(config.SPARK_CONFIG['AppName']) \
.getOrCreate()
data = getData(spark_session)
analytics = Analytics(data)
analytics.execute_and_save_analytics()
spark_session.stop()
def getData(spark_session):
sqlContext = pyspark.SQLContext(spark_session.sparkContext)
return sqlContext.read.option('user', user).option('password', pswd)\
.jdbc('jdbc:sqlserver://' + sqlserver + ':' + port\
+ ';database=' + database, table)
class Analytics():
def __init__(self, df):
self.df = df
def _execute(self):
df0 = self.df.withColumn('col3', df.col31 + df.col32)
# df0.persist()
df1 = df0.filter(df.col3 > 10).groupBy('col1', 'col2').count()
df2 = df0.filter(df.col3 < 10).groupBy('col1', 'col2').count()
return df1, df2
def execute_and_save_analytics(self):
output_df1, output_df2 = self._execute()
output_df1.coalesce(1).write.csv('/path/file.csv', header='true')
output_df2.coalesce(1).write.csv('/path/file.csv', header='true')
我怎样才能以这种方式重新组织代码,使 df0 只被评估一次?我尝试像评论行中那样使用 persist() ,但没有任何性能改进。有什么想法吗?
另一个类似的问题,如果您没有一个 _execute(),而是许多类似的方法 _execute1()、_execute2() 等,您将如何组织您的管道。 我想如果我分别调用 _execute() 方法,那么 PySpark 将分别评估每个转换管道(?),因此我会降低性能。
编辑:鉴于转换(过滤器、groupBy、计数)只是示例,我正在寻找适用于任何类型转换或 col3 定义的解决方案。
edit2:看来在init中调用cache()是这里最好的优化。
原样(注释掉 persist
)df0
无论如何都会被计算两次。你的代码结构不会有任何影响。
将您的代码拆分为
def _execute_1(self):
df0 = self.df.withColumn('col3', df.col31 + df.col32)
df1 = df0.filter(df.col3 > 10).groupBy('col1', 'col2').count()
return df1
def _execute_2(self):
df0 = self.df.withColumn('col3', df.col31 + df.col32)
df2 = df0.filter(df.col3 < 10).groupBy('col1', 'col2').count()
return df2
不会有任何区别。无需深入了解 cache
保证,您可以:
def __init__(self, df):
self.df = df.withColumn('col3', df.col31 + df.col32).cache()
def _execute_1(self):
return df0.filter(df.col3 > 10).groupBy('col1', 'col2').count()
def _execute_2(self):
return df0.filter(df.col3 < 10).groupBy('col1', 'col2').count()
def execute_and_save_analytics(self):
output_df1 = self._execute_1()
output_df2 = self._execute_2()
output_df1.coalesce(1).write.csv('/path/file1.csv', header='true')
output_df2.coalesce(1).write.csv('/path/file2.csv', header='true')
self.df.unpersist()
但更简单的方法是:
(self.df
.withColumn('col3', df.col31 + df.col32 > 10)
.repartition("col3")
.write.partitionBy("col3")
.write.csv('/path/file.csv', header='true'))