- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有点难以理解 Flink Triggers 的工作原理。我的数据流包含具有 sessionId 的事件,我根据该 sessionId 聚合了这些事件。每个 session 将包含一个 Started 和一个 Ended 事件,但有时 Ended 事件会丢失。
为了处理这个问题,我设置了一个触发器,它会在处理结束事件时发出聚合 session 。但是,如果该 session 在 2 分钟内没有事件到达,我想发出我们迄今为止聚合的任何内容(我们发送事件的应用程序每分钟发送一次心跳,因此如果我们没有收到任何事件, session 将被视为丢失) .
我已经设置了以下触发功能:
public class EventTimeProcessingTimeTrigger extends Trigger<HashMap, TimeWindow> {
private final long sessionTimeout;
private long lastSetTimer;
// Max session length set to 1 day
public static final long MAX_SESSION_LENGTH = 1000l * 86400l;
// End session events
private static ImmutableSet<String> endSession = ImmutableSet.<String>builder()
.add("Playback.Aborted")
.add("Playback.Completed")
.add("Playback.Error")
.add("Playback.StartAirplay")
.add("Playback.StartCasting")
.build();
public EventTimeProcessingTimeTrigger(long sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
@Override
public TriggerResult onElement(HashMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
lastSetTimer = ctx.getCurrentProcessingTime() + sessionTimeout;
ctx.registerProcessingTimeTimer(lastSetTimer);
if(endSession.contains(element.get(Field.EVENT_TYPE))) {
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return time == window.maxTimestamp() ?
TriggerResult.FIRE_AND_PURGE :
TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(lastSetTimer);
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + sessionTimeout);
}
}
DataStream<HashMap> playerEvents = env
.addSource(kafkaConsumerEvents, "playerEvents(Kafka)")
.name("Read player events from Kafka")
.uid("Read player events from Kafka")
.map(json -> DECODER.decode(json, TypeToken.of(HashMap.class))).returns(HashMap.class)
.name("Map Json to HashMap")
.uid("Map Json to HashMap")
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<HashMap>(org.apache.flink.streaming.api.windowing.time.Time.seconds(30))
{
@Override
public long extractTimestamp(HashMap element)
{
long timestamp = 0L;
Object timestampAsObject = (Object) element.get("CanonicalTime");
timestamp = (long)timestampAsObject;
return timestamp;
}
})
.name("Add CanonicalTime as timestamp")
.uid("Add CanonicalTime as timestamp");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(60, Time.of(60, TimeUnit.MINUTES), Time.of(60, TimeUnit.SECONDS)));
env.enableCheckpointing(5000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<PlayerSession> playerSessions = sideEvents
.keyBy((KeySelector<HashMap, String>) event -> (String) event.get(Field.SESSION_ID))
.window(ProcessingTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.minutes(5)))
.trigger(new EventTimeProcessingTimeTrigger(SESSION_TIMEOUT))
.aggregate(new SessionAggregator())
.name("Aggregate events into sessions")
.uid("Aggregate events into sessions");
最佳答案
这种情况很复杂。我不敢准确预测这段代码会做什么,但我可以解释一些正在发生的事情。
Point 1:您已经将时间特征设置为事件时间,安排了时间戳和水印,并实现了onEventTime
触发器中的回调。但是您无处可创建事件时间计时器。除非我错过了什么,否则实际上没有使用事件时间或水印。您还没有实现事件时间触发器,我不希望 onEventTime
将永远被调用。
第 2 点:您的触发器不需要调用 clear。作为清除窗口的一部分,Flink 负责在触发器上调用 clear。
第 3 点:您的触发器试图反复触发和清除窗口,这似乎不正确。我这样说是因为您正在为每个元素创建一个新的处理时间计时器,并且当每个计时器触发时,您正在触发和清除窗口。您可以随心所欲地触发该窗口,但您只能清除该窗口一次,之后它就会消失。
第4点: session 窗口是一种特殊的窗口,称为合并窗口。当 session 合并时(这种情况一直发生,随着事件的到来),它们的触发器被合并,其中一个被清除。这就是为什么你会看到 clear 如此频繁地被调用。
建议:由于您有每分钟一次的 keepalive,并且打算在 2 分钟不 Activity 后关闭 session ,似乎您可以将 session 间隔设置为 2 分钟,这样可以避免一些让事情变得如此复杂的事情.让 session 窗口做他们设计要做的事情。
假设这行得通,那么您可以简单地扩展 Flink 的 ProcessingTimeTrigger
并覆盖其 onElement
方法来做到这一点:
@Override
public TriggerResult onElement(HashMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (endSession.contains(element.get(Field.EVENT_TYPE))) {
return TriggerResult.FIRE_AND_PURGE;
}
return super(element, timestamp, window, ctx);
}
ProcessingTimeTrigger
的其余部分。的行为。
EventTimeTrigger
作为父类(super class),您必须找到一种方法来确保您的水印即使在流空闲时也能取得进展。见
this answer如何处理。
关于java - 如何实现在 X 分钟内未收到任何事件后发出的 Flink 事件时间触发器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62059837/
概述 触发器是 MySQL 的数据库对象之一,不需要程序调用或手工启动,而是由事件来触发、激活,从而实现执行,包括 INSERT 语句、UPDATE 语句和 DELETE 语句 创建触发器 1. 创建
当我为单元格获得的值是某种类型时,我试图设置一个触发器来显示文本块。 我已经成功地设法在相同的情况下显示图像,但在这种情况下我不需要图像,而是一些文本。 已注释掉行以进行测试。尝试使其工作。注释掉的代
我需要在 phpmyadmin 中为 2 个表创建一个触发器。 所以有表 stores 和 tbl_storefinder_stores。 我想从 stores 表中插入 4 个东西(名称、地址、经度
阅读目录 1、触发器 2、触发器类型 3、触发器语法 4、插入数据触发器案例 5、修改数据触发器案例
SQLite 触发器(Trigger) SQLite 的触发器是数据库的回调函数,它会在指定的数据库事件发生时自动执行/调用。以下是关于SQLite的触发器的要点:SQLite **触发器(Trig
请帮我写一个向表中添加新行的触发器。 我的数据库中有 3 个表: 地区(id,名字); id - 主要; 技术人员(身份证、姓名); id - 主要; 可用性(id、区域、技术、计数); id - p
我正在编写一个触发器来审核表中的更新和删除。我正在使用 SQL Server 2008 我的问题是, 有没有办法在不经过删除和插入表的选择阶段的情况下找出对记录采取的操作? 另一个问题是,如果记录被删
我的表: TableA (id number, state number) TableB (id number, tableAId number, state number) TableC (id n
我很少写触发器。我可以帮助设置这件事。 CREATE TRIGGER audit_tableName ON dbo.tableNameAudit AFTER CREATE, UPDATE, DELET
我之前从未在 Oracle 中创建过触发器,所以我正在寻找一些方向。 如果 ID 不在插入语句中,我想创建一个将 ID 增加 1 的触发器。 ID 应该从 10000 开始,当插入一条记录时,下一个
考虑以下两个(假设的)表 温度 * day * time * lake_name * station * temperature_f 温度_总结 * day * lake_name * station
如何在 SQL 触发器中获取更新记录的值 - 如下所示: CREATE TRIGGER TR_UpdateNew ON Users AFTER UPDATE AS BEGIN S
我是 Cassandra 新手,使用 Cassandra 3.10 并有类似的表格 create table db1.table1 (id text, trip_id text, event_time
在 MSSQL 中执行 TRUNCATE(而不是删除)时如何触发触发器 最佳答案 来自msdn : TRUNCATE TABLE cannot activate a trigger because t
我正在尝试在 sql developer 中创建一个简单的触发器,以在工资发生变化时显示工资的变化 CREATE OR REPLACE TRIGGER salary_changes BEFORE DE
我有三个表: Table1: Customers (CustomerId, Name, CustomerAddress) Table2: AccountManagers(ManagerId, Name
在 Sql Server 2005 触发器中有没有办法在执行期间获取触发器附加到的表的名称和架构? 最佳答案 SELECT OBJECT_NAME(parent_id) AS [Table],
使用 MySQL 5.5,以下触发器因错误而被拒绝: create trigger nodups before insert on `category-category` for each row b
我使用 fancybox 打开一个带有表单的弹出窗口。目前,当鼠标离开主页时,弹出窗口就会出现。为了完成这项工作,我有一个隐藏的链接标签,我用trigger()函数模拟它,单击该函数,以便该链接的hr
我的触发器触发 INSERT, UPDATE and DELETE .我需要根据触发触发器的操作从适当的内存表( inserted, deleted )插入。由于只有 inserted位于 INSER
我是一名优秀的程序员,十分优秀!