gpt4 book ai didi

elasticsearch - Spark Streaming 和 ElasticSearch - 无法写入所有条目

转载 作者:行者123 更新时间:2023-11-29 02:54:00 26 4
gpt4 key购买 nike

我目前正在编写一个由生产者和消费者组成的 Scala 应用程序。生产者从外部源获取一些数据并将它们写入 Kafka。消费者从 Kafka 读取并写入 Elasticsearch。

消费者基于 Spark Streaming,每 5 秒从 Kafka 获取新消息并将它们写入 ElasticSearch。问题是我无法写入 ES,因为我收到很多错误,如下所示:

ERROR] [2015-04-24 11:21:14,734] [org.apache.spark.TaskContextImpl]: Error in TaskCompletionListener org.elasticsearch.hadoop.EsHadoopException: Could not write all entries [3/26560] (maybe ES was overloaded?). Bailing out... at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:225) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:236) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:125) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply$mcV$sp(EsRDDWriter.scala:33) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at org.apache.spark.TaskContextImpl$$anon$2.onTaskCompletion(TaskContextImpl.scala:57) ~[spark-core_2.10-1.2.1.jar:1.2.1] at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68) [spark-core_2.10-1.2.1.jar:1.2.1] at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66) [spark-core_2.10-1.2.1.jar:1.2.1] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) [na:na] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) [na:na] at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66) [spark-core_2.10-1.2.1.jar:1.2.1] at org.apache.spark.scheduler.Task.run(Task.scala:58) [spark-core_2.10-1.2.1.jar:1.2.1] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) [spark-core_2.10-1.2.1.jar:1.2.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_65] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_65] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]

考虑到生产者每 15 秒写 6 条消息,所以我真的不明白这种“重载”是如何发生的(我什至清理了主题并刷新了所有旧消息,我认为这与偏移量问题有关). Spark Streaming 每 5 秒执行一次的任务可以用下面的代码来概括:

  val result = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map("wasp.raw" -> 1), StorageLevel.MEMORY_ONLY_SER_2)
val convertedResult = result.map(k => (k._1 ,AvroToJsonUtil.avroToJson(k._2)))



//TO-DO : Remove resource (yahoo/yahoo) hardcoded parameter
log.info(s"*** EXECUTING SPARK STREAMING TASK + ${java.lang.System.currentTimeMillis()}***")


convertedResult.foreachRDD(rdd => {
rdd.map(data => data._2).saveToEs("yahoo/yahoo", Map("es.input.json" -> "true"))

})

如果我尝试打印消息而不是发送到 ES,一切都很好,我实际上只看到 6 条消息。为什么我不能写入 ES?

为了完整起见,我使用这个库来写入 ES:带有最新测试版的 elasticsearch-spark_2.10。

最佳答案

在多次重试之后,我发现了一种写入 ElasticSearch 而不会出现任何错误的方法。基本上将参数 "es.batch.size.entries"-> "1" 传递给 saveToES 方法解决了这个问题。我不明白为什么使用默认或任何其他批处理大小会导致上述错误,因为如果我尝试编写的内容超过允许的最大批处理大小,我会收到一条错误消息,而不是更少。

此外,我注意到实际上我是在写 ES 但不是我所有的消息,我每批丢失 1 到 3 条消息。

关于elasticsearch - Spark Streaming 和 ElasticSearch - 无法写入所有条目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29843898/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com