gpt4 book ai didi

java - 重播实时收集的数据以模拟真实的流量延迟和消息排序

转载 作者:行者123 更新时间:2023-11-30 05:36:47 26 4
gpt4 key购买 nike

有两个输入流,都生成定义为的对象实例

case class ReplayData(timestamp:Long, payload:Any)

流 1

1,有效负载1

1000,有效负载3

信息流 2

400,有效负载2

1500,有效负载4

我想实现重播机制将将元素按每个元素上的时间戳排序向下游推送

它将模拟生产中的实时场景。

此机制需要遵守消息之间的延迟,例如第一条消息发送是有效负载 1(其起点),来自 Stream2 的有效负载 2 应在 400 毫秒后发送(下一条消息时间戳与初始消息时间戳之间的差异),依此类推。

我可以使用 DelayedQueue 轻松做到这一点SO thread 中解释了哪些用法

An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired.

The head of the queue is that Delayed element whose delay expired furthest in the past. If no delay has expired there is no head and poll will return null.

Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements.

For example, the size method returns the count of both expired and unexpired elements. This queue does not permit null elements. does not permit null elements.

我正在尝试弄清楚如何在 Akka 流中做到这一点,但很难找到可以为我解决这个问题的东西。

我正在查看mergeSorted作为合并两个流并根据某些函数对它们进行排序的一种方式。

它似乎或多或少符合基于某些自定义函数进行排序的目的。

我不确定如何处理基于时间戳属性的元素之间的延迟

使用普通的旧 AKKA,我可以使用调度程序来读取数据,对它们进行排序并安排每个元素在时间过去时发送。

最佳答案

我不记得 akka-streams 中的任何内容可以通过每条消息的自定义延迟来延迟开箱即用的消息。毕竟 akka-streams 背后的想法是响应式(Reactive)编程。我只知道如何解决您的问题的两种选择(假设您已经合并了 2 个源)

  1. Flow.mapAsync - 在这种情况下,在延迟一段时间后返回 Future 完全是你的事。例如:

    import java.time.LocalDateTime
    import java.util.concurrent.Executors

    import akka.NotUsed
    import akka.actor.ActorSystem
    import akka.pattern.after
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.Source

    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration._
    import scala.concurrent.{ExecutionContext, Future}

    object Application extends App {

    implicit val sys: ActorSystem = ActorSystem()
    implicit val mat: ActorMaterializer = ActorMaterializer()

    case class SomeEntity(time: Int, value: Int)
    val source: Source[SomeEntity, NotUsed] = Source(List(100, 200, 400, 1000, 1100, 1400)).map(i => SomeEntity(i, i * i + 3))

    val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
    val scheduler = sys.scheduler

    val f = source
    .mapAsync(10)(se => after(se.time.milliseconds, scheduler)(Future.successful(se))(ec))
    .runForeach(se => println(s"${LocalDateTime.now()} -> $se"))

    f.onComplete(_ => sys.terminate())
    }
  2. 您的用例(毕竟是模拟)实际上可能并不那么严格,因此您可能会使用 Flow.throttle 。它不像第一个解决方案那么简单和精确,但它的性能要高得多,因为它使用一些轻量级的桶模型来控制项目输出率。

    import java.time.LocalDateTime

    import akka.NotUsed
    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.Source

    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration._

    object Application extends App {

    implicit val sys: ActorSystem = ActorSystem()
    implicit val mat: ActorMaterializer = ActorMaterializer()

    case class SomeEntity(time: Int, value: Int)

    val source: Source[SomeEntity, NotUsed] = Source(List(100, 200, 400, 1000, 1100, 1400, 1400, 1500, 1900, 2500, 2700)).map(i => SomeEntity(i, i * i + 3))


    val future = source.throttle(cost = 1, per = 1.millisecond, _.time).runForeach(se => {
    println(s"${LocalDateTime.now()} -> $se")
    })

    future.onComplete(_ => sys.terminate())
    }

关于java - 重播实时收集的数据以模拟真实的流量延迟和消息排序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56426658/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com