gpt4 book ai didi

scala - 如何引用范围之外的 Spark 广播变量

转载 作者:行者123 更新时间:2023-12-04 15:19:59 24 4
gpt4 key购买 nike

我看到的所有 Spark 广播变量示例都在使用它们的函数范围内定义它们( map()join() 等)。我想同时使用 map()功能和mapPartitions()引用广播变量的函数,但我想将它们模块化,以便可以将相同的函数用于单元测试。

  • 我怎样才能做到这一点?

  • 我的一个想法是对函数进行柯里化(Currying),以便在使用 map 时传递对广播变量的引用。或 mapPartitions称呼。
  • 在原始范围内定义函数时,传递对广播变量的引用是否会对性能产生影响?

  • 我有这样的想法(伪代码):
    // firstFile.scala
    // ---------------

    def mapper(bcast: Broadcast)(row: SomeRow): Int = {
    bcast.value(row._1)
    }

    def mapMyPartition(bcast: Broadcast)(iter: Iterator): Iterator {
    val broadcastVariable = bcast.value

    for {
    i <- iter
    } yield broadcastVariable(i)
    })


    // secondFile.scala
    // ----------------

    import firstFile.{mapMyPartition, mapper}

    val bcastVariable = sc.broadcast(Map(0 -> 1, 1 -> 2, 2 -> 3))

    rdd
    .map(mapper(bcastVariable))
    .mapPartitions(mapMyPartition(bcastVariable))

    最佳答案

    您的解决方案应该可以正常工作。在这两种情况下,函数都传递给 map{Partitions}序列化时将包含对广播变量本身的引用,但不包含对其值的引用,并且仅调用 bcast.value在节点上计算时。

    需要避免的是

    def mapper(bcast: Broadcast): SomeRow => Int = {
    val value = bcast.value
    row => value(row._1)
    }

    关于scala - 如何引用范围之外的 Spark 广播变量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36849204/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com