gpt4 book ai didi

java - 将 BlockingQueue 传递给 Spring KafkaListener

转载 作者:行者123 更新时间:2023-12-02 01:55:30 25 4
gpt4 key购买 nike

我对 Java、Spring 和 Kafka 总体来说还是新手。情况如下:

我使用 @KafkaListener 注释来创建一个如下所示的 Kafka Consumer:

public class Listener {

private ExecutorService executorService;
private List<Future> futuresThread1 = new ArrayList<>();
public Listener() {
Properties appProps = new AppProperties().get();
this.executorService = Executors.newFixedThreadPool(Integer.parseInt(appProps.getProperty("listenerThreads")));
}
//TODO: how can I pass an approp into this annotation?
@KafkaListener(id = "id0", topics = "bose.cdp.ingest.marge.boseaccount.normalized")
public void listener(ConsumerRecord<?, ?> record, ArrayBlockingQueue<ConsumerRecord> arrayBlockingQueue) throws InterruptedException, ExecutionException
{
futuresThread1.add(executorService.submit(new Runnable() {
@Override public void run() {
System.out.println(record);
arrayBlockingQueue.add(record);
}
}));
}

}

我向监听器添加了一个参数 ArrayBlockingQueue,我希望它能够将来自 Kafka 的消息添加到其中。

我遇到的问题是我无法弄清楚如何将 ArrayBlockingQueue 传递到监听器中,因为 Spring 正在幕后处理监听器的实例化和运行。

我需要这个阻塞队列,以便监听器之外的另一个对象可以访问消息并对其进行一些处理。例如,在我的 main 中:

@SpringBootApplication
public class SourceAccountListenerApp {
public static void main(String[] args) {
Properties appProps = new AppProperties().get();
ArrayBlockingQueue<ConsumerRecord> arrayBlockingQueue = new ArrayBlockingQueue<>(
Integer.parseInt(appProps.getProperty("blockingQueueSize"))
);
//TODO: This starts my listener. How do I pass the queue to it?
SpringApplication.run(SourceAccountListenerApp.class, args);
}
}

最佳答案

有很多方法可以将阻塞队列声明为 bean。

一个例子,主要:

@SpringBootApplication
public class SourceAccountListenerApp {
public static void main(String[] args) {
SpringApplication.run(SourceAccountListenerApp.class, args);
}

@Bean
public ArrayBlockingQueue arrayBlockingQueue() {
Properties appProps = new AppProperties().get();
ArrayBlockingQueue<ConsumerRecord> arrayBlockingQueue = new ArrayBlockingQueue<>(
Integer.parseInt(appProps.getProperty("blockingQueueSize"))
);
return arrayBlockingQueue;
}
}

听众:

public class Listener {

@Autowired
ArrayBlockingQueue arrayBlockingQueue;

关于java - 将 BlockingQueue 传递给 Spring KafkaListener,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52373624/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com