gpt4 book ai didi

google-cloud-dataflow - 阅读前在数据流中解压缩文件

转载 作者:行者123 更新时间:2023-12-04 09:15:50 34 4
gpt4 key购买 nike

我们的客户正在将文件上传到 GCS,但它们已被压缩。有没有什么办法,使用Java Dataflow SDK,我们可以运行所有压缩文件,解压文件,将所有生成的.csv文件合并成一个文件,然后只做TextIO转变?

编辑

回答jkffs的问题,

  • 好吧,我真的不需要将它们全部合并到一个文件中,从阅读的角度来看会容易得多。
  • 它们是 ZIP 文件,而不是 GZ 或 BZ 或其他任何文件。每个 ZIP 包含多个文件。文件名并不是很重要,是的,我实际上更喜欢 TextIO 在每个存档的基础上透明地解压缩和连接所有文件。

  • 希望有帮助!

    最佳答案

    因为我遇到了同样的问题,只找到了这个 1 岁且非常不完整的解决方案。以下是有关如何在 google 数据流上解压缩文件的完整示例:

    public class SimpleUnzip {

    private static final Logger LOG = LoggerFactory.getLogger(SimpleUnzip.class);

    public static void main(String[] args){
    Pipeline p = Pipeline.create(
    PipelineOptionsFactory.fromArgs(args).withValidation().create());

    GcsUtilFactory factory = new GcsUtilFactory();
    GcsUtil util = factory.create(p.getOptions());
    try{
    List<GcsPath> gcsPaths = util.expand(GcsPath.fromUri("gs://tlogdataflow/test/*.zip"));
    List<String> paths = new ArrayList<String>();

    for(GcsPath gcsp: gcsPaths){
    paths.add(gcsp.toString());
    }
    p.apply(Create.of(paths))
    .apply(ParDo.of(new UnzipFN()));
    p.run();

    }
    catch(Exception e){
    LOG.error(e.getMessage());
    }


    }

    public static class UnzipFN extends DoFn<String,Long>{
    private static final long serialVersionUID = 2015166770614756341L;
    private long filesUnzipped=0;
    @Override
    public void processElement(ProcessContext c){
    String p = c.element();
    GcsUtilFactory factory = new GcsUtilFactory();
    GcsUtil u = factory.create(c.getPipelineOptions());
    byte[] buffer = new byte[100000000];
    try{
    SeekableByteChannel sek = u.open(GcsPath.fromUri(p));
    InputStream is = Channels.newInputStream(sek);
    BufferedInputStream bis = new BufferedInputStream(is);
    ZipInputStream zis = new ZipInputStream(bis);
    ZipEntry ze = zis.getNextEntry();
    while(ze!=null){
    LOG.info("Unzipping File {}",ze.getName());
    WritableByteChannel wri = u.create(GcsPath.fromUri("gs://tlogdataflow/test/" + ze.getName()), getType(ze.getName()));
    OutputStream os = Channels.newOutputStream(wri);
    int len;
    while((len=zis.read(buffer))>0){
    os.write(buffer,0,len);
    }
    os.close();
    filesUnzipped++;
    ze=zis.getNextEntry();

    }
    zis.closeEntry();
    zis.close();

    }
    catch(Exception e){
    e.printStackTrace();
    }
    c.output(filesUnzipped);
    }

    private String getType(String fName){
    if(fName.endsWith(".zip")){
    return "application/x-zip-compressed";
    }
    else {
    return "text/plain";
    }
    }
    }

    }

    关于google-cloud-dataflow - 阅读前在数据流中解压缩文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32964657/

    34 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com