在 PySpark groupBy 中,如何按组计算执行时间?

In PySpark groupBy, how do I calculate execution time by group?

我正在将 PySpark 用于一个大学项目,其中我有大型数据帧,并且我使用 groupBy 应用了 PandasUDF。基本上调用如下所示:

df.groupBy(col).apply(pandasUDF)

我在我的 Spark 配置中使用了 10 个内核 (SparkConf().setMaster('local[10]'))。

目标是能够报告每个小组花费在 运行 我的代码上的时间。我想要每组完成的时间,这样我就可以取平均数。我也对计算标准偏差感兴趣。

我现在正在测试清理过的数据,我知道这些数据将被分成 10 组,并且我让 UDF 使用 time.time() 打印 运行ning 时间。但是,如果我要使用更多的组,这是不可能的(对于上下文,我的所有数据将被分成 3000 多个组)。有没有办法衡量每组的执行时间?

如果不想将执行时间打印到标准输出,您可以 return 将其作为 Pandas UDF 的额外列,例如

@pandas_udf("my_col long, execution_time long", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
    start = datetime.now()
    # Some business logic
    return pdf.assign(execution_time=datetime.now() - start)

或者,要计算驱动程序应用程序中的平均执行时间,您可以使用两个 Accumulators 累加执行时间和 UDF 中的 UDF 调用次数。例如

udf_count = sc.accumulator(0)
total_udf_execution_time = sc.accumulator(0)

@pandas_udf("my_col long", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
    start = datetime.now()
    # Some business logic
    udf_count.add(1)
    total_udf_execution_time.add(datetime.now() - start)
    return pdf

# Some Spark action to run business logic

mean_udf_execution_time = total_udf_execution_time.value / udf_count.value