SCALA:如何使用收集功能从数据框中获取最新修改的条目?

SCALA: How to use collect function to get the latest modified entry from a dataframe?

我有一个包含两列的 scala 数据框

从这个数据框中我只想得到最新的日期,我现在使用以下代码:

df.agg(max("updated")).head()
// returns a row

I've just read about the collect() function, which I'm told to be safer to use for such a problem - when it runs as a job, it appears it is not aggregating the max on the whole dataset, it looks perfectly fine when it is running in a notebook -, but I don't understand how it should be used.

我找到了如下所示的实现,但我不知道应该如何使用它...

df1.agg({"x": "max"}).collect()[0]

我试过如下:

df.agg(max("updated")).collect()(0)

没有 (0) 它 returns 一个数组,实际上看起来不错。所以想法是,我们应该对驱动器中加载的整个数据集应用聚合,而不仅仅是分区版本,否则似乎无法检索所有时间戳。我现在的问题是,在这种情况下 collect() 实际上应该如何工作?

提前致谢!

听起来很奇怪。首先,您不需要收集数据帧来获取已排序数据帧的最后一个元素。这个话题有很多答案:

我假设您在谈论 spark 数据框(不是 scala)。 如果您只想要最新日期(仅该列),您可以这样做:

df.select(max("updated"))

您可以使用 df.show() 查看数据框内的内容。由于 df 是不可变的,因此您需要将 select 的结果分配给另一个变量或在 select() 之后添加显示。 这将 return 一个数据框只有一行,最大值在 "updated" 列中。 回答你的问题:

So idea is, we should apply the aggregation on the whole dataset loaded in the drive, not just the partitioned version, otherwise it seems to not retrieve all the timestamp

当你在一个dataframe上select时,spark会select从整个dataset中获取数据,没有partitioned version和driver version。 Spark 会将您的数据分片到您的集群中,您定义的所有操作都将在整个数据集上完成。

My question now is, how is collect() actually supposed to work in such a situation?

收集操作正在将 spark 数据帧转换为数组(未分发)并且该数组将位于驱动程序节点中,请记住,如果您的数据帧大小超过驱动程序中的可用内存,您将有一个 outOfMemoryError。

在这种情况下,如果您这样做:

df.select(max("Timestamp")).collect().head

您的 DF(仅包含一行和一列是您的日期)将被转换为 scala 数组。在这种情况下是安全的,因为 select(max()) 将 return 只有一行。

花点时间阅读更多关于 spark dataframe/rdd 以及转换和动作之间的区别。