将自定义函数应用于 spark 数据框组

Apply a custom function to a spark dataframe group

我有一个非常大的 table 时间序列数据,其中包含以下列:

每个 LicensePlate/UberRide 数据集的处理都应考虑到整个数据集。换句话说,我不需要逐行处理数据,而是将所有按(LicensePlate/UberRide)分组的行放在一起。

我打算将 spark 与数据框一起使用 api,但我对如何对 spark 分组数据框执行自定义计算感到困惑。

我需要做的是:

  1. 获取所有数据
  2. 按某些列分组
  3. 为每个 spark 数据帧组应用一个 f(x)。 Return 每个组的自定义对象
  4. 通过应用 g(x) 并返回单个自定义对象获得结果

第 3 步和第 4 步如何进行?关于我应该使用哪个 spark API(dataframe、dataset、rdd,也许 pandas...)的任何提示?

整个工作流程如下:

  • 虽然 Spark 提供了一些与 Pandas 集成的方法,但它并没有使 Pandas 分布式。因此,无论您在 Spark 中使用 Pandas 做什么,都只是本地操作(在转换内部使用时对驱动程序或执行程序)操作。

    如果您正在寻找类似 Pandas 的分布式系统 API,您应该看看 dask.

  • to process grouped Datasets but this part of the API is directly accessible only in Scala. It is not that hard to write a Python wrapper 当你创建一个。
  • RDD API提供了很多函数,可以用来进行以低级repartition/repartitionAndSortWithinPartitions开始,以数字结束的分组操作*byKey 方法(combineByKeygroupByKeyreduceByKey 等)。

    哪一个适用于您的情况取决于您要应用的函数的属性(它是关联的和可交换的,它可以在流上工作吗,它是否需要特定的顺序)。

    最通用但效率低下的方法可以总结如下:

    h(rdd.keyBy(f).groupByKey().mapValues(g).collect())
    

    其中 f 从值映射到 keyg 对应于每组聚合,而 h 是最终合并。大多数时候你可以做得更好,所以它应该只作为最后的手段使用。

  • 相对复杂的逻辑可以用DataFrames/Spark SQL和window functions.

  • 来表达
  • 另见 Applying UDFs on GroupedData in PySpark (with functioning python example)

自 Spark 2.3 以来就存在您正在寻找的内容:Pandas 矢量化 UDF。它允许对 DataFrame 进行分组并使用 pandas 应用自定义转换,分布在每个组上:

df.groupBy("groupColumn").apply(myCustomPandasTransformation)

它很容易使用所以我就放 a link to Databricks' presentation of pandas UDF.

但是,我还不知道在 Scala 中进行分组转换的实用方法,因此欢迎任何其他建议。

编辑:在 Scala 中,您可以使用 Dataset 的 groupByKey + mapGroups/flatMapGroups.

实现与早期版本的 Spark 相同的功能