gpt4 book ai didi

apache-spark - 扩展 DefaultCodec 以支持 Hadoop 文件的 Zip 压缩

转载 作者:行者123 更新时间:2023-12-02 19:56:08 25 4
gpt4 key购买 nike

我有一些 Spark 代码从 HDFS 读取两个文件(一个头文件和一个正文文件),将 RDD[String] 减少到单个分区,然后使用 GZip 编解码器将结果作为压缩文件写入:

spark.sparkContext.textFile("path_to_header.txt,path_to_body.txt")
.coalesce(1)
.saveAsTextFile("output_path", classOf[GzipCodec])

这按预期 100% 有效。我们现在被要求为无法本地解压缩 *.gzip 文件的 Windows 用户支持 zip 压缩。显然,zip 格式不受 native 支持,因此我正在尝试推出自己的压缩编解码器。

我在运行代码时遇到了“ ZipException: no current ZIP entry ”异常:
Exception occured while exporting org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 16.0 failed 2 times, most recent failure: Lost task 0.1 in stage 16.0 (TID 675, xxxxxxx.xxxxx.xxx, executor 16): java.util.zip.ZipException: no current ZIP entry
at java.util.zip.ZipOutputStream.write(Unknown Source)
at io.ZipCompressorStream.write(ZipCompressorStream.java:23)
at java.io.DataOutputStream.write(Unknown Source)
at org.apache.hadoop.mapred.TextOutputFormat$LineRecordWriter.writeObject(TextOutputFormat.java:81)
at org.apache.hadoop.mapred.TextOutputFormat$LineRecordWriter.write(TextOutputFormat.java:102)
at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:95)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1205)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1348)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1211)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1190)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

我创建了一个扩展 DefaultCodec 的 ZipCodec 类:
public class ZipCodec extends DefaultCodec {

@Override
public CompressionOutputStream createOutputStream(final OutputStream out, final Compressor compressor) throws IOException {
return new ZipCompressorStream(new ZipOutputStream(out));
}

以及扩展 CompressorStream 的 ZipCompressorStream:
public class ZipCompressorStream extends CompressorStream {

public ZipCompressorStream(final ZipOutputStream out) {
super(out);
}

@Override
public void write(final int b) throws IOException {
out.write(b);
}

@Override
public void write(final byte[] data, final int offset, final int length) throws IOException {
out.write(data, offset, length);
}

我们目前正在使用 Spark 1.6.0 和 Hadoop 2.6.0-cdh5.8.2

有什么想法吗?

提前致谢!

最佳答案

ZIP 是一种容器格式,而 GZip 只是一种类似流的格式(用于存储一个文件)。这就是为什么在创建一个新的 ZIP 文件时需要先启动一个条目(给出一个名称),然后在关闭该条目之后再关闭容器。请参阅此处的示例:https://www.programcreek.com/java-api-examples/?class=java.util.zip.ZipOutputStream&method=putNextEntry

关于apache-spark - 扩展 DefaultCodec 以支持 Hadoop 文件的 Zip 压缩,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51332893/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com