gpt4 book ai didi

scala - 如何使用kafka流为kafka主题中的 key 保留N个最新值

转载 作者:行者123 更新时间:2023-12-01 13:14:56 25 4
gpt4 key购买 nike

假设我使用 kafka 流(kafka-streams-scala 库,版本 2.2.0)。

我需要使用 kafka 流为 kafka 主题中的键保留一些最近的值。我用它来丰富另一个流。所以我需要像 KTable 或 GlobalKTable 这样的东西,但它们只保留一个值。

我想出了一种可能的方法来做到这一点:创建流和可变 Map,然后使用 stream.foreach跟踪每个键的 N 个最近值。

val stream: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topicName")

val map = scala.collection.mutable.Map[String, List[MyObject]]

stream.foreach((k, v) => {
//update map
})

我的问题是是否有更好的方法来实现这一点 - 使用流 API 或至少不使用可变映射。

最佳答案

So I need something like KTable or GlobalKTable, but they keep only one value.



继续使用 KTable (或 GlobalKTable ),但使用结构化值和/或集合作为“值”。 Kafka 中没有任何强制要求您将消息值限制为仅原始数据类型(如 IntegerString )。

想想: KStream<UserId, List<ClickEvent>> .这里,每条消息都属于一个特定的用户(由 key UserId 标识),每条消息都有零、一或多个列表 ClickEvent s 与该用户相关联。这“有效”,您只需要为要使用的数据类型提供适当的 serdes(串行器/解串器)。

例如, CustomStreamTableJoin示例在 https://github.com/confluentinc/kafka-streams-examples ( direct link to example for v5.2.1 ,适用于 Apache Kafka v2.2)使用 Pair 用于在 Kafka 的消息值中存储元组的类,它有其附带的 PairSerde .同样可以(并且正在由开发人员完成)来存储值的集合,例如 List<ClickEvent> ,正如您在自己的用例中提到的那样。

I need to keep a few recent values for key in kafka topic using kafka streams. [...] I figured out one possible way to do this: creating stream and mutable Map, [...]



您不需要使用 Map .该键已经在 Kafka 消息中可用,因此您只需要为消息值提供一个类似 List 的数据类型。

or at least without a mutable map.



您不需要(也不应该)使用可变数据结构,除非有特定原因,我认为您的用例中没有。当正在处理新消息并且相应的输出存储在 KTable 中时,那么为该键存储在表中的任何内容都将被覆盖——因此使用不可变数据结构作为消息值是完全没问题的。

关于scala - 如何使用kafka流为kafka主题中的 key 保留N个最新值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56002612/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com