gpt4 book ai didi

java - Apache Beam 没有将无限数据保存到文本文件

转载 作者:行者123 更新时间:2023-11-30 10:11:26 24 4
gpt4 key购买 nike

我创建了一个管道,使用 Apache Beam 和 Java 将 Google Cloud Pubsub 消息保存到文本文件中。每当我使用 --runner=DataflowRunner 在 Google Dataflow 中运行管道时,消息都会正确保存。

但是,当我使用 --runner=DirerctRunner 运行相同的管道时,消息不会被保存。

我可以看到通过管道的事件,但没有任何反应。

管道是下面的代码:

public static void main(String[] args) {
ExerciseOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ExerciseOptions.class);

Pipeline pipeline = Pipeline.create(options);

pipeline
.apply("Read Messages from Pubsub",
PubsubIO
.readStrings()
.fromTopic(options.getTopicName()))

.apply("Set event timestamp", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext context) {
context.outputWithTimestamp(context.element(), Instant.now());
}
}))

.apply("Windowing", Window.into(FixedWindows.of(Duration.standardMinutes(5))))

.apply("Write to File",
TextIO
.write()
.withWindowedWrites()
.withNumShards(1)
.to(options.getOutputPrefix()));

pipeline.run();
}

我做错了什么?是否可以在本地运行此管道?

最佳答案

我遇到了同样的问题。PubSubIO 无法与 DirectRunnerTextIO 一起正常工作。我找到了解决此问题的方法,即在写入之前触发窗口。对于某些运行者,这需要 --streaming 标签才能工作。

  pipeline
.apply("2 minutes window",
Window.<String>into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(10),
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(2)))))
.withAllowedLateness(Duration.standardSeconds(10))
.discardingFiredPanes())

这样文件就按预期写入了。

关于java - Apache Beam 没有将无限数据保存到文本文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52445414/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com