gpt4 book ai didi

java - Apache 弗林克 : Custom trigger behaves unexpectedly

转载 作者:行者123 更新时间:2023-11-30 05:52:02 26 4
gpt4 key购买 nike

我有一个由事件组成的数据流,其属性表示一批生成的元素。该属性(我们称之为“batchNumber”)在我从同一生产批处理中摄取的每个事件中都是恒定的。我每批处理收到多个事件。

我想在“batchNumber”更改时分析批处理内的机器性能。我的方法是使用全局流并使用“batchNumber”作为键对其进行分区。我希望这会将全局流分区到窗口中,其中每个事件都带有“batchNumber”。然后我定义一个触发器,当“batchNumber”更改时应该触发该触发器。然后我可以分析 ProcessWindowFunction 中的聚合数据。

我的问题是:

  • 当 prodnr 更改时,触发器并不总是触发
  • 即使它确实触发,也只有一个元素被聚合。我预计接近 200。

这是我正在使用的代码。

    public class batchnrTrigger extends Trigger<ImaginePaperData, GlobalWindow> {

private static final long serialVersionUID = 1L;

public batchnrTrigger() {}

private final ValueStateDescriptor<Integer> prevbatchnr = new ValueStateDescriptor<>("batchnr", Integer.class);

@Override
public TriggerResult onElement(ImaginePaperData element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

ValueState<Integer> batchnrState = ctx.getPartitionedState(prevbatchnr);

if (batchnrState == null || batchnrState.value() == null || !(element.batchnr == batchnrState.value())) {

System.out.println("batchnr BEFORE: " + batchnrState.value() + " NEW batchnr: " + element.batchnr + " ==> should fire and process elements from window!");
batchnrState.update(element.batchnr);
return TriggerResult.FIRE;

}

System.out.println("batchnr BEFORE: " + batchnrState.value() + " NEW batchnr: " + element.batchnr + " ==> should not fire and continue ingesting elements!");
batchnrState.update(element.batchnr);
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {

}

}

这就是我如何称呼这个触发器:

DataStream<String> imaginePaperDataStream = nifiStreamSource
.map(new ImaginePaperDataConverter())
.keyBy((ImaginePaperData event) -> event.lunum)
.window(GlobalWindows.create())
.trigger(new LunumTrigger())
.process(new ImaginePaperWindowReportFunction());

我知道这个问题类似于this问题。但我正在使用 ValueState 并且我认为我的触发条件根本不相似。

我怎样才能让它工作?

最佳答案

您确定要通过 event.lunum 为流设置 key 吗?如果您预计每个不同的 lunum 值大约有 200 个事件,那么这是有道理的。但是,如果每个 lunum 值的每批只有一个事件,那就可以解释您所看到的行为。

此外,您确定您的事件正在按顺序处理吗?如果批处理由于并行进程之间的竞争条件而在处理管道中的某个位置交错,这也可能有助于解释您所看到的内容。

此外,您应该在触发器的clear方法中清除状态。您需要实现一个 Evictor,以便在处理后从窗口中删除元素。

这部分窗口 API 相当复杂。我认为这个特定的应用程序可以更直接地实现为 RichFlatMap,它在 ListState 中收集项目,直到批处理号发生变化(您将其保留在 ValueState 中)。

关于java - Apache 弗林克 : Custom trigger behaves unexpectedly,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53746813/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com