gpt4 book ai didi

scala - 如何使用 FS2 中的分类器函数对对象进行分组?

转载 作者:行者123 更新时间:2023-12-02 02:49:48 36 4
gpt4 key购买 nike

我有一个无序的测量流,我想将其分组为固定大小的批处理,以便以后可以有效地保留它们:

val measurements = for {
id <- Seq("foo", "bar", "baz")
value <- 1 to 5
} yield (id, value)

fs2.Stream.emits(scala.util.Random.shuffle(measurements)).toVector

也就是说,而不是:

(bar,4)
(foo,5)
(baz,3)
(baz,5)
(baz,4)
(foo,2)
(bar,2)
(foo,4)
(baz,1)
(foo,1)
(foo,3)
(bar,1)
(bar,5)
(bar,3)
(baz,2)

我希望批量大小等于 3 具有以下结构:

(bar,[4,2,1])
(foo,[5,2,4])
(baz,[3,5,4])
(baz,[1,2])
(foo,[1,3])
(bar,[5,3])

在 FS2 中是否有一种简单、惯用的方法可以实现此目的?我知道有一个groupAdjacentBy函数,但这只会考虑相邻的项目。

我现在使用的是 0.10.5

最佳答案

这可以通过 fs2 Pull 来实现:

import cats.data.{NonEmptyList => Nel}
import fs2._

object GroupingByKey {
def groupByKey[F[_], K, V](limit: Int): Pipe[F, (K, V), (K, Nel[V])] = {
require(limit >= 1)

def go(state: Map[K, List[V]]): Stream[F, (K, V)] => Pull[F, (K, Nel[V]), Unit] = _.pull.uncons1.flatMap {
case Some(((key, num), tail)) =>
val prev = state.getOrElse(key, Nil)
if (prev.size == limit - 1) {
val group = Nel.ofInitLast(prev.reverse, num)
Pull.output1(key -> group) >> go(state - key)(tail)
} else {
go(state.updated(key, num :: prev))(tail)
}
case None =>
val chunk = Chunk.vector {
state
.toVector
.collect { case (key, last :: revInit) =>
val group = Nel.ofInitLast(revInit.reverse, last)
key -> group
}
}
Pull.output(chunk) >> Pull.done
}

go(Map.empty)(_).stream
}
}

用法:

import cats.data.{NonEmptyList => Nel}
import cats.implicits._
import cats.effect.{ExitCode, IO, IOApp}
import fs2._

object Answer extends IOApp {
type Key = String

override def run(args: List[String]): IO[ExitCode] = {
require {
Stream('a -> 1).through(groupByKey(2)).compile.toList ==
List('a -> Nel.one(1))
}

require {
Stream('a -> 1, 'a -> 2).through(groupByKey(2)).compile.toList ==
List('a -> Nel.of(1, 2))
}

require {
Stream('a -> 1, 'a -> 2, 'a -> 3).through(groupByKey(2)).compile.toList ==
List('a -> Nel.of(1, 2), 'a -> Nel.one(3))
}

val infinite = (for {
prng <- Stream.eval(IO { new scala.util.Random() })
keys <- Stream(Vector[Key]("a", "b", "c", "d", "e", "f", "g"))
key = Stream.eval(IO {
val i = prng.nextInt(keys.size)
keys(i)
})
num = Stream.eval(IO { 1 + prng.nextInt(9) })
} yield (key zip num).repeat).flatten

infinite
.through(groupByKey(3))
.showLinesStdOut
.compile
.drain
.as(ExitCode.Success)
}
}

关于scala - 如何使用 FS2 中的分类器函数对对象进行分组?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51003257/

36 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com