gpt4 book ai didi

java - Apache 梁 WithTimestamps : Output timestamps must be no earlier than timestamp of current input

转载 作者:行者123 更新时间:2023-12-02 09:40:28 32 4
gpt4 key购买 nike

我试图以 10 秒的频率从 google cloud pubsub 流中窗口数据,但是出现此错误:

java.lang.IllegalArgumentException: Cannot output with timestamp 2019-07-20T12:13:04.875Z. Output timestamps must be no earlier than the timestamp of the current input (2019-07-20T12:13:05.591Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew. org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:587) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:566) org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:80) org.apache.beam.sdk.transforms.WithTimestamps$AddTimestampsDoFn.processElement(WithTimestamps.java:136)

这是导致错误的代码:

eventStream
.apply("Add Event Timestamps",
WithTimestamps.of((Event event) -> new Instant(event.getTime())))
.apply("Window Events",
Window.<Event>into(FixedWindows.of(Duration.parseDuration("10s"))));

造成这种情况的原因是什么以及合适的解决方案是什么?

最佳答案

来自文档:

If the input {@link PCollection} elements have timestamps, the output timestamp for each element must not be before the input element's timestamp minus the value of {@link getAllowedTimestampSkew()}. If an output timestamp is before this time, the transform will throw an {@link IllegalArgumentException} when executed. Use {@link withAllowedTimestampSkew(Duration)} to update the allowed skew.

CAUTION: Use of {@link #withAllowedTimestampSkew(Duration)} permits elements to be emitted behind the watermark. These elements are considered late, and if behind the {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream {@link PCollection} may be silently dropped.

因此,要解决此问题,您可以使用 withAllowedTimestampSkew

我使用了不同的 API:withTimestampAttribute。您可以在 JSON/AVRO 中设置一个包含时间戳字段的属性。

此 API 在发布时可用:

  .apply(PubsubIO.writeAvros(Someclass.class)
.withIdAttribute("id")
.withTimestampAttribute("myTime").to(topic));

订阅时:

.apply(PubsubIO.readAvros(Someclass.class) .fromSubscription(...)
.withIdAttribute("id").withTimestampAttribute("myTime"))

关于java - Apache 梁 WithTimestamps : Output timestamps must be no earlier than timestamp of current input,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57124674/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com