- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试测试一个拓扑,该拓扑作为最后一个节点,具有 KTable。我的测试使用的是成熟的 Kafka 集群(通过 confluence 的 Docker 镜像),因此我不使用TopologyTestDriver
。
我的拓扑具有键值类型 String -> Customer
的输入和 String -> CustomerMapped
的输出。 Serdes、模式以及与模式注册表的集成都按预期工作。
我正在使用 Scala、Kafka 2.2.0、Confluence Platform 5.2.1 和 kafka-streams-scala
。我的拓扑尽可能简化,如下所示:
val otherBuilder = new StreamsBuilder()
otherBuilder
.table[String,Customer](source)
.mapValues(c => CustomerMapped(c.surname, c.age))
.toStream.to(target)
(所有隐式 Serdes、Produced
、Consumed
等都是默认值,并且可以正确找到)
我的测试包括将一些记录(数据
)同步且不间断地发送到源
主题,并从目标
读回> 主题,我将结果与预期
进行比较:
val data: Seq[(String, Customer)] = Vector(
"key1" -> Customer(0, "Obsolete", "To be overridden", 0),
"key1" -> Customer(0, "Obsolete2", "To be overridden2", 0),
"key1" -> Customer(1, "Billy", "The Man", 32),
"key2" -> Customer(2, "Tommy", "The Guy", 31),
"key3" -> Customer(3, "Jenny", "The Lady", 40)
)
val expected = Vector(
"key1" -> CustomerMapped("The Man", 32),
"key2" -> CustomerMapped("The Guy", 31),
"key3" -> CustomerMapped("The Lady", 40)
)
我构建了 Kafka Stream 应用程序,在其他设置之间进行设置,以下两个:
p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000")
val s: Long = 50L * 1024 * 1024
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, s.toString)
所以我希望 KTable 使用缓存,提交之间的间隔为 5 秒,缓存大小为 50MB(对于我的场景来说绰绰有余)。
我的问题是,我从 target
主题读回的结果始终包含 key1
的多个条目。我预计不会为带有 Obsolete
和 `Obsolete1 的记录发出任何事件。实际输出为:
Vector(
"key1" -> CustomerMapped("To be overridden", 0),
"key1" -> CustomerMapped("To be overridden2", 0),
"key1" -> CustomerMapped("The Man", 32),
"key2" -> CustomerMapped("The Guy", 31),
"key3" -> CustomerMapped("The Lady", 40)
)
最后要提的是:这个测试过去一直按预期工作,直到我将 Kafka 从 2.1.0 更新到 2.2.0。我再次验证了我的应用程序降级。
我很困惑,谁能指出 2.2.x 版本中 KTables 的行为是否发生了变化?或者也许现在我必须设置新的设置来控制事件的发出?
最佳答案
在 Kafka 2.2 中,引入了一项优化来减少 Kafka Streams 的资源占用。如果计算不需要KTable
,则不一定会具体化它。这适用于您的情况,因为 mapValues()
可以即时计算。由于 KTable
未具体化,因此没有缓存,因此每个输入记录都会生成一个输出记录。
比较:https://issues.apache.org/jira/browse/KAFKA-6036
如果您想强制执行 KTable
物化,可以将 Materilized.as("someStoreName")
传入 StreamsBuilder#table()
方法。
关于scala - KTable 应该发出的事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55687101/
我有两个流来自两个主题 orders 和 fsource。主要订单是静态的,很少更新,而 fsource 的更新速度为每秒 1000 个。这里我使用了 KTable-KTabke join 因为它们具
我对以下拓扑的行为方式有一些问题: String topic = config.topic(); KTable myTable = topology.builder().table(UUIDSerde
documentation来自 Confluent 网站提到以下内容: The left KTable can have multiple records which map to the same
KTable table1 KTable table2 我正在尝试加入两个 KTables (无窗口) 键并将结果写为 到输出主题。 谁能帮我提供一些如何执行此操作的示例。 最佳答案 因为在 KTab
请参阅下面的更新以显示潜在的解决方法 我们的应用程序使用 2 个主题作为 KTables,执行左连接,并输出到一个主题。在测试期间,我们发现当我们的输出主题只有 1 个分区时,这可以正常工作。当我们增
我最近开始试验 kafka 流。我有一个场景需要加入 KStream与 KTable . KTable 可能是这种情况不包含某些键。在那种情况下,我得到一个 NullPointerException
我正在尝试测试一个拓扑,该拓扑作为最后一个节点,具有 KTable。我的测试使用的是成熟的 Kafka 集群(通过 confluence 的 Docker 镜像),因此我不使用TopologyTest
我正在尝试测试一个拓扑,该拓扑作为最后一个节点,具有 KTable。我的测试使用的是成熟的 Kafka 集群(通过 confluence 的 Docker 镜像),因此我不使用TopologyTest
我有一个 kafka 主题和一个听它的 KTable。 我想写一个 http POST 请求,它将遍历 ktable 中的当前项目,对它们执行一些操作并写回主题 所以基本上我有: private va
我们有以下高级 DSL 处理拓扑: TimeWindows timeWindow = TimeWindows.of(windowDurationMs).advanceBy(windowAdvanceM
用例 - 有一个包含消息的主题(空,元数据)。我需要从主题创建一个 Ktable,其中键 (metadata.entity_id) 和值作为元数据。该表稍后将用于与具有相同键的流进行连接。 p
我在 Kafka 中创建带有时间窗口的 KTable 时遇到了一些问题。 我想创建一个表来计算流中 ID 的数量,就像这样。 ID (String) | Count (Long) X
假设我有一个包含多个分区的主题。我在其中写入 K/V 数据,并希望通过按键在 Tumbling Windows 中聚合所述数据。 假设我已经启动了与我有分区一样多的工作实例,并且每个工作实例都在单独的
我正在尝试在 Kafka Streams 中加入一个类似于许多文章的简单外键(例如: https://www.confluent.io/blog/data-enrichment-with-kafka-
如何确定主题的 KTable 物化何时完成? 例如假设 KTable 有几百万行。伪代码如下: KTable kt = kgroupedStream.groupByKey(..).reduce(..)
我有一个窗口KTable按预期工作,但每次收到新值时它都会输出。我找到了 .suppress 运算符,它完全符合我的要求:仅在时间窗口结束时输出结果。我已向 TimeWindow 添加了 grace
我是 Kafka Streams API 的新手,我正在尝试创建一个 KTable。我有一个输入主题:s-order-topic,它是一个json格式的消息,如下所示。 { "current_ts":
我想编写一个小型 Kafka Streams 应用程序,它会减少输入流的时间窗口,对值进行一些映射,然后将生成的 toStream() 更改日志发送到另一个主题。使用我的代码,我在 toStream(
有没有办法计算 Java/scala 应用程序中 KTable/KStream 随着时间的推移大约将使用多少堆(或任何其他)内存? 我有一些具体的假设,我想知道它们是否正确: Kafka 流仅使用内部
如何确定主题的 KTable 物化何时完成? 例如假设 KTable 有几百万行。伪代码如下: KTable kt = kgroupedStream.groupByKey(..).reduce(..)
我是一名优秀的程序员,十分优秀!