如何提高 pyspark 到 pandas 数据帧转换的效率,而不是 PyArrow 或使用它
How to increase the efficiency of pyspark to pandas dataframe conversion other than PyArrow or with it
我也尝试了 PyArrow,在我的示例中,我使用 spark.sql 语句获得了 spark datframe。之后我想转换为 pandas 数据帧。为了显示执行时间,我 运行 以下语句。
import time
startTime = time.time()
df=df.toPandas()
executionTime = (time.time() - startTime)
executionTime
这给了 1021.55
我也试过了
import time
startTime = time.time()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
df=df.toPandas()
executionTime = (time.time() - startTime)
executionTime
这给了 1008.71
简要了解数据框的形状是 (944,5)。以下是 spark dataframe
中的数据类型
import pandas as pd
pd.set_option('max_colwidth', -1) # to prevent truncating of columns in jupyter
def count_column_types(spark_df):
"""Count number of columns per type"""
return pd.DataFrame(spark_df.dtypes).groupby(1, as_index=False)[0].agg({'count':'count', 'names':lambda x: " | ".join(set(x))}).rename(columns={1:"type"})
count_column_types(df)
type count names
0 bigint 1 col4
1 date 1 col1
2 decimal(20,4) 1 col5
3 int 1 col2
4 string 1 col3
请告诉我是否有任何方法可以提高效率
如果您使用的是所谓的 Pandas UDF,则 spark.sql.execution.arrow.pyspark.enabled
有效,但您的情况无效。
你的问题是toPandas
需要将executor的所有数据收集到driver节点,但在此之前,它需要处理你的SQL查询,主要瓶颈可能在那里(你没有展示示例,所以很难说)。您可以尝试了解瓶颈在哪里 - 在 SQL 查询执行中,或者确实在 toPandas
中。为此,请尝试以下操作:
df = spark.sql(....)
import time
startTime = time.time()
df.write.format("noop").mode("overwrite").save()
executionTime = (time.time() - startTime)
executionTime
并将此执行时间与您从 toPandas
获得的时间进行比较。
我也尝试了 PyArrow,在我的示例中,我使用 spark.sql 语句获得了 spark datframe。之后我想转换为 pandas 数据帧。为了显示执行时间,我 运行 以下语句。
import time
startTime = time.time()
df=df.toPandas()
executionTime = (time.time() - startTime)
executionTime
这给了 1021.55
我也试过了
import time
startTime = time.time()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
df=df.toPandas()
executionTime = (time.time() - startTime)
executionTime
这给了 1008.71
简要了解数据框的形状是 (944,5)。以下是 spark dataframe
中的数据类型import pandas as pd
pd.set_option('max_colwidth', -1) # to prevent truncating of columns in jupyter
def count_column_types(spark_df):
"""Count number of columns per type"""
return pd.DataFrame(spark_df.dtypes).groupby(1, as_index=False)[0].agg({'count':'count', 'names':lambda x: " | ".join(set(x))}).rename(columns={1:"type"})
count_column_types(df)
type count names
0 bigint 1 col4
1 date 1 col1
2 decimal(20,4) 1 col5
3 int 1 col2
4 string 1 col3
请告诉我是否有任何方法可以提高效率
如果您使用的是所谓的 Pandas UDF,则 spark.sql.execution.arrow.pyspark.enabled
有效,但您的情况无效。
你的问题是toPandas
需要将executor的所有数据收集到driver节点,但在此之前,它需要处理你的SQL查询,主要瓶颈可能在那里(你没有展示示例,所以很难说)。您可以尝试了解瓶颈在哪里 - 在 SQL 查询执行中,或者确实在 toPandas
中。为此,请尝试以下操作:
df = spark.sql(....)
import time
startTime = time.time()
df.write.format("noop").mode("overwrite").save()
executionTime = (time.time() - startTime)
executionTime
并将此执行时间与您从 toPandas
获得的时间进行比较。