gpt4 book ai didi

java - 如何在 Kafka 中使用多线程

转载 作者:行者123 更新时间:2023-12-03 12:51:49 26 4
gpt4 key购买 nike

我正在尝试使用 读取来自生产者的 kafka 消息Java 多线程 .

假设,卡夫卡制作人 发送多条消息卡夫卡消费者 .那么如何使用 分别读取这些多条消息JAVA中的ExecutorService

最佳答案

我已经实现了您提到的案例,我将分享您应该遵循的步骤。

创建一个必须实现 Runnable 接口(interface)的消费者类,它必须有一个 Kafkaconsumer 实例作为类成员。您可以在构造函数方法中配置消费者属性。

public LogConsumer(List<String> topics, String group, String brokerList) {

Properties propsConsumer = new Properties();
propsConsumer.put("bootstrap.servers", brokerList);
propsConsumer.put("group.id", group);
propsConsumer.put("enable.auto.commit", "false");
propsConsumer.put("key.deserializer", StringDeserializer.class);
propsConsumer.put("value.deserializer", ByteArrayDeserializer.class);
propsConsumer.put("auto.offset.reset", "latest");

this.consumer = new KafkaConsumer(propsConsumer);
this.consumer.subscribe(topics);
}

然后,在 run 方法中,您可以使用下面共享的每个 kafka 消息。
public void run() {
try {
while (!flagOfThread) {
ConsumerRecords<String, byte []> records = consumer.poll(10000);
for (ConsumerRecord<String, byte []> record : records) {
handleRecord(record);
}
// At least one
consumer.commitSync();
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!flagOfThread){
LOG.error("Log Consumer is shutting down.",e);
}

} finally {
consumer.close();
}
}

在您的应用程序运行器类中,您应该创建一个线程池,这 线程池计数必须等于主题分区计数 .
 ExecutorService executorService = Executors.newFixedThreadPool(parallelismCount);
for (int i = 0; i < parallelismCount; i++) {
ExecutionLogConsumer bean = new LogConsumer(/*parameters*/);
executorService.execute(bean);
}

现在您可以开始使用来自 kafka 的消息 :)

关于java - 如何在 Kafka 中使用多线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50406204/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com