向量化 Pyspark 地理距离计算

Vectorizing Pyspark Geodistance Calculation

我有 2 个数据集:

df: {'key':int, 'latitude':num, 'longitude':num}
分:{'latitude':num, 'longitude':num, 'income': num}

我基本上需要计算 df 中每一行与 cent 中每一行之间的距离,当距离 <= 5000 米时,对收入求和。

输出需要是每个距离 <= 5000 米的收入总和。

我编写了一个非矢量化的解决方案,如下所示,它似乎有效,但需要很长时间。我需要帮助对其进行矢量化。我什至不知道从哪里开始。

我正在使用 pyspark 在数据块中完成所有这些工作。

谢谢!

from math import sin, cos, sqrt, atan2, radians

def calcdist(lat1, lon1, lat2, lon2):  # funcao calcula distancia em metros entre dois pontos
  R = 6371
  lat1 = radians(lat1)
  lon1 = radians(lon1)
  lat2 = radians(lat2)
  lon2 = radians(lon2)
  dlon = lon2 - lon1
  dlat = lat2 - lat1
  a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
  c = 2 * atan2(sqrt(a), sqrt(1 - a))
  distance = R * c * 1000
  return(distance)

rdd = []
x = 0
y = 0
z = 0

for row in df.toLocalIterator():
  for row2 in cent.toLocalIterator():
    d = calcdist(lat1 = float(row.lat), lon1 = float(row.lon), lat2 = float(row2.latitude), lon2 = float(row2.longitude))
    if (d is not None and d <= 5000):
      x = x + row2.income * row2.pop
      y = y + row2.pop
      z = z + row2.houses
    else:
      x = x + 0
      y = y + 0
      z = z + 0
  rdd.append((row.key, x, y, z))
  x = 0
  y = 0
  z = 0

rdd = sc.parallelize(rdd)

df2=rdd.toDF(['id','soma_faixa_renda', 'total_pop', 'total_houses'])

df2.show()

我想你有 2 个包含列 [id, latitude1, longtitude1][latitude2, longtitude2, income] 的数据框(似乎 key 不是一个好的列名,可能会引发一些错误)。首先,你可以使 udf 函数来计算距离:

import pyspark.sql.functions as fn
from pyspark.sql.types import *
calcdist_udf = fn.udf(calcdist, returnType=DoubleType())

然后,您可以交叉连接数据框并应用创建的udf函数来计算距离

df_join = df.crossJoin(cent)
df_dist = df_join.select(fn.col('id'), fn.col('income'), 
                         calcdist_udf('latitude1', 'longtitude1', 'latitude2', 'longtitude2').alias('distance'))
df_dist.cache() # cache first because of bug in Spark

现在,您有一个包含计算距离的数据框。只需过滤低距离的行并根据需要聚合

df_agg = df_dist.where(df_dist['distance'] <= 5000).groupby('id').agg(fn.sum('income').alias('sum_income'), fn.count('income').alias('n_house'))
df_agg_pandas = df_agg.toPandas() # save to pandas dataframe