Pyspark - 使用字典中的值映射和应用计算

Pyspark - Map and apply calculation using value from dictionaries

我有两本字典。其中 UserID 是键,它们的位置是值。 第一项如下所示:

{'U1001': ('22.139997', '-100.978803'),
 'U1002': ('22.150087', '-100.983325')}

还有另一个字典,其中 PlaceID 是键,位置是值。 第一项如下所示:

{'134999': ('18.915421', '-99.184871'),
 '132825': ('22.1473922', '-100.983092')}

现在我得到了一个 RDD,其中 UserIDPlaceID 和用户对该地点的评分:

[('U1077', '135085', 2),
 ('U1077', '135038', 2)]

我想计算用户和地点之间的距离,并使用 geodesicgeopy.distance

保持评分

我可以(转换和)加入(值)字典并将它们替换为 UserIDPlaceID,但我正在寻找使用 pyspark 语言的解决方案。

我遇到了 .mapValues 但这对我来说并没有什么用。

所以,最终,我想获得给出的距离和评分:

[('2', 693.4067254748844),
 ('2', 806.8757681276663)]

您可以从 users_dictplaces_dict 创建 RDD,然后加入 ratings_rdd 以获取用户坐标和评分地点。然后使用地图,调用geodesic计算距离。

这是一个例子:

from geopy.distance import geodesic

users_dict = {'U1077': ('22.139997', '-100.978803'), 'U1002': ('22.150087', '-100.983325')}
places_dict = {'135085': ('18.915421', '-99.184871'), '135038': ('22.1473922', '-100.983092')}

users_rdd = sc.parallelize(list(users_dict.items()))
places_rdd = sc.parallelize(list(places_dict.items()))
ratings_rdd = sc.parallelize([('U1077', '135085', 2), ('U1077', '135038', 2)])

# RDD(UserId, (PlaceId, Rating))
ratings_rdd = ratings_rdd.map(lambda x: (x[0], list(x[1:])))

# RDD(PlaceId, (UserId, UserCoordinates, Rating)) 
joined1 = ratings_rdd.join(users_rdd).map(lambda x: (x[1][0][0], (x[0], x[1][1], x[1][0][1]))) 

# RDD(UserId, PlaceId, Rating, Distance)
result = joined1.join(places_rdd).map(
    lambda x: (x[1][0][0], x[0], x[1][0][2], geodesic(x[1][0][1], x[1][1]).kilometers)
)

print(result.collect())
#[('U1077', '135085', 2, 403.0361166435645), ('U1077', '135038', 2, 0.9307697045815713)]

你可以打印中间的RDD来理解逻辑。简而言之,我们需要 rdds 由 UserId 键控以加入 users_rdd 然后由 PlaceID 键控以加入 places_rdd