gpt4 book ai didi

apache-spark - 从spark写入elasticsearch非常慢

转载 作者:行者123 更新时间:2023-12-02 22:43:33 31 4
gpt4 key购买 nike

我正在处理文本文件并从Spark应用程序将转换后的行写入到 flex 搜索中,如下所示

input.write.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.resource", "{date}/" + dir).save()

这运行非常慢,大约需要8分钟才能写入287.9 MB / 1513789记录。
enter image description here

考虑到网络延迟始终存在,我如何调整spark和elasticsearch设置以使其更快。

我在本地模式下使用Spark,并且具有16个内核和64GB RAM。
我的Elasticsearch集群有一个主节点和3个数据节点,每个节点有16个核心,每个核心64GB。

我正在阅读如下文本文件
 val readOptions: Map[String, String] = Map("ignoreLeadingWhiteSpace" -> "true",
"ignoreTrailingWhiteSpace" -> "true",
"inferSchema" -> "false",
"header" -> "false",
"delimiter" -> "\t",
"comment" -> "#",
"mode" -> "PERMISSIVE")

....
val input = sqlContext.read.options(readOptions).csv(inputFile.getAbsolutePath)

最佳答案

首先,让我们从应用程序中发生的事情开始。 Apache Spark正在读取1个(不是很大)被压缩的csv文件。因此,第一个spark将花费时间解压缩数据并对其进行扫描,然后再将其写入elasticsearch

这将创建一个带有一个分区的Dataset / DataFrame (由注释中提到的df.rdd.getNumPartitions的结果确认)。

一种简单的解决方案是在读取数据时对数据进行repartition并将其缓存,然后再将其写入elasticsearch。现在我不确定您的数据是什么样的,因此从您的 Angular 确定分区的数量是基准。

val input = sqlContext.read.options(readOptions)
.csv(inputFile.getAbsolutePath)
.repartition(100) // 100 is just an example
.cache

我不确定您的应用程序能带来多少好处,因为我相信可能还会存在其他瓶颈(网络IO,ES的磁盘类型)。

PS:我应该先将csv转换为 Parquet 文件,然后在其上构建ETL。在这里可以真正获得性能。 (个人意见和基准)

另一种可能的优化方法是调整elasticsearch-spark连接器的es.batch.size.entries设置。默认值为1000

设置此参数时需要小心,因为您可能会使Elasticsearch重载。我强烈建议您看一下可用的配置here

我希望这有帮助 !

关于apache-spark - 从spark写入elasticsearch非常慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47453244/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com