星火:按ID分组RDD
Spark: Group RDD by id
我有 2 个 RDD。在 Spark scala 中,如果 event1001RDD 和 event2009RDD 具有相同的 id,我该如何加入它们?
val event1001RDD: schemaRDD = [eventtype,id,location,date1]
[1001,4929102,LOC01,2015-01-20 10:44:39]
[1001,4929103,LOC02,2015-01-20 10:44:39]
[1001,4929104,LOC03,2015-01-20 10:44:39]
val event2009RDD: schemaRDD = [eventtype,id,date1,date2]
[2009,4929101,2015-01-20 20:44:39,2015-01-20 20:44:39]
[2009,4929102,2015-01-20 15:44:39,2015-01-20 21:44:39]
[2009,4929103,2015-01-20 14:44:39,2015-01-20 14:44:39]
[2009,4929105,2015-01-20 20:44:39,2015-01-20 20:44:39]
预期结果将是:(唯一)(按 ID 排序)
[eventtype,id,1001的位置,1001的日期1,2009的日期1,2009的日期2]
2009,4929101,NULL,NULL,2015-01-20 20:44:39,2015-01-20 20:44:39
1001,4929102,LOC01,2015-01-20 10:44:39,2015-01-20 15:44:39,2015-01-20 21:44:39
1001,4929103,LOC02,2015-01-20 10:44:39,2015-01-20 14:44:39,2015-01-20 14:44:39
1001,4929104,LOC03,2015-01-20 10:44:39,NULL,NULL
2009,4929105,NULL,NULL,2015-01-20 20:44:39,2015-01-20 20:44:39
请注意,对于 id 4929102,1001 用作事件类型。只有在 1001 中没有任何匹配 ID 时才会使用 2009 事件类型。
它可以是 RDD[String] - flat。或通过 aggregateByKey 的 RDD 元组。我只需要遍历 RDD。
这是Full outer join的情况。给你...
d1=[[1001,4929102,"LOC01","2015-01-20 10:44:39"],[1001,4929103,"LOC02","2015-01-20 10:44:39"],[1001,4929104,"LOC03","2015-01-20 10:44:39"]]
d2=[[2009,4929101,"2015-01-20 20:44:39","2015-01-20 20:44:39"],[2009,4929102,"2015-01-20 15:44:39","2015-01-20 21:44:39"],
[2009,4929103,"2015-01-20 14:44:39","2015-01-20 14:44:39"],[2009,4929105,"2015-01-20 20:44:39","2015-01-20 20:44:39"]]
d1RDD = sc.parallelize(d1).map(lambda t: Row(d1_eventtype=t[0],d1_id=t[1],d1_location=t[2],d1_date1=t[3]))
d2RDD = sc.parallelize(d2).map(lambda t: Row(d2_eventtype=t[0],d2_id=t[1],d2_date1=t[2],d2_date2=t[3]))
d1DF = ssc.createDataFrame(d1RDD)
d2DF = ssc.createDataFrame(d2RDD)
print d1DF.printSchema()
print d2DF.printSchema()
d1DF.show()
d2DF.show()
d1DF.registerTempTable("d1")
d2DF.registerTempTable("d2")
res = ssc.sql("select case when d1.d1_eventtype is not null then d1.d1_eventtype else d2.d2_eventtype end et, \
case when d1.d1_id is not null then d1.d1_id else d2.d2_id end id, \
d1.d1_location loc, d1.d1_date1, d2.d2_date1, d2.d2_date2 \
from d1 full outer join d2 on d1.d1_id=d2.d2_id order by d1.d1_id")
res.show()
结果:
root
|-- d1_date1: string (nullable = true)
|-- d1_eventtype: long (nullable = true)
|-- d1_id: long (nullable = true)
|-- d1_location: string (nullable = true)
None
root
|-- d2_date1: string (nullable = true)
|-- d2_date2: string (nullable = true)
|-- d2_eventtype: long (nullable = true)
|-- d2_id: long (nullable = true)
None
d1_date1 d1_eventtype d1_id d1_location
2015-01-20 10:44:39 1001 4929102 LOC01
2015-01-20 10:44:39 1001 4929103 LOC02
2015-01-20 10:44:39 1001 4929104 LOC03
d2_date1 d2_date2 d2_eventtype d2_id
2015-01-20 20:44:39 2015-01-20 20:44:39 2009 4929101
2015-01-20 15:44:39 2015-01-20 21:44:39 2009 4929102
2015-01-20 14:44:39 2015-01-20 14:44:39 2009 4929103
2015-01-20 20:44:39 2015-01-20 20:44:39 2009 4929105
et id loc d1_date1 d2_date1 d2_date2
2009 4929101 null null 2015-01-20 20:44:39 2015-01-20 20:44:39
2009 4929105 null null 2015-01-20 20:44:39 2015-01-20 20:44:39
1001 4929102 LOC01 2015-01-20 10:44:39 2015-01-20 15:44:39 2015-01-20 21:44:39
1001 4929103 LOC02 2015-01-20 10:44:39 2015-01-20 14:44:39 2015-01-20 14:44:39
1001 4929104 LOC03 2015-01-20 10:44:39 null null
我有 2 个 RDD。在 Spark scala 中,如果 event1001RDD 和 event2009RDD 具有相同的 id,我该如何加入它们?
val event1001RDD: schemaRDD = [eventtype,id,location,date1]
[1001,4929102,LOC01,2015-01-20 10:44:39]
[1001,4929103,LOC02,2015-01-20 10:44:39]
[1001,4929104,LOC03,2015-01-20 10:44:39]
val event2009RDD: schemaRDD = [eventtype,id,date1,date2]
[2009,4929101,2015-01-20 20:44:39,2015-01-20 20:44:39]
[2009,4929102,2015-01-20 15:44:39,2015-01-20 21:44:39]
[2009,4929103,2015-01-20 14:44:39,2015-01-20 14:44:39]
[2009,4929105,2015-01-20 20:44:39,2015-01-20 20:44:39]
预期结果将是:(唯一)(按 ID 排序)
[eventtype,id,1001的位置,1001的日期1,2009的日期1,2009的日期2]
2009,4929101,NULL,NULL,2015-01-20 20:44:39,2015-01-20 20:44:39
1001,4929102,LOC01,2015-01-20 10:44:39,2015-01-20 15:44:39,2015-01-20 21:44:39
1001,4929103,LOC02,2015-01-20 10:44:39,2015-01-20 14:44:39,2015-01-20 14:44:39
1001,4929104,LOC03,2015-01-20 10:44:39,NULL,NULL
2009,4929105,NULL,NULL,2015-01-20 20:44:39,2015-01-20 20:44:39
请注意,对于 id 4929102,1001 用作事件类型。只有在 1001 中没有任何匹配 ID 时才会使用 2009 事件类型。
它可以是 RDD[String] - flat。或通过 aggregateByKey 的 RDD 元组。我只需要遍历 RDD。
这是Full outer join的情况。给你...
d1=[[1001,4929102,"LOC01","2015-01-20 10:44:39"],[1001,4929103,"LOC02","2015-01-20 10:44:39"],[1001,4929104,"LOC03","2015-01-20 10:44:39"]]
d2=[[2009,4929101,"2015-01-20 20:44:39","2015-01-20 20:44:39"],[2009,4929102,"2015-01-20 15:44:39","2015-01-20 21:44:39"],
[2009,4929103,"2015-01-20 14:44:39","2015-01-20 14:44:39"],[2009,4929105,"2015-01-20 20:44:39","2015-01-20 20:44:39"]]
d1RDD = sc.parallelize(d1).map(lambda t: Row(d1_eventtype=t[0],d1_id=t[1],d1_location=t[2],d1_date1=t[3]))
d2RDD = sc.parallelize(d2).map(lambda t: Row(d2_eventtype=t[0],d2_id=t[1],d2_date1=t[2],d2_date2=t[3]))
d1DF = ssc.createDataFrame(d1RDD)
d2DF = ssc.createDataFrame(d2RDD)
print d1DF.printSchema()
print d2DF.printSchema()
d1DF.show()
d2DF.show()
d1DF.registerTempTable("d1")
d2DF.registerTempTable("d2")
res = ssc.sql("select case when d1.d1_eventtype is not null then d1.d1_eventtype else d2.d2_eventtype end et, \
case when d1.d1_id is not null then d1.d1_id else d2.d2_id end id, \
d1.d1_location loc, d1.d1_date1, d2.d2_date1, d2.d2_date2 \
from d1 full outer join d2 on d1.d1_id=d2.d2_id order by d1.d1_id")
res.show()
结果:
root
|-- d1_date1: string (nullable = true)
|-- d1_eventtype: long (nullable = true)
|-- d1_id: long (nullable = true)
|-- d1_location: string (nullable = true)
None
root
|-- d2_date1: string (nullable = true)
|-- d2_date2: string (nullable = true)
|-- d2_eventtype: long (nullable = true)
|-- d2_id: long (nullable = true)
None
d1_date1 d1_eventtype d1_id d1_location
2015-01-20 10:44:39 1001 4929102 LOC01
2015-01-20 10:44:39 1001 4929103 LOC02
2015-01-20 10:44:39 1001 4929104 LOC03
d2_date1 d2_date2 d2_eventtype d2_id
2015-01-20 20:44:39 2015-01-20 20:44:39 2009 4929101
2015-01-20 15:44:39 2015-01-20 21:44:39 2009 4929102
2015-01-20 14:44:39 2015-01-20 14:44:39 2009 4929103
2015-01-20 20:44:39 2015-01-20 20:44:39 2009 4929105
et id loc d1_date1 d2_date1 d2_date2
2009 4929101 null null 2015-01-20 20:44:39 2015-01-20 20:44:39
2009 4929105 null null 2015-01-20 20:44:39 2015-01-20 20:44:39
1001 4929102 LOC01 2015-01-20 10:44:39 2015-01-20 15:44:39 2015-01-20 21:44:39
1001 4929103 LOC02 2015-01-20 10:44:39 2015-01-20 14:44:39 2015-01-20 14:44:39
1001 4929104 LOC03 2015-01-20 10:44:39 null null