如何在 Scala 中附加每个更改的最后一条记录
How to append the last record of each change in Scala
我是 Scala 的新手,目前我正在做的是从一个大数据集中过滤数据并将它们打印为 csv。所以我以这种格式打印的 csv:
id time status
___ _____ _________
1 2016-10-09 00:09:10 100
1 2016-10-09 00:09:30 100
1 2016-10-09 00:09:50 100
1 2016-10-09 00:10:10 900
2 2016-10-09 00:09:18 100
2 2016-10-09 00:09:20 100
2 2016-10-09 00:10:24 900
3 2016-10-09 00:09:30 100
3 2016-10-09 00:09:33 100
3 2016-10-09 00:09:36 100
3 2016-10-09 00:09:39 100
3 2016-10-09 00:09:51 900
我正在使用以下代码打印数据:
var count=0;
val StatusList = ListBuffer[String]();
for (currentRow <- sortedRow) {
if (currentRow.status==100){
StatusList.+=(currentRow.id+","+currentRow.time+","+currentRow.status)
}
if((count+1) < sortedRow.size && sortedRow(count+1).status==900) {
StatusList.+=(sortedRow(count+1).id+","+sortedRow(count+1).time+","+sortedRow(count+1).status)
}
count+=1;
}
相反,我想打印状态为 100 的行并在它们更改时附加记录。基本上我想按如下方式打印数据:
id time status id change_time status
___ _____ _________ __ ______________ _______
1 2016-10-09 00:09:10 100 1 2016-10-09 00:10:10 900
1 2016-10-09 00:09:30 100 1 2016-10-09 00:10:10 900
1 2016-10-09 00:09:50 100 1 2016-10-09 00:10:10 900
2 2016-10-09 00:09:18 100 2 2016-10-09 00:10:24 900
2 2016-10-09 00:09:20 100 2 2016-10-09 00:10:24 900
3 2016-10-09 00:09:30 100 3 2016-10-09 00:09:51 900
3 2016-10-09 00:09:33 100 3 2016-10-09 00:09:51 900
3 2016-10-09 00:09:36 100 3 2016-10-09 00:09:51 900
3 2016-10-09 00:09:39 100 3 2016-10-09 00:09:51 900
我建议您使用 dataframes
解决方案,这是针对 RDD
所做的优化和改进工作。
我假设数据格式如下 header 行
id,time,status
1,2016-10-0900:09:10,100
1,2016-10-0900:09:30,100
1,2016-10-0900:09:50,100
1,2016-10-0900:10:10,900
第一步是使用 sqlContext
将文件读入 dataframe
val sqlContext = sparkSession.sqlContext
val dataframe = sqlContext.read.format("csv").option("header", "true").load("absolute path to the input file")
你应该 dataframe
作为
+---+------------------+------+
|id |time |status|
+---+------------------+------+
|1 |2016-10-0900:09:10|100 |
|1 |2016-10-0900:09:30|100 |
|1 |2016-10-0900:09:50|100 |
|1 |2016-10-0900:10:10|900 |
|2 |2016-10-0900:09:18|100 |
|2 |2016-10-0900:09:20|100 |
|2 |2016-10-0900:10:24|900 |
|3 |2016-10-0900:09:30|100 |
|3 |2016-10-0900:09:33|100 |
|3 |2016-10-0900:09:36|100 |
|3 |2016-10-0900:09:39|100 |
|3 |2016-10-0900:09:51|900 |
+---+------------------+------+
下一步是将 dataframe
过滤为两个 status
差异
val df1 = dataframe.filter(dataframe("status") === "100")
输出为
+---+------------------+------+
|id |time |status|
+---+------------------+------+
|1 |2016-10-0900:09:10|100 |
|1 |2016-10-0900:09:30|100 |
|1 |2016-10-0900:09:50|100 |
|2 |2016-10-0900:09:18|100 |
|2 |2016-10-0900:09:20|100 |
|3 |2016-10-0900:09:30|100 |
|3 |2016-10-0900:09:33|100 |
|3 |2016-10-0900:09:36|100 |
|3 |2016-10-0900:09:39|100 |
+---+------------------+------+
900
的状态与 df2
相同,但 column
名称已重命名
val df2 = dataframe.filter(dataframe("status") === "900")
.withColumnRenamed("id", "id2")
.withColumnRenamed("time", "changed_time")
.withColumnRenamed("status", "status2")
输出应该是
+---+------------------+-------+
|id2|changed_time |status2|
+---+------------------+-------+
|1 |2016-10-0900:10:10|900 |
|2 |2016-10-0900:10:24|900 |
|3 |2016-10-0900:09:51|900 |
+---+------------------+-------+
最后一步是 join
那两个 dataframes
val finalDF = df1.join(df2, df1("id") === df2("id2"), "left")
最终输出为
+---+------------------+------+---+------------------+-------+
|id |time |status|id2|changed_time |status2|
+---+------------------+------+---+------------------+-------+
|1 |2016-10-0900:09:10|100 |1 |2016-10-0900:10:10|900 |
|1 |2016-10-0900:09:30|100 |1 |2016-10-0900:10:10|900 |
|1 |2016-10-0900:09:50|100 |1 |2016-10-0900:10:10|900 |
|2 |2016-10-0900:09:18|100 |2 |2016-10-0900:10:24|900 |
|2 |2016-10-0900:09:20|100 |2 |2016-10-0900:10:24|900 |
|3 |2016-10-0900:09:30|100 |3 |2016-10-0900:09:51|900 |
|3 |2016-10-0900:09:33|100 |3 |2016-10-0900:09:51|900 |
|3 |2016-10-0900:09:36|100 |3 |2016-10-0900:09:51|900 |
|3 |2016-10-0900:09:39|100 |3 |2016-10-0900:09:51|900 |
+---+------------------+------+---+------------------+-------+
将最终的 dataframe
保存到 csv
文件也很容易
finalDF.write.format("csv").save("absolute path to output filename ")
我是 Scala 的新手,目前我正在做的是从一个大数据集中过滤数据并将它们打印为 csv。所以我以这种格式打印的 csv:
id time status
___ _____ _________
1 2016-10-09 00:09:10 100
1 2016-10-09 00:09:30 100
1 2016-10-09 00:09:50 100
1 2016-10-09 00:10:10 900
2 2016-10-09 00:09:18 100
2 2016-10-09 00:09:20 100
2 2016-10-09 00:10:24 900
3 2016-10-09 00:09:30 100
3 2016-10-09 00:09:33 100
3 2016-10-09 00:09:36 100
3 2016-10-09 00:09:39 100
3 2016-10-09 00:09:51 900
我正在使用以下代码打印数据:
var count=0;
val StatusList = ListBuffer[String]();
for (currentRow <- sortedRow) {
if (currentRow.status==100){
StatusList.+=(currentRow.id+","+currentRow.time+","+currentRow.status)
}
if((count+1) < sortedRow.size && sortedRow(count+1).status==900) {
StatusList.+=(sortedRow(count+1).id+","+sortedRow(count+1).time+","+sortedRow(count+1).status)
}
count+=1;
}
相反,我想打印状态为 100 的行并在它们更改时附加记录。基本上我想按如下方式打印数据:
id time status id change_time status
___ _____ _________ __ ______________ _______
1 2016-10-09 00:09:10 100 1 2016-10-09 00:10:10 900
1 2016-10-09 00:09:30 100 1 2016-10-09 00:10:10 900
1 2016-10-09 00:09:50 100 1 2016-10-09 00:10:10 900
2 2016-10-09 00:09:18 100 2 2016-10-09 00:10:24 900
2 2016-10-09 00:09:20 100 2 2016-10-09 00:10:24 900
3 2016-10-09 00:09:30 100 3 2016-10-09 00:09:51 900
3 2016-10-09 00:09:33 100 3 2016-10-09 00:09:51 900
3 2016-10-09 00:09:36 100 3 2016-10-09 00:09:51 900
3 2016-10-09 00:09:39 100 3 2016-10-09 00:09:51 900
我建议您使用 dataframes
解决方案,这是针对 RDD
所做的优化和改进工作。
我假设数据格式如下 header 行
id,time,status
1,2016-10-0900:09:10,100
1,2016-10-0900:09:30,100
1,2016-10-0900:09:50,100
1,2016-10-0900:10:10,900
第一步是使用 sqlContext
dataframe
val sqlContext = sparkSession.sqlContext
val dataframe = sqlContext.read.format("csv").option("header", "true").load("absolute path to the input file")
你应该 dataframe
作为
+---+------------------+------+
|id |time |status|
+---+------------------+------+
|1 |2016-10-0900:09:10|100 |
|1 |2016-10-0900:09:30|100 |
|1 |2016-10-0900:09:50|100 |
|1 |2016-10-0900:10:10|900 |
|2 |2016-10-0900:09:18|100 |
|2 |2016-10-0900:09:20|100 |
|2 |2016-10-0900:10:24|900 |
|3 |2016-10-0900:09:30|100 |
|3 |2016-10-0900:09:33|100 |
|3 |2016-10-0900:09:36|100 |
|3 |2016-10-0900:09:39|100 |
|3 |2016-10-0900:09:51|900 |
+---+------------------+------+
下一步是将 dataframe
过滤为两个 status
差异
val df1 = dataframe.filter(dataframe("status") === "100")
输出为
+---+------------------+------+
|id |time |status|
+---+------------------+------+
|1 |2016-10-0900:09:10|100 |
|1 |2016-10-0900:09:30|100 |
|1 |2016-10-0900:09:50|100 |
|2 |2016-10-0900:09:18|100 |
|2 |2016-10-0900:09:20|100 |
|3 |2016-10-0900:09:30|100 |
|3 |2016-10-0900:09:33|100 |
|3 |2016-10-0900:09:36|100 |
|3 |2016-10-0900:09:39|100 |
+---+------------------+------+
900
的状态与 df2
相同,但 column
名称已重命名
val df2 = dataframe.filter(dataframe("status") === "900")
.withColumnRenamed("id", "id2")
.withColumnRenamed("time", "changed_time")
.withColumnRenamed("status", "status2")
输出应该是
+---+------------------+-------+
|id2|changed_time |status2|
+---+------------------+-------+
|1 |2016-10-0900:10:10|900 |
|2 |2016-10-0900:10:24|900 |
|3 |2016-10-0900:09:51|900 |
+---+------------------+-------+
最后一步是 join
那两个 dataframes
val finalDF = df1.join(df2, df1("id") === df2("id2"), "left")
最终输出为
+---+------------------+------+---+------------------+-------+
|id |time |status|id2|changed_time |status2|
+---+------------------+------+---+------------------+-------+
|1 |2016-10-0900:09:10|100 |1 |2016-10-0900:10:10|900 |
|1 |2016-10-0900:09:30|100 |1 |2016-10-0900:10:10|900 |
|1 |2016-10-0900:09:50|100 |1 |2016-10-0900:10:10|900 |
|2 |2016-10-0900:09:18|100 |2 |2016-10-0900:10:24|900 |
|2 |2016-10-0900:09:20|100 |2 |2016-10-0900:10:24|900 |
|3 |2016-10-0900:09:30|100 |3 |2016-10-0900:09:51|900 |
|3 |2016-10-0900:09:33|100 |3 |2016-10-0900:09:51|900 |
|3 |2016-10-0900:09:36|100 |3 |2016-10-0900:09:51|900 |
|3 |2016-10-0900:09:39|100 |3 |2016-10-0900:09:51|900 |
+---+------------------+------+---+------------------+-------+
将最终的 dataframe
保存到 csv
文件也很容易
finalDF.write.format("csv").save("absolute path to output filename ")