- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
有两个输入流,都生成定义为的对象实例
case class ReplayData(timestamp:Long, payload:Any)
流 1
1,有效负载1
1000,有效负载3
信息流 2
400,有效负载2
1500,有效负载4
我想实现重播机制将将元素按每个元素上的时间戳排序向下游推送
它将模拟生产中的实时场景。
此机制需要遵守消息之间的延迟,例如第一条消息发送是有效负载 1(其起点),来自 Stream2 的有效负载 2 应在 400 毫秒后发送(下一条消息时间戳与初始消息时间戳之间的差异),依此类推。
我可以使用 DelayedQueue 轻松做到这一点SO thread 中解释了哪些用法
An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired.
The head of the queue is that Delayed element whose delay expired furthest in the past. If no delay has expired there is no head and poll will return null.
Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements.
For example, the size method returns the count of both expired and unexpired elements. This queue does not permit null elements. does not permit null elements.
我正在尝试弄清楚如何在 Akka 流中做到这一点,但很难找到可以为我解决这个问题的东西。
我正在查看mergeSorted作为合并两个流并根据某些函数对它们进行排序的一种方式。
它似乎或多或少符合基于某些自定义函数进行排序的目的。
我不确定如何处理基于时间戳属性的元素之间的延迟。
使用普通的旧 AKKA,我可以使用调度程序来读取数据,对它们进行排序并安排每个元素在时间过去时发送。
最佳答案
我不记得 akka-streams 中的任何内容可以通过每条消息的自定义延迟来延迟开箱即用的消息。毕竟 akka-streams 背后的想法是响应式(Reactive)编程。我只知道如何解决您的问题的两种选择(假设您已经合并了 2 个源)
Flow.mapAsync - 在这种情况下,在延迟一段时间后返回 Future
完全是你的事。例如:
import java.time.LocalDateTime
import java.util.concurrent.Executors
import akka.NotUsed
import akka.actor.ActorSystem
import akka.pattern.after
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
object Application extends App {
implicit val sys: ActorSystem = ActorSystem()
implicit val mat: ActorMaterializer = ActorMaterializer()
case class SomeEntity(time: Int, value: Int)
val source: Source[SomeEntity, NotUsed] = Source(List(100, 200, 400, 1000, 1100, 1400)).map(i => SomeEntity(i, i * i + 3))
val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
val scheduler = sys.scheduler
val f = source
.mapAsync(10)(se => after(se.time.milliseconds, scheduler)(Future.successful(se))(ec))
.runForeach(se => println(s"${LocalDateTime.now()} -> $se"))
f.onComplete(_ => sys.terminate())
}
您的用例(毕竟是模拟)实际上可能并不那么严格,因此您可能会使用 Flow.throttle 。它不像第一个解决方案那么简单和精确,但它的性能要高得多,因为它使用一些轻量级的桶模型来控制项目输出率。
import java.time.LocalDateTime
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
object Application extends App {
implicit val sys: ActorSystem = ActorSystem()
implicit val mat: ActorMaterializer = ActorMaterializer()
case class SomeEntity(time: Int, value: Int)
val source: Source[SomeEntity, NotUsed] = Source(List(100, 200, 400, 1000, 1100, 1400, 1400, 1500, 1900, 2500, 2700)).map(i => SomeEntity(i, i * i + 3))
val future = source.throttle(cost = 1, per = 1.millisecond, _.time).runForeach(se => {
println(s"${LocalDateTime.now()} -> $se")
})
future.onComplete(_ => sys.terminate())
}
关于java - 重播实时收集的数据以模拟真实的流量延迟和消息排序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56426658/
引用网址 http://hi.baidu.com/quiteuniverse/blog/item/9f3f043d46ad1e07bba16716.html 以下函数调用方式:&nbs
我什至不确定如何描述我正在尝试做的事情,因为我对 cookie 了解不多,但就这样吧。 是否可以使用PHP从浏览器缓存中收集一个cookie(或cookie文件),将其保存到数据库中,然后清除缓存并重
我正在使用 Room(v. 2.2.1)和协程支持(v. 1.3.2)并进行以下设置 @Entity(tableName = "simple_table") data class SimpleEnti
我正在尝试编写一个基于时间运算符收集/累积值的规则。 rule "Zone6 Overlap" when $i1 : Instance ($e1 : event == " Vel : 20.9
我有一个简单的 BST,定义了节点结构: struct node { int key_value; struct node *left; struct node *right; }; ty
我有这个对象: public class MenuPriceByDay implements Serializable { private BigDecimal avgPrice; p
我正在开发一个应用程序,需要访问给定传感器的“最后 5 秒有值(value)的数据”。我的计划是以某种方式存储这些数据,然后当我请求数据时,它将返回最近 5 秒内获得的所有数据。鉴于以下情况,我不确定
在 Ruby 中,您可以对数组使用 map/collect 方法来修改它: a = [ "a", "b", "c", "d" ] a.collect! {|x| x + "!" } a
我即将开始实时收集大量数字数据(对于那些感兴趣的人,各种股票和 future 的出价/要价/最后或“磁带”)。稍后将检索数据以进行分析和模拟。这一点都不难,但我想高效地做到这一点,这会带来很多问题。我
我提出这个问题是为了寻求有关如何设计系统的实用建议。 像 amazon.com 和 pandora 这样的网站拥有并维护着庞大的数据集来运行他们的核心业务。例如,亚马逊(以及所有其他主要电子商务网站)
假设我们有一个数据数组和另一个带索引的数组。 data = [1, 2, 3, 4, 5, 7] index = [5, 1, 4, 0, 2, 3] 我们想从 index 的 data 元素创建一个
好的,我已经阅读了几个关于它的主题,但现在就开始吧。假设我有一个应用程序,基本上我会时不时地点击一个按钮,几分钟内会发生很多事情,然后它可能会再闲置一个小时,或者可能只是 1 分钟。难道不是在整个结束
我有一个数据框,例如 Seq Chrm start end length score 0 A C1 1 50 49 12 1 B
我正在考虑在 Object[] 数组中收集泛型方法的所有方法参数以进行记录。我知道使用方面可以更好地实现这一点,但是我不允许使用它,并且如果可能的话我正在寻找一种基于纯反射的方法 为了澄清, 假设一个
快速提问: 如果 Socket 对象(及其本地缓存的 InputStream 和 OutputStream 对象)超出范围并被垃圾收集,连接是否在 JVM 中保持打开状态? (即,不会在监听服务器上抛
是否有用于收集 facebook 公共(public)数据作为实时提要的 API。我阅读了关于用于收集数据的公共(public)提要 API,但我现在不能申请,而且它不是免费的,还有 Open str
摘要 :我使用自定义收集器收集给定搜索的所有命中的文档 ID(它使用 ID 填充 BitSet)。根据我的需要,搜索和获取文档 ID 的速度非常快,但是当涉及到从磁盘实际获取文档时,事情变得非常缓慢。
我正在寻找一种方法来从自定义 Gradle 插件收集给定项目的所有依赖约束(通过常规 platform 和/或 enforcedPlatform 和/或“手动”强制执行)。 在 Maven 世界中,您
我有一个 CSV 格式的用户列表,但我需要按广告中的名称从每个用户收集 SamAccount 属性。 CSV 模型 脚本 Get-ADObject -Filter 'ObjectClass -eq "
我得到了一个非常大的列表,其中包含大约 200 个带有文本和图像的项目。 ng-repeat 是一种缓慢渲染的方式。它尝试过这个 solution 。效果很好。但不适合重复收集。 我的网络服务返回此:
我是一名优秀的程序员,十分优秀!