gpt4 book ai didi

scala - Scala 与 Python 的 Spark 性能

转载 作者:行者123 更新时间:2023-12-03 04:17:42 25 4
gpt4 key购买 nike

相比 Scala,我更喜欢 Python。但是,由于 Spark 本身是用 Scala 编写的,我希望我的代码在 Scala 中运行得比 Python 版本更快,原因很明显。

有了这个假设,我想为大约 1 GB 的数据学习和编写一些非常常见的预处理代码的 Scala 版本。数据取自 SpringLeaf 竞赛 Kaggle .只是为了概述数据(它包含 1936 个维度和 145232 行)。数据由各种类型组成,例如整数,浮点数,字符串, bool 值。我使用 8 个内核中的 6 个进行 Spark 处理;这就是为什么我使用 minPartitions=6这样每个核心都有一些东西要处理。

Scala 代码

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")

for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
val vals = line.split(delim1)
(vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")

Python 代码

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
for e, val in enumerate(vals)]
return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
key, val = kv.split(DELIM_1)
return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

Scala 性能
阶段 0(38 分钟),阶段 1(18 秒)
enter image description here

Python 性能
第 0 阶段(11 分钟),第 1 阶段(7 秒)
enter image description here

两者都产生不同的 DAG 可视化图(由于这两张图显示了 Scala ( map ) 和 Python ( reduceByKey ) 的不同阶段 0 函数)

但是,本质上这两个代码都试图将数据转换为(dimension_id,值列表字符串)RDD 并保存到磁盘。输出将用于计算每个维度的各种统计数据。

在性能方面,像这样的真实数据的 Scala 代码似乎可以运行 慢 4 倍 比 Python 版本。
对我来说好消息是它给了我继续使用 Python 的良好动力。坏消息是我不太明白为什么?

最佳答案

可以在下面找到讨论代码的原始答​​案。

首先,您必须区分不同类型的 API,每种 API 都有自己的性能考虑。

RDD API

(具有基于 JVM 的编排的纯 Python 结构)

这是受 Python 代码性能和 PySpark 实现细节影响最大的组件。虽然 Python 性能不太可能成为问题,但您至少需要考虑几个因素:

  • JVM 通信的开销。实际上,所有进出 Python 执行器的数据都必须通过套接字和 JVM 工作线程传递。虽然这是一种相对有效的本地通信,但它仍然不是免费的。
  • 基于进程的执行程序 (Python) 与基于线程的(单 JVM 多线程)执行程序 (Scala)。每个 Python 执行器都在自己的进程中运行。作为一个副作用,它提供了比 JVM 更强的隔离和对执行程序生命周期的一些控制,但可能会显着提高内存使用量:
  • 解释器内存占用
  • 已加载库的占用空间
  • 广播效率较低(每个进程都需要自己的广播副本)
  • Python 代码本身的性能。一般来说,Scala 比 Python 快,但它会因任务而异。此外,您有多种选择,包括 JIT,如 Numba 、C 扩展( Cython )或专门的库,如 Theano .最后,如果您不使用 ML/MLlib(或仅使用 NumPy 堆栈),请考虑使用 PyPy作为替代翻译。见 SPARK-3094 .
  • PySpark 配置提供了 spark.python.worker.reuse选项可用于在为每个任务 fork Python 进程和重用现有进程之间进行选择。后一个选项似乎有助于避免昂贵的垃圾收集(它更像是一种印象而不是系统测试的结果),而前一个(默认)对于昂贵的广播和导入来说是最佳的。
  • 引用计数作为 CPython 中的第一行垃圾收集方法,在典型的 Spark 工作负载(类似流的处理,无引用周期)中工作得非常好,并降低了长时间 GC 暂停的风险。

  • MLlib

    (混合 Python 和 JVM 执行)

    基本注意事项与以前几乎相同,但有一些其他问题。虽然与 MLlib 一起使用的基本结构是普通的 Python RDD 对象,但所有算法都直接使用 Scala 执行。

    这意味着将 Python 对象转换为 Scala 对象会产生额外的成本,反之亦然,增加内存使用量以及我们稍后将介绍的一些额外限制。

    截至目前(Spark 2.x),基于 RDD 的 API 处于维护模式,为 scheduled to be removed in Spark 3.0 .

    DataFrame API 和 Spark ML

    (JVM 执行的 Python 代码仅限于驱动程序)

    这些可能是标准数据处理任务的最佳选择。由于 Python 代码主要限于驱动程序上的高级逻辑操作,因此 Python 和 Scala 之间应该没有性能差异。

    一个异常(exception)是使用行式 Python UDF,其效率明显低于 Scala 等效项。虽然有一些改进的机会(Spark 2.0.0 有了实质性的发展),但最大的限制是内部表示 (JVM) 和 Python 解释器之间的完整往返。如果可能,您应该支持内置表达式的组合 ( example 。Python UDF 行为在 Spark 2.0.0 中得到了改进,但与 native 执行相比,它仍然不是最佳的。

    随着 vectorized UDFs (SPARK-21190 and further extensions) 的引入,这可能会在 future 得到显着改善。 ,它使用 Arrow Streaming 进行零拷贝反序列化的高效数据交换。对于大多数应用程序,它们的次要开销可以忽略不计。

    另外一定要避免在 DataFrames之间传递不必要的数据。和 RDDs .这需要昂贵的序列化和反序列化,更不用说与 Python 解释器之间的数据传输了。

    值得注意的是,Py4J 调用具有相当高的延迟。这包括简单的调用,例如:
    from pyspark.sql.functions import col

    col("foo")

    通常,这无关紧要(开销是恒定的并且不取决于数据量),但是在软实时应用程序的情况下,您可以考虑缓存/重用 Java 包装器。

    GraphX 和 Spark 数据集

    至于现在(Spark 1.6 2.1)都没有提供 PySpark API,所以你可以说 PySpark 比 Scala 差很多。

    图X

    在实践中,GraphX 开发几乎完全停止,项目目前处于维护模式, related JIRA tickets closed as won't fix . GraphFrames库提供了一个带有 Python 绑定(bind)的替代图形处理库。

    数据集

    主观上来说静态类型没有太多位置 Datasets在 Python 中,即使有当前的 Scala 实现也太简单了,不能提供与 DataFrame 相同的性能优势。 .

    流媒体

    从我目前所见,我强烈建议使用 Scala 而不是 Python。如果 PySpark 获得对结构化流的支持, future 可能会发生变化,但现在 Scala API 似乎更加健壮、全面和高效。我的经验非常有限。

    Spark 2.x 中的结构化流似乎缩小了语言之间的差距,但目前仍处于早期阶段。尽管如此,基于 RDD 的 API 已经在 Databricks Documentation 中被称为“遗留流”。 (访问日期 2017-03-03))因此有理由期待进一步的统一努力。

    非绩效考虑

    功能奇偶校验

    并非所有 Spark 功能都通过 PySpark API 公开。请务必检查您需要的部分是否已经实现,并尝试了解可能的限制。

    当您使用 MLlib 和类似的混合上下文时,这一点尤为重要(请参阅 Calling Java/Scala function from a task )。公平地说,PySpark API 的某些部分,例如 mllib.linalg , 提供了比 Scala 更全面的方法集。

    API设计

    PySpark API 密切反射(reflect)了它的 Scala 对应物,因此不完全是 Pythonic。这意味着在语言之间进行映射非常容易,但与此同时,Python 代码可能更难理解。

    复杂的架构

    与纯 JVM 执行相比,PySpark 数据流相对复杂。推理 PySpark 程序或调试要困难得多。此外,至少对 Scala 和 JVM 的基本了解几乎是必须的。

    Spark 2.x 及更高版本

    持续转向 Dataset API,带有卡住的RDD API 给Python 用户带来了机遇和挑战。虽然 API 的高级部分在 Python 中更容易公开,但更高级的功能几乎不可能使用 直接 .

    此外, native Python 函数仍然是 SQL 世界中的二等公民。希望这将在 future 通过 Apache Arrow 序列化得到改善( current efforts target data collection 但 UDF serde 是 long term goal )。

    对于强烈依赖 Python 代码库的项目,纯 Python 替代方案(如 DaskRay )可能是一个有趣的替代方案。

    不一定非要一对一

    Spark DataFrame (SQL, Dataset) API 提供了一种在 PySpark 应用程序中集成 Scala/Java 代码的优雅方式。您可以使用 DataFrames将数据公开给 native JVM 代码并读回结果。我已经解释了一些选项 somewhere else您可以在 How to use a Scala class inside Pyspark 中找到 Python-Scala 往返的工作示例.

    它可以通过引入用户定义的类型来进一步增强(参见 How to define schema for custom type in Spark SQL?)。

    问题中提供的代码有什么问题

    (免责声明:Pythonista 的观点。很可能我错过了一些 Scala 技巧)

    首先,您的代码中有一个部分根本没有意义。如果您已经拥有 (key, value)使用 zipWithIndex 创建的对或 enumerate创建字符串只是为了在之后立即拆分它有什么意义? flatMap不能递归地工作,因此您可以简单地生成元组并跳过以下 map无论如何。

    我发现有问题的另一部分是 reduceByKey .一般来说, reduceByKey如果应用聚合函数可以减少必须混洗的数据量,则很有用。由于您只是连接字符串,因此这里没有任何好处。忽略低级的东西,比如引用的数量,你必须传输的数据量与 groupByKey 完全相同。 .

    通常我不会细说,但据我所知,这是 Scala 代码中的瓶颈。在 JVM 上连接字符串是一项相当昂贵的操作(参见例如: Is string concatenation in scala as costly as it is in Java? )。这意味着类似这样的事情 _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2)相当于 input4.reduceByKey(valsConcat)在您的代码中不是一个好主意。

    如果你想避免 groupByKey您可以尝试使用 aggregateByKeyStringBuilder .与此类似的事情应该可以解决问题:
    rdd.aggregateByKey(new StringBuilder)(
    (acc, e) => {
    if(!acc.isEmpty) acc.append(",").append(e)
    else acc.append(e)
    },
    (acc1, acc2) => {
    if(acc1.isEmpty | acc2.isEmpty) acc1.addString(acc2)
    else acc1.append(",").addString(acc2)
    }
    )

    但我怀疑这是否值得大惊小怪。

    牢记上述内容,我已将您的代码重写如下:

    斯卡拉 :
    val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
    (idx, iter) => if (idx == 0) iter.drop(1) else iter
    }

    val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
    case ("true", i) => (i, "1")
    case ("false", i) => (i, "0")
    case p => p.swap
    })

    val result = pairs.groupByKey.map{
    case (k, vals) => {
    val valsString = vals.mkString(",")
    s"$k,$valsString"
    }
    }

    result.saveAsTextFile("scalaout")

    python :
    def drop_first_line(index, itr):
    if index == 0:
    return iter(list(itr)[1:])
    else:
    return itr

    def separate_cols(line):
    line = line.replace('true', '1').replace('false', '0')
    vals = line.split(',')
    for (i, x) in enumerate(vals):
    yield (i, x)

    input = (sc
    .textFile('train.csv', minPartitions=6)
    .mapPartitionsWithIndex(drop_first_line))

    pairs = input.flatMap(separate_cols)

    result = (pairs
    .groupByKey()
    .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))

    result.saveAsTextFile("pythonout")

    结果

    local[6]模式(Intel(R) Xeon(R) CPU E3-1245 V2 @ 3.40GHz),每个执行器需要 4GB 内存(n = 3):
  • Scala - 平均值:250.00s,标准差:12.49
  • Python - 平均值:246.66s,标准差:1.15

  • 我很确定大部分时间都花在了改组、序列化、反序列化和其他次要任务上。只是为了好玩,这里是 Python 中的简单单线程代码,可以在不到一分钟的时间内在这台机器上执行相同的任务:
    def go():
    with open("train.csv") as fr:
    lines = [
    line.replace('true', '1').replace('false', '0').split(",")
    for line in fr]
    return zip(*lines[1:])

    关于scala - Scala 与 Python 的 Spark 性能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32464122/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com