gpt4 book ai didi

java - Hazelcast Jet 丢弃空聚合结果

转载 作者:行者123 更新时间:2023-11-30 10:16:34 26 4
gpt4 key购买 nike

我有一个可能有空结果的滑动窗口和自定义聚合累加器。什么是丢弃此类“空”聚合累加器以免进入接收器的正确方法?

        Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(Sources.<Long, Foo>map("map"))
.map(Map.Entry::getValue)
.addTimestamps(Foo::getTimeMillisecond, LIMIT)
.window(WindowDefinition.sliding(100, 10))
.aggregate(FooAggregateOperations.aggregateFoo(), (s, e, r) -> {
return String.format("started: %s\n%s\nended: %s\n", s, r, e);
})
.drainTo(Sinks.files(sinkDirectory));

如您所见,聚合器返回字符串:

public class FooAggregateOperations {

public static AggregateOperation1<Foo, FooAccumulator, String> aggregateFoo() {
return AggregateOperation
.withCreate(FooAccumulator::new)
.andAccumulate(FooAggregateOperations::accumulate)
.andCombine(FooAggregateOperations::combine)
.andDeduct(FooAggregateOperations::deduct)
.andFinish(FooAccumulator::getResult);
}
}

问题基本上是,在继续与其他结果合并/扣除或冲入接收器之前,如何丢弃可忽略的窗口/聚合结果?

最佳答案

要过滤掉空的聚合结果,您可以使用以下方法:

    Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(Sources.<Long, Foo>map("map"))
.map(Map.Entry::getValue)
.addTimestamps(Foo::getTimeMillisecond, LIMIT)
.window(WindowDefinition.sliding(100, 10))
.aggregate(FooAggregateOperations.aggregateFoo(),
(s, e, r) -> tuple3(s, e, r))
.filter(t -> !isEmpty(t.f2()))
.map(t -> String.format("started: %s\n%s\nended: %s\n", t.f0(), t.f2(), t.f1()))
.drainTo(Sinks.files("sinkDirectory"));

它所做的是将聚合结果存储在一个临时元组中,然后应用过滤和最终映射。

我还创建了一个 issue on GitHub我们将考虑在聚合操作中支持这种行为。

关于java - Hazelcast Jet 丢弃空聚合结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50013276/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com