gpt4 book ai didi

scala - EsHadoopIllegalArgumentException:无法检测ES版本Spark-ElasticSearch示例

转载 作者:行者123 更新时间:2023-12-02 23:20:16 29 4
gpt4 key购买 nike

我正在尝试将简单数据写入ElasticSearch示例。但是,我不断收到此错误:

EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only

我对Spark和ElasticSearch的依赖关系:
scalaVersion := "2.11.5"

val sparkVersion = "2.3.0"

resolvers += "Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"com.typesafe" % "config" % "1.3.0",
"org.elasticsearch" %% "elasticsearch-spark-20" % "6.2.4"
)

这是我的示例代码:
object App {

def main(args: Array[String]) {
val sparkConf = new SparkConf()
.setMaster(args(0))
.setAppName("KafkaSparkStreaming")
sparkConf.set("es.index.auto.create", "true")

val sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()

val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(3))
val sparkContext = streamingContext.sparkContext
sparkContext.setLogLevel("ERROR")

val sqlContext = new SQLContext(sparkContext)

val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

sparkContext.makeRDD(Seq(numbers, airports)).saveToEs("spark/docs")

streamingContext.start()
streamingContext.awaitTermination()
}
}

我用docker image运行ElasticSearch。这是我的docker-compose.yml文件:
version: '3.3'
services:
kafka:
image: spotify/kafka
ports:
- "9092:9092"
environment:
- ADVERTISED_HOST=localhost
elasticsearch:
image: elasticsearch
kibana:
image: kibana
ports:
- "5601:5601"

是什么导致此异常?我非常感谢您的帮助。

最佳答案

我在尝试用Elasticsearch尝试 Spark 时遇到了类似的情况,用“elasticsearch-hadoop”替换了“elasticsearch-spark”依赖项,以满足我的elasticsearch ver。解决了问题

val conf = new SparkConf().setAppName("Sample").setMaster("local[*]")
conf.set("es.index.auto.create", "true")

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))

val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

val rdd = sc.makeRDD(Seq(numbers, airports))
val microbatches = mutable.Queue(rdd)

ssc.queueStream(microbatches).saveToEs("spark/docs")

ssc.start()
ssc.awaitTermination()

依赖 list
"org.apache.spark" %% "spark-core" % "2.2.0",
"org.apache.spark" %% "spark-sql" % "2.2.0",
"org.apache.spark" %% "spark-streaming" % "2.2.0",
"org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.1",
"org.elasticsearch" %% "elasticsearch-hadoop" % "6.3.0",

关于scala - EsHadoopIllegalArgumentException:无法检测ES版本Spark-ElasticSearch示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50236733/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com