- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在 spring application.yaml 文件中定义了几个压缩的 Kafka 主题的名称( topic1
、 topic2
、...、 topicN
)。我希望能够消耗每个主题分区上的所有记录 在启动时。每个主题上的分区数量是事先未知的。
官方 Spring Kafka 2.6.1 文档建议执行此操作的最简单方法是 implement a PartitionFinder and use it in a SpEL expresssion动态查找主题的分区数,然后使用 *
@TopicPartition
的 partitions 属性中的通配符注释(参见 @KafkaListener Annotation documentation 中的显式分区分配):
@KafkaListener(topicPartitions = @TopicPartition(topic = "compacted",
partitions = "#{@finder.partitions('compacted')}"),
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
// process record
}
由于我有几个主题,因此生成的代码非常冗长:
@KafkaListener(topicPartitions = {
@TopicPartition(
topic = "${topic1}",
partitions = "#{@finder.partitions('${topic1}')}",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
),
@TopicPartition(
topic = "${topic2}",
partitions = "#{@finder.partitions('${topic2}')}",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
),
// and many more @TopicPartitions...
@TopicPartition(
topic = "${topicN}",
partitions = "#{@finder.partitions('${topicN}')}",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
)
})
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
// process record
}
如何通过配置
topicPartitions
使这种重复配置更加简洁
@KafkaListener
的属性带有
@TopicPartion
动态生成数组的注释s(我的 N 个主题中的每个主题一个)?
最佳答案
当前无法使用 @KafkaListener
- 请在 GitHub 上打开一个新功能问题。
我能想到的唯一解决方法是以编程方式从容器工厂创建一个监听器容器并创建一个监听器适配器。如果您需要,我可以提供一个示例。
编辑
下面是一个例子:
@SpringBootApplication
public class So64022266Application {
public static void main(String[] args) {
SpringApplication.run(So64022266Application.class, args);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("so64022266-1").partitions(10).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("so64022266-2").partitions(10).replicas(1).build();
}
@Bean
ConcurrentMessageListenerContainer<String, String> container(@Value("${topics}") String[] topics,
PartitionFinder finder,
ConcurrentKafkaListenerContainerFactory<String, String> factory,
MyListener listener) throws Exception {
MethodKafkaListenerEndpoint<String, String> endpoint = endpoint(topics, finder, listener);
ConcurrentMessageListenerContainer<String, String> container = factory.createListenerContainer(endpoint);
container.getContainerProperties().setGroupId("someGroup");
return container;
}
@Bean
MethodKafkaListenerEndpoint<String, String> endpoint(String[] topics, PartitionFinder finder,
MyListener listener) throws NoSuchMethodException {
MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setBean(listener);
endpoint.setMethod(MyListener.class.getDeclaredMethod("listen", String.class, String.class));
endpoint.setTopicPartitions(Arrays.stream(topics)
.flatMap(topic -> finder.partitions(topic))
.toArray(TopicPartitionOffset[]::new));
endpoint.setMessageHandlerMethodFactory(methodFactory());
return endpoint;
}
@Bean
DefaultMessageHandlerMethodFactory methodFactory() {
return new DefaultMessageHandlerMethodFactory();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template,
ConcurrentMessageListenerContainer<String, String> container) {
return args -> {
System.out.println(container.getAssignedPartitions());
template.send("so64022266-1", "key1", "foo");
template.send("so64022266-2", "key2", "bar");
};
}
}
@Component
class MyListener {
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
System.out.println(key + ":" + payload);
}
}
@Component
class PartitionFinder {
private final ConsumerFactory<String, String> consumerFactory;
public PartitionFinder(ConsumerFactory<String, String> consumerFactory) {
this.consumerFactory = consumerFactory;
}
public Stream<TopicPartitionOffset> partitions(String topic) {
System.out.println("+" + topic + "+");
try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
return consumer.partitionsFor(topic).stream()
.map(part -> new TopicPartitionOffset(topic, part.partition(), 0L));
}
}
}
topics=so64022266-1, so64022266-2
如果您需要处理墓碑记录(
null
值),我们需要增强处理程序工厂;我们目前不公开框架的处理程序工厂。
关于java - 使用一组压缩主题中的所有记录的最简单的 Spring Kafka @KafkaListener 配置是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64022266/
我需要为监听器工厂生成的每个监听器定义自定义过滤策略。 目前,我正在使用 RecordFilterStrategy要做到这一点: @Bean ConcurrentKafkaListenerContai
我在 Spring Boot 项目中有一个 Spring KafkaListener: @KafkaListener(topics = "topic-one", groupId = "groupone
我将 spring-kafka 用于 springboot 2.0.4.RELEASE。 并使用KafkaListener获取消息 现在我想为我的组重置偏移量 但我不知道如何为该组获取消费者
需要什么 我正在编写一个使用 Kafka 获取信息的应用程序(Spring + Kotlin)。如果我在声明 @KafkaListener 时设置了 autoStartup = "true",那么该应
试图弄清楚我是否可以使用 spring-kafka 和 spring-kafka-test 为 @KafkaListener 编写单元测试。 我的听众课。 public class MyKaf
我要创建并发@KafkaListener它可以处理多个主题,每个主题具有不同数量的分区。 我注意到 Spring-Kafka 只为大多数分区的主题为每个分区初始化一个使用者。 示例:我将并发设置为 8
我正在尝试关注 https://github.com/spring-projects/spring-kafka/issues/361将主题名称从 .yml 文件传递给@kafkalistener。
我正在尝试关注 https://github.com/spring-projects/spring-kafka/issues/361将主题名称从 .yml 文件传递给@kafkalistener。
我有一个简单的 Spring Boot 应用程序,它从 Kafka 读取并写入 Kafka。我写了一个 SpringBootTest使用 EmbeddedKafka测试这一切。 主要问题是:有时测试失
我正在使用具有以下配置的 spring-kafka: package com.danigu.fancypants.infrastructure; import com.fasterxml.jackso
我正在使用 spring boot + spring @KafkaListener。我期望的行为是:我的 kafka 监听器在 10 个线程中读取消息。因此,如果其中一个线程挂起,其他消息将继续读取和
我正在编写一个 Kafka 消费者。我需要将环境变量主题名称传递给 @KafkaListener(topics = ...)。这是我到目前为止所尝试过的: import org.springfram
我真的很难编写一个测试来检查当消息发送到指定主题时我的 Kafka Consumer 是否被正确调用。 我的消费者: @Service @Slf4j @AllArgsConstructor(onCon
使用 Spring 云流的 Kafka 绑定(bind)的消息生产者 @Component public static class PageViewEventSource implements App
我在我的 spring boot 应用程序中使用了 @KafkaListener 注释,而没有创建自定义 KafkaListenerContainerFactory bean。我目前正在我的 appl
我为主题数组创建了一个 bean,在运行时我向这个主题数组添加了一些主题,但消费者没有更新主题并且仍然从主题数组中的第一个主题消费。我希望消费者添加这些新主题并开始消费 @Autowired priv
我正在尝试使用来自 kafka 服务器的数据,我正在使用 @Kafkalistener注解。 问题是每次重新启动应用程序时我都会收到所有消息。 如何在我的应用程序中保存上次使用的偏移量并使用它来使用下
我编写了一个 JUnit 测试用例来测试 Spring Kafka 文档中“使用 Java 配置”类(class)中的代码。 ( https://docs.spring.io/spring-kafka
来自 Twitter 的问题: 只是想找到一个使用 spring-kafka 2.1.7 的简单示例,该示例与 KafkaListener 和 AckMode.MANUAL_IMMEDIATE 一起使
在我的项目中,我有很多 spring 管理的组件,它们做同样的事情。我想创建一个通用的 Util 类,它可以为我的所有组件执行所有通用操作。由于这个 Util 类需要访问环境变量和 beans,它的实
我是一名优秀的程序员,十分优秀!