gpt4 book ai didi

apache-spark - Spark Streaming 崩溃到 Kafka Ran out of messages before reaching ending offset exception

转载 作者:行者123 更新时间:2023-12-03 19:04:55 27 4
gpt4 key购买 nike

在我们的一个流媒体用例中;我们的一个传感器软件每秒向具有 50 个分区的 Kafka 主题发送 5 条 ~20MB JSON 消息,当 spark streaming 试图从 Kafka 读取消息时它崩溃并出现以下异常。为了更好地了解情况,我们设法使传感器软件仅发送 1 20MB 消息/秒,但 spark 应用程序崩溃并出现相同的错误。如果我遗漏了处理此类情况所需的任何操作,请告诉我。

我们有以下配置

-Kafka 0.9.0 server.properties

message.max.bytes=60000000 
replica.fetch.max.bytes=120000000

-Spark 1.6.1 在 yarn 上配置 DirectAPI

val kafkaParams = Map[String, String](
"security.protocol" -> "SASL_PLAINTEXT",
"group.id" -> groupid,
"metadata.broker.list" -> kafkaBrokerList,
"max.partition.fetch.bytes" -> "60000000")

- Spark 提交

spark-submit \
--verbose \
--master yarn-cluster \
--num-executors 3 \
--executor-memory 7g \
--executor-cores 3 \
--conf spark.driver.memory=1024m \
--conf spark.streaming.backpressure.enabled=false \
--conf spark.streaming.kafka.maxRatePerPartition=3 \
--conf spark.streaming.concurrentJobs=3 \
--conf spark.speculation=true \
--conf spark.hadoop.fs.hdfs.impl.disable.cache=true \
--files kafka_jaas.conf#kafka_jaas.conf,user.headless.keytab#user.headless.keytab \
--driver-java-options "-Djava.security.auth.login.config=./kafka_jaas.conf -Dhttp.proxyHost=PROXY_IP -Dhttp.proxyPort=8080 -Dhttps.proxyHost=PROXY_IP -Dhttps.proxyPort=8080 -Dlog4j.configuration=file:/home/user/spark-log4j/log4j-topic_name-driver.properties" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_jaas.conf -Dlog4j.configuration=file:/home/user/spark-log4j/log4j-topic_name-executor.properties" \
--class com.spark.demo.StreamProcessor /home/user/demo.jar /tmp/data/out 60 KAFKA_BROKER:6667 "groupid" topic_name

-异常(exception):

User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, IP_HOST): java.lang.AssertionError: assertion failed: Ran out of messages before reaching ending offset 197 for topic x_topic_3 partition 24 start 196. This should not happen, and indicates that messages may have been lost
at scala.Predef$.assert(Predef.scala:179)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1335)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1335)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1881)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1881)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace

:

最佳答案

在kafkaParams中添加("fetch.message.max.bytes"-> "20971520"),源码在ConsumerConfig.scala#114 ( Spark 1.6.2)

关于apache-spark - Spark Streaming 崩溃到 Kafka Ran out of messages before reaching ending offset exception,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41053930/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com