gpt4 book ai didi

scala - Akka 流卡夫卡: No configuration setting found for key 'kafka-clients'

转载 作者:行者123 更新时间:2023-12-01 23:07:49 29 4
gpt4 key购买 nike

我正在尝试使用 Alpakka Kafka connector (Akka Stream Kafka) 创建一个简单的原型(prototype).

运行应用程序时出现以下错误:

com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'kafka-clients'

我在 ./src/main/scala/App.scala 中有以下代码:

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.Source
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer

import scala.concurrent.Future

object App {
def main(args: Array[String]): Unit = {
println("Hello from producer")

implicit val system = ActorSystem("fakeProducer")
implicit val materializer: Materializer = ActorMaterializer()

val config = system.settings.config // ConfigFactory.load()

val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")

val done: Future[Done] =
Source(1 to 100)
.map(_.toString)
.map(value => new ProducerRecord[String, String]("test-basic-numbers", value))
.runWith(Producer.plainSink(producerSettings))


println("Done")
}
}

以下build.sbt:

name := "test-akka-stream"

version := "0.1"

scalaVersion := "2.11.8"

libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"

我使用 sbt run 运行应用程序。我没有配置任何 uber/assembly jar。

我可能遗漏了一些明显的东西,但我看不到它......我怀疑 akka 依赖项存在一些问题。

更新

正如@terminally-chill 所建议的,调用 ProducerSettings(system, new StringSerializer, new StringSerializer)(传递 ActorSystem 而不是配置)解决了问题。我只是不明白这是设计使然还是错误。

更新 2

我创建了一个 github issue那已经被修复了。现在文档更加准确,并解释了创建 ProducerSettings/ConsumerSettings 的正确方法。

val config = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")

或者您可以按照上面的解释传递 ActorSystem

最佳答案

感谢@terminally-chill 和@murray-todd-williams 的回答。我做了一些进一步的研究,我试着在这里总结一下:

ConsumerSettingsProducerSettings 都具有 apply 函数,这些函数采用 Config(参见 here )或ActorSystem(参见 here)。

问题是当使用 ActorSystem 时,代码是:

val config = system.settings.config.getConfig("akka.kafka.consumer")
apply(config, keyDeserializer, valueDeserializer) // call the other overload

当使用 Config 时,代码是:

val properties = ConfigSettings.parseKafkaClientsProperties(config.getConfig("kafka-clients"))

因此,当直接传递配置时,代码会搜索 kafka-clients 属性,而当传递 ActorSystem 时,代码会检查 akka.kafka。消费者/akka.kafka.producer.

最后考虑一下,在默认情况下创建 ActorSystem 实例时,大部分设置都是从​​嵌入的 reference.conf 文件中加载的,并与您的 application.conf 合并 文件(如果存在)。更多信息 here .所以基本上唯一需要设置的属性通常是 bootstrap.servers

现在您可以理解为什么在使用 system.settings.config 时代码不起作用。此配置实例已加载 reference.conf(具有所有默认值,请参阅 here)和自定义 application.confkafka-clients 属性位于 akka.kafka.consumer/akka.kafka.producer 中,但代码直接检查 kafka-clients

一些可能的解决方案:

  • 使用其他重载直接传递 ActorSystem
  • 使用 system.settings.config.getConfig("akka.kafka.consumer") 传递正确的部分
  • 使用 kafka-clients 部分手动构造一个 Config 实例

对我来说问题是官方文档提供了here没有提及这些差异,并且提供的示例不完整和/或不准确。对于 Akka 专家来说,这可能很清楚,但对于新开发人员来说,这可能会非常困惑。

我创建了一个更“随时可用”的示例 in this gist并打开 issue .

关于scala - Akka 流卡夫卡: No configuration setting found for key 'kafka-clients' ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50914596/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com