gpt4 book ai didi

java - 如何仅在第一个元素上启动自定义触发器中的处理时间计时器?

转载 作者:行者123 更新时间:2023-12-02 01:41:18 26 4
gpt4 key购买 nike

我正在为我的应用程序使用 GlobalWindow 和自定义触发器。根据要求,在触发器函数中,我只需要在窗口中的第一个元素上启动处理时间计时器。

我尝试使用变量firstEventflag来实现它。就像这样。

.window(GlobalWindows.create())
.trigger(new Trigger<ImpactEventObject, GlobalWindow>() {
Boolean firstEventflag = false;

@Override
public TriggerResult onElement(ImpactEventObject impactEventObject, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
if (!firstEventflag) {
firstEventflag = true;
triggerContext.registerProcessingTimeTimer(
triggerContext.getCurrentProcessingTime() + 20000);
}
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
return TriggerResult.FIRE;
}

但是这失败了,因为今天我发现变量 firstEventflag 并不是每次创建新窗口时都初始化的,它取决于正在处理窗口的子任务,这意味着不同的窗口可以共享相同的内容变量 firstEventflag 使这个逻辑实际上毫无用处。鉴于此,我该如何解决我的问题?

最佳答案

通过查看CountTrigger的源代码找到了一种方法来做到这一点here .

我们可以使用 ReducingStateDescriptor 来记录 GlobalWindow 中的元素数量。当计数为 1 时启动计时器,即仅在第一个元素上启动计时器。

public class CustomTrigger extends Trigger<GenericObject, GlobalWindow> {

private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);

@Override
public TriggerResult onElement(ImpactEventObject impactEventObject, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
ReducingState<Long> count = triggerContext.getPartitionedState(stateDesc);
count.add(1L);

if (count.get() == 1) {
triggerContext.registerProcessingTimeTimer(
triggerContext.getCurrentProcessingTime() + 20000);
}
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
return TriggerResult.FIRE;
}

@Override
public TriggerResult onEventTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
return null;
}

@Override
public void clear(GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
triggerContext.deleteProcessingTimeTimer(triggerContext.getCurrentProcessingTime());
}

private static class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}

}
}

关于java - 如何仅在第一个元素上启动自定义触发器中的处理时间计时器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54421409/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com