gpt4 book ai didi

java - Kafka Consumer.poll 调用不返回 kafka ConsumerRecords

转载 作者:太空宇宙 更新时间:2023-11-04 11:12:43 24 4
gpt4 key购买 nike

我是kafka技术的新手..我正在研究POC,我需要发送ProducerRecord<String, Paymnt>到 Kafka 主题,其中 Paymnt 是我的 POJO..我能够发布记录并且我可以看到消息被传递到 Kafka 主题..

D:\kafka\kafka_2.11-0.11.0.0\bin\windows>kafka-run-class.bat kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1
test:2:0
test:1:0
test:0:4

但是在消费者方面,我无法检索相同的记录。当我调试消费者代码时,我看到 consumer.poll() 上的线程调用阻塞。

消费阶层

public class Consumer {



public static void main(String args[]) throws IOException {
Properties props = new Properties();
KafkaConsumer<String, Paymnt> consumer = null;
props.put("bootstrap.servers", "localhost:9092");
props.put("batch.size", 16384);
props.put("buffer.memory", 33554432);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "com.org.kafkaPro.PaymentDeSerializer");
props.put("enable.auto.commit", "false");
props.put("group.id", "test-consumer-group");
try {
consumer =new KafkaConsumer<String, Paymnt>(props);
consumer.subscribe(Arrays.asList("test"));
while(true){

ConsumerRecords<String, Paymnt> records = consumer.poll(200);
for (ConsumerRecord<String,Paymnt> record : records)
{
System.out.println(record.value().toString());
}

consumer.commitAsync();
}

}
catch(Exception ex){
ex.printStackTrace();
}finally{
consumer.commitSync();
consumer.close();
}
}
}

PaymentDeserliazer 类

 package com.org.kafkaPro;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;

public class PaymentDeSerializer implements Deserializer<Paymnt> {

public PaymentDeSerializer(){

}

public void close() {
// TODO Auto-generated method stub

}

public void configure(Map<String, ?> arg0, boolean arg1) {
// TODO Auto-generated method stub

}

public Paymnt deserialize(String arg0, byte[] arg1) {
// TODO Auto-generated method stub
ByteArrayInputStream bis = new ByteArrayInputStream(arg1);
ObjectInputStream in = null;
Paymnt h2 = null;
try {
in = new ObjectInputStream(bis);
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
h2 = (Paymnt) in.readObject();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return h2;
}

}

支付类别

public class Paymnt  {

//fields,getters & setters
}

序列化器

    public class PaymentSerializer implements Serializer<Paymnt> {


public PaymentSerializer(){

}

public void close() {
// TODO Auto-generated method stub

}

public void configure(Map<String, ?> arg0, boolean arg1) {
// TODO Auto-generated method stub

}

public byte[] serialize(String arg0, Paymnt payment) {
// TODO Auto-generated method stub
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(payment);
oos.close();
byte[] b= baos.toByteArray();
return b;
} catch (IOException e) {
return new byte[0];
}

}

}

感谢您的帮助。谢谢

最佳答案

我遇到了同样的问题,并使用1.0.0 kafka安装版本解决了该问题。也许您正在使用不同版本的 kafka 安装?

关于java - Kafka Consumer.poll 调用不返回 kafka ConsumerRecords,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45814773/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com