gpt4 book ai didi

java - 如何从kafka avro记录生成pojo?

转载 作者:行者123 更新时间:2023-11-30 10:16:02 24 4
gpt4 key购买 nike

我有一个 User 类,我将它序列化为 avro,(使用 Confluent avro 序列化程序和模式注册表)并将其发布到 Kafka 主题。我让消费者​​打印数据到控制台,它工作正常。我现在正在尝试的是根据这些数据创建原始对象。例如,我将“用户”对象作为 avro 发布到 Kafka 主题。我正在尝试在使用该用户对象后重新创建该用户对象(而不是控制台输出)。这可能吗?

下面是我的代码

用户类

public class User { 
int id;
String name;
public User(){}
public User(int id, String name) {
super();
this.id = id;
this.name = name;
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

消费者代码

User user = new User();



Properties props = new Properties();

props.put("bootstrap.servers", "127.0.0.1:9092");

props.put("group.id", "avro-consumer-group");

props.put("key.deserializer", io.confluent.kafka.serializers.KafkaAvroDeserializer.class);

props.put("value.deserializer", io.confluent.kafka.serializers.KafkaAvroDeserializer.class);

props.put("schema.registry.url","http://127.0.0.1:8081");

KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);



consumer.subscribe(Arrays.asList("avrotesttopic"));

System.out.println("Subscribed to topic " + "avrotesttopic");



while (true) {

ConsumerRecords<String, GenericRecord> records = consumer.poll(100);

for (org.apache.kafka.clients.consumer.ConsumerRecord<String, GenericRecord> record : records){

System.out.printf("value = %sn",record.value());

//output-> value = {"id": 10, "name": "testName"}

}

}

谢谢

最佳答案

考虑到您正在使用 KafkaAvroDeserializer,您需要将以下属性设置为消费者配置的一部分

    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");

它将允许您获取 SpecificRecord 而不是 GenericRecord 已经由消费者处理。这是一个例子

https://dzone.com/articles/kafka-avro-serialization-and-the-schema-registry

关于java - 如何从kafka avro记录生成pojo?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50169389/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com