gpt4 book ai didi

java - 一个数据流作业内的并行管道

转载 作者:行者123 更新时间:2023-12-01 19:36:53 25 4
gpt4 key购买 nike

我想在 GCP 上的一个数据流作业中运行两个并行管道。我已经创建了一个管道,它工作得很好,但我想要另一个管道而不创建另一项作业。

我已经搜索了很多答案,但找不到任何代码示例:(

如果我这样运行它就不起作用:

pipe1.run();
pipe2.run();

它给我“已经有一个 Activity 的作业名称...如果您想提交第二个作业,请再次尝试使用 --jobName 设置不同的名称”

最佳答案

您可以将其他输入应用于管道,这将在一项作业中产生一个单独的管道。例如:

public class ExamplePipeline {

public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
options.setRunner(DirectRunner.class);

Pipeline pipeline = Pipeline.create(options);

PCollection<String> linesForPipelineOne = pipeline.apply(Create.of("A1", "B1"));
PCollection<String> linesToWriteFromPipelineOne = linesForPipelineOne.apply("Pipeline 1 transform",
ParDo.of(new DoFn<String, String>() {

@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("Pipeline one:" + c.element());
c.output(c.element() + " extra message.");
}

}));
linesToWriteFromPipelineOne.apply((TextIO.write().to("file.txt")));

PCollection<String> linesForPipelineTwo = pipeline.apply(Create.of("A2", "B2"));
linesForPipelineTwo.apply("Pipeline 2 transoform",
ParDo.of(new DoFn<String, String>() {

@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("Pipeline two:" + c.element());
}

}));

pipeline.run();
}

正如您所看到的,您也可以将两个(或更多)单独的 PBegin 应用于具有多个 PDone/Sink 的管道。在此示例中,“pipeline 1” 转储输出并将其写入文件,“pipeline 2” 仅将其转储到屏幕。

如果您在 GCP 上使用 DataflowRunner 运行此程序,GUI 将向您显示 2 个未连接的“管道”。

关于java - 一个数据流作业内的并行管道,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57108773/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com