- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
无法使用 Flume 推特代理读取和解析流推特数据创建的文件,既不使用 Java 也不使用 Avro 工具。我的需求是将avro格式转换成JSON格式。
当使用任何一种方法时,我得到异常:org.apache.avro.AvroRuntimeException: java.io.IOException: Block size invalid or too large for this implementation: -40
我在伪节点集群中使用 Hadoop vanilla 配置,hadoop 版本是 2.7.1
Flume版本为1.6.0
twitter 代理的 flume 配置文件和解析 avro 文件的 java 代码附在下面:
TwitterAgent.sources=Twitter
TwitterAgent.channels=MemChannel
TwitterAgent.sinks=HDFS
TwitterAgent.sources.Twitter.type=org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels=MemChannel
TwitterAgent.sources.Twitter.consumerKey=xxxxxxxxxxxxxx
TwitterAgent.sources.Twitter.consumerSecret=xxxxxxxxxxxxxxxx
TwitterAgent.sources.Twitter.accessToken=xxxxxxxxxxxxxxxx
TwitterAgent.sources.Twitter.accessTokenSecret=xxxxxxxxxxxxxx
TwitterAgent.sources.Twitter.keywords=Modi,PMO,Narendra Modi,BJP
TwitterAgent.sinks.HDFS.channel=MemChannel
TwitterAgent.sinks.HDFS.type=hdfs
TwitterAgent.sinks.HDFS.hdfs.path=hdfs://localhost:9000/user/ashish/Twitter_Data
TwitterAgent.sinks.HDFS.hdfs.fileType=DataStream
TwitterAgent.sinks.HDFS.hdfs.writeformat=Text
TwitterAgent.sinks.HDFS.hdfs.batchSize=100
TwitterAgent.sinks.HDFS.hdfs.rollSize=0
TwitterAgent.sinks.HDFS.hdfs.rollCount=10
TwitterAgent.sinks.HDFS.hdfs.rollInterval=30
TwitterAgent.channels.MemChannel.type=memory
TwitterAgent.channels.MemChannel.capacity=10000
TwitterAgent.channels.MemChannel.transactionCapacity=100
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.FsInput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
public class AvroReader {
public static void main(String[] args) throws IOException {
Path path = new Path("hdfs://localhost:9000/user/ashish/Twitter_Data/FlumeData.1449656815028");
Configuration config = new Configuration();
SeekableInput input = new FsInput(path, config);
DatumReader<GenericRecord> reader = new GenericDatumReader<>();
FileReader<GenericRecord> fileReader = DataFileReader.openReader(input, reader);
for (GenericRecord datum : fileReader) {
System.out.println("value = " + datum);
}
fileReader.close();
}
}
我得到的异常堆栈跟踪是:
2015-12-09 17:48:19,291 WARN [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
value = {"id": "674535686809120768", "user_friends_count": 1260, "user_location": "ユウサリ", "user_description": "「テガミバチ」に登場するザジのbotです。追加してほしい言葉などの希望があればDMでお願いします。リムーブする際はブロックでお願いします。", "user_statuses_count": 47762, "user_followers_count": 1153, "user_name": "ザジ", "user_screen_name": "zazie_bot", "created_at": "2015-12-09T15:56:54Z", "text": "@ill_akane_bot お前、なんか、\u2026すっげー楽しそうだな\u2026", "retweet_count": 0, "retweeted": false, "in_reply_to_user_id": 204695477, "source": "<a href=\"http:\/\/twittbot.net\/\" rel=\"nofollow\">twittbot.net<\/a>", "in_reply_to_status_id": 674535430423887872, "media_url_https": null, "expanded_url": null}
Exception in thread "main" org.apache.avro.AvroRuntimeException: java.io.IOException: Block size invalid or too large for this implementation: -40
at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:275)
at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197)
at avro.AvroReader.main(AvroReader.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.io.IOException: Block size invalid or too large for this implementation: -40
at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:266)
... 7 more
我是否还需要提供 Avro 架构才能正确读取 Avro 文件,如果是的话在哪里?
最佳答案
我也遇到了这个问题。虽然我可以看到你的数据文件已经不存在了。我检查了这个我的数据文件,应该和你的一样。
我发现我的数据文件已经是一个 avro 容器文件,这意味着它有它的模式和数据。
我得到的 avro 文件非常错误,因为它应该只包含一个包含 avro 模式的头,但实际上它的文件中有多个头。
另外,tweets 已经是 JSON 格式,为什么 flume 将它们转换成 avro 格式?
关于java - Flume Twitter Agent 生成的 Avro 文本文件未在 Java 中读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34194377/
Closed. This question does not meet Stack Overflow guidelines。它当前不接受答案。 想要改善这个问题吗?更新问题,以便将其作为on-topi
spooldir 选项用于流式传输特定目录的所有文件。完成整个目录读取后,作业将暂停/停止。但是,如果我想将新文件添加到同一目录中,会发生什么?? 我的要求是在任何新文件添加到该特定 spooldir
我正在尝试从/home/cloudera/Documents/flume/读取日志文件并使用 apache flume 将其写入 hdfs。我使用以下命令在 hdfs 中创建 flumeLogTest
我正在尝试使用 MemChannel 和 HDFS 处理一些 Twitter 关键字。但是,在控制台上的 HDFS 启动 状态后,flume-ng 没有显示进一步的进度。 这是/etc/flume-n
我正在尝试为flume-ng编写一个自定义接收器。我查看了现有的接收器和文档并对其进行了编码。但是,应该接收事件的“process()”方法总是以 null 结束。 我正在做 Event event
我正在测试 Flume NG (1.2.0) 以收集日志。 Flume收集日志文件flume_test.log的简单测试并将收集到的日志作为 sysout 打印到控制台。 conf/flume.con
我在 flume.con 文件中声明了一个 flume agent。来源是 RabbitMQ,尽管这不是很相关。问题是我需要从那里取出凭证到另一个文件。我看到这样做的方法是在 flume-env.sh
我有一个用例,我需要将文件从目录提取到 HDFS。作为 POC,我在 Flume 中使用了简单的目录假脱机,我在其中指定了源、接收器和 channel ,它工作正常。缺点是我必须为进入不同文件夹的多种
我正在使用 Flume 1.3.1 ng,我正在将文件从 spoolDir 传输到 HDFS Sink,并且我需要与输入文件相同的输出文件名称。例如,如果输入文件名为sample.gz,则输出也需要为
kafka_2.10-0.8.2.0 水槽1.6 这是我的水槽配置: a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1
我正在使用以下配置详细信息使用 Flume 将 Twitter 提要推送到 HDFS,但在 Flume 事件 header 中获得预期时间戳,但它为空 twitter.conf TwitterAgen
我们将一个 150 mb 的 csv 文件复制到水槽的 spool 目录中,当它被加载到 hdfs 中时,该文件被拆分成更小的文件,例如 80 kb 的文件。有没有办法加载文件而不会使用水槽拆分成更小
我们将推文保存在目录顺序中,例如/user/flume/2016/06/28/13/FlumeData...。但每小时它会创建超过 100 个 FlumeData 文件。我更改了 TwitterAge
有大量关于在 CDH3 中以故障转移模式配置 Flume (0,9x) 节点的信息。 但是CDH4中Flume(1.x)配置的配置格式完全不同。如何在故障转移模式下配置 Flume 1.x (flum
我正在使用 cloudera CDH 4.4。当我运行 flume cmd 时 - "bin/flume-ng agent -n agentA -f conf/MultipleFlumes.prope
我在 CentOS(cloudera VM)中安装了 Flume 1.4.0-cdh4.7.0 我运行以下命令来启动水槽 Flume-ng agent -n agent-name -c conf -f
我正在尝试运行典型的 Flume 第一个示例来获取推文并使用 Apache FLume 将它们存储在 HDFS 中。 [Hadoop version 3.1.3; Apache Flume 1.9.0
我正在尝试使用 Flume 进行 Twitter 分析。为了从 twitter 获取推文,我在 flume.conf 文件中设置了所有必需的参数(consumerKey、consumerSecret、
我正在尝试使用 Flume 和 Hive 进行 Twitter 分析。为了从 twitter 获取推文,我在 flume.conf 文件中设置了所有必需的参数(consumerKey、consumer
我搭建了一个hadoop集群,其中一个是master-slave节点,另一个是slave。现在,我想建立一个水槽来获取主机上集群的所有日志。但是,当我尝试从 tarball 安装 flume 时,我总
我是一名优秀的程序员,十分优秀!