向量化 Pyspark 地理距离计算
Vectorizing Pyspark Geodistance Calculation
我有 2 个数据集:
df: {'key':int, 'latitude':num, 'longitude':num}
分:{'latitude':num, 'longitude':num, 'income': num}
我基本上需要计算 df 中每一行与 cent 中每一行之间的距离,当距离 <= 5000 米时,对收入求和。
输出需要是每个距离 <= 5000 米的收入总和。
我编写了一个非矢量化的解决方案,如下所示,它似乎有效,但需要很长时间。我需要帮助对其进行矢量化。我什至不知道从哪里开始。
我正在使用 pyspark 在数据块中完成所有这些工作。
谢谢!
from math import sin, cos, sqrt, atan2, radians
def calcdist(lat1, lon1, lat2, lon2): # funcao calcula distancia em metros entre dois pontos
R = 6371
lat1 = radians(lat1)
lon1 = radians(lon1)
lat2 = radians(lat2)
lon2 = radians(lon2)
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
c = 2 * atan2(sqrt(a), sqrt(1 - a))
distance = R * c * 1000
return(distance)
rdd = []
x = 0
y = 0
z = 0
for row in df.toLocalIterator():
for row2 in cent.toLocalIterator():
d = calcdist(lat1 = float(row.lat), lon1 = float(row.lon), lat2 = float(row2.latitude), lon2 = float(row2.longitude))
if (d is not None and d <= 5000):
x = x + row2.income * row2.pop
y = y + row2.pop
z = z + row2.houses
else:
x = x + 0
y = y + 0
z = z + 0
rdd.append((row.key, x, y, z))
x = 0
y = 0
z = 0
rdd = sc.parallelize(rdd)
df2=rdd.toDF(['id','soma_faixa_renda', 'total_pop', 'total_houses'])
df2.show()
我想你有 2 个包含列 [id, latitude1, longtitude1]
和 [latitude2, longtitude2, income]
的数据框(似乎 key
不是一个好的列名,可能会引发一些错误)。首先,你可以使 udf
函数来计算距离:
import pyspark.sql.functions as fn
from pyspark.sql.types import *
calcdist_udf = fn.udf(calcdist, returnType=DoubleType())
然后,您可以交叉连接数据框并应用创建的udf
函数来计算距离
df_join = df.crossJoin(cent)
df_dist = df_join.select(fn.col('id'), fn.col('income'),
calcdist_udf('latitude1', 'longtitude1', 'latitude2', 'longtitude2').alias('distance'))
df_dist.cache() # cache first because of bug in Spark
现在,您有一个包含计算距离的数据框。只需过滤低距离的行并根据需要聚合
df_agg = df_dist.where(df_dist['distance'] <= 5000).groupby('id').agg(fn.sum('income').alias('sum_income'), fn.count('income').alias('n_house'))
df_agg_pandas = df_agg.toPandas() # save to pandas dataframe
我有 2 个数据集:
df: {'key':int, 'latitude':num, 'longitude':num}
分:{'latitude':num, 'longitude':num, 'income': num}
我基本上需要计算 df 中每一行与 cent 中每一行之间的距离,当距离 <= 5000 米时,对收入求和。
输出需要是每个距离 <= 5000 米的收入总和。
我编写了一个非矢量化的解决方案,如下所示,它似乎有效,但需要很长时间。我需要帮助对其进行矢量化。我什至不知道从哪里开始。
我正在使用 pyspark 在数据块中完成所有这些工作。
谢谢!
from math import sin, cos, sqrt, atan2, radians
def calcdist(lat1, lon1, lat2, lon2): # funcao calcula distancia em metros entre dois pontos
R = 6371
lat1 = radians(lat1)
lon1 = radians(lon1)
lat2 = radians(lat2)
lon2 = radians(lon2)
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
c = 2 * atan2(sqrt(a), sqrt(1 - a))
distance = R * c * 1000
return(distance)
rdd = []
x = 0
y = 0
z = 0
for row in df.toLocalIterator():
for row2 in cent.toLocalIterator():
d = calcdist(lat1 = float(row.lat), lon1 = float(row.lon), lat2 = float(row2.latitude), lon2 = float(row2.longitude))
if (d is not None and d <= 5000):
x = x + row2.income * row2.pop
y = y + row2.pop
z = z + row2.houses
else:
x = x + 0
y = y + 0
z = z + 0
rdd.append((row.key, x, y, z))
x = 0
y = 0
z = 0
rdd = sc.parallelize(rdd)
df2=rdd.toDF(['id','soma_faixa_renda', 'total_pop', 'total_houses'])
df2.show()
我想你有 2 个包含列 [id, latitude1, longtitude1]
和 [latitude2, longtitude2, income]
的数据框(似乎 key
不是一个好的列名,可能会引发一些错误)。首先,你可以使 udf
函数来计算距离:
import pyspark.sql.functions as fn
from pyspark.sql.types import *
calcdist_udf = fn.udf(calcdist, returnType=DoubleType())
然后,您可以交叉连接数据框并应用创建的udf
函数来计算距离
df_join = df.crossJoin(cent)
df_dist = df_join.select(fn.col('id'), fn.col('income'),
calcdist_udf('latitude1', 'longtitude1', 'latitude2', 'longtitude2').alias('distance'))
df_dist.cache() # cache first because of bug in Spark
现在,您有一个包含计算距离的数据框。只需过滤低距离的行并根据需要聚合
df_agg = df_dist.where(df_dist['distance'] <= 5000).groupby('id').agg(fn.sum('income').alias('sum_income'), fn.count('income').alias('n_house'))
df_agg_pandas = df_agg.toPandas() # save to pandas dataframe