gpt4 book ai didi

java - 使用ES Hadoop连接器将JavaRDD保存在Elastic Search中

转载 作者:行者123 更新时间:2023-12-02 20:58:15 25 4
gpt4 key购买 nike

目前正在一个转换项目中,我需要将数据提供给来自Oracle的 flex 搜索。所以我的工作就这样

1. Sqoop - From oracle
2. Java Spark - Dataframe Joins then saving them into elastic search repo's

我的 flex 文件看起来像
{
Field 1: Value
Field 2: value
Field 3: Value
Field 4: [ -- Array of Maps
{
Name: Value
Age: Value
},{
Name: Value
Age: Value
}
]
Field 5:{ -- Maps
Code :Value
Key : Value
}
}

因此想知道,如何为上述结构形成一个javaRDD。

我已经编码,直到加入数据框并卡住,无法从那里继续。
所以我想要我的数据以标准化形式

我的 Spark 代码
Dataframe esDF = df.select(
df.col("Field1") , df.col("Field2") ,df.col("Field3")
,df.col("Name") ,df.col("Age") ,
df.col("Code"),df.col("Key")
)

请帮忙。

最佳答案

几种选择:

1-在dataFrame本身中使用saveToES方法。 (较旧的版本可能不支持此功能,适用于elasticsearch-spark-20_2.11-5.1.1.jar

import org.apache.spark.sql.SQLContext._
import org.apache.spark.sql.functions._
import org.elasticsearch.spark.sql._

dataFrame.saveToEs("<index>/<type>",Map(("es.nodes" -> <ip:port>"))

2-创建案例类并使用RDD []方法进行保存。 (也适用于旧版本)
import org.elasticsearch.spark._
case class ESDoc(...)
val rdd = df.map( row => EsDoc(..))
rdd.saveToEs("<index>/<type>",Map(("es.nodes" -> <ip:port>"))

3-对于旧版本的scala( < 2.11),在case类中,您将受到22个字段限制的困扰。请注意,您可以使用Map而不是case类
import org.elasticsearch.spark._
val rdd = df.map( row => Map(<key>:<value>...) )
rdd.saveToEs("<index>/<type>",Map(("es.nodes" -> <ip:port>")) // saves RDD[Map<K,V>]

对于上述所有方法,您可能希望将es.batch.write.retry.count传递给适当的值,如果您有另一种控制EMR生命周期的方式(确保它永远不会运行),则可以传递-1(无限重试)。
   val esOptions = Map("es.nodes" -> <host>:<port>, "es.batch.write.retry.count" -> "-1")

关于java - 使用ES Hadoop连接器将JavaRDD保存在Elastic Search中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43924678/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com