gpt4 book ai didi

hadoop - 将 rdd 从 spark 写入 Elastic Search 失败

转载 作者:可可西里 更新时间:2023-11-01 14:32:47 27 4
gpt4 key购买 nike

我正在尝试在版本 2.4.0 上将一对 rdd 写入 Elastic Cloud 上的 Elastic Search。我正在使用 elasticsearch-spark_2.10-2.4.0 插件写入 ES。这是我用来写入 ES 的代码:

def predict_imgs(r):  
import json
out_d = {}
out_d["pid"] = r["pid"]
out_d["other_stuff"] = r["other_stuff"]

return (r["pid"], json.dumps(out_d))

res2 = res1.map(predict_imgs)

es_write_conf = {
"es.nodes" : image_es,
#"es.port" : "9243",
"es.resource" : "index/type",
"es.nodes.wan.only":"True",
"es.write.operation":"upsert",
"es.mapping.id":"product_id",
"es.nodes.discovery" : "false",
"es.net.http.auth.user": "username",
"es.net.http.auth.pass": "pass",
"es.input.json": "true",
"es.http.timeout":"1m",
"es.scroll.size":"10",
"es.batch.size.bytes":"1mb",
"es.http.retries":"1",
"es.batch.size.entries":"5",
"es.batch.write.refresh":"False",
"es.batch.write.retry.count":"1",
"es.batch.write.retry.wait":"10s"}

res2.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)

我得到的错误如下:

Py4JJavaError: An error occurred while calling     z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 744 in stage 26.0 failed 4 times, most recent failure: Lost task 744.3 in stage 26.0 (TID 2841, 10.181.252.29): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)

有趣的是,当我对 rdd2 上的前几个元素进行处理,然后从中创建一个新的 rdd 并将其写入 ES 时,它可以完美地工作:

x = sc.parallelize([res2.take(1)])
x.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)

我正在使用 Elastic Cloud(Elastic Search 的云产品)和 Databricks(Apache Spark 的云产品)难道是 ES 跟不上 Spark 写入 ES 的吞吐量?我将 Elastic Cloud 的大小从 2GB RAM 增加到 8GB RAM。

我上面使用的 es_write_conf 有推荐的配置吗?您能想到的任何其他 confs 吗?更新到 ES 5.0 有帮助吗?

感谢任何帮助。几天来一直在努力解决这个问题。谢谢。

最佳答案

它看起来像是 pyspark 计算的问题,不一定是 elasticsearch 保存过程。通过以下方式确保您的 RDD 正常:

  1. 在 rdd1 上执行 count()(以“实现”结果)
  2. 在 rdd2 上执行 count()

如果计数没问题,在保存到 ES 之前尝试缓存结果:

res2.cache()
res2.count() # to fill the cache
res2.saveAsNewAPIHadoopFile(...

如果问题仍然存在,请尝试查看已死的执行程序 stderr 和 stdout(您可以在 SparkUI 的“执行程序”选项卡上找到它们)。

我还注意到 es_write_conf 中的批量大小非常小,请尝试将其增加到 500 或 1000 以获得更好的性能。

关于hadoop - 将 rdd 从 spark 写入 Elastic Search 失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40554193/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com