自己加入 scala 的 spark api
self join in spark with scala api
之前我发过一个关于scala self join的问题。我正在尝试在 Spark 中实现相同但无法转换。这是问题和我的代码。
输入数据集...
Proprty_ID, latitude, longitude, Address
123, 33.84, -118.39, null
234, 35.89, -119.48, null
345, 35.34, -119.39, null
输出数据集
Property_ID1, Property_ID2, distance
123,123,0
123,234,0.1
123,345,0.6
234,234,0
234,123,0.1
234,345,0.7
345,345,0
345,123,0.6
345,234,0.7
火花代码:
`import math._
object Haversine {
val R = 6372.8 //radius in km
def haversine(lat1:Double, lon1:Double, lat2:Double, lon2:Double)={
val dLat=(lat2 - lat1).toRadians
val dLon=(lon2 - lon1).toRadians
val a = pow(sin(dLat/2),2) + pow(sin(dLon/2),2) * cos(lat1.toRadians) * cos(lat2.toRadians)
val c = 2 * asin(sqrt(a))
R * c
}
def main(args: Array[String]): Unit = {
println(haversine(36.12, -86.67, 33.94, -118.40))
}
}
class SimpleCSVHeader(header:Array[String]) extends Serializable {
val index = header.zipWithIndex.toMap
def apply(array:Array[String], key:String):String = array(index(key))
}
val csv = sc.textFile("geo.csv") // original file
val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows
val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
val rows = data.filter(line => header(line,"latitude") != "latitude") // filter the header out
// val users = rows.map(row => header(row,"user")
// val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt)
val typed = rows.map{ case Array(id, lat, lon) => (id, lat.toDouble, lon.toDouble)}
`
在此之后,我需要对 typed 进行自连接并通过 Haversine 方法传递它。
我从社区获得了如下所示的 Scala 代码,我需要将其转换为 Spark 代码以使用 RDD。以下代码目前适用于列表。
`val combos = for {
a <- typed
b <- typed
} yield (a,b)
combos.map{ case ((id1, lat1, lon1), (id2, lat2, lon2))
=> id1 + "," + id2 + "," + haversine(lat1, lon1, lat2, lon2)} foreach println`
有人可以帮忙吗?提前致谢。
你想要的Spark操作是cartesian
。您可以在 Spark: produce RDD[(X, X)] of all possible combinations from RDD[X].
了解更多信息
之前我发过一个关于scala self join的问题。我正在尝试在 Spark 中实现相同但无法转换。这是问题和我的代码。 输入数据集...
Proprty_ID, latitude, longitude, Address
123, 33.84, -118.39, null
234, 35.89, -119.48, null
345, 35.34, -119.39, null
输出数据集
Property_ID1, Property_ID2, distance
123,123,0
123,234,0.1
123,345,0.6
234,234,0
234,123,0.1
234,345,0.7
345,345,0
345,123,0.6
345,234,0.7
火花代码:
`import math._
object Haversine {
val R = 6372.8 //radius in km
def haversine(lat1:Double, lon1:Double, lat2:Double, lon2:Double)={
val dLat=(lat2 - lat1).toRadians
val dLon=(lon2 - lon1).toRadians
val a = pow(sin(dLat/2),2) + pow(sin(dLon/2),2) * cos(lat1.toRadians) * cos(lat2.toRadians)
val c = 2 * asin(sqrt(a))
R * c
}
def main(args: Array[String]): Unit = {
println(haversine(36.12, -86.67, 33.94, -118.40))
}
}
class SimpleCSVHeader(header:Array[String]) extends Serializable {
val index = header.zipWithIndex.toMap
def apply(array:Array[String], key:String):String = array(index(key))
}
val csv = sc.textFile("geo.csv") // original file
val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows
val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
val rows = data.filter(line => header(line,"latitude") != "latitude") // filter the header out
// val users = rows.map(row => header(row,"user")
// val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt)
val typed = rows.map{ case Array(id, lat, lon) => (id, lat.toDouble, lon.toDouble)}
`
在此之后,我需要对 typed 进行自连接并通过 Haversine 方法传递它。 我从社区获得了如下所示的 Scala 代码,我需要将其转换为 Spark 代码以使用 RDD。以下代码目前适用于列表。
`val combos = for {
a <- typed
b <- typed
} yield (a,b)
combos.map{ case ((id1, lat1, lon1), (id2, lat2, lon2))
=> id1 + "," + id2 + "," + haversine(lat1, lon1, lat2, lon2)} foreach println`
有人可以帮忙吗?提前致谢。
你想要的Spark操作是cartesian
。您可以在 Spark: produce RDD[(X, X)] of all possible combinations from RDD[X].