- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 Alpakka Kafka connector (Akka Stream Kafka) 创建一个简单的原型(prototype).
运行应用程序时出现以下错误:
com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'kafka-clients'
我在 ./src/main/scala/App.scala
中有以下代码:
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.Source
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import scala.concurrent.Future
object App {
def main(args: Array[String]): Unit = {
println("Hello from producer")
implicit val system = ActorSystem("fakeProducer")
implicit val materializer: Materializer = ActorMaterializer()
val config = system.settings.config // ConfigFactory.load()
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val done: Future[Done] =
Source(1 to 100)
.map(_.toString)
.map(value => new ProducerRecord[String, String]("test-basic-numbers", value))
.runWith(Producer.plainSink(producerSettings))
println("Done")
}
}
以下build.sbt
:
name := "test-akka-stream"
version := "0.1"
scalaVersion := "2.11.8"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"
我使用 sbt run
运行应用程序。我没有配置任何 uber/assembly jar。
我可能遗漏了一些明显的东西,但我看不到它......我怀疑 akka 依赖项存在一些问题。
正如@terminally-chill 所建议的,调用 ProducerSettings(system, new StringSerializer, new StringSerializer)
(传递 ActorSystem
而不是配置)解决了问题。我只是不明白这是设计使然还是错误。
我创建了一个 github issue那已经被修复了。现在文档更加准确,并解释了创建 ProducerSettings
/ConsumerSettings
的正确方法。
val config = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
或者您可以按照上面的解释传递 ActorSystem
。
最佳答案
感谢@terminally-chill 和@murray-todd-williams 的回答。我做了一些进一步的研究,我试着在这里总结一下:
ConsumerSettings
和 ProducerSettings
都具有 apply
函数,这些函数采用 Config
(参见 here )或ActorSystem
(参见 here)。
问题是当使用 ActorSystem
时,代码是:
val config = system.settings.config.getConfig("akka.kafka.consumer")
apply(config, keyDeserializer, valueDeserializer) // call the other overload
当使用 Config
时,代码是:
val properties = ConfigSettings.parseKafkaClientsProperties(config.getConfig("kafka-clients"))
因此,当直接传递配置时,代码会搜索 kafka-clients
属性,而当传递 ActorSystem
时,代码会检查 akka.kafka。消费者/akka.kafka.producer
.
最后考虑一下,在默认情况下创建 ActorSystem
实例时,大部分设置都是从嵌入的 reference.conf
文件中加载的,并与您的 application.conf 合并
文件(如果存在)。更多信息 here .所以基本上唯一需要设置的属性通常是 bootstrap.servers
。
现在您可以理解为什么在使用 system.settings.config
时代码不起作用。此配置实例已加载 reference.conf
(具有所有默认值,请参阅 here)和自定义 application.conf
。 kafka-clients
属性位于 akka.kafka.consumer/akka.kafka.producer
中,但代码直接检查 kafka-clients
。
一些可能的解决方案:
ActorSystem
system.settings.config.getConfig("akka.kafka.consumer")
传递正确的部分kafka-clients
部分手动构造一个 Config
实例对我来说问题是官方文档提供了here没有提及这些差异,并且提供的示例不完整和/或不准确。对于 Akka 专家来说,这可能很清楚,但对于新开发人员来说,这可能会非常困惑。
我创建了一个更“随时可用”的示例 in this gist并打开 issue .
关于scala - Akka 流卡夫卡: No configuration setting found for key 'kafka-clients' ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50914596/
我是一名优秀的程序员,十分优秀!