gpt4 book ai didi

java - Kafka Java Consumer 已关闭

转载 作者:行者123 更新时间:2023-12-02 12:51:20 24 4
gpt4 key购买 nike

我刚刚开始使用 Kafka。我面临着消费者的一个小问题。我用Java写了一个消费者。

我收到此异常 - IllegalStateException 此使用者已关闭。

我在以下行中遇到异常:

ConsumerRecords<String,String> consumerRecords = consumer.poll(1000);

这种情况是在我的消费者因某些异常崩溃后开始发生的,当我尝试再次运行它时,它给了我这个异常。

这是完整的代码:

package StreamApplicationsTest;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.*;

public class StreamAppConsumer {

public static void main(String[] args){
int i = 0;
//List<String> topics = new ArrayList<>();
List<String> topics = Collections.singletonList("test_topic");
//topics.add("test_topic");
Properties consumerConfigurations = new Properties();
consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG,"TestId");

Consumer<String,String> consumer = new KafkaConsumer<>(consumerConfigurations);
consumer.subscribe(topics);

while(true){
ConsumerRecords<String,String> consumerRecords = consumer.poll(1000);
Iterator<ConsumerRecord<String,String>> iterator = consumerRecords.iterator();
while(iterator.hasNext()){
i++;
ConsumerRecord<String,String> consumerRecord = iterator.next();
String key = consumerRecord.key();
String value = consumerRecord.value();
if(key=="exit" || value=="exit")
break;
System.out.println("Key="+key+"\tValue="+value);
}

System.out.println("Messages processed = "+Integer.toString(i));
consumer.close();

}
}
}

我只是被这个问题困扰,任何形式的帮助都会有用。

最佳答案

发生这种情况是因为您在无限循环结束时关闭消费者,因此当它第二次轮询时消费者已关闭。为了处理眼前的问题,我将整个 while(true) 循环包装在 try-catch 中,并在 catch 或 finally block 中处理消费者关闭。

但是,如果 Kafka 消费者没有仔细处理不同的关闭信号,您就会面临丢失数据的风险。我建议查看 Confluence 的优雅消费者关闭示例 here 。在你的情况下,因为你在主线程中运行,所以它看起来像这样......

public static void main(String[] args) {
int i = 0;
//List<String> topics = new ArrayList<>();
List<String> topics = Collections.singletonList("test_topic");
//topics.add("test_topic");
Properties consumerConfigurations = new Properties();
consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG, "TestId");

Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigurations);
consumer.subscribe(topics);

Runtime.getRuntime().addShutdownHook(new Thread()
{
public void run() {
consumer.wakeup();
}
});

try {
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
while (iterator.hasNext()) {
i++;
ConsumerRecord<String, String> consumerRecord = iterator.next();
String key = consumerRecord.key();
String value = consumerRecord.value();
if (key == "exit" || value == "exit")
break;
System.out.println("Key=" + key + "\tValue=" + value);
}
System.out.println("Messages processed = " + Integer.toString(i));
}
} catch (WakeupExection e) {
// Do Nothing
} finally {
consumer.close();
}
}
}

基本上,运行 consumer.wakeup() 是消费者中唯一的线程安全方法,因此它是唯一可以在 Java 的关闭 Hook 内运行的方法。由于调用唤醒时消费者并未 sleep ,因此它会触发唤醒执行,从而优雅地关闭消费者。

关于java - Kafka Java Consumer 已关闭,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47123557/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com