作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 读取来自生产者的 kafka 消息Java 多线程 .
假设,卡夫卡制作人 向 发送多条消息卡夫卡消费者 .那么如何使用 分别读取这些多条消息JAVA中的ExecutorService
最佳答案
我已经实现了您提到的案例,我将分享您应该遵循的步骤。
创建一个必须实现 Runnable 接口(interface)的消费者类,它必须有一个 Kafkaconsumer 实例作为类成员。您可以在构造函数方法中配置消费者属性。
public LogConsumer(List<String> topics, String group, String brokerList) {
Properties propsConsumer = new Properties();
propsConsumer.put("bootstrap.servers", brokerList);
propsConsumer.put("group.id", group);
propsConsumer.put("enable.auto.commit", "false");
propsConsumer.put("key.deserializer", StringDeserializer.class);
propsConsumer.put("value.deserializer", ByteArrayDeserializer.class);
propsConsumer.put("auto.offset.reset", "latest");
this.consumer = new KafkaConsumer(propsConsumer);
this.consumer.subscribe(topics);
}
public void run() {
try {
while (!flagOfThread) {
ConsumerRecords<String, byte []> records = consumer.poll(10000);
for (ConsumerRecord<String, byte []> record : records) {
handleRecord(record);
}
// At least one
consumer.commitSync();
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!flagOfThread){
LOG.error("Log Consumer is shutting down.",e);
}
} finally {
consumer.close();
}
}
ExecutorService executorService = Executors.newFixedThreadPool(parallelismCount);
for (int i = 0; i < parallelismCount; i++) {
ExecutionLogConsumer bean = new LogConsumer(/*parameters*/);
executorService.execute(bean);
}
关于java - 如何在 Kafka 中使用多线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50406204/
我是一名优秀的程序员,十分优秀!