gpt4 book ai didi

apache-spark - 对两个 Spark RDD(在 PySpark 中)进行半连接的正确方法是什么?

转载 作者:行者123 更新时间:2023-12-01 05:03:32 27 4
gpt4 key购买 nike

在我的 PySpark 应用程序中,我有两个 RDD:

  • 项目 - 这包含所有有效元素的元素 ID 和元素名称。约 100000 项。
  • 属性表 - 这包含字段用户 ID、项目 ID 和按该顺序组合的属性值。这些是系统中每个用户-项目组合的特定属性。这个 RDD 有数百个 1000 行。

  • 我想丢弃 attributeTable RDD 中所有与项目 RDD 中的有效项目 ID(或名称)不对应的行。换句话说,按项目 ID 进行半连接。例如,如果这些是 R 数据帧,我会做 semi_join(attributeTable, items, by="itemID")

    我首先尝试了以下方法,但发现这需要很长时间才能返回(在我的 PC 上的 VM 上运行的本地 Spark 安装上)。可以理解,因为涉及的比较数量如此之多:
    # Create a broadcast variable of all valid item IDs for doing filter in the drivers
    validItemIDs = sc.broadcast(items.map(lambda (itemID, itemName): itemID)).collect())
    attributeTable = attributeTable.filter(lambda (userID, itemID, attributes): itemID in set(validItemIDs.value))

    经过一番摆弄之后,我发现以下方法工作得非常快(在我的系统上大约一分钟)。
    # Create a broadcast variable for item ID to item name mapping (dictionary) 
    itemIdToNameMap = sc.broadcast(items.collectAsMap())

    # From the attribute table, remove records that don't correspond to a valid item name.
    # First go over all records in the table and add a dummy field indicating whether the item name is valid
    # Then, filter out all rows with invalid names. Finally, remove the dummy field we added.
    attributeTable = (attributeTable
    .map(lambda (userID, itemID, attributes): (userID, itemID, attributes, itemIdToNameMap.value.get(itemID, 'Invalid')))
    .filter(lambda (userID, itemID, attributes, itemName): itemName != 'Invalid')
    .map(lambda (userID, itemID, attributes, itemName): (userID, itemID, attributes)))

    尽管这对我的应用程序来说效果很好,但感觉更像是一种肮脏的解决方法,我很确定必须有另一种更清洁或惯用正确(并且可能更有效)的方法来在 Spark 中执行此操作。你有什么建议?我是 Python 和 Spark 的新手,因此如果您能指出正确的资源,任何 RTFM 建议也会有所帮助。

    我的 Spark 版本是 1.3.1。

    最佳答案

    只需进行常规连接,然后丢弃“查找”关系(在您的情况下为 items rdd)。

    如果这些是您的 RDD(示例取自另一个答案):

    items = sc.parallelize([(123, "Item A"), (456, "Item B")])
    attributeTable = sc.parallelize([(123456, 123, "Attribute for A")])

    那么你会这样做:
    attributeTable.keyBy(lambda x: x[1])
    .join(items)
    .map(lambda (key, (attribute, item)): attribute)

    结果,您只有来自 attributeTable RDD 的元组,它们在 items RDD 中有相应的条目:
    [(123456, 123, 'Attribute for A')]

    按照另一个答案中的建议通过 leftOuterJoin 进行操作也可以完成这项工作,但效率较低。此外,另一个答案将 itemsattributeTable 而不是 attributeTableitems 半连接。

    关于apache-spark - 对两个 Spark RDD(在 PySpark 中)进行半连接的正确方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31131997/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com