gpt4 book ai didi

java - @OnTimer 在窗口后不触发

转载 作者:行者123 更新时间:2023-12-02 03:34:17 42 4
gpt4 key购买 nike

我一直在尝试使用 Apache Beam 的计时器,但无法触发它们。

据我所知,您可以在 DoFn 中按以下方式定义计时器。

@TimerId("expiry")
private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

我选择了 TimeDomain.PROCESSING_TIME,因为我的事件没有分配时间戳,并且希望在窗口完成后立即触发计时器的执行。

        .apply(
"FixedWindow",
Window.<KV<String, GenericRecord>>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes()
)
.apply("ExecuteAfterWindowFn", ParDo.of(new ExecuteAfterWindowFn()));

我期望下面的计时器,它位于 DoFn 内,基本上在缓冲区内累积对象,并在窗口完成后继续管道并处理事件集...

        @OnTimer("expiry")
public void onExpiry(
OnTimerContext context,
@StateId("bufferedSize") ValueState<Integer> bufferedSizeState,
@StateId("buffered") BagState<GenericRecord> bufferedState) throws IOException {
flush(context, bufferedState, bufferedSizeState);
}

...执行成功。我是否遗漏了某些内容或不了解计时器在 Apache Beam 中的工作原理?

最佳答案

您可以检查[1],其中有计时器用法的示例。

您需要设置计时器何时触发[2],其中可能是错过的地方。

[1] https://beam.apache.org/blog/2017/08/28/timely-processing.html

[2] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java#L53

关于java - @OnTimer 在窗口后不触发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56869241/

42 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com