gpt4 book ai didi

java - 使用一组压缩主题中的所有记录的最简单的 Spring Kafka @KafkaListener 配置是什么?

转载 作者:行者123 更新时间:2023-12-04 08:52:36 27 4
gpt4 key购买 nike

我在 spring application.yaml 文件中定义了几个压缩的 Kafka 主题的名称( topic1topic2 、...、 topicN )。我希望能够消耗每个主题分区上的所有记录 在启动时。每个主题上的分区数量是事先未知的。
官方 Spring Kafka 2.6.1 文档建议执行此操作的最简单方法是 implement a PartitionFinder and use it in a SpEL expresssion动态查找主题的分区数,然后使用 * @TopicPartition 的 partitions 属性中的通配符注释(参见 @KafkaListener Annotation documentation 中的显式分区分配):

@KafkaListener(topicPartitions = @TopicPartition(topic = "compacted",
partitions = "#{@finder.partitions('compacted')}"),
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
// process record
}
由于我有几个主题,因此生成的代码非常冗长:
@KafkaListener(topicPartitions = {
@TopicPartition(
topic = "${topic1}",
partitions = "#{@finder.partitions('${topic1}')}",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
),
@TopicPartition(
topic = "${topic2}",
partitions = "#{@finder.partitions('${topic2}')}",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
),
// and many more @TopicPartitions...
@TopicPartition(
topic = "${topicN}",
partitions = "#{@finder.partitions('${topicN}')}",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
)
})
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
// process record
}
如何通过配置 topicPartitions 使这种重复配置更加简洁 @KafkaListener 的属性带有 @TopicPartion 动态生成数组的注释s(我的 N 个主题中的每个主题一个)?

最佳答案

当前无法使用 @KafkaListener - 请在 GitHub 上打开一个新功能问题。
我能想到的唯一解决方法是以编程方式从容器工厂创建一个监听器容器并创建一个监听器适配器。如果您需要,我可以提供一个示例。
编辑
下面是一个例子:

@SpringBootApplication
public class So64022266Application {

public static void main(String[] args) {
SpringApplication.run(So64022266Application.class, args);
}

@Bean
public NewTopic topic1() {
return TopicBuilder.name("so64022266-1").partitions(10).replicas(1).build();
}

@Bean
public NewTopic topic2() {
return TopicBuilder.name("so64022266-2").partitions(10).replicas(1).build();
}

@Bean
ConcurrentMessageListenerContainer<String, String> container(@Value("${topics}") String[] topics,
PartitionFinder finder,
ConcurrentKafkaListenerContainerFactory<String, String> factory,
MyListener listener) throws Exception {

MethodKafkaListenerEndpoint<String, String> endpoint = endpoint(topics, finder, listener);
ConcurrentMessageListenerContainer<String, String> container = factory.createListenerContainer(endpoint);
container.getContainerProperties().setGroupId("someGroup");
return container;
}

@Bean
MethodKafkaListenerEndpoint<String, String> endpoint(String[] topics, PartitionFinder finder,
MyListener listener) throws NoSuchMethodException {

MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setBean(listener);
endpoint.setMethod(MyListener.class.getDeclaredMethod("listen", String.class, String.class));
endpoint.setTopicPartitions(Arrays.stream(topics)
.flatMap(topic -> finder.partitions(topic))
.toArray(TopicPartitionOffset[]::new));
endpoint.setMessageHandlerMethodFactory(methodFactory());
return endpoint;
}

@Bean
DefaultMessageHandlerMethodFactory methodFactory() {
return new DefaultMessageHandlerMethodFactory();
}

@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template,
ConcurrentMessageListenerContainer<String, String> container) {

return args -> {
System.out.println(container.getAssignedPartitions());
template.send("so64022266-1", "key1", "foo");
template.send("so64022266-2", "key2", "bar");
};
}

}

@Component
class MyListener {

public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
System.out.println(key + ":" + payload);
}

}

@Component
class PartitionFinder {

private final ConsumerFactory<String, String> consumerFactory;

public PartitionFinder(ConsumerFactory<String, String> consumerFactory) {
this.consumerFactory = consumerFactory;
}

public Stream<TopicPartitionOffset> partitions(String topic) {
System.out.println("+" + topic + "+");
try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
return consumer.partitionsFor(topic).stream()
.map(part -> new TopicPartitionOffset(topic, part.partition(), 0L));
}
}

}
topics=so64022266-1, so64022266-2
如果您需要处理墓碑记录( null 值),我们需要增强处理程序工厂;我们目前不公开框架的处理程序工厂。

关于java - 使用一组压缩主题中的所有记录的最简单的 Spring Kafka @KafkaListener 配置是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64022266/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com