gpt4 book ai didi

scala - groupBy 的子流可以依赖于它们生成的键吗?

转载 作者:行者123 更新时间:2023-12-04 15:40:22 24 4
gpt4 key购买 nike

我有一个与用户相关的数据流。我也有每个用户的状态,我可以从数据库异步获取。

我想将我的流与每个用户一个子流分开,并在实现子流时为每个用户加载状态,以便可以根据此状态处理子流的元素。

如果我不想合并下游的子流,我可以用 groupBy 做一些事情和 Sink.lazyInit :

def getState(userId: UserId): Future[UserState] = ...
def getUserId(element: Element): UserId = ...
def treatUser(state: UserState): Sink[Element, _] = ...

val treatByUser: Sink[Element] = Flow[Element].groupBy(
Int.MaxValue,
getUserId
).to(
Sink.lazyInit(
elt => getState(getUserId(elt)).map(treatUser),
??? // this is never called, since the subflow is created when an element comes
)
)

但是,如果 treatUser,这不起作用变成 Flow ,因为 Sink.lazyInit 没有等价物.

由于 groupBy 的子流仅在推送新元素时才具体化,应该可以使用此元素来具体化子流,但我无法调整 groupBy 的源代码,以便此工作始终如一。同样, Sink.lazyInit似乎不容易翻译成 Flow案件。

关于如何解决这个问题的任何想法?

最佳答案

你要看的相关Akka issue是#20129: add Sink.dynamic and Flow.dynamic .

在相关的 PR #20579他们实际上实现了 LazySink东西。

他们正计划做LazyFlow下一个:

Will do next lazyFlow with similar signature.



不幸的是,您必须等待该功能在 Akka 中实现或自己编写(然后考虑对 Akka 进行 PR)。

关于scala - groupBy 的子流可以依赖于它们生成的键吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42862590/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com