gpt4 book ai didi

java - spring-kafka监听并发

转载 作者:行者123 更新时间:2023-11-29 08:33:04 24 4
gpt4 key购买 nike

我已经使用 spring-kafka lib 实现了 Kafka 消费者.我有一个带有 2 个分区的 Kafka 主题,我还使用并发级别设置为 2 的 ConcurrentKafkaListenerContainerFactory 作为结果,每个容器实例都应该根据 spring-kafka documentation 从单个分区中使用.

The KafkaMessageListenerContainer receives all message from all topics/partitions on a single thread. The ConcurrentMessageListenerContainer delegates to 1 or more KafkaMessageListenerContainer s to provide multi-threaded consumption.

有我的消费类:

@Component
public class KafkaConsumer {
private HashMap<String, LinkedBlockingQueue<Event>> hashMap = new HashMap<>();

@KafkaListener(topics = "${kafka.topic}", groupId = "events_group")
public void receive(ConsumerRecord<?, ?> record, Consumer consumer) throws InterruptedException {
String message = record.value().toString();
Event event = EventFactory.createEvent(message);
String customerId = event.getAttributeStringValue(DefinedField.CUSTOMER_ID);
// add event to hashMap
LinkedBlockingQueue<Event> queue = hashMap.get(customerId);
if (queue == null) {
queue = new LinkedBlockingQueue<>();
queue.add(event);
hashMap.put(customerId, queue);
} else {
queue.add(event);
}
}
}

如您所见,我有“hashMap”集合,我根据消息“customer_id”属性将我的事件放入相应队列。在多线程访问的情况下,此类功能需要额外的同步,正如我所见,spring-kafka 只为所有容器创建一个 bean 实例,而不是为每个容器创建一个单独的 bean 实例,以避免并发问题。

如何以编程方式更改此逻辑?

我看到解决此问题的唯一奇怪方法是使用两个 JVM 运行一个单独的应用程序,其中包含单线程消费者,因此使用 #receive 方法访问 KafkaConsumer 类将是单线程的。

最佳答案

没错。这就是它的工作原理。该框架实际上不依赖于 bean,而仅依赖于它向函数传递消息的方法。

您可以考虑为主题中的每个分区设置两个 @KafkaListener 方法。的确,来自一个分区的记录在单个线程中传送到 @KafkaListener。所以,如果您真的无法忍受那种状态,您可以为每个线程使用两个 HashMap

监听器抽象背后的总体思想正是关于无状态 行为。 KafkaConsumer 是常规的 Spring 单例 bean。您必须接受这个事实并根据这种情况重新设计您的解决方案。

关于java - spring-kafka监听并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46350587/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com