如何根据另一列的值填充 Spark DataFrame 列?
How to populate a Spark DataFrame column based on another column's value?
我有一个用例,我需要 select 包含至少 30 列和数百万行的 dataframe
中的某些列。
我正在使用 scala
和 apache-spark
从 cassandra
table 加载此数据。
我select编辑了所需的列使用:df.select("col1","col2","col3","col4")
现在我必须执行一个基本的 groupBy
操作来根据 src_ip
、src_port
、dst_ip
、dst_port
对数据进行分组,我也想要从原始 dataframe
的 received_time
列中获取最新值。
我想要一个 dataframe
和 distinct
src_ip
值及其 count
和最新的 received_time
在新列中作为 last_seen
。
我知道如何使用.withColumn
,而且我认为.map()
可以用在这里。
由于我在这个领域相对较新,我真的不知道如何进一步进行。我真的需要你的帮助才能完成这项任务。
假设你有一个数据帧 df src_ip,src_port,dst_ip,dst_port and received_time
,你可以尝试:
val mydf = df.groupBy(col("src_ip"),col("src_port"),col("dst_ip"),col("dst_port")).agg(count("received_time").as("row_count"),max(col("received_time")).as("max_received_time"))
以上行计算了针对按列分组收到的时间戳的计数以及该分组按列的最大时间戳。
我有一个用例,我需要 select 包含至少 30 列和数百万行的 dataframe
中的某些列。
我正在使用 scala
和 apache-spark
从 cassandra
table 加载此数据。
我select编辑了所需的列使用:df.select("col1","col2","col3","col4")
现在我必须执行一个基本的 groupBy
操作来根据 src_ip
、src_port
、dst_ip
、dst_port
对数据进行分组,我也想要从原始 dataframe
的 received_time
列中获取最新值。
我想要一个 dataframe
和 distinct
src_ip
值及其 count
和最新的 received_time
在新列中作为 last_seen
。
我知道如何使用.withColumn
,而且我认为.map()
可以用在这里。
由于我在这个领域相对较新,我真的不知道如何进一步进行。我真的需要你的帮助才能完成这项任务。
假设你有一个数据帧 df src_ip,src_port,dst_ip,dst_port and received_time
,你可以尝试:
val mydf = df.groupBy(col("src_ip"),col("src_port"),col("dst_ip"),col("dst_port")).agg(count("received_time").as("row_count"),max(col("received_time")).as("max_received_time"))
以上行计算了针对按列分组收到的时间戳的计数以及该分组按列的最大时间戳。