gpt4 book ai didi

java - 为什么我的处理时间窗口触发器触发但事件时间窗口触发器不会

转载 作者:搜寻专家 更新时间:2023-11-01 03:45:44 25 4
gpt4 key购买 nike

我正在努力让基于事件时间的触发器为我的 Apache Beam 管道触发,但似乎确实能够通过处理时间触发窗口触发。

我的管道相当基础:

  1. 我收到了一批数据点,其中包括来自 pubsub 读入的毫秒级时间戳,时间戳比最早的批处理数据点稍早。对数据进行批处理以减少客户端工作量和发布订阅费用。

  2. 我提取二级时间戳并将时间戳应用于各个数据点

  3. 我对数据进行窗口处理并避免使用全局窗口。

  4. 我按秒对数据进行分组,以便稍后按流数据的秒进行分类。

  5. 我最终在分类秒数上使用滑动窗口有条件地每秒向 pubsub 发送两条消息之一。

我的问题似乎出在第 3 步。

我正在尝试在第 3 阶段使用我最终将在第 5 阶段使用的相同窗口策略,以对分类秒数运行滑动平均计算。

我试过弄乱 withTimestampCombiner(TimestampCombiner.EARLIEST) 选项,但这似乎无法解决问题。

我读过关于事件时间使用的 .withEarlyFirings 方法,但这似乎会模仿我现有的解决方法。理想情况下,我能够依赖通过窗口末尾的水印并包括延迟触发。

// De-Batching The Pubsub Message

static public class UnpackDataPoints extends DoFn<String,String>{
@ProcessElement
public void processElement(@Element String c, OutputReceiver<String> out) {
JsonArray packedData = new JsonParser().parse(c).getAsJsonArray();
DateTimeFormatter dtf = DateTimeFormat.forPattern("EEE dd MMM YYYY HH:mm:ss:SSS zzz");
for (JsonElement acDataPoint: packedData){
String hereData = acDataPoint.toString();
DateTime date = dtf.parseDateTime(acDataPoint.getAsJsonObject().get("Timestamp").getAsString());
Instant eventTimeStamp = date.toInstant();
out.outputWithTimestamp(hereData,eventTimeStamp);
}
}
}
// Extracting The Second
static public class ExtractTimeStamp extends DoFn<String,KV<String,String>> {
@ProcessElement
public void processElement(ProcessContext ctx ,@Element String c, OutputReceiver<KV<String,String>> out) {
JsonObject accDataObject = new JsonParser().parse(c).getAsJsonObject();
String milliString = accDataObject.get("Timestamp").getAsString();
String secondString = StringUtils.left(milliString,24);
accDataObject.addProperty("noMiliTimeStamp", secondString);
String updatedAccData = accDataObject.toString();
KV<String,String> outputKV = KV.of(secondString,updatedAccData);
out.output(outputKV);
}
}
// The Pipeline & Windowing
Pipeline pipeline = Pipeline.create(options);

PCollection<String> dataPoints = pipeline
.apply("Read from Pubsub", PubsubIO.readStrings()
.fromTopic("projects/????/topics/???")
.withTimestampAttribute("messageTimestamp"))
.apply("Extract Individual Data Points",ParDo.of(new UnpackDataPoints()));


/// This is the event time window that doesn't fire for some reason
/*
PCollection<String> windowedDataPoints = dataPoints.apply(
Window.<String>into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1)))
// .triggering(AfterWatermark.pastEndOfWindow())
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TWO_MINUTES))
//.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(2)))
.discardingFiredPanes()
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.standardSeconds(1)));
*/
///// Temporary Work Around, this does fire but data is out of order

PCollection<String> windowedDataPoints = dataPoints.apply(
Window.<String>into(FixedWindows.of(Duration.standardMinutes(120)))
.triggering(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5)))
.discardingFiredPanes()
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.standardSeconds(1)));

PCollection<KV<String, String>> TimeStamped = windowedDataPoints
.apply( "Pulling Out The Second For Aggregates", ParDo.of(new ExtractTimeStamp()));

PCollection<KV<String, Iterable<String>>> TimeStampedGrouped = TimeStamped.apply("Group By Key",GroupByKey.create());

PCollection<KV<String, Iterable<String>>> testing = TimeStampedGrouped.apply("testingIsh", ParDo.of(new LogKVIterable()));

当我使用第一个被注释掉的窗口策略时,我的管道无限期地运行,它接收数据并且 LogKVIterable ParDo 永远不会返回任何东西,当我使用处理时间工作时,LogKVIterable 会触发并记录到控制台。

最佳答案

这看起来确实像是您添加到数据中的时间戳可能是错误的/损坏的。我鼓励您验证以下内容:

  1. 已正确添加元素中的时间戳。在转换之前/之后添加一些日志记录,并广泛测试该代码。

  2. 您的管道中的数据新鲜度和系统滞后指标正在按预期进行。 如果数据新鲜度没有按预期移动,则表明您的时间戳设置不当。

关于java - 为什么我的处理时间窗口触发器触发但事件时间窗口触发器不会,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57368360/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com