转换 Spark 数据集 - 按 ID 计算和合并多行
Transform Spark Datset - count and merge multiple rows by ID
经过一些数据处理后,我得到了这个数据集:
Dataset<Row> counts //ID,COUNT,DAY_OF_WEEK
现在我想将其转换为这种格式并另存为 CSV:
ID,COUNT_DoW1, ID,COUNT_DoW2, ID,COUNT_DoW3,..ID,COUNT_DoW7
我可以想到一种方法:
JavaPairRDD<Long, Map<Integer, Integer>> r = counts.toJavaRDD().mapToPair(...)
JavaPairRDD<Long, Map<Integer, Integer>> merged = r.reduceByKey(...);
它是一对 "ID" 和大小为 7 的列表。
得到JavaPairRDD后,我可以将它存储在csv中。有没有更简单的方法来进行这种转换而不将其转换为 RDD?
您可以使用 struct 函数从 cnt 和 day 构造一对,然后使用 collect_list 进行 groupby。
像这样的东西(scala 但你可以很容易地转换为 java):
df.groupBy("ID").agg(collect_list(struct("COUNT","DAY")))
现在您可以编写一个提取相关列的 UDF。因此,您只需在循环中执行 withColumn 即可简单地复制 ID (df.withColumn("id2",col("id")))
然后您创建一个 UDF,它从位置 i 提取计数元素,运行 它在所有列上,最后在日期相同。
如果您保留所需的顺序并删除不相关的列,您将获得所需的内容。
您还可以使用 pivot 命令(同样在 scala 中,但您应该能够轻松转换为 java):
df.show()
>>+---+---+---+
>>| id|cnt|day|
>>+---+---+---+
>>|333| 31| 1|
>>|333| 32| 2|
>>|333|133| 3|
>>|333| 34| 4|
>>|333| 35| 5|
>>|333| 36| 6|
>>|333| 37| 7|
>>|222| 41| 4|
>>|111| 11| 1|
>>|111| 22| 2|
>>|111| 33| 3|
>>|111| 44| 4|
>>|111| 55| 5|
>>|111| 66| 6|
>>|111| 77| 7|
>>|222| 21| 1|
>>+---+---+---+
val df2 = df.withColumn("all",struct('id, 'cnt' 'day))
val res = .groupBy("id").pivot("day").agg(first('all).as("bla")).select("1.*","2.*","3.*", "4.*", "5.*", "6.*", "7.*")
res.show()
>>+---+---+---+----+----+----+----+----+----+---+---+---+----+----+----+----+----+----+----+----+----+
>>| id|cnt|day| id| cnt| day| id| cnt| day| id|cnt|day| id| cnt| day| id| cnt| day| id| cnt| day|
>>+---+---+---+----+----+----+----+----+----+---+---+---+----+----+----+----+----+----+----+----+----+
>>|333| 31| 1| 333| 32| 2| 333| 133| 3|333| 34| 4| 333| 35| 5| 333| 36| 6| 333| 37| 7|
>>|222| 21| 1|null|null|null|null|null|null|222| 41| 4|null|null|null|null|null|null|null|null|null|
>>|111| 11| 1| 111| 22| 2| 111| 33| 3|111| 44| 4| 111| 55| 5| 111| 66| 6| 111| 77| 7|
>>+---+---+---+----+----+----+----+----+----+---+---+---+----+----+----+----+----+----+----+----+----+
经过一些数据处理后,我得到了这个数据集:
Dataset<Row> counts //ID,COUNT,DAY_OF_WEEK
现在我想将其转换为这种格式并另存为 CSV:
ID,COUNT_DoW1, ID,COUNT_DoW2, ID,COUNT_DoW3,..ID,COUNT_DoW7
我可以想到一种方法:
JavaPairRDD<Long, Map<Integer, Integer>> r = counts.toJavaRDD().mapToPair(...)
JavaPairRDD<Long, Map<Integer, Integer>> merged = r.reduceByKey(...);
它是一对 "ID" 和大小为 7 的列表。 得到JavaPairRDD后,我可以将它存储在csv中。有没有更简单的方法来进行这种转换而不将其转换为 RDD?
您可以使用 struct 函数从 cnt 和 day 构造一对,然后使用 collect_list 进行 groupby。 像这样的东西(scala 但你可以很容易地转换为 java):
df.groupBy("ID").agg(collect_list(struct("COUNT","DAY")))
现在您可以编写一个提取相关列的 UDF。因此,您只需在循环中执行 withColumn 即可简单地复制 ID (df.withColumn("id2",col("id")))
然后您创建一个 UDF,它从位置 i 提取计数元素,运行 它在所有列上,最后在日期相同。
如果您保留所需的顺序并删除不相关的列,您将获得所需的内容。
您还可以使用 pivot 命令(同样在 scala 中,但您应该能够轻松转换为 java):
df.show()
>>+---+---+---+
>>| id|cnt|day|
>>+---+---+---+
>>|333| 31| 1|
>>|333| 32| 2|
>>|333|133| 3|
>>|333| 34| 4|
>>|333| 35| 5|
>>|333| 36| 6|
>>|333| 37| 7|
>>|222| 41| 4|
>>|111| 11| 1|
>>|111| 22| 2|
>>|111| 33| 3|
>>|111| 44| 4|
>>|111| 55| 5|
>>|111| 66| 6|
>>|111| 77| 7|
>>|222| 21| 1|
>>+---+---+---+
val df2 = df.withColumn("all",struct('id, 'cnt' 'day))
val res = .groupBy("id").pivot("day").agg(first('all).as("bla")).select("1.*","2.*","3.*", "4.*", "5.*", "6.*", "7.*")
res.show()
>>+---+---+---+----+----+----+----+----+----+---+---+---+----+----+----+----+----+----+----+----+----+
>>| id|cnt|day| id| cnt| day| id| cnt| day| id|cnt|day| id| cnt| day| id| cnt| day| id| cnt| day|
>>+---+---+---+----+----+----+----+----+----+---+---+---+----+----+----+----+----+----+----+----+----+
>>|333| 31| 1| 333| 32| 2| 333| 133| 3|333| 34| 4| 333| 35| 5| 333| 36| 6| 333| 37| 7|
>>|222| 21| 1|null|null|null|null|null|null|222| 41| 4|null|null|null|null|null|null|null|null|null|
>>|111| 11| 1| 111| 22| 2| 111| 33| 3|111| 44| 4| 111| 55| 5| 111| 66| 6| 111| 77| 7|
>>+---+---+---+----+----+----+----+----+----+---+---+---+----+----+----+----+----+----+----+----+----+