gpt4 book ai didi

java - 如何删除消费者已经消费过的数据?卡夫卡

转载 作者:行者123 更新时间:2023-11-30 02:00:40 28 4
gpt4 key购买 nike

我正在kafka中进行数据复制。但是,kafka 日志文件的大小增长得非常快。一天大小达到 5 GB。作为这个问题的解决方案,我想立即删除已处理的数据。我正在 AdminClient 中使用删除记录方法来删除偏移量。但是当我查看日志文件时,与该偏移量对应的数据并未被删除。

RecordsToDelete recordsToDelete = RedcordsToDelete.beforeOffset(offset);
TopicPartition topicPartition = new TopicPartition(topicName,partition);
Map<TopicPartition,RecordsToDelete> deleteConf = new HashMap<>();
deleteConf.put(topicPartition,recordsToDelete);
adminClient.deleteRecords(deleteConf);

我不需要诸如(log.retention.hours、log.retention.bytes、log.segment.bytes、log.cleanup.policy=delete)之类的建议

因为我只想删除消费者消费的数据。在这个解决方案中,我还删除了未消费的数据。

您有什么建议?

最佳答案

你没有做错任何事。您提供的代码有效并且我已经测试过。以防万一我忽略了您代码中的某些内容,我的代码是:

public void deleteMessages(String topicName, int partitionIndex, int beforeIndex) {
TopicPartition topicPartition = new TopicPartition(topicName, partitionIndex);
Map<TopicPartition, RecordsToDelete> deleteMap = new HashMap<>();
deleteMap.put(topicPartition, RecordsToDelete.beforeOffset(beforeIndex));
kafkaAdminClient.deleteRecords(deleteMap);
}

我使用了组:'org.apache.kafka',名称:'kafka-clients',版本:'2.0.0'

因此请检查您的目标分区是否正确(第一个分区为 0)

检查您的代理版本:https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html说:

This operation is supported by brokers with version 0.11.0.0

从同一应用程序生成消息,以确保连接正确。

您还可以考虑另一种选择。使用cleanup.policy=compact 如果您的消息键重复,您可以从中受益。不仅因为该键的较旧消息将被自动删除,而且您可以利用具有空负载的消息删除该键的所有消息这一事实。只是不要忘记将 delete.retention.msmin.compaction.lag.ms 设置为足够小的值。在这种情况下,您可以使用消息,然后为同一 key 生成空有效负载(但要谨慎使用这种方法,因为这样您可以删除未使用的消息(使用该 key ))

关于java - 如何删除消费者已经消费过的数据?卡夫卡,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52961751/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com