gpt4 book ai didi

java - 为什么 Kafka KTable 缺少条目?

转载 作者:搜寻专家 更新时间:2023-11-01 03:32:56 26 4
gpt4 key购买 nike

我有一个使用 Kafka Streams 中的 KTable 的单实例 Java 应用程序。直到最近,当某些消息突然消失时,我才可以使用 KTable 检索所有数据。那里应该有大约 33k strip 有唯一键的消息。

当我想按键检索消息时,我没有收到一些消息。我使用 ReadOnlyKeyValueStore 来检索消息:

final ReadOnlyKeyValueStore<GenericRecord, GenericRecord> store = ((KafkaStreams)streams).store(storeName, QueryableStoreTypes.keyValueStore());
store.get(key);

这些是我为 KafkaStreams 设置的配置设置。

final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, serverId);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

卡夫卡:0.10.2.0-cp1
合流:3.2.0

调查让我有了一些非常令人担忧的见解。使用 REST 代理我手动读取分区并发现一些偏移量返回错误。

要求:/topics/{topic}/partitions/{partition}/messages?offset={offset}

{
"error_code": 50002,
"message": "Kafka error: Fetch response contains an error code: 1"
}

没有客户端,java 和命令行都没有返回任何错误。他们只是跳过导致 KTables 中丢失数据的错误丢失消息。一切都很好,似乎有些消息不知何故损坏了。

我有两个代理,所有主题的复制因子都是 2,并且已完全复制。两个经纪人分别返回相同的。重启代理没有任何区别。

  • 可能是什么原因?
  • 如何在客户端检测到这种情况?

最佳答案

通过 default Kafka Broker配置键 cleanup.policy 设置为 delete。将其设置为 compact 以保留每个 key 的最新消息。 See compaction .

删除旧消息不会更改最小偏移量,因此尝试检索低于它的消息会导致错误。错误非常模糊。 Kafka Streams 客户端将从最小偏移量开始读取消息,因此没有错误。唯一可见的影响是 KTables 中缺少数据。

由于 caches,应用程序正在运行即使从 Kafka 本身删除消息后,所有数据可能仍然可用。它们将在清理后消失。

关于java - 为什么 Kafka KTable 缺少条目?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43257970/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com