gpt4 book ai didi

scala - 通过 akka 流链接上下文

转载 作者:行者123 更新时间:2023-12-04 10:50:12 24 4
gpt4 key购买 nike

我正在将一些 C# 代码转换为 scala 和 akka 流。

我的 c# 代码如下所示:


Task<Result1> GetPartialResult1Async(Request request) ...
Task<Result2> GetPartialResult2Async(Request request) ...

async Task<Result> GetResultAsync(Request request)
{
var result1 = await GetPartialResult1Async(request);
var result2 = await GetPartialResult2Async(request);
return new Result(request, result1, result2);
}

现在是 akka 流。而不是拥有 Request 的函数到 Task结果,我有从请求到结果的流程。

所以我已经有以下两个流程:

val partialResult1Flow: Flow[Request, Result1, NotUsed] = ...
val partialResult2Flow: Flow[Request, Result2, NotUsed] = ...

但是我看不到如何将它们组合成一个完整的流程,因为通过在第一个流程上调用 via 我们丢失了原始请求,并且通过在第二个流程上调用 via 我们丢失了第一个流程的结果。

所以我创建了一个看起来像这样的 WithState monad:

case class WithState[+TState, +TValue](value: TValue, state: TState) {
def map[TResult](func: TValue => TResult): WithState[TState, TResult] = {
WithState(func(value), state)
}
... bunch more helper functions go here
}

然后我将我的原始流程重写为如下所示:

def partialResult1Flow[TState]: Flow[WithState[TState, Request], WithState[TState, Result1]] = ...
def partialResult2Flow: Flow[WithState[TState, Request], WithState[TState, Result2]] = ...

并像这样使用它们:

val flow = Flow[Request]
.map(x => WithState(x, x))
.via(partialResult1Flow)
.map(x => WithState(x.state, (x.state, x.value))
.via(partialResult2Flow)
.map(x => Result(x.state._1, x.state._2, x.value))

现在这可行,但我当然不能保证将如何使用流量。所以我真的应该让它接受一个状态参数:

def flow[TState] = Flow[WithState[TState, Request]]
.map(x => WithState(x.value, (x.state, x.value)))
.via(partialResult1Flow)
.map(x => WithState(x.state._2, (x.state, x.value))
.via(partialResult2Flow)
.map(x => WithState(Result(x.state._1._2, x.state._2, x.value), x.state._1._1))

现在在这个阶段,我的代码变得非常难以阅读。我可以通过命名函数来清理它,并使用案例类而不是元组等,但基本上这里有很多附带的复杂性,这是很难避免的。

我错过了什么吗?这不是 Akka 流的好用例吗?有没有一些内置的方法可以做到这一点?

最佳答案

我没有任何根本不同的方式来做到这一点,而不是我在问题中描述的。

然而,电流可以显着改善:

第 1 阶段:FlowWithContext

而不是使用自定义 WithState monad,可以使用内置的FlowWithContext .

这样做的好处是您可以在流上使用标准运算符,而无需担心转换 WithState单子(monad)。 Akka 会为您解决这个问题。

所以而不是

def partialResult1Flow[TState]: Flow[WithState[TState, Request], WithState[TState, Result1]] = 
Flow[WithState[TState, Request]].mapAsync(_ mapAsync {doRequest(_)})

我们可以写:

def partialResult1Flow[TState]: FlowWithContext[Request, TState, Result1, TState, NotUsed] = 
FlowWithContext[Request, TState].mapAsync(doRequest(_))

不幸的是,虽然当您不需要更改上下文时 FlowWithContext 很容易编写,但当您需要通过需要将一些当前数据移动到上下文中的流时使用它有点繁琐(如我们的有)。为此,您需要转换为 Flow (使用 asFlow ),然后返回到 FlowWithContext使用 asFlowWithContext .

我发现将整个内容写成 Flow 是最简单的。在这种情况下,并转换为 FlowWithContext在末尾。

例如:

def flow[TState]: FlowWithContext[Request, TState, Result, TState, NotUsed] = 
Flow[(Request, TState)]
.map(x => (x._1, (x._1, x._2)))
.via(partialResult1Flow)
.map(x => (x._2._1, (x._2._1, x._1, x._2._2))
.via(partialResult2Flow)
.map(x => (Result(x._2._1, x._2._2, x._1), x._2._2))
.asFlowWithContext((a: Request, b: TState) => (a,b))(_._2)
.map(_._1)

这更好吗?

在这种特殊情况下,情况可能更糟。在其他情况下,您很少需要更改上下文,它会更好。但是无论哪种方式,我都建议使用它,因为它是内置的,而不是依赖于自定义 monad。

第 2 阶段:通过使用

为了使它对用户更友好,我创建了一个 viaUsing Flow 和 FlowWithContext 的扩展方法:

import akka.stream.{FlowShape, Graph}
import akka.stream.scaladsl.{Flow, FlowWithContext}

object FlowExtensions {
implicit class FlowViaUsingOps[In, Out, Mat](val f: Flow[In, Out, Mat]) extends AnyVal {
def viaUsing[Out2, Using, Mat2](func: Out => Using)(flow: Graph[FlowShape[(Using, Out), (Out2, Out)], Mat2]) : Flow[In, (Out2, Out), Mat] =
f.map(x => (func(x), x)).via(flow)
}

implicit class FlowWithContextViaUsingOps[In, CtxIn, Out, CtxOut, Mat](val f: FlowWithContext[In, CtxIn, Out, CtxOut, Mat]) extends AnyVal {
def viaUsing[Out2, Using, Mat2](func: Out => Using)(flow: Graph[FlowShape[(Using, (Out, CtxOut)), (Out2, (Out, CtxOut))], Mat2]):
FlowWithContext[In, CtxIn, (Out2, Out), CtxOut, Mat] =
f
.asFlow
.map(x => (func(x._1), (x._1, x._2)))
.via(flow)
.asFlowWithContext((a: In, b: CtxIn) => (a,b))(_._2._2)
.map(x => (x._1, x._2._1))
}
}
viaUsing的目的| , 是为 FlowWithContext 创建输入来自当前输出,同时通过上下文传递来保留当前输出。结果是 Flow其输出是嵌套流的输出和原始流的元组。

viaUsing我们的示例简化为:

  def flow[TState]: FlowWithContext[Request, TState, Result, TState, NotUsed] =
FlowWithContext[Request, TState]
.viaUsing(x => x)(partialResult1Flow)
.viaUsing(x => x._2)(partialResult2Flow)
.map(x => Result(x._2._2, x._2._1, x._1))

我认为这是一个重大的改进。我已请求将 viaUsing 添加到 Akka,而不是依赖扩展方法 here .

关于scala - 通过 akka 流链接上下文,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59518343/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com