gpt4 book ai didi

hadoop - 如何插入到spark中的elasticsearch?

转载 作者:可可西里 更新时间:2023-11-01 14:19:34 25 4
gpt4 key购买 nike

使用 HTTP POST,以下脚本可以插入新字段 createtime 或更新 lastupdatetime:

curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
"doc": {
"lastupdatetime": "2015-09-16T18:00:00"
}
"upsert" : {
"createtime": "2015-09-16T18:00:00"
"lastupdatetime": "2015-09-16T18:00",
}
}'

但是在spark脚本中,设置了"es.write.operation": "upsert"之后,我根本不知道如何插入createtimeofficial document 中只有 es.update.script.* ...那么,谁能给我举个例子吗?

更新:在我的例子中,我想将 android 设备的信息从登录保存到 one elasticsearch 类型,并将其首次出现时间设置为 createtime 。如果设备再次出现,我只更新 lastupdatetime,但保持 createtime 不变。

所以文档id是android ID,如果id存在,更新lastupdatetime,否则插入createtimelastupdatetime。所以这里的设置是(在python中):

conf = {
"es.resource.write": "stats-device/activation",
"es.nodes": "NODE1:9200",
"es.write.operation": "upsert",
"es.mapping.id": "id"
# ???
}

rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=conf
)

如果 id 不存在,我不知道如何插入一个字段。

最佳答案

没有看到您的 Spark 脚本,很难给出详细的答案。但一般来说,你会想使用 elasticsearch-hadoop (所以你需要将该依赖项添加到你的 Build.sbt 文件中,例如)然后在你的脚本中你可以:

import org.elasticsearch.spark._ 
val documents = sc.parallelize(Seq(Map(
"id" -> 1,
"createtime" -> "2015-09-16T18:00:00"
"lastupdatetime" -> "2015-09-16T18:00"),
Map(<next document>), ...)
.saveToEs("test/type1", Map("es.mapping.id" -> "id"))

根据 official docs . saveToES 的第二个参数指定 map 的 RDD 中的哪个键用作 ElasticSearch 文档 ID。

当然,如果您使用 Spark 执行此操作,则意味着您的行数多于您想要手动输入的行数,因此对于您的情况,您需要将数据转换为 map 的 RDD从脚本中的键-> 值。但由于不知道数据来源,我无法深入了解更多细节。

关于hadoop - 如何插入到spark中的elasticsearch?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32605883/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com