gpt4 book ai didi

java - Apache Flink 从 Kafka 读取 Avro byte[]

转载 作者:行者123 更新时间:2023-12-01 09:06:18 25 4
gpt4 key购买 nike

在查看示例时,我看到了很多这样的内容:

FlinkKafkaConsumer08<Event> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);

我发现他们已经知道架构了。

I do not know the schema until I read the byte[] into a Generic Record then get the schema. (As it may change from record to record)

有人可以将我指向一个从 byte[] 读取到映射过滤器的 FlinkKafkaConsumer08 ,以便我可以删除一些前导位,然后加载该 字节[] 到通用记录中?

最佳答案

如果您使用 Confluence 的架构注册表,我相信首选解决方案是使用 Confluence 提供的 Avro serde。这样,我们只需调用 deserialize()并且要使用的最新版本的 Avro 模式的解析是在幕后自动完成的,不需要字节操作。

它归结为这样的东西(scala中的示例代码,java解决方案将非常相似):

import io.confluent.kafka.serializers.KafkaAvroDeserializer

...

val valueDeserializer = new KafkaAvroDeserializer()
valueDeserializer.configure(
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
false)

...

override def deserialize(messageKey: Array[Byte], message: Array[Byte],
topic: String, partition: Int, offset: Long): KafkaKV = {

val key = keyDeserializer.deserialize(topic, messageKey).asInstanceOf[GenericRecord]
val value = valueDeserializer.deserialize(topic, message).asInstanceOf[GenericRecord]

KafkaKV(key, value)
}

...

此方法要求消息生成器也与架构注册表集成并在那里发布架构。这可以通过与上面非常相似的方式来完成,使用 Confluence 的 KafkaAvroSerializer

我在这里发布了详细的解释:How to integrate Flink with Confluent's schema registry

关于java - Apache Flink 从 Kafka 读取 Avro byte[],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41255017/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com