在地图操作中发出多对
Emit multiple pairs in map operation
假设我有 phone 行通话记录格式:
[CallingUser, ReceivingUser, Duration]
如果我想知道给定用户在 phone 上的总时间(用户是 CallingUser 或 ReceivingUser 的持续时间总和)。
实际上,对于给定的记录,我想创建 2 对 (CallingUser, Duration)
和 (ReceivingUser, Duration)
。
最有效的方法是什么?我可以将 2 RDDs
加在一起,但我不清楚这是否是一个好方法:
#Sample Data:
callData = sc.parallelize([["User1", "User2", 2], ["User1", "User3", 4], ["User2", "User1", 8] ])
calls = callData.map(lambda record: (record[0], record[2]))
#The potentially inefficient map in question:
calls += callData.map(lambda record: (record[1], record[2]))
reduce = calls.reduceByKey(lambda a, b: a + b)
你想要平面地图。如果你写一个returns列表[(record[0], record[2]),(record[1],record[2])]
的函数,那么你可以平面映射它!
使用适合获取单个输入并生成多个映射输出的 flatMap()。完成代码:
callData = sc.parallelize([["User1", "User2", 2], ["User1", "User3", 4], ["User2", "User1", 8]])
calls = callData.flatMap(lambda record: [(record[0], record[2]), (record[1], record[2])])
print calls.collect()
# prints [('User1', 2), ('User2', 2), ('User1', 4), ('User3', 4), ('User2', 8), ('User1', 8)]
reduce = calls.reduceByKey(lambda a, b: a + b)
print reduce.collect()
# prints [('User2', 10), ('User3', 4), ('User1', 14)]
假设我有 phone 行通话记录格式:
[CallingUser, ReceivingUser, Duration]
如果我想知道给定用户在 phone 上的总时间(用户是 CallingUser 或 ReceivingUser 的持续时间总和)。
实际上,对于给定的记录,我想创建 2 对 (CallingUser, Duration)
和 (ReceivingUser, Duration)
。
最有效的方法是什么?我可以将 2 RDDs
加在一起,但我不清楚这是否是一个好方法:
#Sample Data:
callData = sc.parallelize([["User1", "User2", 2], ["User1", "User3", 4], ["User2", "User1", 8] ])
calls = callData.map(lambda record: (record[0], record[2]))
#The potentially inefficient map in question:
calls += callData.map(lambda record: (record[1], record[2]))
reduce = calls.reduceByKey(lambda a, b: a + b)
你想要平面地图。如果你写一个returns列表[(record[0], record[2]),(record[1],record[2])]
的函数,那么你可以平面映射它!
使用适合获取单个输入并生成多个映射输出的 flatMap()。完成代码:
callData = sc.parallelize([["User1", "User2", 2], ["User1", "User3", 4], ["User2", "User1", 8]])
calls = callData.flatMap(lambda record: [(record[0], record[2]), (record[1], record[2])])
print calls.collect()
# prints [('User1', 2), ('User2', 2), ('User1', 4), ('User3', 4), ('User2', 8), ('User1', 8)]
reduce = calls.reduceByKey(lambda a, b: a + b)
print reduce.collect()
# prints [('User2', 10), ('User3', 4), ('User1', 14)]