gpt4 book ai didi

java - 在反序列化过程中如何在不使用无限循环的情况下编写 kafka 消费者?

转载 作者:行者123 更新时间:2023-12-02 16:52:31 32 4
gpt4 key购买 nike

如何在java中写kafka consumer而不使用无限循环进行轮询?

我使用这个 link 创建了 kafka 消费者作为引用。这里在处理传入记录函数时编写了 while(true) 循环,它正在轮询新事件。如果我在我的项目中使用它,我将无法做任何其他事情。有没有办法避免使用这个无限循环来获取新事件?

 public static void main(String[] str) throws InterruptedException {
System.out.println("Starting AtMostOnceConsumer ...");
execute();
}
private static void execute() throws InterruptedException {
KafkaConsumer<String, Event> consumer = createConsumer();
// Subscribe to all partition in that topic. 'assign' could be used here
// instead of 'subscribe' to subscribe to specific partition.
consumer.subscribe(Arrays.asList("topic"));
processRecords(consumer);
}
private static KafkaConsumer<String, Event> createConsumer() {
Properties props = new Properties();
String consumeGroup = "group_id";
props.put("group.id", consumeGroup);
props.put("org.slf4j.simpleLogger.defaultLogLevel", "INFO");
props.put("client.id", "clientId");
props.put("security.protocol", "SASL_SSL");

props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "servers");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username="" + "username" + " password="" + "password";");
props.put("enable.auto.commit", "true");
// Auto commit interval, kafka would commit offset at this interval.
props.put("auto.commit.interval.ms", "101");
// This is how to control number of records being read in each poll
props.put("max.partition.fetch.bytes", "135");
// Set this if you want to always read from beginning.
// props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "6001");
props.put("schema.registry.url", "https://avroregistry.octanner.io");
props.put("key.deserializer",
"io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("value.deserializer",
"io.confluent.kafka.serializers.KafkaAvroDeserializer");
return new KafkaConsumer<String, Event>(props);
}
private static void processRecords(KafkaConsumer<String, Event> consumer) throws InterruptedException {
while (true) {
ConsumerRecords<String, Event> records = consumer.poll(TimeUnit.MINUTES.toMillis(1));
long lastOffset = 0;
for (ConsumerRecord<String, Event> record : records) {
System.out.printf("\n\n\n\n\n\n\roffset = %d, key = %s\n\n\n\n\n\n", record.offset(), record.value());
lastOffset = record.offset();
}
System.out.println("lastOffset read: " + lastOffset);
process();
}
}
private static void process() throws InterruptedException {
// create some delay to simulate processing of the message.
Thread.sleep(TimeUnit.MINUTES.toMillis(1));
}

谁能帮我修改这个,这样我就可以避免 while(true) 循环,只听我传入的事件?

最佳答案

你可以尝试这样的事情:

public class ConsumerDemoWithThread {
private Logger logger = LoggerFactory.getLogger(ConsumerDemoWithThread.class.getName());
private String bootstrapServers = "127.0.0.1:9092";
private String groupId = "my-first-application";
private String topic = "first-topic";

KafkaConsumer consumer = createConsumer(bootstrapServers, groupId, topic);

private void pollForRecords() {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> processRecords());
}


private KafkaConsumer createConsumer(String bootstrapServers, String groupId, String topic) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// create consumer
KafkaConsumer consumer = new KafkaConsumer<String, String>(properties);
// subscribe consumer to our topic(s)
consumer.subscribe(Arrays.asList(topic));
return consumer;
}


private void processRecords() {
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
logger.info("Key: " + record.key() + ", Value: " + record.value());
logger.info("Partition: " + record.partition() + ", Offset:" + record.offset());
}
}
} catch (WakeupException e) {
logger.info("Received shutdown signal!");
} finally {
consumer.close();
}
}

public static void main(String[] args) {
ConsumerDemoWithThread consumerDemoWithThread = new ConsumerDemoWithThread();
consumerDemoWithThread.pollForRecords();
}
}

基本上,正如 Joachim 所提到的,整个轮询和处理逻辑需要委托(delegate)给线程

关于java - 在反序列化过程中如何在不使用无限循环的情况下编写 kafka 消费者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57924520/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com