gpt4 book ai didi

apache-kafka - Kafka Streams - 从 KTable 中消失的值

转载 作者:行者123 更新时间:2023-12-03 21:19:28 24 4
gpt4 key购买 nike

问题背景

目前我们正在使用:Kafka Streams API(版本 1.1.0)来处理来自 Kafka 集群的消息(3 个代理,每个主题 3 个分区,复制因子为 2)。安装的 Kafka 版本为 1.1.1 。

最终用户向我们报告了数据消失的问题。他们报告说他们突然看不到任何数据(例如,昨天他们可以在 UI 中看到 n 条记录,而第二天早上的 table 是空的)。我们检查了这个特定用户的变更日志主题,它看起来很奇怪,看起来在几天不活动之后(给定键值对可能几天不变)变更日志主题中的聚合值丢失了。

代码

KTable 流水线:(消息按事件中的“用户名”分组)

@Bean
public KTable<UsernameVO, UserItems> itemsOfTheUser() {
return streamsBuilder.stream("application-user-UserItems", Consumed.with(Serdes.String(), serdes.forA(UserItems.class)))
.groupBy((key, event) -> event.getUsername(),
Serialized.with(serdes.forA(UsernameVO.class), serdes.forA(UserItems.class)))
.aggregate(
UserItems::none,
(key, event, userItems) ->
userItems.after(event),
Materialized
.<UsernameVO, UserItems> as(persistentKeyValueStore("application-user-UserItems"))
.withKeySerde(serdes.forA(UsernameVO.class))
.withValueSerde(serdes.forA(UserItems.class)));
}

聚合对象(KTable 值):
public class UserItems {

private final Map<String, Item> items;

public static UserItems none() {
return new UserItems();
}

private UserItems() {
this(emptyMap());
}

@JsonCreator
private UserItems(Map<String, Item> userItems) {
this.userItems = userItems;
}

@JsonValue
@SuppressWarnings("unused")
Map<String, Item> getUserItems() {
return Collections.unmodifiableMap(items);
}

...
public UserItems after(ItemAddedEvent itemEvent) {
Item item = Item.from(itemEvent);

Map<String, Item> newItems = new HashMap<>(items);
newItems.put(itemEvent.getItemName(), item);
return new UserItems(newItems);
}

卡夫卡主题

应用程序用户用户项

这个源主题没有问题。它已将保留设置为最大值,所有消息始终存在。

应用程序用户用户项存储更改日志 (压缩。具有默认配置 - 没有更改保留,也没有任何内容)

这是奇怪的部分。我们可以在更新日志中观察到,对于一些用户来说,这些值正在丢失:
Offset | Partition |   Key   |  Value  
...........................................
...
320 0 "User1" : {"ItemName1":{"param":"foo"}}
325 0 "User1" : {"ItemName1":{"param":"foo"},"ItemName2":{"param":"bar"}}
1056 0 "User1" : {"ItemName3":{"param":"zyx"}}
...

我们可以在上面看到,起初消息被正确聚合:有 Item1 被处理,然后 Item2 被应用于聚合。但一段时间后 - 可能是几天 - 正在处理另一个事件 - 底层“User1”键下的值似乎丢失了,只有 Item3 存在。

在应用程序中,用户不可能删除所有项目并在一个操作中添加另一个项目 - 用户只能添加或删除一个项目。所以如果他删除 ItemName1 和 ItemName2 然后添加 ItemName3 我们 期待 更改日志中的类似内容:
Offset | Partition |   Key   |  Value   
..............................................
...
320 0 "User1" : {"ItemName1":{"param":"foo"}}
325 0 "User1" : {"ItemName1":{"param":"foo"},"ItemName2":{"param":"bar"}}
1054 0 "User1" : {"ItemName2":{"param":"bar"}}
1055 0 "User1" : {}
1056 0 "User1" : {"ItemName3":{"param":"zyx"}}

结论

起初我们认为它与更改日志主题保留有关(但我们检查了它,它只启用了压缩)。
application-user-UserItems-store-changelog  PartitionCount:3    ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=104857600   
Topic: application-user-UserItems-store-changelog Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: application-user-UserItems-store-changelog Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: application-user-UserItems-store-changelog Partition: 2 Leader: 1 Replicas: 1 Isr:

任何想法或提示将不胜感激。干杯

最佳答案

我遇到了与您描述的相同的问题,似乎该问题与您的 kafka-streams 配置有关。
您提到您的“源”主题具有以下配置:

3 brokers, 3 partitions per topic, with replication factor 2


确保将以下属性放入您的 kafka 流配置(replication.factor)至少为 2(默认设置为 1)
StreamsConfig.REPLICATION_FACTOR_CONFIG [replication.factor]
这也对应于您编写的内容(更改日志主题的复制因子设置为 1)

application-user-UserItems-store-changelog PartitionCount:3 ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=104857600


因此,我的假设是您由于代理中断而丢失数据(由于复制因子 2,数据应该保留在源主题中,因此您可以重新处理和填充变更日志主题)

关于apache-kafka - Kafka Streams - 从 KTable 中消失的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52402883/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com