- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我需要一些有关 Flink Streaming 的帮助。我在下面生成了一个简单的 Hello-world 类型的代码。这从 RabbitMQ 流式传输 Avro 消息并将其持久化到 HDFS。我希望有人可以查看代码,也许它可以帮助其他人。
我发现的大多数 Flink 流示例都将结果发送到 std-out。我实际上想将数据保存到 Hadoop。我读到,理论上,您可以使用 Flink 将其流式传输到任何您喜欢的地方。实际上,我还没有找到任何将数据保存到 HDFS 的示例。但是,根据我确实找到的示例以及反复试验,我提供了以下代码。
这里的数据来源是RabbitMQ。我使用客户端应用程序将“MyAvroObjects”发送到 RabbitMQ。 MyAvroObject.java - 不包括 - 从 avro IDL 生成...可以是任何 avro 消息。
下面的代码使用 RabbitMQ 消息,并将其保存到 HDFS,作为 avro 文件......好吧,这就是我希望的。
package com.johanw.flink.stackoverflow;
import java.io.IOException;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroOutputFormat;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.FileSinkFunctionByMillis;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RMQToHadoop {
public class MyDeserializationSchema implements DeserializationSchema<MyAvroObject> {
private static final long serialVersionUID = 1L;
@Override
public TypeInformation<MyAvroObject> getProducedType() {
return TypeExtractor.getForClass(MyAvroObject.class);
}
@Override
public MyAvroObject deserialize(byte[] array) throws IOException {
SpecificDatumReader<MyAvroObject> reader = new SpecificDatumReader<MyAvroObject>(MyAvroObject.getClassSchema());
Decoder decoder = DecoderFactory.get().binaryDecoder(array, null);
MyAvroObject MyAvroObject = reader.read(null, decoder);
return MyAvroObject;
}
@Override
public boolean isEndOfStream(MyAvroObject arg0) {
return false;
}
}
private String hostName;
private String queueName;
public final static String path = "/hdfsroot";
private static Logger logger = LoggerFactory.getLogger(RMQToHadoop.class);
public RMQToHadoop(String hostName, String queueName) {
super();
this.hostName = hostName;
this.queueName = queueName;
}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
public void run() {
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
logger.info("Running " + RMQToHadoop.class.getName());
DataStream<MyAvroObject> socketStockStream = env.addSource(new RMQSource<>(hostName, queueName, new MyDeserializationSchema()));
Job job;
try {
job = Job.getInstance();
AvroJob.setInputKeySchema(job, MyAvroObject.getClassSchema());
} catch (IOException e1) {
e1.printStackTrace();
}
try {
JobConf jobConf = new JobConf(Job.getInstance().getConfiguration());
jobConf.set("avro.output.schema", MyAvroObject.getClassSchema().toString());
org.apache.avro.mapred.AvroOutputFormat<MyAvroObject> akof = new AvroOutputFormat<MyAvroObject>();
HadoopOutputFormat<AvroWrapper<MyAvroObject>, NullWritable> hof = new HadoopOutputFormat<AvroWrapper<MyAvroObject>, NullWritable>(akof, jobConf);
FileSinkFunctionByMillis<Tuple2<AvroWrapper<MyAvroObject>, NullWritable>> fileSinkFunctionByMillis = new FileSinkFunctionByMillis<Tuple2<AvroWrapper<MyAvroObject>, NullWritable>>(hof, 10000l);
org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(jobConf, new Path(path));
socketStockStream.map(new MapFunction<MyAvroObject, Tuple2<AvroWrapper<MyAvroObject>, NullWritable>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<AvroWrapper<MyAvroObject>, NullWritable> map(MyAvroObject envelope) throws Exception {
logger.info("map");
AvroKey<MyAvroObject> key = new AvroKey<MyAvroObject>(envelope);
Tuple2<AvroWrapper<MyAvroObject>, NullWritable> tupple = new Tuple2<AvroWrapper<MyAvroObject>, NullWritable>(key, NullWritable.get());
return tupple;
}
}).addSink(fileSinkFunctionByMillis);
try {
env.execute();
} catch (Exception e) {
logger.error("Error while running " + RMQToHadoop.class + ".", e);
}
} catch (IOException e) {
logger.error("Error while running " + RMQToHadoop.class + ".", e);
}
}
public static void main(String[] args) throws IOException {
RMQToHadoop toHadoop = new RMQToHadoop("localhost", "rabbitTestQueue");
toHadoop.run();
}
}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
...
DataStreamSource<MyAvroObject> socketStockStream = env.addSource(new FlinkKafkaConsumer082<MyAvroObject>(topic, new MyDeserializationSchema(), sourceProperties));
Properties destProperties = new Properties();
destProperties.setProperty("bootstrap.servers", bootstrapServers);
FlinkKafkaProducer<MyAvroObject> kafkaProducer = new FlinkKafkaProducer<L3Result>("MyKafkaTopic", new MySerializationSchema(), destProperties);
最佳答案
我想 FileSinkFunctionByMillis
可以使用,但这意味着您的流媒体程序不具有容错能力。这意味着如果您的源或机器或写入失败,那么您的程序将崩溃而无法恢复。
建议你看看用RollingSink
( https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#hadoop-filesystem )。这可用于创建类似 Flum 的管道以将数据摄取到 HDFS(或其他文件系统)中。滚动接收器是一个可恢复接收器,这意味着您的程序将是容错的,因为 Kafka 消费者也是容错的。您也可以指定自定义 Writer
以您想要的任何格式写入数据,例如 Avro。
关于apache-flink - 请确认这是使用 Flink 将数据流式传输到 Hadoop 的正确方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34500756/
我刚刚更新了 Ruby,现在我在尝试启动 compass 时遇到以下错误: Encoding::CompatibilityError on line ["28"] of /usr/local/Cell
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 我们不允许提问寻求书籍、工具、软件库等的推荐。您可以编辑问题,以便用事实和引用来回答。 关闭 6 年前。
关闭。这个问题需要debugging details .它目前不接受答案。 编辑问题以包含 desired behavior, a specific problem or error, and th
按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
我正在尝试在我的 iOS 应用程序中开发可折叠/ Accordion 式的功能。这将是您可以在网站上找到的典型 FAQ 类型功能。我想点击标题,然后显示详细信息。 因为这是帮助部分,只有几个项目,我认
我正在尝试设计一个基于 REST 的 Web 服务来与我正在开发的农场动物管理系统进行交互。 为了详细说明问题,我收藏了动物 属于一个农场。每只动物都有自己的信息——例如姓名、身份证号、品种年龄等。因
我有 3 种不同的表单,其中复选框数量不同,每个部分基本上代表一个表单,因此当用户选择该部分中的复选框时,它会显示他们在该部分的总金额中 checkout 了多少 HTML
我有一份 32 页的 PDF 版家谱。与其将家谱全部放在一个非常大的 PDF 页面上(这是我想要的),不如将其格式化为一组 8 个单独的美国信纸大小的页面应该在整个宽度上缝合; 4 行这样就完成了树。
指SASS implementation for Java? : 在 Maven 目标编译包中自动编译 compass-style.org 样式表的最佳方法是什么? 我不想发送太多的自编译库,也不想通
鉴于以下 XAML... 我正在寻找一种绑定(bind) ComboBox、Button 和 Command 的方法,以便当 ComboBox 的值更改时,在 Command 上调用 CanExe
在玩具应用程序中,我有一个显示所有帖子标题的“帖子”模板。当您单击每个标题时,我不想直接进入“显示” View ,而是直接内联展开该帖子的其余内容。 我考虑过让 postRoute 重用 postsR
我需要一些使用 Twitter Bootstrap 或其他响应式框架的自定义 Swagger-UI 实现。需要在我的移动设备上使用这样的 UI 测试我的 API,但 swagger-ui 不能很好地扩
我正在做一个项目,我真的在尝试编写面向对象的 JavaScript 代码。我刚刚开始阅读Douglas Crockford's JavaScript: The Good Parts我很快开始意识到用
在 C# 中,我通过执行以下操作来加密文本数据(请注意我正在以 block ( block )的形式加密数据): public string EncryptData(string pu
我正在构建一个社交网站,该网站将向全世界公开 REST API (WCF WebAPI),以便任何开发人员都能够为该网站创建客户端应用程序、将其与其他服务集成等。 我想为 API 实现 Faceboo
我是一名优秀的程序员,十分优秀!