gpt4 book ai didi

scala - 使用 fs2.Stream 分组事件

转载 作者:行者123 更新时间:2023-12-04 15:38:51 25 4
gpt4 key购买 nike

我有如下事件流:

sealed trait Event

val eventStream: fs2.Stream[IO, Event] = //...

我想对在一分钟内收到的这些事件进行分组(即每分钟从 0 秒到 59 秒)。这对 fs2 来说听起来很简单
val groupedEventsStream = eventStream groupAdjacentBy {event => 
TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis())
}

问题在于 分组功能不纯 .它使用 currentTimeMillis .我可以解决这个问题如下:
stream.evalMap(t => IO(System.currentTimeMillis(), t))
.groupAdjacentBy(t => TimeUnit.MILLISECONDS.toMinutes(t._1))

问题是添加了带有我想避免的元组的笨拙样板。还有其他解决方案吗?

或者,对于这种情况,使用不纯函数可能并没有那么糟糕?

最佳答案

您可以使用 cats.effect.Clock 删除一些样板文件。 :

def groupedEventsStream[A](stream: fs2.Stream[IO, A])
(implicit clock: Clock[IO], eq: Eq[Long]): fs2.Stream[IO, (Long, Chunk[(Long, A)])] =
stream.evalMap(t => clock.realTime(TimeUnit.MINUTES).map((_, t)))
.groupAdjacentBy(_._1)

关于scala - 使用 fs2.Stream 分组事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55071247/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com