gpt4 book ai didi

json - 如何在指定了_id字段的情况下将所有数据从Elastic Search Index导出为JSON格式的文件?

转载 作者:行者123 更新时间:2023-12-02 23:52:05 25 4
gpt4 key购买 nike

我是Spark和Scala的新手。我试图将所有数据从Elastic Search中的特定索引读取到RDD中,并使用此数据写入Mongo DB。

我正在将Elastic搜索数据加载到esJsonRDD中,当我尝试打印RDD内容时,其格式如下,

(1765770532{"FirstName":ABC,"LastName":"DEF",Zipcode":"36905","City":"PortAdam","StateCode":"AR"})

预期格式
{_id:"1765770532","FirstName":ABC,"LastName":"DEF",Zipcode":"36905","City":"PortAdam","StateCode":"AR"}

如何实现 flex 搜索的输出以这种方式进行格式化?

任何帮助,将不胜感激。

从 flex 搜索中检索的数据具有以下格式,
(1765770532{"FirstName":ABC,"LastName":"DEF",Zipcode":"36905","City":"PortAdam","StateCode":"AR"})

预期格式为

{_id:“1765770532”,“名字”:ABC,“姓氏”:“DEF”,邮政编码”:“36905”,“城市”:“PortAdam”,“StateCode”:“AR”}
    object readFromES {

def main(args: Array[String]) {

val conf = new SparkConf().setAppName("readFromES")
.set("es.nodes", Config.ES_NODES)
.set("es.nodes.wan.only", Config.ES_NODES_WAN_ONLY)
.set("es.net.http.auth.user", Config.ES_NET_HTTP_AUTH_USER)
.set("es.net.http.auth.pass", Config.ES_NET_HTTP_AUTH_PASS)
.set("es.net.ssl", Config.ES_NET_SSL)
.set("es.output.json","true")

val sc = new SparkContext(conf)
val RDD = EsSpark.esJsonRDD(sc, "userdata/user")
//RDD.coalesce(1).saveAsTextFile(args(0))
RDD.take(5).foreach(println)
}
}

我希望将RDD输出以以下JSON格式(每个文档一行)写入文件,
{_id:"1765770532","FirstName":ABC,"LastName":"DEF",Zipcode":"36905","City":"PortAdam","StateCode":"AR"}
{_id:"1765770533","FirstName":DEF,"LastName":"DEF",Zipcode":"35525","City":"PortWinchestor","StateCode":"AI"}

最佳答案

"_id"是元数据的一部分,要访问它,您应该在配置中添加.config("es.read.metadata", true)

然后您可以通过两种方式访问​​它,可以使用

val RDD =  EsSpark.esJsonRDD(sc, "userdata/user") 

并在json中手动添加 _id字段

或更简单的方法是读取为数据框
val df = spark.read
.format("org.elasticsearch.spark.sql")
.load("userdata/user")
.withColumn("_id", $"_metadata".getItem("_id"))
.drop("_metadata")

//在文件中写为json
df.write.json("output folder ")

这里的 Spark 是创建为
val spark = SparkSession.builder().master("local[*]").appName("Test")
.config("spark.es.nodes","host")
.config("spark.es.port","ports")
.config("spark.es.nodes.wan.only","true")
.config("es.read.metadata", true) //for enabling metadata
.getOrCreate()

希望这可以帮助

关于json - 如何在指定了_id字段的情况下将所有数据从Elastic Search Index导出为JSON格式的文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56947211/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com