如何根据另一列的值填充 Spark DataFrame 列?

How to populate a Spark DataFrame column based on another column's value?

我有一个用例,我需要 select 包含至少 30 列和数百万行的 dataframe 中的某些列。

我正在使用 scalaapache-sparkcassandra table 加载此数据。

我select编辑了所需的列使用:df.select("col1","col2","col3","col4")

现在我必须执行一个基本的 groupBy 操作来根据 src_ipsrc_portdst_ipdst_port 对数据进行分组,我也想要从原始 dataframereceived_time 列中获取最新值。

我想要一个 dataframedistinct src_ip 值及其 count 和最新的 received_time 在新列中作为 last_seen

我知道如何使用.withColumn,而且我认为.map()可以用在这里。 由于我在这个领域相对较新,我真的不知道如何进一步进行。我真的需要你的帮助才能完成这项任务。

假设你有一个数据帧 df src_ip,src_port,dst_ip,dst_port and received_time,你可以尝试:

val mydf = df.groupBy(col("src_ip"),col("src_port"),col("dst_ip"),col("dst_port")).agg(count("received_time").as("row_count"),max(col("received_time")).as("max_received_time"))

以上行计算了针对按列分组收到的时间戳的计数以及该分组按列的最大时间戳。