gpt4 book ai didi

apache-spark - 使用Spark Structured Streaming从Kafka读取数据,总是出现超时问题

转载 作者:行者123 更新时间:2023-12-04 00:32:28 26 4
gpt4 key购买 nike

这是我使用 Spark Structured Streaming 从 Kafka 读取数据的代码,

//ss:SparkSession is defined before. 
import ss.implicits._
val df = ss
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_server)
.option("subscribe", topic_input)
.option("startingOffsets", "latest")
.option("kafkaConsumer.pollTimeoutMs", "5000")
.option("failOnDataLoss", "false")
.load()

这是错误代码,
  Caused by: java.util.concurrent.TimeoutException: Cannot fetch record xxxx for offset in 5000 milliseconds

如果我将 5000 放大到 10000,仍然会发生此错误。
我用谷歌搜索了这个问题。似乎没有太多关于此问题的相关信息。

这是与此问题相关的 sbt 文件部分。
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0" exclude ("org.apache.kafka", "kafka-clients")
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"

最佳答案

我也收到了这个错误。

我查看了KafkaSourceRDD的源代码,一无所获。

我猜kafka连接器有问题,因此我在“spark-sql-kafka-0-10_2.11”包中排除了kafka-client,并添加了一个新的依赖项,如下所示:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>kafka-clients</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>

现在它起作用了。希望能帮助到你。

我创建了一个 jira 问题来报告这个问题:
https://issues.apache.org/jira/browse/SPARK-23829

2018 年 12 月 17 日更新:Spark 2.4 和 Kafka2.0 解决了问题。

关于apache-spark - 使用Spark Structured Streaming从Kafka读取数据,总是出现超时问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49526632/

26 4 0