如何在 Scala 中附加每个更改的最后一条记录

How to append the last record of each change in Scala

我是 Scala 的新手,目前我正在做的是从一个大数据集中过滤数据并将它们打印为 csv。所以我以这种格式打印的 csv:

id         time                              status
___        _____                            _________
1        2016-10-09 00:09:10                    100
1        2016-10-09 00:09:30                    100
1        2016-10-09 00:09:50                    100
1        2016-10-09 00:10:10                    900
2        2016-10-09 00:09:18                    100
2        2016-10-09 00:09:20                    100
2        2016-10-09 00:10:24                    900
3        2016-10-09 00:09:30                    100
3        2016-10-09 00:09:33                    100
3        2016-10-09 00:09:36                    100
3        2016-10-09 00:09:39                    100
3        2016-10-09 00:09:51                    900

我正在使用以下代码打印数据:

      var count=0;

      val StatusList = ListBuffer[String]();
       for (currentRow <- sortedRow) {
              if (currentRow.status==100){
                   StatusList.+=(currentRow.id+","+currentRow.time+","+currentRow.status)
                }
              if((count+1) <  sortedRow.size && sortedRow(count+1).status==900)   {
                   StatusList.+=(sortedRow(count+1).id+","+sortedRow(count+1).time+","+sortedRow(count+1).status)
                }
     count+=1;

    }

相反,我想打印状态为 100 的行并在它们更改时附加记录。基本上我想按如下方式打印数据:

 id       time                status    id     change_time         status
___      _____               _________  __    ______________       _______
1    2016-10-09 00:09:10      100       1      2016-10-09 00:10:10    900
1    2016-10-09 00:09:30      100       1      2016-10-09 00:10:10    900
1    2016-10-09 00:09:50      100       1      2016-10-09 00:10:10    900
2    2016-10-09 00:09:18      100       2      2016-10-09 00:10:24    900
2    2016-10-09 00:09:20      100       2      2016-10-09 00:10:24    900
3    2016-10-09 00:09:30      100       3      2016-10-09 00:09:51    900
3    2016-10-09 00:09:33      100       3      2016-10-09 00:09:51    900
3    2016-10-09 00:09:36      100       3      2016-10-09 00:09:51    900
3    2016-10-09 00:09:39      100       3      2016-10-09 00:09:51    900

我建议您使用 dataframes 解决方案,这是针对 RDD 所做的优化和改进工作。

我假设数据格式如下 header 行

id,time,status
1,2016-10-0900:09:10,100
1,2016-10-0900:09:30,100
1,2016-10-0900:09:50,100
1,2016-10-0900:10:10,900

第一步是使用 sqlContext

将文件读入 dataframe
 val sqlContext = sparkSession.sqlContext
 val dataframe = sqlContext.read.format("csv").option("header", "true").load("absolute path to the input file")

你应该 dataframe 作为

+---+------------------+------+
|id |time              |status|
+---+------------------+------+
|1  |2016-10-0900:09:10|100   |
|1  |2016-10-0900:09:30|100   |
|1  |2016-10-0900:09:50|100   |
|1  |2016-10-0900:10:10|900   |
|2  |2016-10-0900:09:18|100   |
|2  |2016-10-0900:09:20|100   |
|2  |2016-10-0900:10:24|900   |
|3  |2016-10-0900:09:30|100   |
|3  |2016-10-0900:09:33|100   |
|3  |2016-10-0900:09:36|100   |
|3  |2016-10-0900:09:39|100   |
|3  |2016-10-0900:09:51|900   |
+---+------------------+------+

下一步是将 dataframe 过滤为两个 status 差异

val df1 = dataframe.filter(dataframe("status") === "100")

输出为

+---+------------------+------+
|id |time              |status|
+---+------------------+------+
|1  |2016-10-0900:09:10|100   |
|1  |2016-10-0900:09:30|100   |
|1  |2016-10-0900:09:50|100   |
|2  |2016-10-0900:09:18|100   |
|2  |2016-10-0900:09:20|100   |
|3  |2016-10-0900:09:30|100   |
|3  |2016-10-0900:09:33|100   |
|3  |2016-10-0900:09:36|100   |
|3  |2016-10-0900:09:39|100   |
+---+------------------+------+

900 的状态与 df2 相同,但 column 名称已重命名

val df2 = dataframe.filter(dataframe("status") === "900")
  .withColumnRenamed("id", "id2")
  .withColumnRenamed("time", "changed_time")
  .withColumnRenamed("status", "status2")

输出应该是

+---+------------------+-------+
|id2|changed_time      |status2|
+---+------------------+-------+
|1  |2016-10-0900:10:10|900    |
|2  |2016-10-0900:10:24|900    |
|3  |2016-10-0900:09:51|900    |
+---+------------------+-------+

最后一步是 join 那两个 dataframes

val finalDF = df1.join(df2, df1("id") === df2("id2"), "left")

最终输出为

+---+------------------+------+---+------------------+-------+
|id |time              |status|id2|changed_time      |status2|
+---+------------------+------+---+------------------+-------+
|1  |2016-10-0900:09:10|100   |1  |2016-10-0900:10:10|900    |
|1  |2016-10-0900:09:30|100   |1  |2016-10-0900:10:10|900    |
|1  |2016-10-0900:09:50|100   |1  |2016-10-0900:10:10|900    |
|2  |2016-10-0900:09:18|100   |2  |2016-10-0900:10:24|900    |
|2  |2016-10-0900:09:20|100   |2  |2016-10-0900:10:24|900    |
|3  |2016-10-0900:09:30|100   |3  |2016-10-0900:09:51|900    |
|3  |2016-10-0900:09:33|100   |3  |2016-10-0900:09:51|900    |
|3  |2016-10-0900:09:36|100   |3  |2016-10-0900:09:51|900    |
|3  |2016-10-0900:09:39|100   |3  |2016-10-0900:09:51|900    |
+---+------------------+------+---+------------------+-------+

将最终的 dataframe 保存到 csv 文件也很容易

finalDF.write.format("csv").save("absolute path to output filename ")