- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我有以下 flume 代理配置来从 kafka 源读取消息并将它们写回 HDFS 接收器
tier1.sources = source1
tier 1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = 192.168.0.100:2181
tier1.sources.source1.topic = test
tier1.sources.source1.groupId = flume
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 100
tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.channel1.brokerList = 192.168.0.100:9092
tier1.channels.channel1.topic = test
tier1.channels.channel1.zookeeperConnect = 192.168.0.100:2181/kafka
tier1.channels.channel1.parseAsFlumeEvent = false
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.writeFormat = Text
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.hdfs.filePrefix = test-kafka
tier1.sinks.sink1.hdfs.fileSufix = .avro
tier1.sinks.sink1.hdfs.useLocalTimeStamp = true
tier1.sinks.sink1.hdfs.path = /tmp/kafka/%y-%m-%d
tier1.sinks.sink1.hdfs.rollCount=0
tier1.sinks.sink1.hdfs.rollSize=0
如果每个轮询周期只有一条 kafka 消息到达,kafka 消息内容是 avro 数据,它被正确序列化到一个文件中。
当两个kafka消息到达同一个批处理时,它们被分组到同一个HDFS文件中,由于avro消息同时包含schema + data,结果文件包含schema + data + schema + data,导致无效.avro 文件。
我如何拆分 avro 事件以将不同的 kafka 消息拆分为将每个消息写入不同的文件
谢谢
最佳答案
一种方法:假设您将源 kafka 传入数据称为“SourceTopic”。您可以将自定义接收器注册到此“SourceTopic”。
<FlumeNodeRole>.sinks.<your-sink>.type =net.my.package.CustomSink
在您的 CustomSink 中,您可以编写一个方法来区分传入的消息、将其拆分并重新发送到不同的“DestinationTopic”。这个“DestinationTopic”现在可以充当文件序列化的新水槽源。
管道衬里水槽引用以下链接: https://flume.apache.org/FlumeUserGuide.html
关于hadoop - 水槽+卡夫卡+HDFS : Split messages,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40101469/
我已经从“https://github.com/apache/flume/downloads”下载了水槽..但我无法构建它..我需要先安装节俭才能构建水槽吗?如果是这样,原因是什么..我得到了当我运行
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
我有以下 flume 代理配置来从 kafka 源读取消息并将它们写回 HDFS 接收器 tier1.sources = source1 tier 1.channels = channel1 tie
我想获取由 AVRO 反序列化器创建的记录并将其发送到 ElasticSearch。我意识到我必须编写自定义代码来执行此操作。 使用 LITERAL 选项,我得到了 JSON 模式,这是使用 Gene
我是 spark 的新手,我们正在运行 spark on yarn。我可以很好地运行我的测试应用程序。我正在尝试收集 Graphite 中的 Spark 指标。我知道要对 metrics.proper
我正在尝试使用 Flume-ng 将数据写入 Hdfs 作为 exec 源。但它总是以退出代码 127 结束。它还显示类似警告 无法从 VM 获取 maxDirectMemory:NoSuchMeth
我是一名优秀的程序员,十分优秀!