gpt4 book ai didi

java - 如何实现在 X 分钟内未收到任何事件后发出的 Flink 事件时间触发器

转载 作者:行者123 更新时间:2023-12-04 09:47:58 24 4
gpt4 key购买 nike

我有点难以理解 Flink Triggers 的工作原理。我的数据流包含具有 sessionId 的事件,我根据该 sessionId 聚合了这些事件。每个 session 将包含一个 Started 和一个 Ended 事件,但有时 Ended 事件会丢失。

为了处理这个问题,我设置了一个触发器,它会在处理结束事件时发出聚合 session 。但是,如果该 session 在 2 分钟内没有事件到达,我想发出我们迄今为止聚合的任何内容(我们发送事件的应用程序每分钟发送一次心跳,因此如果我们没有收到任何事件, session 将被视为丢失) .

我已经设置了以下触发功能:

public class EventTimeProcessingTimeTrigger extends Trigger<HashMap, TimeWindow> {
private final long sessionTimeout;
private long lastSetTimer;

// Max session length set to 1 day
public static final long MAX_SESSION_LENGTH = 1000l * 86400l;

// End session events
private static ImmutableSet<String> endSession = ImmutableSet.<String>builder()
.add("Playback.Aborted")
.add("Playback.Completed")
.add("Playback.Error")
.add("Playback.StartAirplay")
.add("Playback.StartCasting")
.build();

public EventTimeProcessingTimeTrigger(long sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}

@Override
public TriggerResult onElement(HashMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
lastSetTimer = ctx.getCurrentProcessingTime() + sessionTimeout;
ctx.registerProcessingTimeTimer(lastSetTimer);

if(endSession.contains(element.get(Field.EVENT_TYPE))) {
return TriggerResult.FIRE_AND_PURGE;
}

return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return time == window.maxTimestamp() ?
TriggerResult.FIRE_AND_PURGE :
TriggerResult.CONTINUE;
}

@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(lastSetTimer);
}

@Override
public boolean canMerge() {
return true;
}

@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + sessionTimeout);
}
}

为了为事件设置水印,我使用应用程序设置的水印,因为 appEventTime 可能与服务器上的 wallClock 不同。我像这样提取水印:
DataStream<HashMap> playerEvents = env
.addSource(kafkaConsumerEvents, "playerEvents(Kafka)")
.name("Read player events from Kafka")
.uid("Read player events from Kafka")
.map(json -> DECODER.decode(json, TypeToken.of(HashMap.class))).returns(HashMap.class)
.name("Map Json to HashMap")
.uid("Map Json to HashMap")
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<HashMap>(org.apache.flink.streaming.api.windowing.time.Time.seconds(30))
{
@Override
public long extractTimestamp(HashMap element)
{
long timestamp = 0L;
Object timestampAsObject = (Object) element.get("CanonicalTime");
timestamp = (long)timestampAsObject;
return timestamp;
}
})
.name("Add CanonicalTime as timestamp")
.uid("Add CanonicalTime as timestamp");

现在我觉得奇怪的是,当我在调试中运行代码并在 Trigger 的 clear 函数中设置断点时,它会不断被调用。即使在触发器中没有达到 FIRE_AND_PURGE 点。所以感觉就像我完全误解了触发器应该如何工作。而且我的实现根本没有做我认为它在做的事情。

我想我的问题是,触发器何时应该调用 clear ?这是实现组合 EventTimeTrigger 和 ProcessingTimeTrigger 的正确方法吗?

感谢我能得到的所有帮助。

更新 1: (2020-05-29)

为了提供有关如何设置的更多信息。
我的环境设置如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(60, Time.of(60, TimeUnit.MINUTES), Time.of(60, TimeUnit.SECONDS)));
env.enableCheckpointing(5000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

所以我对整个流使用 EventTime。
然后我创建这样的窗口:
DataStream<PlayerSession> playerSessions = sideEvents
.keyBy((KeySelector<HashMap, String>) event -> (String) event.get(Field.SESSION_ID))
.window(ProcessingTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.minutes(5)))
.trigger(new EventTimeProcessingTimeTrigger(SESSION_TIMEOUT))
.aggregate(new SessionAggregator())
.name("Aggregate events into sessions")
.uid("Aggregate events into sessions");

最佳答案

这种情况很复杂。我不敢准确预测这段代码会做什么,但我可以解释一些正在发生的事情。

Point 1:您已经将时间特征设置为事件时间,安排了时间戳和水印,并实现了onEventTime触发器中的回调。但是您无处可创建事件时间计时器。除非我错过了什么,否则实际上没有使用事件时间或水印。您还没有实现事件时间触发器,我不希望 onEventTime将永远被调用。

第 2 点:您的触发器不需要调用 clear。作为清除窗口的一部分,Flink 负责在触发器上调用 clear。

第 3 点:您的触发器试图反复触发和清除窗口,这似乎不正确。我这样说是因为您正在为每个元素创建一个新的处理时间计时器,并且当每个计时器触发时,您正在触发和清除窗口。您可以随心所欲地触发该窗口,但您只能清除该窗口一次,之后它就会消失。

第4点: session 窗口是一种特殊的窗口,称为合并窗口。当 session 合并时(这种情况一直发生,随着事件的到来),它们的触发器被合并,其中一个被清除。这就是为什么你会看到 clear 如此频繁地被调用。

建议:由于您有每分钟一次的 keepalive,并且打算在 2 分钟不 Activity 后关闭 session ,似乎您可以将 session 间隔设置为 2 分钟,这样可以避免一些让事情变得如此复杂的事情.让 session 窗口做他们设计要做的事情。

假设这行得通,那么您可以简单地扩展 Flink 的 ProcessingTimeTrigger并覆盖其 onElement方法来做到这一点:

@Override
public TriggerResult onElement(HashMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {

if (endSession.contains(element.get(Field.EVENT_TYPE))) {
return TriggerResult.FIRE_AND_PURGE;
}

return super(element, timestamp, window, ctx);
}

以这种方式,窗口将在两分钟不 Activity 后或由显式 session 结束事件触发。

您应该能够简单地继承 ProcessingTimeTrigger 的其余部分。的行为。

如果要使用事件时间,则使用 EventTimeTrigger作为父类(super class),您必须找到一种方法来确保您的水印即使在流空闲时也能取得进展。见 this answer如何处理。

关于java - 如何实现在 X 分钟内未收到任何事件后发出的 Flink 事件时间触发器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62059837/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com