gpt4 book ai didi

java - 调用 deleteRecords Kafka 管理客户端 Java API 时,消息不会从文件系统中删除

转载 作者:行者123 更新时间:2023-12-02 03:53:42 25 4
gpt4 key购买 nike

我尝试使用 Java 管理客户端 API 的删除记录方法从我的 kafka 主题中删除消息。以下是我尝试过的步骤

    1. I pushed 20000 records to my TEST-DELETE topic    2. Started a console consumer and consumed all the messages    3. Invoked my java program to delete all those 20k messages    4. Started another console consumer with a different group id. This consumer is not receiving any of the deleted messages

When I checked the file system, I could still see all those 20k records occupying the disk space. My intention is to delete those records forever from file system too.

My Topic configuration is given below along with server.properties settings

Topic:TEST-DELETE       PartitionCount:4        ReplicationFactor:1     Configs:cleanup.policy=delete        Topic: TEST-DELETE    Partition: 0      Leader: 0     Replicas: 0       Isr: 0        Topic: TEST-DELETE    Partition: 1      Leader: 0     Replicas: 0       Isr: 0        Topic: TEST-DELETE    Partition: 2      Leader: 0     Replicas: 0       Isr: 0        Topic: TEST-DELETE    Partition: 3      Leader: 0     Replicas: 0       Isr: 0
    log.retention.hours=24    log.retention.check.interval.ms=60000    log.cleaner.delete.retention.ms=60000    file.delete.delay.ms=60000    delete.retention.ms=60000    offsets.retention.minutes=5    offsets.retention.check.interval.ms=60000    log.cleaner.enable=true    log.cleanup.policy=compact,delete

My delete code is given below


public void deleteRecords(Map<String, Map<Integer, Long>> allTopicPartions) {

Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();

allTopicPartions.entrySet().forEach(topicDetails -> {

String topicName = topicDetails.getKey();
Map<Integer, Long> value = topicDetails.getValue();

value.entrySet().forEach(partitionDetails -> {

if (partitionDetails.getValue() != 0) {
recordsToDelete.put(new TopicPartition(topicName, partitionDetails.getKey()),
RecordsToDelete.beforeOffset(partitionDetails.getValue()));
}
});
});

DeleteRecordsResult deleteRecords = this.client.deleteRecords(recordsToDelete);

Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = deleteRecords.lowWatermarks();

lowWatermarks.entrySet().forEach(entry -> {
try {
logger.info(entry.getKey().topic() + " " + entry.getKey().partition() + " "
+ entry.getValue().get().lowWatermark());
} catch (Exception ex) {

}
});

}

我的java程序的输出如下所示

2019-06-25 16:21:15 INFO  MyKafkaAdminClient:247 - TEST-DELETE 1 50002019-06-25 16:21:15 INFO  MyKafkaAdminClient:247 - TEST-DELETE 0 50002019-06-25 16:21:15 INFO  MyKafkaAdminClient:247 - TEST-DELETE 3 50002019-06-25 16:21:15 INFO  MyKafkaAdminClient:247 - TEST-DELETE 2 5000

我的目的是从文件系统中删除消耗的记录,因为我的 kafka 代理的存储空间有限。

我想获得一些帮助来解决我的以下疑问

  1. 我的印象是删除记录也会从文件系统中删除消息,但看起来我错了!
  2. 这些已删除的记录在日志目录中存在多长时间?
  3. 调用删除记录 API 后,我是否需要使用任何特定配置才能从文件系统中删除记录?

感谢您的帮助

谢谢

最佳答案

处理此问题的推荐方法是为您感兴趣的主题设置 retention.ms 和相关配置值。这样,您就可以定义 Kafka 将存储您的数据多长时间,直到它完成为止。删除它,确保所有下游消费者都有机会在数据从 Kafk 集群中删除之前拉取数据。

但是,如果您仍然想强制 Kafka 根据字节进行删除,可以使用 log.retention.bytesretention.bytes 配置值。第一个是集群范围的设置,第二个是特定于主题的设置,默认情况下采用第一个设置的设置,但您仍然可以按主题覆盖它。 retention.bytes 数字是按分区强制执行的,因此您应该将其乘以主题分区的总数。

但是请注意,如果您有一个失控的生产者突然开始生成大量数据,并且您将其设置为硬字节限制,则可能会清除集群中一整天的数据,并且只留下最后几分钟的数据,也许在有效的消费者可以从集群中提取数据之前。这就是为什么将您的 kafka 主题设置为基于时间而不是基于字节的保留会更好。

您可以在 Kafka 官方文档中找到配置属性及其说明:https://kafka.apache.org/documentation/

关于java - 调用 deleteRecords Kafka 管理客户端 Java API 时,消息不会从文件系统中删除,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56761871/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com