- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我对 Java、Spring 和 Kafka 总体来说还是新手。情况如下:
我使用 @KafkaListener 注释来创建一个如下所示的 Kafka Consumer:
public class Listener {
private ExecutorService executorService;
private List<Future> futuresThread1 = new ArrayList<>();
public Listener() {
Properties appProps = new AppProperties().get();
this.executorService = Executors.newFixedThreadPool(Integer.parseInt(appProps.getProperty("listenerThreads")));
}
//TODO: how can I pass an approp into this annotation?
@KafkaListener(id = "id0", topics = "bose.cdp.ingest.marge.boseaccount.normalized")
public void listener(ConsumerRecord<?, ?> record, ArrayBlockingQueue<ConsumerRecord> arrayBlockingQueue) throws InterruptedException, ExecutionException
{
futuresThread1.add(executorService.submit(new Runnable() {
@Override public void run() {
System.out.println(record);
arrayBlockingQueue.add(record);
}
}));
}
}
我向监听器添加了一个参数 ArrayBlockingQueue,我希望它能够将来自 Kafka 的消息添加到其中。
我遇到的问题是我无法弄清楚如何将 ArrayBlockingQueue 传递到监听器中,因为 Spring 正在幕后处理监听器的实例化和运行。
我需要这个阻塞队列,以便监听器之外的另一个对象可以访问消息并对其进行一些处理。例如,在我的 main 中:
@SpringBootApplication
public class SourceAccountListenerApp {
public static void main(String[] args) {
Properties appProps = new AppProperties().get();
ArrayBlockingQueue<ConsumerRecord> arrayBlockingQueue = new ArrayBlockingQueue<>(
Integer.parseInt(appProps.getProperty("blockingQueueSize"))
);
//TODO: This starts my listener. How do I pass the queue to it?
SpringApplication.run(SourceAccountListenerApp.class, args);
}
}
最佳答案
有很多方法可以将阻塞队列声明为 bean。
一个例子,主要:
@SpringBootApplication
public class SourceAccountListenerApp {
public static void main(String[] args) {
SpringApplication.run(SourceAccountListenerApp.class, args);
}
@Bean
public ArrayBlockingQueue arrayBlockingQueue() {
Properties appProps = new AppProperties().get();
ArrayBlockingQueue<ConsumerRecord> arrayBlockingQueue = new ArrayBlockingQueue<>(
Integer.parseInt(appProps.getProperty("blockingQueueSize"))
);
return arrayBlockingQueue;
}
}
听众:
public class Listener {
@Autowired
ArrayBlockingQueue arrayBlockingQueue;
关于java - 将 BlockingQueue 传递给 Spring KafkaListener,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52373624/
我需要为监听器工厂生成的每个监听器定义自定义过滤策略。 目前,我正在使用 RecordFilterStrategy要做到这一点: @Bean ConcurrentKafkaListenerContai
我在 Spring Boot 项目中有一个 Spring KafkaListener: @KafkaListener(topics = "topic-one", groupId = "groupone
我将 spring-kafka 用于 springboot 2.0.4.RELEASE。 并使用KafkaListener获取消息 现在我想为我的组重置偏移量 但我不知道如何为该组获取消费者
需要什么 我正在编写一个使用 Kafka 获取信息的应用程序(Spring + Kotlin)。如果我在声明 @KafkaListener 时设置了 autoStartup = "true",那么该应
试图弄清楚我是否可以使用 spring-kafka 和 spring-kafka-test 为 @KafkaListener 编写单元测试。 我的听众课。 public class MyKaf
我要创建并发@KafkaListener它可以处理多个主题,每个主题具有不同数量的分区。 我注意到 Spring-Kafka 只为大多数分区的主题为每个分区初始化一个使用者。 示例:我将并发设置为 8
我正在尝试关注 https://github.com/spring-projects/spring-kafka/issues/361将主题名称从 .yml 文件传递给@kafkalistener。
我正在尝试关注 https://github.com/spring-projects/spring-kafka/issues/361将主题名称从 .yml 文件传递给@kafkalistener。
我有一个简单的 Spring Boot 应用程序,它从 Kafka 读取并写入 Kafka。我写了一个 SpringBootTest使用 EmbeddedKafka测试这一切。 主要问题是:有时测试失
我正在使用具有以下配置的 spring-kafka: package com.danigu.fancypants.infrastructure; import com.fasterxml.jackso
我正在使用 spring boot + spring @KafkaListener。我期望的行为是:我的 kafka 监听器在 10 个线程中读取消息。因此,如果其中一个线程挂起,其他消息将继续读取和
我正在编写一个 Kafka 消费者。我需要将环境变量主题名称传递给 @KafkaListener(topics = ...)。这是我到目前为止所尝试过的: import org.springfram
我真的很难编写一个测试来检查当消息发送到指定主题时我的 Kafka Consumer 是否被正确调用。 我的消费者: @Service @Slf4j @AllArgsConstructor(onCon
使用 Spring 云流的 Kafka 绑定(bind)的消息生产者 @Component public static class PageViewEventSource implements App
我在我的 spring boot 应用程序中使用了 @KafkaListener 注释,而没有创建自定义 KafkaListenerContainerFactory bean。我目前正在我的 appl
我为主题数组创建了一个 bean,在运行时我向这个主题数组添加了一些主题,但消费者没有更新主题并且仍然从主题数组中的第一个主题消费。我希望消费者添加这些新主题并开始消费 @Autowired priv
我正在尝试使用来自 kafka 服务器的数据,我正在使用 @Kafkalistener注解。 问题是每次重新启动应用程序时我都会收到所有消息。 如何在我的应用程序中保存上次使用的偏移量并使用它来使用下
我编写了一个 JUnit 测试用例来测试 Spring Kafka 文档中“使用 Java 配置”类(class)中的代码。 ( https://docs.spring.io/spring-kafka
来自 Twitter 的问题: 只是想找到一个使用 spring-kafka 2.1.7 的简单示例,该示例与 KafkaListener 和 AckMode.MANUAL_IMMEDIATE 一起使
在我的项目中,我有很多 spring 管理的组件,它们做同样的事情。我想创建一个通用的 Util 类,它可以为我的所有组件执行所有通用操作。由于这个 Util 类需要访问环境变量和 beans,它的实
我是一名优秀的程序员,十分优秀!