gpt4 book ai didi

scala - 如何在Spark中的执行器之间同步功能以避免在写入Elastic时并发

转载 作者:行者123 更新时间:2023-12-02 23:12:27 29 4
gpt4 key购买 nike

我有一个函数将被调用以使用spark和scala将DataFrame写入Elastic搜索。 (DataFrame是在函数调用之前创建的)

def writeToES(dfForES: DataFrame, indexName: String, spark: SparkSession, conf: JSONObject) = {
import org.apache.spark.sql.functions.col
val doc_id_cols = Array("zip_id", "pattern_name", "row_index")
if (indexName == conf.getString("elkParserIndex")) {
println("Parser Index")
.withColumn("row_index", col("line_number").cast(IntegerType))
.write.format("org.elasticsearch.spark.sql")
.mode("append")
.save(conf.getString("elkParserIndex"))
}

我有5个执行者,每个执行者都有3个核心。他们在并行中调用此函数,并且 flex 搜索由于无法处理大量并行负载而给出了异常(exception)。

org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes



由于我是Elastic的新手,所以无法在Elastic中处理此异常,并且希望通过避免并发来处理spark。
有什么办法可以解决这个问题?

最佳答案

在您的语句中适当使用.coalese(1)或.repartition(1),将导致所有数据都被改组到Worker上的单个执行器。

这意味着1个进程,没有并发问题。这也意味着较低的吞吐量。

关于scala - 如何在Spark中的执行器之间同步功能以避免在写入Elastic时并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58852156/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com