对两个 Spark RDD(在 PySpark 中)进行半连接的正确方法是什么?

What is the right way to do a semi-join on two Spark RDDs (in PySpark)?

在我的 PySpark 应用程序中,我有两个 RDD:

我想丢弃 attributeTable RDD 中与项目 RDD 中的有效项目 ID(或名称)不对应的所有行。换句话说,通过项目 ID 进行半连接。例如,如果这些是 R 数据框,我会做 semi_join(attributeTable, items, by="itemID")

我首先尝试了以下方法,但发现这需要永远 return(在我的本地 Spark 安装上 运行 在我的 PC 上的虚拟机上)。可以理解,因为涉及的比较数量如此之多:

# Create a broadcast variable of all valid item IDs for doing filter in the drivers
validItemIDs = sc.broadcast(items.map(lambda (itemID, itemName): itemID)).collect())
attributeTable = attributeTable.filter(lambda (userID, itemID, attributes): itemID in set(validItemIDs.value))

经过一番折腾,我发现下面的方法工作得相当快(在我的系统上大约需要一分钟)。

# Create a broadcast variable for item ID to item name mapping (dictionary) 
itemIdToNameMap = sc.broadcast(items.collectAsMap())

# From the attribute table, remove records that don't correspond to a valid item name.
# First go over all records in the table and add a dummy field indicating whether the item name is valid
# Then, filter out all rows with invalid names. Finally, remove the dummy field we added.
attributeTable = (attributeTable
                  .map(lambda (userID, itemID, attributes): (userID, itemID, attributes, itemIdToNameMap.value.get(itemID, 'Invalid')))
                  .filter(lambda (userID, itemID, attributes, itemName): itemName != 'Invalid')
                  .map(lambda (userID, itemID, attributes, itemName): (userID, itemID, attributes)))

虽然这对我的应用程序来说效果很好,但感觉更像是一个肮脏的解决方法,我很确定在 Spark 中必须有另一种更清洁或惯用正确(并且可能更有效)的方法或方法来执行此操作。你有什么建议?我是 Python 和 Spark 的新手,所以如果您能为我指出正确的资源,任何 RTFM 建议也会有所帮助。

我的Spark版本是1.3.1.

正如其他人所指出的,这可能最容易通过利用 DataFrame 来实现。但是,您也许可以通过使用 leftOuterJoinfilter 函数来实现您的预​​期目标。像下面这样有点骇人听闻的东西可能就足够了:

items = sc.parallelize([(123, "Item A"), (456, "Item B")])
attributeTable = sc.parallelize([(123456, 123, "Attribute for A")])
sorted(items.leftOuterJoin(attributeTable.keyBy(lambda x: x[1]))
       .filter(lambda x: x[1][1] is not None)
       .map(lambda x: (x[0], x[1][0])).collect())

returns

[(123, 'Item A')]

只需进行常规连接,然后丢弃 "lookup" 关系(在您的情况下为 items rdd)。

如果这些是你的 RDDs (取自另一个答案的例子):

items = sc.parallelize([(123, "Item A"), (456, "Item B")])
attributeTable = sc.parallelize([(123456, 123, "Attribute for A")])

那么你会做:

attributeTable.keyBy(lambda x: x[1])
  .join(items)
  .map(lambda (key, (attribute, item)): attribute)

因此,您只有来自 attributeTable RDD 的元组,这些元组在 items RDD 中有相应的条目:

[(123456, 123, 'Attribute for A')]

按照另一个答案中的建议通过 leftOuterJoin 进行操作也可以完成这项工作,但效率较低。此外,另一个答案半加入 itemsattributeTable 而不是 attributeTableitems.