Spark Scala:按近位置和时间范围加入两个数据帧
Spark Scala : Join two Dataframes by near position and time range
我有两个数据框:
一个数据帧DF1
,结构如下:(ID, StartDate, EndDate, Position)
一个 Dataframe DF2
看起来像:(DateTime, Position)
我想使用这些数据帧创建一个新数据帧,其中包含每个 DF1(ID)、DF2 中的行数,其中 DF2(DateTime) 在 DF1(StartDate) 和 DF1(EndDate) 和 DF2 之间(位置)靠近 DF1(位置)
我们可以假设我有一个 udf 函数 isNearUDF(pos1,pos2)
可以完成比较位置的工作。
我目前正在尝试通过我的数据帧之间的连接来执行此操作,但这似乎不是正确的解决方案
编辑 2:
这是一个 MVCE:
def isInRadius(lat1:Double,lon1:Double,lat2:Double,lon2:Double,dist:Double):Boolean={
val distance = 0// calculate distance between lon/lat positions
return distance<=dist
}
val DF1 = sc.parallelize(Array(
("ID1", "2018-02-27T13:47:59.416+01:00", "2018-03-01T16:02:00.632+01:00", "25.13297154663", "55.13297154663"),
("ID2", "2018-02-25T13:47:59.416+01:00", "2018-02-07T16:02:00.632+01:00", "26.13297154663", "55.13297154663"),
("ID3", "2018-02-24T13:47:59.416+01:00", "2018-02-02T16:02:00.632+01:00", "25.13297154663", "55.13297154663")
// ...
)).toDF("ID", "CreationDate","EndDate","Lat1","Lon1")
val DF2 = sc.parallelize(Array(
("2018-02-27T13:47:59.416+01:00","25.13297154663", "55.13297154663"),
("2018-02-27T13:47:59.416+01:00","25.1304663", "54.10663"),
("2018-02-27T13:47:59.416+01:00","25.1354663", "55.132904663")
// ...
)).toDF("DateTime","Lat2","Lon2")
val isInRadiusUdf = udf(isInRadius _)
val DF3 = DF1.join(DF2,$"DateTime">=$"CreationDate" && $"DateTime"<=$"EndDate" /*&& isInRadiusUdf($"Lat1",$"Lon1",$"Lat2",$"Lon2",lit(10))*/)
display(DF3)
这适用于日期,但需要很长时间。
当我添加 isInRadius 条件时,出现错误:
SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.DataFrameReader
尝试将您的函数定义更改为:
def isInRadius : Double => Double => Double => Double => Double = lat1 => long1 => lat2 => long2 => dist {
val distance = // calculate distance between lon/lat positions
return distance<=dist
}
在尝试了各种可能的解决方案并得到奇怪的结果后,我终于通过简单地重新启动我的 Spark 集群(Databricks Notebook)来解决我的问题
我完全不知道问题出在哪里,但 MVCE 的代码现在可以工作了。
我有两个数据框:
一个数据帧
DF1
,结构如下:(ID, StartDate, EndDate, Position)
一个 Dataframe
DF2
看起来像:(DateTime, Position)
我想使用这些数据帧创建一个新数据帧,其中包含每个 DF1(ID)、DF2 中的行数,其中 DF2(DateTime) 在 DF1(StartDate) 和 DF1(EndDate) 和 DF2 之间(位置)靠近 DF1(位置)
我们可以假设我有一个 udf 函数 isNearUDF(pos1,pos2)
可以完成比较位置的工作。
我目前正在尝试通过我的数据帧之间的连接来执行此操作,但这似乎不是正确的解决方案
编辑 2:
这是一个 MVCE:
def isInRadius(lat1:Double,lon1:Double,lat2:Double,lon2:Double,dist:Double):Boolean={
val distance = 0// calculate distance between lon/lat positions
return distance<=dist
}
val DF1 = sc.parallelize(Array(
("ID1", "2018-02-27T13:47:59.416+01:00", "2018-03-01T16:02:00.632+01:00", "25.13297154663", "55.13297154663"),
("ID2", "2018-02-25T13:47:59.416+01:00", "2018-02-07T16:02:00.632+01:00", "26.13297154663", "55.13297154663"),
("ID3", "2018-02-24T13:47:59.416+01:00", "2018-02-02T16:02:00.632+01:00", "25.13297154663", "55.13297154663")
// ...
)).toDF("ID", "CreationDate","EndDate","Lat1","Lon1")
val DF2 = sc.parallelize(Array(
("2018-02-27T13:47:59.416+01:00","25.13297154663", "55.13297154663"),
("2018-02-27T13:47:59.416+01:00","25.1304663", "54.10663"),
("2018-02-27T13:47:59.416+01:00","25.1354663", "55.132904663")
// ...
)).toDF("DateTime","Lat2","Lon2")
val isInRadiusUdf = udf(isInRadius _)
val DF3 = DF1.join(DF2,$"DateTime">=$"CreationDate" && $"DateTime"<=$"EndDate" /*&& isInRadiusUdf($"Lat1",$"Lon1",$"Lat2",$"Lon2",lit(10))*/)
display(DF3)
这适用于日期,但需要很长时间。 当我添加 isInRadius 条件时,出现错误:
SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.DataFrameReader
尝试将您的函数定义更改为:
def isInRadius : Double => Double => Double => Double => Double = lat1 => long1 => lat2 => long2 => dist {
val distance = // calculate distance between lon/lat positions
return distance<=dist
}
在尝试了各种可能的解决方案并得到奇怪的结果后,我终于通过简单地重新启动我的 Spark 集群(Databricks Notebook)来解决我的问题 我完全不知道问题出在哪里,但 MVCE 的代码现在可以工作了。