gpt4 book ai didi

java - 为什么Kafka的seekToBeginning和seekToEnd不能与assign一起使用?

转载 作者:行者123 更新时间:2023-12-02 07:44:43 25 4
gpt4 key购买 nike

假设,我想检查 Kafka 中特定分区的第一条和最后一条消息的偏移量。我的想法是使用 assign(...) 方法以及 seekToBeginning(...)seekToEnd(...)。不幸的是,这不起作用。

如果我将 AUTO_OFFSET_RESET_CONFIG 设置为 “latest”,则 seekToBeginning(…) 无效;如果我将其设置为“最早”,则 seekToEnd(…) 不起作用。似乎对我的消费者来说唯一重要的是 AUTO_OFFSET_RESET_CONFIG

我见过类似的主题,但问题涉及 subscribe(),而不是 assign() 方法。建议的解决方案是实现 ConsumerRebalanceListner 并将其作为参数传递给 subscribe() 方法。不幸的是,assign() 方法只有一个签名,并且只能获取主题分区列表。

问题是:是否可以将 seekToBeginning()seekToEnd()assign() 方法一起使用。如果是,怎么办?如果不是,为什么?

我的代码的相关片段:

KafkaConsumer<String, ProtoMeasurement> consumer = createConsumer();
TopicPartition zeroP = new TopicPartition(TOPIC, 1);
List<TopicPartition> partitions = Collections.singletonList(zeroP);

consumer.assign(partitions);
consumer.poll(Duration.ofSeconds(1));
consumer.seekToBeginning(partitions);
long currOffsetPos = consumer.position(zeroP);
LOGGER.info("Current offset {}.", currOffsetPos);
ConsumerRecords<String, ProtoMeasurement> records = consumer.poll(Duration.ofMillis(100));
// ...

记录器打印偏移量 n,这是所考虑主题的最大(最新)偏移量。

最佳答案

我注意到这种行为在 MockConsumer 中存在错误且不一致。文档说它们很懒,但会在调用position()后触发。但对于 MockConsumer 来说情况并非如此。特别是,我发现它在大约 1.0 到 2.2.2 之间适用于 MockConsumer,并且在 2.3.0 之后就被破坏了

取而代之的是,我选择执行以下操作,这在 MockConsumer 和真实的环境中一致工作:

// consistently working seed to beginning
consumer.beginningOffsets(partitions).forEach(consumer::seek);
// consistently working seed to end
consumer.endOffsets(partitions).forEach(consumer::seek);

如果有线程同时调用 poll,这会更危险,但在我的情况下效果很好,我只想在应用程序开始轮询时手动控制偏移位置。

关于java - 为什么Kafka的seekToBeginning和seekToEnd不能与assign一起使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58557198/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com