gpt4 book ai didi

scala - Spark如何从多个Elastic Search集群读取

转载 作者:行者123 更新时间:2023-12-02 22:56:41 26 4
gpt4 key购买 nike

我需要从两个不同的Elastic Search集群读取数据。一个用于日志,一个用于产品数据,我尝试在创建sparkConf()时放入不同的SparkSession,但似乎仅与我创建的第一个SparkSession一起使用

val config1 = new SparkConf().setAppName("test")
.set("spark.driver.allowMultipleContexts", "true")
.set("es.index.auto.create", "true")
.set("es.nodes.discovery", "false")
.set("es.nodes.wan.only", "true")
.set("es.nodes.client.only", "false")
.set("es.nodes", s"$esNode1:$esPort1")

val config2 = new SparkConf().setAppName("test")
.set("spark.driver.allowMultipleContexts", "true")
.set("es.index.auto.create", "true")
.set("es.nodes.discovery", "false")
.set("es.nodes.wan.only", "true")
.set("es.nodes.client.only", "false")
.set("es.nodes", s"$esNode2:$esPort2")

val session1 = SparkSession.builder.master('local').config(config1).getOrCreate()
val session2 = SparkSession.builder.master('local').config(config2).getOrCreate()

session1.read.format("org.elasticsearch.spark.sql").load(path)
session2.read.format("org.elasticsearch.spark.sql").load(path)

似乎spark不支持使用相同格式的多个 session ,因为我也在Mysql(jdbc)中使用了相同的SparkSession,并且效果很好。有没有其他方法可以从多个ElasticSearch集群获取数据?

最佳答案

每个Spark应用程序仅创建一个 session 。然后以这种方式读取2个DataFrame:

  val config = new SparkConf().setAppName("test")
.set("spark.driver.allowMultipleContexts", "true")
.set("es.index.auto.create", "true")
.set("es.nodes.discovery", "false")
.set("es.nodes.wan.only", "true")
.set("es.nodes.client.only", "false")

val session = SparkSession.builder.master("local").config(config).getOrCreate

val df1 = session.read.format("org.elasticsearch.spark.sql")
.option("es.nodes", s"$esNode1:$esPort1").load(path)

val df2 = session.read.format("org.elasticsearch.spark.sql")
.option("es.nodes", s"$esNode2:$esPort2").load(path)

关于scala - Spark如何从多个Elastic Search集群读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51134832/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com