计算pyspark中每个起点到目的地的最小距离
Calculate the minimum distance to destinations for each origin in pyspark
我有一个起点和终点列表及其地理坐标。我需要计算每个起点到终点的最小距离。
下面是我的代码:
import pyspark.sql.functions as F
from haversine import haversine_vector, Unit
data1 = [("A", (45.7597, 4.8422)), ("B", (46.7431, 5.8422))]
columns1 = ["Origin", "Origin_Geo"]
df1 = spark.createDataFrame(data=data1, schema=columns1)
data2 = [("Destin1", (48.8567, 2.3508)), ("Destin2", (40.7033962, -74.2351462))]
columns2 = ["Destination", "Destination_Geo"]
df2 = spark.createDataFrame(data=data2, schema=columns2)
df = df1.crossJoin(df2)
df.withColumn(
"Distance", haversine_vector(F.col("Origin_Geo"), F.col("Destination_Geo"))
).groupBy("Origin").agg(F.min("Distance").alias("Min_Distance")).show()
我遇到如下错误:
IndexError: too many indices for array: array is 0-dimensional, but 2 were indexed
我的问题是:
看来withColumn('Distance', haversine_vector(F.col('Origin_Geo'), F.col('Destination_Geo')))
有问题。我不知道为什么。 (我是 pyspark 的新手..)
我有一长串出发地和目的地(均超过 30K)。交叉联接生成大量的起点和终点组合。请问有没有更有效的方法来获取最小距离?
非常感谢。
您正在将 haversine
函数应用到一个列,而它应该应用到元组或数组。
如果你想使用这个库,你需要创建一个 UDF 并在你所有的 spark 节点上安装 haversine 包。
from haversine import haversine
from pyspark.sql import functions as F, types as T
haversine_udf = F.udf(haversine, T.FloatType())
df.withColumn(
"Distance", haversine_udf(F.col("Origin_Geo"), F.col("Destination_Geo"))
).groupBy("Origin").agg(F.min("Distance").alias("Min_Distance")).show()
如果你不能在每个节点上都安装包,那么你可以简单地使用函数的内置版本(cf. Haversine Formula in Python (Bearing and Distance between two GPS points))- 公式很大程度上取决于你选择的地球半径
from math import radians, cos, sin, asin, sqrt
from pyspark.sql import functions as F, types as T
@F.udf(T.FloatType())
def haversine_udf(point1, point2):
"""
Calculate the great circle distance between two points
on the earth (specified in decimal degrees)
"""
# convert decimal degrees to radians
lon1, lat1 = point1
lon2, lat2 = point2
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
# haversine formula
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
r = 6372.8 # Radius of earth in kilometers. Use 3956 for miles
return c * r
df.withColumn(
"Distance", haversine_udf(F.col("Origin_Geo"), F.col("Destination_Geo"))
).groupBy("Origin").agg(F.min("Distance").alias("Min_Distance")).show()
+------+------------+
|Origin|Min_Distance|
+------+------------+
| B| 351.08905|
| A| 392.32755|
+------+------------+
我有一个起点和终点列表及其地理坐标。我需要计算每个起点到终点的最小距离。
下面是我的代码:
import pyspark.sql.functions as F
from haversine import haversine_vector, Unit
data1 = [("A", (45.7597, 4.8422)), ("B", (46.7431, 5.8422))]
columns1 = ["Origin", "Origin_Geo"]
df1 = spark.createDataFrame(data=data1, schema=columns1)
data2 = [("Destin1", (48.8567, 2.3508)), ("Destin2", (40.7033962, -74.2351462))]
columns2 = ["Destination", "Destination_Geo"]
df2 = spark.createDataFrame(data=data2, schema=columns2)
df = df1.crossJoin(df2)
df.withColumn(
"Distance", haversine_vector(F.col("Origin_Geo"), F.col("Destination_Geo"))
).groupBy("Origin").agg(F.min("Distance").alias("Min_Distance")).show()
我遇到如下错误:
IndexError: too many indices for array: array is 0-dimensional, but 2 were indexed
我的问题是:
看来
withColumn('Distance', haversine_vector(F.col('Origin_Geo'), F.col('Destination_Geo')))
有问题。我不知道为什么。 (我是 pyspark 的新手..)我有一长串出发地和目的地(均超过 30K)。交叉联接生成大量的起点和终点组合。请问有没有更有效的方法来获取最小距离?
非常感谢。
您正在将 haversine
函数应用到一个列,而它应该应用到元组或数组。
如果你想使用这个库,你需要创建一个 UDF 并在你所有的 spark 节点上安装 haversine 包。
from haversine import haversine
from pyspark.sql import functions as F, types as T
haversine_udf = F.udf(haversine, T.FloatType())
df.withColumn(
"Distance", haversine_udf(F.col("Origin_Geo"), F.col("Destination_Geo"))
).groupBy("Origin").agg(F.min("Distance").alias("Min_Distance")).show()
如果你不能在每个节点上都安装包,那么你可以简单地使用函数的内置版本(cf. Haversine Formula in Python (Bearing and Distance between two GPS points))- 公式很大程度上取决于你选择的地球半径
from math import radians, cos, sin, asin, sqrt
from pyspark.sql import functions as F, types as T
@F.udf(T.FloatType())
def haversine_udf(point1, point2):
"""
Calculate the great circle distance between two points
on the earth (specified in decimal degrees)
"""
# convert decimal degrees to radians
lon1, lat1 = point1
lon2, lat2 = point2
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
# haversine formula
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
r = 6372.8 # Radius of earth in kilometers. Use 3956 for miles
return c * r
df.withColumn(
"Distance", haversine_udf(F.col("Origin_Geo"), F.col("Destination_Geo"))
).groupBy("Origin").agg(F.min("Distance").alias("Min_Distance")).show()
+------+------------+
|Origin|Min_Distance|
+------+------------+
| B| 351.08905|
| A| 392.32755|
+------+------------+