gpt4 book ai didi

scala - 如何清理连续 Akka 流中的子流

转载 作者:行者123 更新时间:2023-12-04 12:17:13 25 4
gpt4 key购买 nike

鉴于我有一个很长的事件流,如下所示。当很长时间过去后,将创建大量不再需要的子流。

Is there a way to clean up a specific substream at a given time, for example the substream created by id 3 should be cleaned and the state in the scan method lost at 13Pm (expires property of Wid)?


case class Wid(id: Int, v: String, expires: LocalDateTime)
test("Substream with scan") {
val (pub, sub) = TestSource.probe[Wid]
.groupBy(Int.MaxValue, _.id)
.scan("")((a: String, b: Wid) => a + b.v)
.mergeSubstreams
.toMat(TestSink.probe[String])(Keep.both)
.run()
}

最佳答案

TL;博士 您可以在一段时间后关闭子流。但是,使用输入动态设置内置阶段的时间是另一回事。

关闭子流

要关闭流,您通常会完成它(从上游),但您也可以取消它(从下游)。例如,take(n: Int)流量将取消一次n元素已经通过。

现在,在 groupBy在这种情况下,您无法完成子流,因为所有子流共享上游,但您可以取消它。 How取决于你想把它放在什么条件下。

但是,请注意 groupBy删除已经关闭的子流的输入:如果一个新元素的 ID 为 3来自上游到 groupBy3 -substream 已关闭,它将被简单地忽略并拉入下一个元素。 原因可能是在关闭和重新打开子流之间的过程中可能会丢失某些元素。此外,如果您的流应该运行很长时间,这将影响性能,因为在转发到相关(实时)子流之前,每个元素都将根据关闭的子流列表进行检查。如果您对它的性能不满意,您可能想要实现自己的有状态过滤器(例如,使用布隆过滤器)。

要关闭子流,我通常使用 take (如果你只想要给定数量的元素,但在无限流中可能不是这种情况),或者某种超时:要么 completionTimeout如果您想要从实现到关闭的固定时间或 idleTimeout如果您想在一段时间内没有元素通过时关闭。请注意,这些流不会取消流而是使流失败,因此您必须使用 recover 捕获异常或 recoverWith阶段将失败更改为取消( recoverWith 允许您取消而不发送任何最后一个元素,通过恢复 Source.empty )。

动态设置超时时间

现在你想要的是根据第一个通过的元素动态设置关闭时间。这更复杂,因为流的实现与通过它们的元素无关。事实上,在通常的情况下(没有 groupBy ),流在任何元素通过它们之前被物化,所以使用元素来物化它们是没有意义的。

我在 that question 中遇到了类似的问题,并最终使用了 groupBy 的修改版本有签名

paramGroupBy[K, OO, MM](maxSubstreams: Int, f: Out => K, paramSubflow: K => Flow[Out, OO, MM])

这允许使用定义它的键来定义每个子流。这可以修改为将第一个元素(而不是键)作为参数。

另一种(在您的情况下可能更简单)方法是编写您自己的舞台,它完全符合您的要求:从第一个元素获取结束时间并在那时取消流。这是一个示例实现(我使用了调度程序而不是设置状态):
object CancelAfterTimer

class CancelAfter[T](getTimeout: T => FiniteDuration) extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("CancelAfter.in")
val out = Outlet[T]("CancelAfter.in")
override val shape: FlowShape[T, T] = FlowShape(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (!isTimerActive(CancelAfterTimer))
scheduleOnce(CancelAfterTimer, getTimeout(elem))
push(out, elem)
}

override def onTimer(timerKey: Any): Unit =
completeStage() //this will cancel the upstream and close the downstrean

override def onPull(): Unit = pull(in)

setHandlers(in, out, this)
}
}

关于scala - 如何清理连续 Akka 流中的子流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44016410/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com