gpt4 book ai didi

java - Apache Beam/Dataflow - PubSub 丢失消息

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:48:19 27 4
gpt4 key购买 nike

我有一个简单的工作(Java 2.2.0 的 Apache Beam SDK)从 PubSub 订阅读取消息,从侧输入读取配置,对消息应用转换并将结果发送到另一个 PubSub 主题

问题是传出消息的数量不等于传入消息的数量。我正在从另一项工作中快速插入 1500 万条消息(无需手动指定时间戳)。问题似乎伴随着侧面输入的存在,因为没有我就没有更多的损失。在 Dataflow 监控中,我们可以看到大约 20000 条丢失的消息。

DataflowRunner 上的作业 ID:2018-01-17_05_33_45-3290466857677892673

enter image description here

如果我重新启动同一个作业,丢失消息的数量是不一样的

我创建了简单的片段来说明我的问题

发布者

String PROJECT_ID = "...";

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);

p
.apply(GenerateSequence.from(0).to(15000000))
.apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
.apply(PubsubIO.writeStrings().to("projects/" + PROJECT_ID + "/topics/test_in"));

p.run();

监听器

String PROJECT_ID = "...";

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);

PCollectionView<Long> sideInput = p
.apply(GenerateSequence.from(0).to(10))
.apply(Count.globally())
.apply(View.asSingleton());

p
// 15,000,000 in input
.apply(PubsubIO.readMessages().fromSubscription("projects/" + PROJECT_ID + "/subscriptions/test_in"))
.apply(ParDo.of(new DoFn<PubsubMessage, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element());
}
}).withSideInputs(sideInput))
// 14,978,010 in output
.apply(PubsubIO.writeMessages().to("projects/" + PROJECT_ID + "/topics/test_out"));

p.run();

最佳答案

问题很可能是由于 late data dropping 引起的.您可以通过设置允许延迟无限大的窗口策略来解决这个问题。

关于java - Apache Beam/Dataflow - PubSub 丢失消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48304427/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com