gpt4 book ai didi

spring - 如何获取 KafkaListener 消费者

转载 作者:行者123 更新时间:2023-12-05 04:00:09 25 4
gpt4 key购买 nike

我将 spring-kafka 用于 springboot 2.0.4.RELEASE。

并使用KafkaListener获取消息

现在我想为我的组重置偏移量

但我不知道如何为该组获取消费者

    @KafkaListener(id="test",topics={"test"},groupId="group",containerFactory="batchContainerFactory")
public String listenTopic33(List<ConsumerRecord<Integer, String>> record, Acknowledgment ack){
// do something

}


@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

public void test() {

MessageListenerContainer test3 = kafkaListenerEndpointRegistry.getListenerContainer("test3");
}

最佳答案

如果你想在监听器本身中寻找消费者,只需添加一个 Consumer<?, ?> consumer监听器方法的参数。

请记住,容器可能已经获取了更多消息,因此您将在搜索生效之前获取它们。你可以设置 max.poll.records=1避免这种情况。

您还可以添加自定义 RemainingRecordsErrorHandler到容器,在监听器中抛出异常,错误处理程序将代替监听器获取剩余的记录。

另见 Seeking to a Specific Offset .

In order to seek, your listener must implement ConsumerSeekAware, which has the following methods:

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

The first method is called when the container is started. You should use this callback when seeking at some arbitrary time after initialization. You should save a reference to the callback. If you use the same listener in multiple containers (or in a ConcurrentMessageListenerContainer), you should store the callback in a ThreadLocal or some other structure keyed by the listener Thread.

When using group management, the second method is called when assignments change. You can use this method, for example, for setting initial offsets for the partitions, by calling the callback. You must use the callback argument, not the one passed into registerSeekCallback. This method is never called if you explicitly assign partitions yourself. Use the TopicPartitionInitialOffset in that case.

The callback has the following methods:

void seek(String topic, int partition, long offset);

void seekToBeginning(String topic, int partition);

void seekToEnd(String topic, int partition);

You can also perform seek operations from onIdleContainer() when an idle container is detected. See Detecting Idle and Non-Responsive Consumers for how to enable idle container detection.

To arbitrarily seek at runtime, use the callback reference from the registerSeekCallback for the appropriate thread.

这是一个例子;我们跟踪每个主题/分区的回调...

@SpringBootApplication
public class So56584233Application {

public static void main(String[] args) {
SpringApplication.run(So56584233Application.class, args);
}

@Bean
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send(new ProducerRecord<>("so56584233", i % 3, "foo", "bar")));
while (true) {
System.in.read();
listener.seekToStart();
}
};
}

@Bean
public NewTopic topic() {
return new NewTopic("so56584233", 3, (short) 1);
}

}

@Component
class Listener implements ConsumerSeekAware {


private static final Logger logger = LoggerFactory.getLogger(Listener.class);


private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();

private static final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();

@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
callbackForThread.set(callback);
}

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> this.callbacks.put(tp, callbackForThread.get()));
}

@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}

@KafkaListener(id = "so56584233", topics = "so56584233", concurrency = "3")
public void listen(ConsumerRecord<String, String> in) {
logger.info(in.toString());
}

public void seekToStart() {
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}

}

关于spring - 如何获取 KafkaListener 消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56584233/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com