gpt4 book ai didi

java - 设计输出多个文件(包括二进制输出)的 Apache Beam 转换的理想方法是什么?

转载 作者:行者123 更新时间:2023-12-02 06:15:41 25 4
gpt4 key购买 nike

我正在尝试在 Beam 管道中处理来自输入存储桶的 PDF 文件,并将结果、输入和中间文件全部输出到单独的输出存储桶。

My pipeline

所有三个输出的文件名均来自最后一步,并且输入文件到输出文件名存在 1:1 映射,因此我不想在输出文件名中包含分片模板(我的 UniquePrefixFileNaming 类是做与 TextIO.withoutSharding()) 相同的事情

由于文件名仅在最后一步中已知,因此我认为我无法在前面的每个处理步骤中设置标记输出和输出文件 - 我必须在整个管道中携带数据。

实现这一目标的最佳方法是什么?下面是我对这个问题的尝试 - 文本输出工作正常,但我没有 PDF 输出的解决方案(没有可用的二进制输出接收器,没有二进制数据通过)。 FileIO.writeDynamic 是最好的方法吗?

Pipeline p = Pipeline.create();

PCollection<MyProcessorTransformResult> transformCollection = p.apply(FileIO.match().filepattern("Z:\\Inputs\\en_us\\**.pdf"))
.apply(FileIO.readMatches())
.apply(TikaIO.parseFiles())
.apply(ParDo.of(new MyProcessorTransform()));

// Write output PDF
transformCollection.apply(FileIO.match().filepattern())
transformCollection.apply(FileIO.<String, MyProcessorTransformResult>writeDynamic()
.withTempDirectory("Z:\\Temp\\vbeam")
.by(input -> input.data.getResourceKey())
.via(
Contextful.fn((SerializableFunction<MyProcessorTransformResult, byte[]>) input -> new byte[] {})
)
.withNaming(d -> new UniquePrefixFileNaming(d, ".pdf"))
.withNumShards(1)
.withDestinationCoder(ByteArrayCoder.of())
.to("Z:\\Outputs"));

// Write output TXT
transformCollection.apply(FileIO.<String, MyProcessorTransformResult>writeDynamic()
.withTempDirectory("Z:\\Temp\\vbeam")
.by(input -> input.data.getResourceKey())
.via(
Contextful.fn((SerializableFunction<MyProcessorTransformResult, String>) input -> input.originalContent),
TextIO.sink()
)
.withNaming(d -> new UniquePrefixFileNaming(d, ".pdf.txt"))
.withNumShards(1)
.withDestinationCoder(StringUtf8Coder.of())
.to("Z:\\Outputs"));

// Write output JSON
transformCollection.apply(FileIO.<String, MyProcessorTransformResult>writeDynamic()
.withTempDirectory("Z:\\Temp\\vbeam")
.by(input -> input.data.getResourceKey())
.via(
Contextful.fn((SerializableFunction<MyProcessorTransformResult, String>) input -> SerializationHelpers.toJSON(input.data)),
TextIO.sink()
)
.withNaming(d -> new UniquePrefixFileNaming(d, ".pdf.json"))
.withNumShards(1)
.withDestinationCoder(StringUtf8Coder.of())
.to("Z:\\Outputs"));

p.run();

最佳答案

我最终编写了自己的文件接收器来保存所有 3 个输出。 FileIO 非常适合流式处理,让 Windows 和 Panes 来分割数据,- 我的接收器步骤一直耗尽内存,因为它会在进行任何实际写入之前尝试聚合所有内容,因为批处理作业在 Beam 的单个窗口中运行。我的自定义 DoFn 没有遇到此类问题。

我对任何研究这个问题的人的建议是做同样的事情 - 您可以尝试连接到 Beam 的文件系统类或查看 jclouds 以实现与文件系统无关的存储。

关于java - 设计输出多个文件(包括二进制输出)的 Apache Beam 转换的理想方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55873011/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com