gpt4 book ai didi

akka - 使 Reactive Kafka 与 Confluent Schema Registry Avro Schema 一起工作

转载 作者:行者123 更新时间:2023-12-01 04:54:58 25 4
gpt4 key购买 nike

如何使 Reactive Kafka ( https://github.com/akka/reactive-kafka ) 与 Confluent Schema Registry Avro Schema 一起工作?这是我的示例代码:

def create(groupId: String)(implicit system: ActorSystem): Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] = {

val consumerSettings = ConsumerSettings(system, new StringDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(bootstrap)
.withGroupId(groupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty("schema.registry.url", schemaRegistryUrl)
.withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[String].getName)
.withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer].getName)
.withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")

Consumer.committableSource(consumerSettings, Subscriptions.topics(createDataSetJobRequestTopic))
}

最佳答案

您只需要将消费者配置为使用 Confluent 反序列化器:io.confluent.kafka.serializers.KafkaAvroDeserializer.class

设置以下属性:

  • ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
  • ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG

此外,您必须添加属性“schema.registry.url”以指定至少一个指向您的模式注册表实例的地址。

最后,您必须将以下依赖项添加到您的项目中:

            <dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${io.confluent.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>

Confluent Documentation

关于akka - 使 Reactive Kafka 与 Confluent Schema Registry Avro Schema 一起工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44627756/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com