gpt4 book ai didi

java - Apache Flink 测试中是否有像 Reactor 和 RxJava 中那样的虚拟时间概念

转载 作者:行者123 更新时间:2023-12-02 10:13:04 27 4
gpt4 key购买 nike

在 RxJava 和 Reactor 中,存在虚拟时间的概念来测试依赖于时间的运算符。我不知道如何在 Flink 中做到这一点。例如,我整理了以下示例,我想尝试一下迟到事件以了解它们是如何处理的。但是我无法理解这样的测试会是什么样子?有没有办法将Flink和Reactor结合起来,让测试变得更好?

public class PlayWithFlink {

public static void main(String[] args) throws Exception {

final OutputTag<MyEvent> lateOutputTag = new OutputTag<MyEvent>("late-data"){};

// TODO understand how BoundedOutOfOrderness is related to allowedLateness
BoundedOutOfOrdernessTimestampExtractor<MyEvent> eventTimeFunction = new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
@Override
public long extractTimestamp(MyEvent element) {
return element.getEventTime();
}
};

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> events = env.fromCollection(MyEvent.examples())
.assignTimestampsAndWatermarks(eventTimeFunction);

AggregateFunction<MyEvent, MyAggregate, MyAggregate> aggregateFn = new AggregateFunction<MyEvent, MyAggregate, MyAggregate>() {
@Override
public MyAggregate createAccumulator() {
return new MyAggregate();
}

@Override
public MyAggregate add(MyEvent myEvent, MyAggregate myAggregate) {
if (myEvent.getTracingId().equals("trace1")) {
myAggregate.getTrace1().add(myEvent);
return myAggregate;
}
myAggregate.getTrace2().add(myEvent);
return myAggregate;
}

@Override
public MyAggregate getResult(MyAggregate myAggregate) {
return myAggregate;
}

@Override
public MyAggregate merge(MyAggregate myAggregate, MyAggregate acc1) {
acc1.getTrace1().addAll(myAggregate.getTrace1());
acc1.getTrace2().addAll(myAggregate.getTrace2());
return acc1;
}
};

KeySelector<MyEvent, String> keyFn = new KeySelector<MyEvent, String>() {
@Override
public String getKey(MyEvent myEvent) throws Exception {
return myEvent.getTracingId();
}
};

SingleOutputStreamOperator<MyAggregate> result = events
.keyBy(keyFn)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.allowedLateness(Time.seconds(20))
.sideOutputLateData(lateOutputTag)
.aggregate(aggregateFn);


DataStream lateStream = result.getSideOutput(lateOutputTag);

result.print("SessionData");

lateStream.print("LateData");

env.execute();
}
}

class MyEvent {
private final String tracingId;
private final Integer count;
private final long eventTime;

public MyEvent(String tracingId, Integer count, long eventTime) {
this.tracingId = tracingId;
this.count = count;
this.eventTime = eventTime;
}

public String getTracingId() {
return tracingId;
}

public Integer getCount() {
return count;
}

public long getEventTime() {
return eventTime;
}

public static List<MyEvent> examples() {
long now = System.currentTimeMillis();
MyEvent e1 = new MyEvent("trace1", 1, now);
MyEvent e2 = new MyEvent("trace2", 1, now);
MyEvent e3 = new MyEvent("trace2", 1, now - 1000);
MyEvent e4 = new MyEvent("trace1", 1, now - 200);
MyEvent e5 = new MyEvent("trace1", 1, now - 50000);
return Arrays.asList(e1,e2,e3,e4, e5);
}

@Override
public String toString() {
return "MyEvent{" +
"tracingId='" + tracingId + '\'' +
", count=" + count +
", eventTime=" + eventTime +
'}';
}
}

class MyAggregate {
private final List<MyEvent> trace1 = new ArrayList<>();
private final List<MyEvent> trace2 = new ArrayList<>();


public List<MyEvent> getTrace1() {
return trace1;
}

public List<MyEvent> getTrace2() {
return trace2;
}

@Override
public String toString() {
return "MyAggregate{" +
"trace1=" + trace1 +
", trace2=" + trace2 +
'}';
}
}

运行的输出是:

SessionData:1> MyAggregate{trace1=[], trace2=[MyEvent{tracingId='trace2', count=1, eventTime=1551034666081}, MyEvent{tracingId='trace2', count=1, eventTime=1551034665081}]}
SessionData:3> MyAggregate{trace1=[MyEvent{tracingId='trace1', count=1, eventTime=1551034166081}], trace2=[]}
SessionData:3> MyAggregate{trace1=[MyEvent{tracingId='trace1', count=1, eventTime=1551034666081}, MyEvent{tracingId='trace1', count=1, eventTime=1551034665881}], trace2=[]}

但是,我希望看到 e5 事件的 LateStream 触发器,该触发器应该在第一个事件触发之前 50 秒。

最佳答案

如果您将水印分配器修改为这样

AssignerWithPunctuatedWatermarks eventTimeFunction = new AssignerWithPunctuatedWatermarks<MyEvent>() {
long maxTs = 0;

@Override
public long extractTimestamp(MyEvent myEvent, long l) {
long ts = myEvent.getEventTime();
if (ts > maxTs) {
maxTs = ts;
}
return ts;
}

@Override
public Watermark checkAndGetNextWatermark(MyEvent event, long extractedTimestamp) {
return new Watermark(maxTs - 10000);
}
};

然后你就会得到你期望的结果。我不推荐这样做——只是用它来说明正在发生的事情。

这里发生的是 BoundedOutOfOrdernessTimestampExtractor是一个周期性水印生成器,仅每 200 毫秒(默认情况下)将水印插入到流中。因为您的作业在此之前很久就完成了,所以您的作业遇到的唯一水印是 Flink 在每个有限流末尾注入(inject)的水印(值为 MAX_WATERMARK)。迟到与水印有关,您预计迟到的事件正在设法在该水印之前到达。

通过切换到标点水印,您可以强制在流中的特定点更频繁或更精确地添加水印。这通常是不必要的(太频繁的水印会导致开销),但当您想要对水印的顺序进行强有力的控制时,这很有帮助。

至于如何编写测试,你可以看看test harnesses用于 Flink 自己的测试,或者在 flink-spector .

更新:

与 BoundedOutOfOrdernessTimestampExtractor 关联的时间间隔是流预计的无序程度的规范。在此范围内到达的事件不会被视为延迟,并且事件时间计时器不会触发,直到经过此延迟,从而为无序事件的到达提供了时间。 allowedLateness 仅适用于窗口 API,并描述了框架在正常窗口触发时间之后多长时间保持窗口状态,以便事件仍然可以添加到窗口并导致延迟触发。在此额外间隔之后,窗口状态将被清除,后续事件将发送到侧面输出(如果已配置)。

enter image description here

所以当你使用BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10))时您不是说“在每个事件之后等待 10 秒,以防更早的事件可能仍然到达”。但你说你的事件最多应该有 10 秒的困惑。因此,如果您正在处理实时事件流,这意味着您最多将等待 10 秒,以防较早的事件到达。 (如果您正在处理历史数据,那么您可能能够在 1 秒内处理 10 秒的数据,或者不能——知道您将等待 n 秒的事件时间过去,但这并不能说明实际需要多长时间。 )

有关此主题的更多信息,请参阅 Event Time and Watermarks .

关于java - Apache Flink 测试中是否有像 Reactor 和 RxJava 中那样的虚拟时间概念,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54855358/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com