gpt4 book ai didi

scala - KTable 应该发出的事件

转载 作者:行者123 更新时间:2023-12-02 20:02:00 27 4
gpt4 key购买 nike

我正在尝试测试一个拓扑,该拓扑作为最后一个节点,具有 KTable。我的测试使用的是成熟的 Kafka 集群(通过 confluence 的 Docker 镜像),因此我使用TopologyTestDriver

我的拓扑具有键值类型 String -> Customer 的输入和 String -> CustomerMapped 的输出。 Serdes、模式以及与模式注册表的集成都按预期工作。

我正在使用 Scala、Kafka 2.2.0、Confluence Platform 5.2.1 和 kafka-streams-scala。我的拓扑尽可能简化,如下所示:

val otherBuilder = new StreamsBuilder()

otherBuilder
.table[String,Customer](source)
.mapValues(c => CustomerMapped(c.surname, c.age))
.toStream.to(target)

(所有隐式 Serdes、ProducedConsumed 等都是默认值,并且可以正确找到)

我的测试包括将一些记录(数据)同步且不间断地发送到主题,并从目标读回> 主题,我将结果与预期进行比较:

val data: Seq[(String, Customer)] = Vector(
"key1" -> Customer(0, "Obsolete", "To be overridden", 0),
"key1" -> Customer(0, "Obsolete2", "To be overridden2", 0),
"key1" -> Customer(1, "Billy", "The Man", 32),
"key2" -> Customer(2, "Tommy", "The Guy", 31),
"key3" -> Customer(3, "Jenny", "The Lady", 40)
)
val expected = Vector(
"key1" -> CustomerMapped("The Man", 32),
"key2" -> CustomerMapped("The Guy", 31),
"key3" -> CustomerMapped("The Lady", 40)
)

我构建了 Kafka Stream 应用程序,在其他设置之间进行设置,以下两个:

p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000")
val s: Long = 50L * 1024 * 1024
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, s.toString)

所以我希望 KTable 使用缓存,提交之间的间隔为 5 秒,缓存大小为 50MB(对于我的场景来说绰绰有余)。

我的问题是,我从 target 主题读回的结果始终包含 key1 的多个条目。我预计不会为带有 Obsolete 和 `Obsolete1 的记录发出任何事件。实际输出为:

Vector(
"key1" -> CustomerMapped("To be overridden", 0),
"key1" -> CustomerMapped("To be overridden2", 0),
"key1" -> CustomerMapped("The Man", 32),
"key2" -> CustomerMapped("The Guy", 31),
"key3" -> CustomerMapped("The Lady", 40)
)

最后要提的是:这个测试过去一直按预期工作,直到我将 Kafka 从 2.1.0 更新到 2.2.0。我再次验证了我的应用程序降级。

我很困惑,谁能指出 2.2.x 版本中 KTables 的行为是否发生了变化?或者也许现在我必须设置新的设置来控制事件的发出?

最佳答案

在 Kafka 2.2 中,引入了一项优化来减少 Kafka Streams 的资源占用。如果计算不需要KTable,则不一定会具体化它。这适用于您的情况,因为 mapValues() 可以即时计算。由于 KTable 未具体化,因此没有缓存,因此每个输入记录都会生成一个输出记录。

比较:https://issues.apache.org/jira/browse/KAFKA-6036

如果您想强制执行 KTable 物化,可以将 Materilized.as("someStoreName") 传入 StreamsBuilder#table()方法。

关于scala - KTable 应该发出的事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55687101/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com