scala rdd flatmap 从一行生成多行以填充行问题
scala rdd flatmap to generate multiple row from one row to en-fill gap of rows issue
我正在尝试解决一个问题,假设一个人从另一个人那里借了钱,然后我们进行了无息分期还款的所有交易。在这里,我想用与以前的金额相同的金额填充未支付的行
输入
name,date_of_borrow/return,Amount-Principal
Ashish,2018-03-01,20000
Ashish,2018-04-01,19000
Ashish,2018-05-01,18000
Ashish,2018-06-01,17000
Ashish,2018-07-01,16000
Ashish,2018-08-01,15000
Ashish,2018-12-01,14000
Ashish,2019-02-01,13000
预期输出
name,date_of_borrow/return,Amount-principal
Ashish,2018-03-01,20000
Ashish,2018-04-01,19000
Ashish,2018-05-01,18000
Ashish,2018-06-01,17000
Ashish,2018-07-01,16000
Ashish,2018-08-01,15000
****Ashish,2018-09-01,15000** --- copy previous amount as installment not paid
**Ashish,2018-10-01,15000**
**Ashish,2018-11-01,15000****
Ashish,2018-12-01,14000
**Ashish,2018-01-01,14000**
Ashish,2019-02-01,13000
我想用Scala RDD写
val tr = spark.sparkContext.textFile("/tmp/data.txt")
val tr.map(x=>x.split(',')).map(x=>(x(0),(x(1),x(2)))).collect()
val sm= tr.map(x=>(x.split(',')(0),(x))).groupByKey().flatMap(rec=>{rec._2.toList.sortBy(x=>(-x.split(",")(2).toFloat)).zipWithIndex})
val part1 = sm.map(x=>((x._1.split(',')(0),x._2.toInt),(x._1.split(',')(1),x._1.split(',')(2))))
val part2 = sm.map(x=>((x._1.split(',')(0),x._2.toInt-1),(x._1.split(',')(1),x._1.split(',')(2))))
val data = part1.leftOuterJoin(part2).sortByKey()
///读取数据并根据索引与下一行连接名称
val oo = data.map(x=>(x._1._1,x._2._1._1,x._2._1._2,x._2._2.getOrElse(x._2._1._1,0)))
val rr = oo.map(x=>(x._1,x._2,x._3,x._4._1)) or val oo = data.map(x=>(x._1._1,x._2._1._1,x._2._1._2,x._2._2.getOrElse(x._2._1._1,0)._1))
///映射得到最终数据为
scala> oo.filter(x=>x._1=="Ashish").collect().foreach(println)
(Ashish,2018-03-01,20000,2018-04-01)
(Ashish,2018-04-01,19000,2018-05-01)
(Ashish,2018-05-01,18000,2018-06-01)
(Ashish,2018-06-01,17000,2018-07-01)
(Ashish,2018-07-01,16000,2018-08-01)
(Ashish,2018-08-01,15000,2018-12-01)
(Ashish,2018-12-01,14000,2019-02-01)
(Ashish,2019-02-01,13000,2019-02-01)
现在剩下的任务是查找日期差异并生成 flatMap 行
val format = new java.text.SimpleDateFormat("yyyy-MM-dd")
format.format(new java.util.Date()) --test date
def generateDates(startdate: Date, enddate: Date): ListBuffer[String] ={
var dateList = new ListBuffer[String]()
var calendar = new GregorianCalendar()
calendar.setTime(startdate)
while (calendar.getTime().before(enddate)) {
dateList += (calendar.get(Calendar.YEAR)) + "-" + (calendar.get(Calendar.MONTH)+1) + "-" + (calendar.get(Calendar.DAY_OF_MONTH))
calendar.add(Calendar.MONTH, 1)
}
if (dateList.isEmpty) {
dateList+= (calendar.get(Calendar.YEAR)) + "-" + (calendar.get(Calendar.MONTH)+1) + "-" + (calendar.get(Calendar.DAY_OF_MONTH))}
println("\n" + dateList + "\n")
dateList
}
这是错误的地方,我在理解或解决它时遇到了困难。我为每个不应该出现的最后日期得到额外的一行
scala> oo.filter(x=>x._1=="Ashish").flatMap(pp=> {
| var allDates = new ListBuffer[(String,String,Integer)]()
| for (x <- generateDates(format.parse(pp._2),format.parse(pp._4))) {
| allDates += ((pp._1, x , pp._3.toInt))}
| allDates
| }).collect().foreach(println)
(Ashish,Thu Mar 01,1,2,20000)
(Ashish,Sun Apr 01 00:00:00 IST 2018,20000) --- unwanted row and I dont know why wrong date format
(Ashish,Sun Apr 01,1,3,19000)
(Ashish,Tue May 01 00:00:00 IST 2018,19000) --- unwanted row
(Ashish,Tue May 01,1,4,18000)
(Ashish,Fri Jun 01 00:00:00 IST 2018,18000)
(Ashish,Fri Jun 01,1,5,17000)
(Ashish,Sun Jul 01 00:00:00 IST 2018,17000)
(Ashish,Sun Jul 01,1,6,16000)
(Ashish,Wed Aug 01 00:00:00 IST 2018,16000)
(Ashish,Wed Aug 01,1,7,15000)
(Ashish,Sat Sep 01,1,8,15000)
(Ashish,Mon Oct 01,1,9,15000)
(Ashish,Thu Nov 01,1,10,15000)
(Ashish,Sat Dec 01 00:00:00 IST 2018,15000)
(Ashish,Sat Dec 01,1,11,14000)
(Ashish,Tue Jan 01,1,0,14000)
(Ashish,Fri Feb 01 00:00:00 IST 2019,14000)
(Ashish,Fri Feb 01 00:00:00 IST 2019,13000)
我完全同意这可能是一种糟糕的代码编写方式,但请有人帮助理解它。因为我需要知道我哪里出错了,而且我需要知道最好的方法
我可以看到 datefunction 工作正常。
scala> for(x<-generateDates(format.parse("2018-01-01"),format.parse("2018-11-01")))
| {
| println("\n" + x + "\n")
| }
ListBuffer(2018-1-1, 2018-2-1, 2018-3-1, 2018-4-1, 2018-5-1, 2018-6-1, 2018-7-1, 2018-8-1, 2018-9-1, 2018-10-1)
2018-1-1
2018-2-1
2018-3-1
2018-4-1
2018-5-1
2018-6-1
2018-7-1
2018-8-1
2018-9-1
2018-10-1
我仍在努力寻找这段代码不起作用的原因,但我用不同的方式对其进行了编码,这给了我正确的结果
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import scala.collection.mutable.ListBuffer
import java.util.{GregorianCalendar, Date}
import java.util.Calendar
val ipt = spark.read.format("com.databricks.spark.csv").option("header","true").option("inferchema","true").load("/tmp/data.csv")
val sm = ipt.rdd.map(x=>(x(0).toString(),(x.toString().replace("[","").replace("]","")))).groupByKey().flatMap(rec=>{rec._2.toList.sortBy(x=>(-x(2).toFloat)).zipWithIndex})
val part1 = sm.map(x=>((x._1.split(',')(0),x._2.toInt),(x._1.split(',')(1),x._1.split(',')(2))))
val part2 = sm.map(x=>((x._1.split(',')(0),x._2.toInt-1),(x._1.split(',')(1),x._1.split(',')(2))))
val data = part1.leftOuterJoin(part2).sortByKey()
val oo = data.map(x=>(x._1._1,x._2._1._1,x._2._1._2,x._2._2.getOrElse(x._2._1._1,0)))
val oo = data.map(x=>(x._1._1,x._2._1._1,x._2._1._2,x._2._2.getOrElse(x._2._1._1,0)._1))
val format = new java.text.SimpleDateFormat("yyyy-MM-dd")
format.format(new java.util.Date()) --test date
def generateDates(startdate: Date, enddate: Date): ListBuffer[String] ={
var dateList = new ListBuffer[String]()
var calendar = new GregorianCalendar()
calendar.setTime(startdate)
while (calendar.getTime().before(enddate)) {
dateList += (calendar.get(Calendar.YEAR)) + "-" + (calendar.get(Calendar.MONTH)+1) + "-" + (calendar.get(Calendar.DAY_OF_MONTH))
calendar.add(Calendar.MONTH, 1)
}
if (dateList.isEmpty) {
dateList+= (calendar.get(Calendar.YEAR)) + "-" + (calendar.get(Calendar.MONTH)+1) + "-" + (calendar.get(Calendar.DAY_OF_MONTH))}
println("\n" + dateList + "\n")
dateList
}
oo.flatMap(pp=> {
var allDates = new ListBuffer[(String,String,Integer)]()
for (x <- generateDates(format.parse(pp._2),format.parse(pp._4))) {
allDates += ((pp._1, x , pp._3.toInt))}
allDates
}).collect().foreach(println)
(Ashish,2018-3-1,20000)
(Ashish,2018-4-1,19000)
(Ashish,2018-5-1,18000)
(Ashish,2018-6-1,17000)
(Ashish,2018-7-1,16000)
(Ashish,2018-8-1,15000)
(Ashish,2018-9-1,15000)
(Ashish,2018-10-1,15000)
(Ashish,2018-11-1,15000)
(Ashish,2018-12-1,14000)
(Ashish,2019-1-1,14000)
(Ashish,2019-2-1,13000)
我正在尝试解决一个问题,假设一个人从另一个人那里借了钱,然后我们进行了无息分期还款的所有交易。在这里,我想用与以前的金额相同的金额填充未支付的行
输入
name,date_of_borrow/return,Amount-Principal
Ashish,2018-03-01,20000
Ashish,2018-04-01,19000
Ashish,2018-05-01,18000
Ashish,2018-06-01,17000
Ashish,2018-07-01,16000
Ashish,2018-08-01,15000
Ashish,2018-12-01,14000
Ashish,2019-02-01,13000
预期输出
name,date_of_borrow/return,Amount-principal
Ashish,2018-03-01,20000
Ashish,2018-04-01,19000
Ashish,2018-05-01,18000
Ashish,2018-06-01,17000
Ashish,2018-07-01,16000
Ashish,2018-08-01,15000
****Ashish,2018-09-01,15000** --- copy previous amount as installment not paid
**Ashish,2018-10-01,15000**
**Ashish,2018-11-01,15000****
Ashish,2018-12-01,14000
**Ashish,2018-01-01,14000**
Ashish,2019-02-01,13000
我想用Scala RDD写
val tr = spark.sparkContext.textFile("/tmp/data.txt")
val tr.map(x=>x.split(',')).map(x=>(x(0),(x(1),x(2)))).collect()
val sm= tr.map(x=>(x.split(',')(0),(x))).groupByKey().flatMap(rec=>{rec._2.toList.sortBy(x=>(-x.split(",")(2).toFloat)).zipWithIndex})
val part1 = sm.map(x=>((x._1.split(',')(0),x._2.toInt),(x._1.split(',')(1),x._1.split(',')(2))))
val part2 = sm.map(x=>((x._1.split(',')(0),x._2.toInt-1),(x._1.split(',')(1),x._1.split(',')(2))))
val data = part1.leftOuterJoin(part2).sortByKey()
///读取数据并根据索引与下一行连接名称
val oo = data.map(x=>(x._1._1,x._2._1._1,x._2._1._2,x._2._2.getOrElse(x._2._1._1,0)))
val rr = oo.map(x=>(x._1,x._2,x._3,x._4._1)) or val oo = data.map(x=>(x._1._1,x._2._1._1,x._2._1._2,x._2._2.getOrElse(x._2._1._1,0)._1))
///映射得到最终数据为
scala> oo.filter(x=>x._1=="Ashish").collect().foreach(println)
(Ashish,2018-03-01,20000,2018-04-01)
(Ashish,2018-04-01,19000,2018-05-01)
(Ashish,2018-05-01,18000,2018-06-01)
(Ashish,2018-06-01,17000,2018-07-01)
(Ashish,2018-07-01,16000,2018-08-01)
(Ashish,2018-08-01,15000,2018-12-01)
(Ashish,2018-12-01,14000,2019-02-01)
(Ashish,2019-02-01,13000,2019-02-01)
现在剩下的任务是查找日期差异并生成 flatMap 行
val format = new java.text.SimpleDateFormat("yyyy-MM-dd")
format.format(new java.util.Date()) --test date
def generateDates(startdate: Date, enddate: Date): ListBuffer[String] ={
var dateList = new ListBuffer[String]()
var calendar = new GregorianCalendar()
calendar.setTime(startdate)
while (calendar.getTime().before(enddate)) {
dateList += (calendar.get(Calendar.YEAR)) + "-" + (calendar.get(Calendar.MONTH)+1) + "-" + (calendar.get(Calendar.DAY_OF_MONTH))
calendar.add(Calendar.MONTH, 1)
}
if (dateList.isEmpty) {
dateList+= (calendar.get(Calendar.YEAR)) + "-" + (calendar.get(Calendar.MONTH)+1) + "-" + (calendar.get(Calendar.DAY_OF_MONTH))}
println("\n" + dateList + "\n")
dateList
}
这是错误的地方,我在理解或解决它时遇到了困难。我为每个不应该出现的最后日期得到额外的一行
scala> oo.filter(x=>x._1=="Ashish").flatMap(pp=> {
| var allDates = new ListBuffer[(String,String,Integer)]()
| for (x <- generateDates(format.parse(pp._2),format.parse(pp._4))) {
| allDates += ((pp._1, x , pp._3.toInt))}
| allDates
| }).collect().foreach(println)
(Ashish,Thu Mar 01,1,2,20000)
(Ashish,Sun Apr 01 00:00:00 IST 2018,20000) --- unwanted row and I dont know why wrong date format
(Ashish,Sun Apr 01,1,3,19000)
(Ashish,Tue May 01 00:00:00 IST 2018,19000) --- unwanted row
(Ashish,Tue May 01,1,4,18000)
(Ashish,Fri Jun 01 00:00:00 IST 2018,18000)
(Ashish,Fri Jun 01,1,5,17000)
(Ashish,Sun Jul 01 00:00:00 IST 2018,17000)
(Ashish,Sun Jul 01,1,6,16000)
(Ashish,Wed Aug 01 00:00:00 IST 2018,16000)
(Ashish,Wed Aug 01,1,7,15000)
(Ashish,Sat Sep 01,1,8,15000)
(Ashish,Mon Oct 01,1,9,15000)
(Ashish,Thu Nov 01,1,10,15000)
(Ashish,Sat Dec 01 00:00:00 IST 2018,15000)
(Ashish,Sat Dec 01,1,11,14000)
(Ashish,Tue Jan 01,1,0,14000)
(Ashish,Fri Feb 01 00:00:00 IST 2019,14000)
(Ashish,Fri Feb 01 00:00:00 IST 2019,13000)
我完全同意这可能是一种糟糕的代码编写方式,但请有人帮助理解它。因为我需要知道我哪里出错了,而且我需要知道最好的方法
我可以看到 datefunction 工作正常。
scala> for(x<-generateDates(format.parse("2018-01-01"),format.parse("2018-11-01")))
| {
| println("\n" + x + "\n")
| }
ListBuffer(2018-1-1, 2018-2-1, 2018-3-1, 2018-4-1, 2018-5-1, 2018-6-1, 2018-7-1, 2018-8-1, 2018-9-1, 2018-10-1)
2018-1-1
2018-2-1
2018-3-1
2018-4-1
2018-5-1
2018-6-1
2018-7-1
2018-8-1
2018-9-1
2018-10-1
我仍在努力寻找这段代码不起作用的原因,但我用不同的方式对其进行了编码,这给了我正确的结果
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import scala.collection.mutable.ListBuffer
import java.util.{GregorianCalendar, Date}
import java.util.Calendar
val ipt = spark.read.format("com.databricks.spark.csv").option("header","true").option("inferchema","true").load("/tmp/data.csv")
val sm = ipt.rdd.map(x=>(x(0).toString(),(x.toString().replace("[","").replace("]","")))).groupByKey().flatMap(rec=>{rec._2.toList.sortBy(x=>(-x(2).toFloat)).zipWithIndex})
val part1 = sm.map(x=>((x._1.split(',')(0),x._2.toInt),(x._1.split(',')(1),x._1.split(',')(2))))
val part2 = sm.map(x=>((x._1.split(',')(0),x._2.toInt-1),(x._1.split(',')(1),x._1.split(',')(2))))
val data = part1.leftOuterJoin(part2).sortByKey()
val oo = data.map(x=>(x._1._1,x._2._1._1,x._2._1._2,x._2._2.getOrElse(x._2._1._1,0)))
val oo = data.map(x=>(x._1._1,x._2._1._1,x._2._1._2,x._2._2.getOrElse(x._2._1._1,0)._1))
val format = new java.text.SimpleDateFormat("yyyy-MM-dd")
format.format(new java.util.Date()) --test date
def generateDates(startdate: Date, enddate: Date): ListBuffer[String] ={
var dateList = new ListBuffer[String]()
var calendar = new GregorianCalendar()
calendar.setTime(startdate)
while (calendar.getTime().before(enddate)) {
dateList += (calendar.get(Calendar.YEAR)) + "-" + (calendar.get(Calendar.MONTH)+1) + "-" + (calendar.get(Calendar.DAY_OF_MONTH))
calendar.add(Calendar.MONTH, 1)
}
if (dateList.isEmpty) {
dateList+= (calendar.get(Calendar.YEAR)) + "-" + (calendar.get(Calendar.MONTH)+1) + "-" + (calendar.get(Calendar.DAY_OF_MONTH))}
println("\n" + dateList + "\n")
dateList
}
oo.flatMap(pp=> {
var allDates = new ListBuffer[(String,String,Integer)]()
for (x <- generateDates(format.parse(pp._2),format.parse(pp._4))) {
allDates += ((pp._1, x , pp._3.toInt))}
allDates
}).collect().foreach(println)
(Ashish,2018-3-1,20000)
(Ashish,2018-4-1,19000)
(Ashish,2018-5-1,18000)
(Ashish,2018-6-1,17000)
(Ashish,2018-7-1,16000)
(Ashish,2018-8-1,15000)
(Ashish,2018-9-1,15000)
(Ashish,2018-10-1,15000)
(Ashish,2018-11-1,15000)
(Ashish,2018-12-1,14000)
(Ashish,2019-1-1,14000)
(Ashish,2019-2-1,13000)