gpt4 book ai didi

java - 如何在Spring Kafka客户端中寻求偏移量?

转载 作者:行者123 更新时间:2023-12-02 09:00:40 25 4
gpt4 key购买 nike

在开发使用来自 feed 的数据的微服务时,我们使用 ConsumerSeekAwareseekToBeginning。当我们准备好时,我们希望消费者能够使用偏移量。由于文档的原因,我无法真正理解 seekToEnd 代表什么。这是否意味着,它将寻找偏移量(最后一条 ACKed 消息),还是将寻找主题的最后一条消息(无论偏移量如何)并等待新消息?

@Slf4j
public class KafkaSeekOffsetAwareListener implements ConsumerSeekAware {
protected final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
log.info("Registering seek callback");
this.seekCallBack.set(callback);
}

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
log.info("Incoming Topic Partitions {}", assignments);
----> assignments.forEach((assignment, assignmentKey) -> assignments.forEach((t, o) -> callback.seekToBeginning(t.topic(), t.partition())));
}

public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
log.info("Container idle");
}
}

最佳答案

请参阅 JavaDocs:

/**
* Seek to the last offset for each of the given partitions. This function evaluates lazily, seeking to the
* final offset in all partitions only when {@link #poll(Duration)} or {@link #position(TopicPartition)} are called.
* If no partitions are provided, seek to the final offset for all of the currently assigned partitions.
* <p>
* If {@code isolation.level=read_committed}, the end offset will be the Last Stable Offset, i.e., the offset
* of the first message with an open transaction.
*
* @throws IllegalArgumentException if {@code partitions} is {@code null}
* @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer
*/
@Override
public void seekToEnd(Collection<TopicPartition> partitions) {

如您所见,它是纯 Apache Kafka 客户端。 Spring for Apache Kafka 只是委派该功能。我什至会说,说 Spring for Apache Kafka 是“Kafka 的客户端”是错误的。它只是标准 Apache Kafka 客户端的包装器和高级 Spring API。

关于java - 如何在Spring Kafka客户端中寻求偏移量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60184928/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com