gpt4 book ai didi

scala - 如何对来自无限流的传入事件进行分组?

转载 作者:行者123 更新时间:2023-12-04 17:55:25 24 4
gpt4 key购买 nike

我有无限的事件流:

(timestamp, session_uid, traffic)

IE。
...
(1448089943, session-1, 10)
(1448089944, session-1, 20)
(1448089945, session-2, 50)
(1448089946, session-1, 30)
(1448089947, session-2, 10)
(1448089948, session-3, 10)
...

这些事件我想按 session_uid 分组并计算每个 session 的流量总和。

我写了一个 akka-streams使用有限流时可以正常工作的流 groupBy (我的代码基于 this 食谱中的示例)。但是对于无限流,它不起作用,因为 groupBy函数应该处理所有传入的流,然后才准备好返回结果。

我想我应该实现超时分组,即如果我没有收到指定 stream_uid 超过 5 分钟的事件,我应该返回此 session_uid 的分组事件。但是如何实现它使用 akka-streams只要?

最佳答案

我想出了一个有点gnarly解决方案,但我认为它可以完成工作。

基本思想是使用 keepAlive Source 的方法作为将触发完成的计时器。

但是要做到这一点,我们首先必须对数据进行一些抽象。计时器需要从原始源发送触发器或另一个元组值,因此:

sealed trait Data

object TimerTrigger extends Data
case class Value(tstamp : Long, session_uid : String, traffic : Int) extends Data

然后将我们的元组来源转换为值来源。我们仍将使用 groupBy进行类似于您的有限流案例的分组:
val originalSource : Source[(Long, String, Int), Unit] = ???

type IDGroup = (String, Source[Value, Unit]) //uid -> Source of Values for uid

val groupedDataSource : Source[IDGroup, Unit] =
originalSource.map(t => Value(t._1, t._2, t._3))
.groupBy(_.session_uid)

棘手的部分是处理只是元组的分组: (String, Source[Value,Unit]) .如果时间已经过去,我们需要计时器通知我们,因此我们需要另一个抽象来知道我们是否仍在计算或由于超时而完成计算:
sealed trait Sum {
val sum : Int
}
case class StillComputing(val sum : Int) extends Sum
case class ComputedSum(val sum : Int) extends Sum

val zeroSum : Sum = StillComputing(0)

现在我们可以排出每个组的源。 keepAlive将发送 TimerTrigger如果值的来源在 timeOut 之后没有产生任何东西. Data然后将来自 keepAlive 的模式与 TimerTrigger 或来自原始 Source 的新值进行模式匹配:
val evaluateSum : ((Sum , Data)) => Sum = {
case (runningSum, data) => {
data match {
case TimerTrigger => ComputedSum(runningSum.sum)
case v : Value => StillComputing(runningSum.sum + v.traffic)
}
}
}//end val evaluateSum

type SumResult = (String, Future[Int]) // uid -> Future of traffic sum for uid

def handleGroup(timeOut : FiniteDuration)(idGroup : IDGroup) : SumResult =
idGroup._1 -> idGroup._2.keepAlive(timeOut, () => TimerTrigger)
.scan(zeroSum)(evaluateSum)
.collect {case c : ComputedSum => c.sum}
.runWith(Sink.head)

该集合应用于仅匹配完成的总和的部分函数,​​因此只有在计时器触发后才能到达 Sink。

然后我们将此处理程序应用于每个出现的分组:
val timeOut = FiniteDuration(5, MINUTES)

val sumSource : Source[SumResult, Unit] =
groupedDataSource map handleGroup(timeOut)

我们现在有 (String,Future[Int]) 的来源这是 session_uid 和该 ID 的流量总和的 Future。

就像我说的,复杂但符合要求。另外,我不完全确定如果一个 uid 已经被分组并且已经超时,但是一个具有相同 uid 的新值出现了会发生什么。

关于scala - 如何对来自无限流的传入事件进行分组?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33840868/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com