gpt4 book ai didi

scala - 如何透明地将输入元素与输出元素相关联

转载 作者:行者123 更新时间:2023-12-01 04:31:48 25 4
gpt4 key购买 nike

我得到一个 Flow<A, B> (这是一个花哨的流/图形,见 https://doc.akka.io/api/akka/current/akka/stream/scaladsl/Flow.html )来自一些我无法控制的外部代码。我需要包装该流程并对每个输入元素和每个输出元素进行一些处理。我可以通过放置 BidiFlow 轻松实现这一点。像这样:

Flow<I, O, Unused> flow = ...; // external source
BidiFlow<I, I, O, O, Unused> bidi = BidiFlow.fromFunctions(i -> preprocess(i), o -> postprocess(o)); // do something on every input and every output
Flow<I, O, Unused> newFlow = bidi.join(flow);

所以这里有一个转折点:正确地对输出元素进行后处理 o ,我需要生成该输出元素的输入。由于我无法控制底层流,因此我无法重构它以返回,例如,输入和输出的元组。而且由于 Akka 的异步和并行特性,我不能做任何技巧,例如将输入存储在线程本地或静态字段或类似的东西上。

所以我的问题是:我可以应用一些 Akka Streams 魔法以某种方式获取生成输出的输入元素吗?

最佳答案

这是使用 GraphDsl、Broadcast 和 Zip 阶段的解决方案。

  val externalFlow: Flow[Int, String, NotUsed] = Flow[Int].map(i => i.toString + "-external")

def zipInAndOut[I, O](flow: Flow[I, O, NotUsed]): Flow[I, (I, O), NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[I](2))
val zip = b.add(Zip[I, O])
val theFlow = b.add(flow)
broadcast.out(0) ~> zip.in0
broadcast.out(1) ~> theFlow ~> zip.in1
new FlowShape(broadcast.in, zip.out)
})
}
Source
.fromIterator(() => (1 until 10).iterator)
.via(zipInAndOut(externalFlow))
.runWith(Sink.foreach(println))

打印
(1,1-external)
(2,2-external)
(3,3-external)
(4,4-external)
(5,5-external)
(6,6-external)
(7,7-external)
(8,8-external)
(9,9-external)

关于scala - 如何透明地将输入元素与输出元素相关联,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53538005/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com