gpt4 book ai didi

java - Akka流滑动窗口通过SourceQueue控制receive到sink

转载 作者:行者123 更新时间:2023-12-02 02:47:58 26 4
gpt4 key购买 nike

更新:我将问题放入 test project 详细解释我的意思

================================================== =====================

我有 Akka 源代码,可以继续从数据库表中读取,然后对某个键进行分组,然后减少它。然而,在我应用reduce函数后,数据似乎永远不会发送到sink,它将继续reduce,因为上游总是有数据到来。

我读了一些帖子,并尝试了 groupedWithin 和滑动,但它并不像我想象的那样工作,它只会将消息分组到较大的部分,但不会使上游暂停并发出到接收器。以下是Akka流2.5.2中的代码

源减少代码:

source = source
.groupedWithin(100, FiniteDuration.apply(1, TimeUnit.SECONDS))
.sliding(3, 1)
.mapConcat(i -> i)
.mapConcat(i -> i)
.groupBy(2000000, i -> i.getEntityName())
.map(i -> new Pair<>(i.getEntityName(), i))
.reduce((l, r) ->{ l.second().setAction(r.second().getAction() + l.second().getAction()); return l;})
.map(i -> i.second())
.mergeSubstreams();

接收器并运行:

Sink<Object, CompletionStage<Done>> sink = 
Sink.foreach(i -> System.out.println(i))
final RunnableGraph<SourceQueueWithComplete<Object>> run = source.toMat(sink, Keep.left());
run.run(materIalizer);

我也尝试过 .takeWhile(predicated);我使用计时器来切换谓词值 true 和 false,但似乎只会将第一次切换为 false,当我切换回 true 时,它​​不会重新启动上游。

请帮助我提前致谢!

================================================== ==

更新

information about the type of elements

添加我想要的内容:我有类调用 SystemCodeTracking 包含 2 个属性 (id,entityName)

我将有对象列表:(1, "table1"), (2, "table2"), (3, "table3"),(4, "table1"),(5, "table3"”)

我想对entityName进行分组,然后对id进行求和,因此,我希望看到的结果如下

("table1" 1+4),("table3", 3+5),("table2", 2)

我现在正在做的代码如下

source
.groupBy(2000000, systemCodeTracking -> systemCodeTracking.getEntityName)
.map(systemCodeTracking -> new Pair<String, Integer>(systemCodeTracking.getEntityName, SystemCodeTracking.getId()))
.scan(....)

我现在的问题更多是关于如何构建扫描初始状态我应该做什么?

scan(new Pair<>("", 0), (first, second) -> first.setId(first.getId() + second.getId()))

最佳答案

所以,如果我理解一切的话,你想要的是:

  • 首先,按id分组
  • 然后按时间窗口分组,并在该时间窗口内对所有 systemCodeTracking.getId()

对于第一部分,您需要groupBy。对于第二部分groupedWithin。但是,它们的工作方式并不相同:第一个将为您提供子流,而第二个将为您提供列表流。

因此,我们必须以不同的方式处理它们。

首先,让我们为您的列表编写一个缩减器:

private SystemCodeTracking reduceList(List<SystemCodeTracking> list) throws Exception {
if (list.isEmpty()) {
throw new Exception();
} else {
SystemCodeTracking building = list.get(0);
building.setId(0L);
list.forEach(next -> building.setId(building.getId() + next.getId()));
return building;
}
}

因此,对于列表中的每个元素,我们都会递增 building.id,以便在遍历整个列表后获取我们想要的值。

现在你只需要做

Source<SystemCodeTracking, SourceQueueWithComplete<SystemCodeTracking>> loggedSource = source
.groupBy(20000, SystemCodeTracking::getEntityName) // group by name
.groupedWithin(100, FiniteDuration.create(10, TimeUnit.SECONDS) // for a given name, group by time window (or by packs of 100)
.filterNot(List::isEmpty) // remove empty elements from the flow (if no element has passed in the last second, to avoid error in reducer)
.map(this::reduceList) // reduce each list to sum the ids
.log("====== doing reduceing ") // log each passing element using akka logger, rather than `System.out.println`
.mergeSubstreams() // merge back all elements with different names

关于java - Akka流滑动窗口通过SourceQueue控制receive到sink,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44316740/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com