gpt4 book ai didi

scala - 将 Elasticsearch 5.5批量提取API与Spark流一起使用时发生NullPointerException

转载 作者:行者123 更新时间:2023-12-03 01:39:43 28 4
gpt4 key购买 nike

获取NullPointerException:

java.lang.NullPointerException
at org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:604)
at org.elasticsearch.action.TransportActionNodeProxy.execute(TransportActionNodeProxy.java:46)
at org.elasticsearch.client.transport.TransportProxyClient.lambda$execute$0(TransportProxyClient.java:59)
at org.elasticsearch.client.transport.TransportClientNodesService.execute(TransportClientNodesService.java:250)
at org.elasticsearch.client.transport.TransportProxyClient.execute(TransportProxyClient.java:59)
at org.elasticsearch.client.transport.TransportClient.doExecute(TransportClient.java:363)
at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:408)
at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:80)
at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:54)

我有一个场景,其中多个并发任务在Spark Streaming Application的4个执行程序中运行,并且每个执行程序都从Kafka读取数据,准备大容量并提取ES索引中的记录批次。我第一次遇到了一些奇怪的NullPointerException异常,但是在第二次运行中成功处理了它们。

有人可以告诉我为什么会这样。

最佳答案

这是我正在使用的代码片段第一行是来自build.sbt文件的依赖项

//lib dependency in build.sbt
"org.elasticsearch" %% "elasticsearch-spark-20" % "5.6.5"

//below is the connection variables required by Spark

val resources: String =
s"${appConf.getString("es-index")}/${appConf.getString("es.type")}"
val esConfig: Map[String, String] = Map(
"es.index.auto.create" -> s"${appConf.getString("es.index.auto.create")}",
"es.nodes" -> s"${appConf.getString("es-nodes")}",
"es.port" -> s"${appConf.getInt("es.port")}",
"es.nodes.wan.only" -> s"${appConf.getString("es.nodes.wan.only")}",
"es.net.ssl" -> s"${appConf.getString("es.net.ssl")}"
)

import org.elasticsearch.spark._
val dstream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](conn.topic,
conn.kafkaProps)
)
dstream.foreachRDD(rdd =>
rdd.map(_.value).saveJsonToEs(resources,esConfig))
ssc.checkpoint("/tmp/OACSpark")
ssc.start()
ssc.awaitTermination()

我已使用typesafe config从属性文件中读取配置。
我正在以json的形式将数据发布到kafka,所以我使用了“saveJsonToEs()” api,您可以在Elasticsearch网站的连接器文档中找到更多信息”

关于scala - 将 Elasticsearch 5.5批量提取API与Spark流一起使用时发生NullPointerException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48781385/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com