gpt4 book ai didi

scala - Elasticsearch + Spark:使用自定义文档_id编写json

转载 作者:行者123 更新时间:2023-12-02 22:43:32 31 4
gpt4 key购买 nike

我正在尝试从Spark在Elasticsearch中编写对象集合。我必须满足两个要求:

  • 文档已在JSON中序列化,应按
  • 的原样编写
  • 应该提供Elasticsearch文档_id

  • 到目前为止,这是我尝试过的。
    saveJsonToEs()
    我试图像这样使用 saveJsonToEs() (序列化文档包含具有所需Elasticsearch ID的 _id字段):
    val rdd: RDD[String] = job.map{ r => r.toJson() }

    val cfg = Map(
    ("es.resource", "myindex/mytype"),
    ("es.mapping.id", "_id"),
    ("es.mapping.exclude", "_id")
    )

    EsSpark.saveJsonToEs(rdd, cfg)

    但是 elasticsearch-hadoop 库给出了以下异常:
    Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...
    at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
    at org.elasticsearch.hadoop.rest.InitializationUtils.validateSettings(InitializationUtils.java:253)

    如果我删除 es.mapping.exclude但保留 es.mapping.id并发送一个内部带有 _id的JSON(例如 {"_id":"blah",...})
    val cfg = Map(
    ("es.resource", "myindex/mytype"),
    ("es.mapping.id", "_id")
    )

    EsSpark.saveJsonToEs(rdd, cfg)

    我收到此错误:
    Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 84.0 failed 4 times, most recent failure: Lost task 15.3 in stage 84.0 (TID 628, 172.31.35.69, executor 1): org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [172.31.30.184:9200] returned Bad Request(400) - Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.; Bailing out..
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)
    ...

    当我尝试将此ID作为其他字段发送时(例如 {"superID":"blah",...":
     val cfg = Map(
    ("es.resource", "myindex/mytype"),
    ("es.mapping.id", "superID")
    )

    EsSpark.saveJsonToEs(rdd, cfg)

    它无法提取字段:
    17/12/20 15:15:38 WARN TaskSetManager: Lost task 8.0 in stage 84.0 (TID 586, 172.31.33.56, executor 0): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]
    at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:106)
    at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
    at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
    at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:161)
    at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

    当我从配置中删除 es.mapping.ides.mapping.exclude时,它可以工作,但是文档ID由Elasticsearch生成(违反了要求2):
    val rdd: RDD[String] = job.map{ r => r.toJson() }

    val cfg = Map(
    ("es.resource", "myindex/mytype"),
    )

    EsSpark.saveJsonToEs(rdd, cfg)
    saveToEsWithMeta()
    还有另一个函数提供 _id和其他 metadata进行插入: saveToEsWithMeta() 可以解决需求2,但无法满足需求1。
    val rdd: RDD[(String, String)] = job.map{
    r => r._id -> r.toJson()
    }

    val cfg = Map(
    ("es.resource", "myindex/mytype"),
    )

    EsSpark.saveToEsWithMeta(rdd, cfg)

    实际上,Elasticsearch甚至无法解析 elasticsearch-hadoop发送的内容:
    Caused by: org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [<es_host>:9200] returned Bad Request(400) - failed to parse; Bailing out..
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)

    问题

    是否可以将Spark中的 (documentID, serializedDocument)集合写入Elasticsearch(使用 elasticsearch-hadoop)?

    附言我正在使用Elasticsearch 5.6.3和Spark 2.1.1。

    最佳答案

    最终我发现了问题:这是配置中的错字。

    [JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]

    它正在寻找字段 superID,但是只有 superID(请注意情况)。在这个问题上,它也有点误导,因为在代码中,它看起来像 "es.mapping.id", "superID"(这是不正确的)。

    实际的解决方案就像 Levi Ramsey建议的那样:
    val json = """{"foo":"bar","superID":"deadbeef"}"""

    val rdd = spark.makeRDD(Seq(json))
    val cfg = Map(
    ("es.mapping.id", "superID"),
    ("es.resource", "myindex/mytype")
    )
    EsSpark.saveJsonToEs(rdd, cfg = cfg)

    区别在于 es.mapping.id不能是 _id(如原始帖子中所述, _id是元数据,而Elasticsearch不接受它)。

    自然,这意味着应该将新字段 superID添加到映射中(除非映射是动态的)。如果在索引中存储其他字段是一种负担,则还应该:
  • exclude来自映射
  • 并禁用其索引

  • 非常感谢 Alex Savitsky指向正确的方向。

    关于scala - Elasticsearch + Spark:使用自定义文档_id编写json,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47892705/

    31 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com