gpt4 book ai didi

hadoop - 带有 avro 记录的 Kafka

转载 作者:可可西里 更新时间:2023-11-01 16:21:58 29 4
gpt4 key购买 nike

我有以下内容:来源 - kafka主题(翻译) channel - 内存接收器 - Hdfs (avro_event)

kafka topic trans中的数据是用c# producer写的,有上千条avro记录。当我运行我的水槽消费者时,它开始将数据接收到 hdfs。问题是数据的格式是:schema数据图式数据

代替:

架构数据数据

我猜这是因为 flume 期望记录类型为 {header} {body} 而来自 kafka 的数据只是 {body}我知道有一种方法可以将写入主题的 avro 数据包装在 avroFlumeEvent 中,但它似乎不再是真正的 avro 记录,也许 spark 消费者或 Storm 会更喜欢真正的 avro 中的数据.有没有一种方法可以处理这个主题,以便每次 flume 将数据滚动到 hdfs 时都可以在没有多个模式的情况下写入数据?

最佳答案

我们实际上最终成功了。我们在 C# 生产者中使用的是 Microsoft .NET avro 库而不是 apache avro 库。这意味着 avro 记录已正确序列化。我还需要更改水槽接收器以使用“org.apache.flume.sink.hdfs.AvroEventSerializer$Builder”作为接收器序列化器而不是“avro_event”。我还需要包含一个连接到 kafka 源的水槽拦截器,它将变量“flume.avro.schema.url”推送到水槽 header 中,供 hdfs 接收器序列化程序稍后使用。

我看过 camus,但它似乎对我们试图实现的东西有点矫枉过正,一个连接到 kafka 主题的基本水槽 channel ,它将 avro 数据下沉到 hdfs。

我刚刚从构建 flume 配置的 java 应用程序中删除了拦截器位,希望它可以帮助遇到此问题的其他人:

                _flumeFileConfigProperties.put(_agentId+".sources." + _sourceId +".interceptors",_interceptorId);           
_flumeFileConfigProperties.put(_agentId+".sources." + _sourceId + ".interceptors." + _interceptorId + ".type","static");
_flumeFileConfigProperties.put(_agentId+".sources." + _sourceId + ".interceptors." + _interceptorId + ".key","flume.avro.schema.url");
_flumeFileConfigProperties.put(_agentId+".sources." + _sourceId + ".interceptors." + _interceptorId + ".value",_avroProdSchemaLocation +_databaseName + "/" + _topic + "/record/" + _schemaVersion + "/" + _topicName + ".avsc");

关于hadoop - 带有 avro 记录的 Kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28787835/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com