gpt4 book ai didi

scala - Akka-Streams 收集数据(Source -> Flow -> Flow (collect) -> Sink)

转载 作者:行者123 更新时间:2023-12-01 08:53:47 25 4
gpt4 key购买 nike

我是 Scala 和 Akka 的新手。我有一个简单的 RunnableFlow:

Source -> Flow (do some transformation) -> Sink.runForeach

现在我想要这样的东西:

Source -> Flow1 (do some transformation) -> Flow2 (do some transformation) -> Sink.runForeach

但 Flow2 应该等到 Flow1 中的 100 个元素可用,然后将这 100 个元素转换为一个新元素(需要 Flow1 中的所有 100 个元素)并将这个新元素提供给 Sink。

我做了一些研究,发现 Explicit user defined buffers但我不明白如何从 flow2 中的 flow1 访问所有 100 个元素并对其进行一些转换。有人可以解释一下吗?或者更好地发布一个简单的小例子?还是两者兼有?

最佳答案

Akka 定义集合

如果您不介意使用 akka 确定的集合类型,那么您可以使用 grouped 函数:

//alternative stream formation
val stream = Source(1 to 100).via(Flow[Int].grouped(bufferSize))
.runWith(Sink foreach println)

用户定义的集合

如果您想控制用于缓冲区的集合类型,例如一个 SeqArray:

type MyCollectionType[X] = Array[X]

def emptyMyCollection[X] : MyCollectionType[X] = Array.empty[X]

然后您可以使用两个 Flows 执行此操作。第一个流程执行 scan 以构建一系列元素:

val bufferSize = 10

def appendToMyCollection[X](coll : MyCollectionType[X], i : X) : MyCollectionType[X] =
(if(coll.size < bufferSize) coll else emptyMyCollection[Int]) :+ i

val buffer : Flow[Int, MyCollectionType[Int], _] =
Flow[Int].scan[MyCollectionType[Int]](emptyMyCollection[Int]){
(coll, i) => appendToMyCollection(coll, i)
}

第二个流程是一个过滤器,用于一个大小刚好合适的序列(即“goldiLocks”):

val goldiLocks : Flow[MyCollectionType[Int], MyCollectionType[Int],_] =
Flow[MyCollectionType[Int]].filter(_.size == bufferSize)

这两个 Flow 可以组合起来生成一个 Stream,它将生成所需的集合类型:

val stream = Source(1 to 100).via(buffer)
.via(goldiLocks)
.runWith(Sink foreach println)

关于scala - Akka-Streams 收集数据(Source -> Flow -> Flow (collect) -> Sink),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33280709/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com