gpt4 book ai didi

scala - 如何从递归生成值的流创建 akka-stream 源?

转载 作者:行者123 更新时间:2023-12-03 11:29:49 27 4
gpt4 key购买 nike

我需要遍历一个形状像树的 API。例如,目录结构或讨论线程。它可以通过以下流程建模:

type ItemId = Int
type Data = String
case class Item(data: Data, kids: List[ItemId])

def randomData(): Data = scala.util.Random.alphanumeric.take(2).mkString

// 0 => [1, 9]
// 1 => [10, 19]
// 2 => [20, 29]
// ...
// 9 => [90, 99]
// _ => []
// NB. I don't have access to this function, only the itemFlow.
def nested(id: ItemId): List[ItemId] =
if (id == 0) (1 to 9).toList
else if (1 <= id && id <= 9) ((id * 10) to ((id + 1) * 10 - 1)).toList
else Nil

val itemFlow: Flow[ItemId, Item, NotUsed] =
Flow.fromFunction(id => Item(randomData, nested(id)))

我怎样才能遍历这些数据?我得到了以下工作:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._

import scala.concurrent.Await
import scala.concurrent.duration.Duration

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val loop =
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._

val source = b.add(Flow[Int])
val merge = b.add(Merge[Int](2))
val fetch = b.add(itemFlow)
val bcast = b.add(Broadcast[Item](2))

val kids = b.add(Flow[Item].mapConcat(_.kids))
val data = b.add(Flow[Item].map(_.data))

val buffer = Flow[Int].buffer(100, OverflowStrategy.dropHead)

source ~> merge ~> fetch ~> bcast ~> data
merge <~ buffer <~ kids <~ bcast

FlowShape(source.in, data.out)
}

val flow = Flow.fromGraph(loop)


Await.result(
Source.single(0).via(flow).runWith(Sink.foreach(println)),
Duration.Inf
)

system.terminate()

但是,由于我使用的是带有缓冲区的流,因此 Stream 永远不会完成。

Completes when upstream completes and buffered elements have been drained



Flow.buffer

我读了 Graph cycles, liveness, and deadlocks部分多次,我仍在努力寻找答案。

这将创建一个活锁:
import java.util.concurrent.atomic.AtomicInteger

def unfold[S, E](seed: S, flow: Flow[S, E, NotUsed])(loop: E => List[S]): Source[E, NotUsed] = {
// keep track of how many element flows,
val remaining = new AtomicInteger(1) // 1 = seed

// should be > max loop(x)
val bufferSize = 10000

val (ref, publisher) =
Source.actorRef[S](bufferSize, OverflowStrategy.fail)
.toMat(Sink.asPublisher(true))(Keep.both)
.run()

ref ! seed

Source.fromPublisher(publisher)
.via(flow)
.map{x =>
loop(x).foreach{ c =>
remaining.incrementAndGet()
ref ! c
}
x
}
.takeWhile(_ => remaining.decrementAndGet > 0)
}

编辑:我添加了一个 git repo 来测试您的解决方案 https://github.com/MasseGuillaume/source-unfold

最佳答案

未完成原因

我不认为流永远不会完成的原因是“使用带有缓冲区的流”。实际原因,类似this question , 是与默认参数 eagerClose=False 合并的事实正在等待sourcebuffer在它(合并)完成之前完成。但是缓冲区正在等待合并完成。所以合并正在等待缓冲区,缓冲区正在等待合并。

热切关闭合并

您可以设置 eagerClose=True创建合并时。但不幸的是,使用eager close 可能会导致一些 child ItemId永远不会查询值。

间接解决方案

如果为树的每个级别具体化一个新流,则可以将递归提取到流之外。

您可以使用 itemFlow 构造查询函数。 :

val itemQuery : Iterable[ItemId] => Future[Seq[Data]] = 
(itemIds) => Source.apply(itemIds)
.via(itemFlow)
.runWith(Sink.seq[Data])

此查询函数现在可以包含在递归辅助函数中:
val recQuery : (Iterable[ItemId], Iterable[Data]) => Future[Seq[Data]] = 
(itemIds, currentData) => itemQuery(itemIds) flatMap { allNewData =>
val allNewKids = allNewData.flatMap(_.kids).toSet

if(allNewKids.isEmpty)
Future.successful(currentData ++ allNewData)
else
recQuery(allNewKids, currentData ++ data)
}

创建的流的数量将等于树的最大深度。

不幸的是,因为涉及到Futures,这个递归函数 不是尾递归 如果树太深,可能会导致“堆栈溢出”。

关于scala - 如何从递归生成值的流创建 akka-stream 源?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51579355/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com