gpt4 book ai didi

java - 使用 Apache Beam 进行窗口化 - 修复了 Windows 似乎没有关闭的问题?

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:17:03 24 4
gpt4 key购买 nike

我们正在尝试在 Apache Beam 管道上使用固定窗口(使用 DirectRunner )。我们的流程如下:

  1. 从发布/订阅中提取数据
  2. 将JSON反序列化为Java对象
  3. 具有 5 秒固定窗口的窗口事件
  4. 使用自定义 CombineFn , 组合 Event 的每个窗口进入 List<Event>
  5. 为了测试,简单地输出结果 List<Event>

管道代码:

    pipeline
// Read from pubsub topic to create unbounded PCollection
.apply(PubsubIO
.<String>read()
.topic(options.getTopic())
.withCoder(StringUtf8Coder.of())
)

// Deserialize JSON into Event object
.apply("ParseEvent", ParDo
.of(new ParseEventFn())
)

// Window events with a fixed window size of 5 seconds
.apply("Window", Window
.<Event>into(FixedWindows
.of(Duration.standardSeconds(5))
)
)

// Group events by window
.apply("CombineEvents", Combine
.globally(new CombineEventsFn())
.withoutDefaults()
)

// Log grouped events
.apply("LogEvent", ParDo
.of(new LogEventFn())
);

我们看到的结果是最后一步永远不会运行,因为我们没有得到任何日志记录。

此外,我们还添加了 System.out.println("***")在我们自定义的每种方法中CombineFn类,以便跟踪它们何时运行,但似乎它们也没有运行。

这里的窗口设置不正确吗?我们遵循了一个在 https://beam.apache.org/documentation/programming-guide/#windowing 中找到的示例它看起来相当简单,但显然缺少一些基本的东西。

感谢任何见解 - 提前致谢!

最佳答案

看起来主要问题确实是缺少触发器 - 窗口正在打开,但没有任何信息告诉它何时发出结果。我们只想根据处理时间(而不是事件时间)来设置窗口,因此做了以下操作:

.apply("Window", Window
.<Event>into(new GlobalWindows())
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5))
)
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)

本质上,这会创建一个全局窗口,在处理第一个元素后 5 秒触发该窗口以发出事件。每次关闭窗口时,都会在收到元素后打开另一个窗口。当我们没有 withAllowedLateness 片段时,Beam 会提示 - 据我所知,这只是告诉它忽略任何迟到的数据。

我的理解可能有点离题,但上面的代码片段已经解决了我们的问题!

关于java - 使用 Apache Beam 进行窗口化 - 修复了 Windows 似乎没有关闭的问题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44011806/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com