gpt4 book ai didi

google-cloud-dataflow - 谷歌数据流 : Request payload size exceeds the limit: 10485760 bytes

转载 作者:行者123 更新时间:2023-12-01 12:25:05 27 4
gpt4 key购买 nike

尝试对约 800.000 个文件运行大型转换时,我在尝试运行管道时收到上述错误消息。

代码如下:

public static void main(String[] args) {
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
GcsUtil u = getUtil(p.getOptions());

try{
List<GcsPath> paths = u.expand(GcsPath.fromUri("gs://tlogdataflow/stage/*.zip"));
List<String> strPaths = new ArrayList<String>();
for(GcsPath pa: paths){
strPaths.add(pa.toUri().toString());
}

p.apply(Create.of(strPaths))
.apply("Unzip Files", Write.to(new ZipIO.Sink("gs://tlogdataflow/outbox")));
p.run();
}
catch(IOException io){
//
}

我认为这正是谷歌数据流的用途?处理大量文件/数据?

有没有办法分担负载以使其正常工作?

谢谢&BR

菲尔

最佳答案

Dataflow 擅长处理大量数据,但在管道描述 的大小方面存在限制。传递给 Create.of() 的数据目前嵌入在管道描述中,因此您不能在那里传递大量数据 - 相反,应该从外部存储读取大量数据,并且管道应仅指定它们的位置。

将其视为程序可以处理的数据量与程序代码本身的大小之间的区别。

您可以通过在 ParDo 中进行扩展来解决这个问题:

p.apply(Create.of("gs://tlogdataflow/stage/*.zip"))
.apply(ParDo.of(new ExpandFn()))
.apply(...fusion break (see below)...)
.apply(Write.to(new ZipIO.Sink("gs://tlogdataflow/outbox")))

其中 ExpandFn 如下所示:

private static class ExpandFn extends DoFn<String, String> {
@ProcessElement
public void process(ProcessContext c) {
GcsUtil util = getUtil(c.getPipelineOptions());
for (String path : util.expand(GcsPath.fromUri(c.element()))) {
c.output(path);
}
}
}

融合断裂 我指的是 this (基本上,ParDo(添加唯一键) + 按键分组 + Flatten.iterables() + Values.create())。这不是很方便,并且正在讨论添加内置转换来执行此操作(请参阅 this PRthis thread)。

关于google-cloud-dataflow - 谷歌数据流 : Request payload size exceeds the limit: 10485760 bytes,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40627585/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com