gpt4 book ai didi

java - Kafka SpringBoot StreamListener - 如何按顺序消费多个主题?

转载 作者:行者123 更新时间:2023-12-02 09:13:42 28 4
gpt4 key购买 nike

我有多个使用不同主题的 StreamListener 注释方法。但是其中一些主题需要从“最早”的偏移量读取,以填充内存中的映射(类似于状态机),然后从其他主题中使用,这些主题中可能包含应针对“最新”执行的命令状态机。

当前代码类似于:

@Component
@AllArgsConstructor
@EnableBinding({InputChannel.class, OutputChannel.class})
@Slf4j
public class KafkaListener {

@StreamListener(target = InputChannel.EVENTS)
public void event(Event event) {
// do something with the event
}

@StreamListener(target = InputChannel.COMMANDS)
public void command(Command command) {
// do something with the command only after all events have been processed
}

}

我尝试添加一些可怕的代码,从传入的事件消息中获取 kafka 主题偏移元数据,然后使用信号量来阻止命令,直到事件达到总偏移量的一定百分比。它有点管用,但让我感到难过,一旦我们有 20 个左右的主题,而且所有主题都相互依赖,那么维护起来就会很糟糕。

SpringBoot/Spring Streams 是否有任何内置机制来执行此操作,或者是否存在人们使用的一些我不知道的常见模式?

TL;DR:如何在使用主题 B 中的任何消息之前处理来自主题 A 的所有消息,而不做一些肮脏的事情,例如在主题 B 的使用者中粘贴 Thread.sleep(60000) ?

最佳答案

请参阅kafka consumer binding property resetOffsets

resetOffsets

Whether to reset offsets on the consumer to the value provided by startOffset. Must be false if a KafkaRebalanceListener is provided; see Using a KafkaRebalanceListener.

Default: false.

startOffset

The starting offset for new groups. Allowed values: earliest and latest. If the consumer group is set explicitly for the consumer 'binding' (through spring.cloud.stream.bindings..group), 'startOffset' is set to earliest. Otherwise, it is set to latest for the anonymous consumer group. Also see resetOffsets (earlier in this list).

Default: null (equivalent to earliest).

您还可以添加 KafkaBindingRebalanceListener并对消费者执行搜索。

编辑

您还可以在第二个监听器上将 autoStartup 设置为 false,并在准备好时启动绑定(bind)。这是一个例子:

@SpringBootApplication
@EnableBinding(Sink.class)
public class Gitter55Application {

public static void main(String[] args) {
SpringApplication.run(Gitter55Application.class, args);
}

@Bean
public ConsumerEndpointCustomizer<KafkaMessageDrivenChannelAdapter<?, ?>> customizer() {
return (endpoint, dest, group) -> {
endpoint.setOnPartitionsAssignedSeekCallback((assignments, callback) -> {
assignments.keySet().forEach(tp -> callback.seekToBeginning(tp.topic(), tp.partition()));
});
};
}

@StreamListener(Sink.INPUT)
public void listen(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key) {
System.out.println(new String(key) + ":" + value);
}

@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template,
BindingsEndpoint bindings) {

return args -> {
while (true) {
template.send("gitter55", "foo".getBytes(), "bar".getBytes());

System.out.println("Hit enter to start");
System.in.read();
bindings.changeState("input", State.STARTED);
}
};

}

}
spring.cloud.stream.bindings.input.group=gitter55
spring.cloud.stream.bindings.input.destination=gitter55
spring.cloud.stream.bindings.input.content-type=text/plain

spring.cloud.stream.bindings.input.consumer.auto-startup=false

关于java - Kafka SpringBoot StreamListener - 如何按顺序消费多个主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59180386/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com