gpt4 book ai didi

java - 通用 Spring Kafka 监听器

转载 作者:搜寻专家 更新时间:2023-11-01 02:58:58 27 4
gpt4 key购买 nike

我在一个maven项目中做了一个kafka生产者和消费者。

我想在另一个maven项目中使用它,所以我添加了上面kafka项目的依赖。现在的问题是生产者很好,但如何使监听器通用,可以被添加该项目的所有其他项目覆盖。

目前我在一个项目中有Listener

public class Listener {
public CountDownLatch countDownLatch0 = new CountDownLatch(3);
public CountDownLatch countDownLatch1 = new CountDownLatch(3);
public CountDownLatch countDownLatch2 = new CountDownLatch(3);

@KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = "SpringKafkaTopic1", partitions = { "0" }) })
public void listenPartition0(ConsumerRecord<?, ?> record) {
System.out.println("Listener Id0, Thread ID: " + Thread.currentThread().getId());
System.out.println("Received: " + record);
countDownLatch0.countDown();
}

@KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = "SpringKafkaTopic1", partitions = { "1" }) })
public void listenPartition1(ConsumerRecord<?, ?> record) {
System.out.println("Listener Id1, Thread ID: " + Thread.currentThread().getId());
System.out.println("Received: " + record);
countDownLatch1.countDown();
}

@KafkaListener(id = "id2", topicPartitions = { @TopicPartition(topic = "SpringKafkaTopic1", partitions = { "2" }) })
public void listenPartition2(ConsumerRecord<?, ?> record) {
System.out.println("Listener Id2, Thread ID: " + Thread.currentThread().getId());
System.out.println("Received: " + record);
countDownLatch2.countDown();
}

我如何制作可以被所有其他项目覆盖的通用监听器,所有其他项目都将此项目添加为依赖项并可以收听各自的主题

最佳答案

如果我正确理解你的问题,我认为你需要做这样的事情:

在共享的.jar ...

public abstract class Listener {
public CountDownLatch countDownLatch0 = new CountDownLatch(3);
public CountDownLatch countDownLatch1 = new CountDownLatch(3);
public CountDownLatch countDownLatch2 = new CountDownLatch(3);


abstract void handlePartition0(record);
abstract void handlePartition1(record);
abstract void handlePartition2(record);

@KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = "SpringKafkaTopic1", partitions = { "0" }) })
public void listenPartition0(ConsumerRecord<?, ?> record) {
handlePartition0(record);
countDownLatch0.countDown();
}

@KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = "SpringKafkaTopic1", partitions = { "1" }) })
public void listenPartition1(ConsumerRecord<?, ?> record) {
handlePartition1(record);
countDownLatch1.countDown();
}

@KafkaListener(id = "id2", topicPartitions = { @TopicPartition(topic = "SpringKafkaTopic1", partitions = { "2" }) })
public void listenPartition2(ConsumerRecord<?, ?> record) {
handlePartition2(record);
countDownLatch2.countDown();
}
}

然后在子项目中,导入共享.jar 并导入handleXXX 方法。

public class MyChildListener extends Listener {
public void handlePartition0(Record<?,?> r) {
// do something useful
}
...
}

这可能对你有用。

关于java - 通用 Spring Kafka 监听器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42045033/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com