gpt4 book ai didi

spark-streaming - spark-streaming-kafka-0-10_2.10 中的一些 kafka 参数被固定为无

转载 作者:行者123 更新时间:2023-12-01 06:00:02 30 4
gpt4 key购买 nike

我正在使用 2.0.2 版的 spark-streaming-kafka-0-10_2.10 进行 Spark 流作业。我收到了这样的警告:
17/10/10 16:42:25 WARN KafkaUtils: overriding enable.auto.commit to false for executor
17/10/10 16:42:25 WARN KafkaUtils: overriding auto.offset.reset to none for executor
17/10/10 16:42:25 WARN KafkaUtils: overriding executor group.id to spark-executor-dump_user_profile
17/10/10 16:42:25 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135

当我查看源代码时,有一段代码修复了 KafkaUtils 中名为 fixKafkaParams(...) 的参数,如下所示:

``

logWarning(s"overriding ${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG} to false for executor")
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean)

logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor")
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")

// driver and executor should be in different consumer groups
val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
if (null == originalGroupId) {
logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it")
}
val groupId = "spark-executor-" + originalGroupId
logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)

// possible workaround for KAFKA-3135
val rbb = kafkaParams.get(ConsumerConfig.RECEIVE_BUFFER_CONFIG)
if (null == rbb || rbb.asInstanceOf[java.lang.Integer] < 65536) {
logWarning(s"overriding ${ConsumerConfig.RECEIVE_BUFFER_CONFIG} to 65536 see KAFKA-3135")
kafkaParams.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
}

}
``
我怎样才能度过难关?非常感谢

最佳答案

“KafkaUtils: overriding auto.offset.reset to none for executor”是 KafkaUtils 中的常见行为,这不会产生任何问题。这个可以忽略。

这仅在 KafkaUtils 中以这种方式编写,以调整 kafka 参数以防止执行程序出现问题,但是您可以在驱动程序上检查它,它不会更改 auto.offset.reset 的值并保留您在 kafkaParams 中定义的值.下面是 kafkaUtils 的链接供引用。

https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala

最初我什至认为这可能是问题,但在执行我的 kafka 代码后,我没有遇到任何问题。

关于spark-streaming - spark-streaming-kafka-0-10_2.10 中的一些 kafka 参数被固定为无,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46666406/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com