gpt4 book ai didi

apache-kafka - 使用 Avro 的单个 Kafka 主题中的多种消息类型

转载 作者:行者123 更新时间:2023-12-04 11:50:55 32 4
gpt4 key购买 nike

我有一个基于 Kafka 构建的事件源应用程序。目前我有一个包含多种消息类型的主题。全部使用 JSON 序列化/反序列化。

confluent 的架构注册表看起来是一种很好的消息类型维护方法,并且在 Avro 完全兼容模式下,它还提供了一种机制来在我的事件源应用程序中进行消息版本控制。

与最近 patch -- blog post至 4.1.1 汇合。您可以使用 Avro 序列化器/反序列化器在一个主题中包含多种不同类型的消息。

但是,我还没有看到任何工作示例。甚至一个都没有。

我的问题是:在不必使用 Avro 联合​​类型(将所有不同类型的消息放在一个单一模式中并利用联合)的情况下,上述补丁真的有效吗?

这种方法如何与需要指定 Key 和 Value Serde 的 Kafka Streaming 应用程序一起使用?

我应该忘记 Avro 而是使用 protobuff 吗?

最佳答案

这是消费者从发布不同类型事件的主题获取数据的示例:

package com.kafka.schema;

import com.phonebook.Employee;
import com.phonebook.Milestone;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.IntStream;

public class AvroConsumer {

private static Consumer<Long, GenericRecord> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Const.BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
// Use Kafka Avro Deserializer.
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// Use Specific Record or else you get Avro GenericRecord.
// props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");

// Schema registry location.
// Run Schema Registry on 8081
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, Const.SCHEMA_REGISTRY);
props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
return new KafkaConsumer<>(props);
}

public static void main(String... args) {
final Consumer<Long, GenericRecord> consumer = createConsumer();
consumer.subscribe(Collections.singletonList(Const.TOPIC));
IntStream.range(1, 100).forEach(index -> {
final ConsumerRecords<Long, GenericRecord> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
if (records.count() == 0) {
System.out.println("None found");
} else {
records.forEach(record -> {
GenericRecord recValue = record.value();
System.out.printf("%s %d %d %s \n", record.topic(), record.partition(), record.offset(), recValue);
});
}
});
}
}

这里的重要部分是:
props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());

关于apache-kafka - 使用 Avro 的单个 Kafka 主题中的多种消息类型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51429759/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com