作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我们想通过Flink的BucketingSink或者StreamingFileSink将压缩后的数据写入HDFS。我已经编写了自己的 Writer,如果没有发生故障,它可以正常工作。然而,当它遇到故障并从检查点重新启动时,它会生成有效长度的文件(hadoop < 2.7)或截断文件。不幸的是,gzip 是二进制文件,在文件末尾有尾部。因此,简单截断在我的情况下不起作用。为压缩 hdfs 接收器启用精确一次语义的任何想法?
这是我编写器的代码:
public class HdfsCompressStringWriter extends StreamWriterBaseV2<JSONObject> {
private static final long serialVersionUID = 2L;
/**
* The {@code CompressFSDataOutputStream} for the current part file.
*/
private transient GZIPOutputStream compressionOutputStream;
public HdfsCompressStringWriter() {}
@Override
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);
this.setSyncOnFlush(true);
compressionOutputStream = new GZIPOutputStream(this.getStream(), true);
}
public void close() throws IOException {
if (compressionOutputStream != null) {
compressionOutputStream.close();
compressionOutputStream = null;
}
resetStream();
}
@Override
public void write(JSONObject element) throws IOException {
if (element == null || !element.containsKey("body")) {
return;
}
String content = element.getString("body") + "\n";
compressionOutputStream.write(content.getBytes());
compressionOutputStream.flush();
}
@Override
public Writer<JSONObject> duplicate() {
return new HdfsCompressStringWriter();
}
最佳答案
我建议为 StreamingFileSink
实现一个 BulkWriter
,它通过 GZIPOutputStream
压缩元素。代码可能如下所示:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(1000);
final DataStream<Integer> input = env.addSource(new InfinitySource());
final StreamingFileSink<Integer> streamingFileSink = StreamingFileSink.<Integer>forBulkFormat(new Path("output"), new GzipBulkWriterFactory<>()).build();
input.addSink(streamingFileSink);
env.execute();
}
private static class GzipBulkWriterFactory<T> implements BulkWriter.Factory<T> {
@Override
public BulkWriter<T> create(FSDataOutputStream fsDataOutputStream) throws IOException {
final GZIPOutputStream gzipOutputStream = new GZIPOutputStream(fsDataOutputStream, true);
return new GzipBulkWriter<>(new ObjectOutputStream(gzipOutputStream), gzipOutputStream);
}
}
private static class GzipBulkWriter<T> implements BulkWriter<T> {
private final GZIPOutputStream gzipOutputStream;
private final ObjectOutputStream objectOutputStream;
public GzipBulkWriter(ObjectOutputStream objectOutputStream, GZIPOutputStream gzipOutputStream) {
this.gzipOutputStream = gzipOutputStream;
this.objectOutputStream = objectOutputStream;
}
@Override
public void addElement(T t) throws IOException {
objectOutputStream.writeObject(t);
}
@Override
public void flush() throws IOException {
objectOutputStream.flush();
}
@Override
public void finish() throws IOException {
objectOutputStream.flush();
gzipOutputStream.finish();
}
}
关于gzip - Flink 如何容错将数据作为 gzip 压缩下沉到 hdfs?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56474706/
在 firefox 中遇到问题。 你可以在这里看到:link 点击并按住文本“Click Me”下降一个像素。如果您在按住鼠标按钮的同时移开光标,然后在空白处松开 - 文本“Click Me”不会在一
我是一名优秀的程序员,十分优秀!