- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
使用akka-streams 2.4.17 Scala API,我试图使用Source.groupedWithin(size, duration)
并指定持续时间。根据the documentation和我在source code中看到的内容,如果超过了组大小或超时,则分组应该向下进行;以先到者为准。
当我以模糊模式(非异步)运行简单的工作流时,持续时间似乎没有任何效果。但是,当我在.async
调用之前或之后放置groupedWithin
时,超时有效。
不起作用版本
Source.fromIterator(() => aFiniteIterator)
.map(aLongOperation(_))
.groupedWithin(1000, 5.seconds) // keeps waiting beyond 5 seconds
.map(somethingWithGroup(_))
.runWith(Sink.fold(0)(_ + _))
Source.fromIterator(() => aFiniteIterator)
.map(aLongOperation(_))
.async
.groupedWithin(1000, 5.seconds) // now respects 5 seconds without full batch
.map(somethingWithGroup(_))
.runWith(Sink.fold(0)(_ + _))
case class Foo(id: String, value: String)
object Main {
implicit val system = ActorSystem("akka-streams-oom")
implicit val materializer = ActorMaterializer()
def main(args: Array[String]): Unit = {
println("starting tests...")
val attempt = Try(forceOOM)
attempt match {
case Success(_) => println("all tests passed successfully")
case Failure(e) => println(s"exception: e.getMessage")
}
println("terminating system...")
system.terminate
println("system terminated")
println("done with tests...")
}
private def forceOOM: Unit = {
println("executing forceOOM...")
val sink = Sink.fold[Int, Int](0)(_ + _)
val future =
bigSource
.map(logEmit)
.via(slowSubscriber)
.runWith(sink)
val finalResult = Await.result(future, Duration.Inf)
println(s"forceOOM result: $finalResult")
}
private def bigSource = {
val largeIterator = () =>
Iterator
.from(0,1000000000)
.map(_ => generateLargeFoo)
Source.fromIterator(largeIterator)
}
private def slowSubscriber =
Flow[Foo]
.map { foo =>
println(s"allocating memory for ${foo.id} at ${time}")
Foo(foo.id, bloat)
}
.async // if i remove this, the 5 second window below doesn't seem to work
.groupedWithin(100, 5.seconds)
.map(foldFoos)
private def logEmit(x: Foo): Foo = {
println(s"emitting next record: ${x.id} at ${time}")
x
}
private def foldFoos(x: Seq[Foo]): Int = {
println(s"folding records at ${time}")
x.map(_.value.length).fold(0)(_ + _)
}
private def time: String = LocalDateTime.now.toLocalTime.toString
private def bloat: String = {
(0 to 10)
.map(_ => generateLargeFoo.value)
.fold("")(_ + _)
}
private def generateLargeFoo: Foo = {
Foo(java.util.UUID.randomUUID.toString, (0 to 1000000).mkString)
}
}
[info] emitting next record: 5016fea4-f076-45dd-b95b-1d24f71a25b4 at 09:34:25.826
[info] allocating memory for 5016fea4-f076-45dd-b95b-1d24f71a25b4 at 09:34:25.868
[info] emitting next record: ab6e298b-0152-4af5-b685-bb4ed6c5b9de at 09:34:27.572
[info] allocating memory for ab6e298b-0152-4af5-b685-bb4ed6c5b9de at 09:34:27.572
[info] emitting next record: 6f5c1b75-5aaf-44e6-ac62-a6074735c057 at 09:34:28.957
[info] allocating memory for 6f5c1b75-5aaf-44e6-ac62-a6074735c057 at 09:34:28.958
[info] emitting next record: 313ce2b5-f669-4c59-b2ec-eafdae85ded6 at 09:34:30.378
[info] allocating memory for 313ce2b5-f669-4c59-b2ec-eafdae85ded6 at 09:34:30.378
[info] emitting next record: 91a8a95b-b3cc-4e27-8d3f-3400fa9c7a9f at 09:34:31.802
[info] allocating memory for 91a8a95b-b3cc-4e27-8d3f-3400fa9c7a9f at 09:34:31.802
[info] emitting next record: 0220e75a-029b-4d35-8494-690bed6938aa at 09:34:33.173
[info] allocating memory for 0220e75a-029b-4d35-8494-690bed6938aa at 09:34:33.174
[info] emitting next record: faa16b80-cfb1-4ea4-b3ba-c1d270caf865 at 09:34:34.409
[info] allocating memory for faa16b80-cfb1-4ea4-b3ba-c1d270caf865 at 09:34:34.409
[info] emitting next record: 8956d710-ad55-4dee-b4f3-82b8cf313a85 at 09:34:35.656
[info] allocating memory for 8956d710-ad55-4dee-b4f3-82b8cf313a85 at 09:34:35.656
[info] emitting next record: 1b989c56-6580-44f0-b8d9-46d5241046cc at 09:34:36.944
[info] allocating memory for 1b989c56-6580-44f0-b8d9-46d5241046cc at 09:34:36.945
[info] emitting next record: 66a766c7-29e0-40ca-b997-54985aad75d6 at 09:34:38.272
[info] allocating memory for 66a766c7-29e0-40ca-b997-54985aad75d6 at 09:34:38.272
[info] emitting next record: b8d29dad-bd44-4843-936e-5eb5df3bb594 at 09:34:39.530
[info] allocating memory for b8d29dad-bd44-4843-936e-5eb5df3bb594 at 09:34:39.530
[info] emitting next record: 8c7999cf-7796-427e-a155-c28d7fc4a934 at 09:34:40.987
[info] allocating memory for 8c7999cf-7796-427e-a155-c28d7fc4a934 at 09:34:40.988
[info] emitting next record: eda79635-4559-4c92-a5b7-83bbfc2e85b2 at 09:34:42.382
[info] allocating memory for eda79635-4559-4c92-a5b7-83bbfc2e85b2 at 09:34:42.382
[info] emitting next record: 8fa5d744-70e8-4261-9c3f-427737233e13 at 09:34:43.593
[info] allocating memory for 8fa5d744-70e8-4261-9c3f-427737233e13 at 09:34:43.593
[info] emitting next record: cc621484-c70d-4092-8dc6-2e39acc1f0b3 at 09:34:44.983
[info] allocating memory for cc621484-c70d-4092-8dc6-2e39acc1f0b3 at 09:34:44.983
[info] emitting next record: fbc03c9c-1ea8-4d4d-9a80-13118324140d at 09:34:46.244
[info] allocating memory for fbc03c9c-1ea8-4d4d-9a80-13118324140d at 09:34:46.244
[info] emitting next record: 96374d33-e117-4f48-b3be-79b8cb1e0fda at 09:34:47.953
[info] allocating memory for 96374d33-e117-4f48-b3be-79b8cb1e0fda at 09:34:47.953
[info] emitting next record: 1c210d73-35d3-41b9-ade6-9310783589a3 at 09:34:49.303
[info] allocating memory for 1c210d73-35d3-41b9-ade6-9310783589a3 at 09:34:49.303
[info] emitting next record: 3872c382-17a9-484a-861c-6f66a0c7d0ca at 09:34:50.620
[info] allocating memory for 3872c382-17a9-484a-861c-6f66a0c7d0ca at 09:34:50.620
[info] emitting next record: c34ba954-a9ff-45d1-910c-316c6eb9c85d at 09:34:52.597
[info] allocating memory for c34ba954-a9ff-45d1-910c-316c6eb9c85d at 09:34:52.597
[info] emitting next record: 8e5f804e-5e75-4eac-937f-651d45e3745d at 09:34:54.145
[info] allocating memory for 8e5f804e-5e75-4eac-937f-651d45e3745d at 09:34:54.145
[info] emitting next record: 1caf82cc-7b41-4730-bcc1-ca61ee7780e0 at 09:34:56.454
[info] allocating memory for 1caf82cc-7b41-4730-bcc1-ca61ee7780e0 at 09:34:56.455
[info] emitting next record: 9364d386-408a-4b63-80b5-0ed34473ba45 at 09:34:58.706
[info] allocating memory for 9364d386-408a-4b63-80b5-0ed34473ba45 at 09:34:58.706
[info] emitting next record: c43baaba-961e-4877-9835-7eeee538f0af at 09:35:00.822
[info] allocating memory for c43baaba-961e-4877-9835-7eeee538f0af at 09:35:00.822
[info] #
[info] # java.lang.OutOfMemoryError: Java heap space
[info] # -XX:OnOutOfMemoryError="kill -9 %p"
[info] # Executing "kill -9 96871"...
java.lang.RuntimeException: Nonzero exit code returned from runner: 137
at scala.sys.package$.error(package.scala:27)
[info] emitting next record: 668d6f9f-43cc-45a6-99b3-d8e8ab2b9cae at 09:28:48.188
[info] allocating memory for 668d6f9f-43cc-45a6-99b3-d8e8ab2b9cae at 09:28:48.231
[info] emitting next record: 6c50b3e1-d3ec-422e-b41a-fe3d92df15a9 at 09:28:48.333
[info] emitting next record: 20b659f9-73e1-4c67-b251-2b224eec4d24 at 09:28:48.421
[info] emitting next record: 9af08f07-8246-498b-9f64-b56982cf3536 at 09:28:48.497
[info] emitting next record: 14cdf3b4-d14f-4953-8609-24c7a1996a12 at 09:28:48.569
[info] emitting next record: 571002f3-7301-4afa-8bc9-3fb8a9e84db2 at 09:28:48.665
[info] emitting next record: 5e88a51b-b56c-40fe-84a3-2fcf18b90e3f at 09:28:48.787
[info] emitting next record: e66b29f3-1690-4645-a048-19049e92303a at 09:28:48.846
[info] emitting next record: 66c16074-b200-4808-a990-13abadc66e43 at 09:28:48.943
[info] emitting next record: 1de8caca-fa48-4777-90a7-1449bd6722bb at 09:28:49.003
[info] emitting next record: bc3859b6-94ab-4262-b4cd-fa757e8f3f1f at 09:28:49.064
[info] emitting next record: 988216a7-5944-4aa5-98f6-b36542d8e7a8 at 09:28:49.172
[info] emitting next record: e6ab4ef6-1fd2-471b-8866-2f8422346df5 at 09:28:49.325
[info] emitting next record: c86b3116-70c8-453e-9ddf-bd8d9e144caf at 09:28:49.384
[info] emitting next record: 78c68185-cdd1-4fde-aa39-e03b37b5f449 at 09:28:49.603
[info] emitting next record: 7ed11952-ceba-47f5-9ba4-25d1e9dceea0 at 09:28:49.671
[info] allocating memory for 6c50b3e1-d3ec-422e-b41a-fe3d92df15a9 at 09:28:50.164
[info] allocating memory for 20b659f9-73e1-4c67-b251-2b224eec4d24 at 09:28:51.459
[info] allocating memory for 9af08f07-8246-498b-9f64-b56982cf3536 at 09:28:52.752
[info] folding records at 09:28:53.106
[info] allocating memory for 14cdf3b4-d14f-4953-8609-24c7a1996a12 at 09:28:53.969
[info] allocating memory for 571002f3-7301-4afa-8bc9-3fb8a9e84db2 at 09:28:55.234
[info] allocating memory for 5e88a51b-b56c-40fe-84a3-2fcf18b90e3f at 09:28:56.422
...
最佳答案
我怀疑您正在使用aLongOperation
或其他一些阻止操作来模拟Thread.sleep
。
如果是这种情况,在不强制使用async
边界的情况下,整个图形将共享相同的actor-从而共享相同的线程。阻塞该线程将导致基础调度基础设施匮乏(请参阅docs)。
尝试以非阻塞方式模拟您的长时间操作(例如,使用after模式)。
另请参见以下针对该主题提出的issue。
关于akka - 为什么akka-stream的Source.groupedWithin不考虑持续时间?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42845166/
如果我错了,但身份验证 session 有 30 天的最大限制,请纠正我?如果是这种情况,有没有办法让我的服务器节点应用程序永远监听经过身份验证的 dataRef? 干杯, 旅行。 最佳答案 自 on
我目前正在阅读 book Continuos Delivery由 Humble/Farley 撰写,虽然里面的很多东西都是有道理的,但有一件事让我烦恼: 似乎作者只针对基于服务器的(单客户端?)应用程
好吧,我非常了解每个人对自制密码管理器的看法,但我希望得到帮助。 不用于实际使用,仅供学习。 我想知道,在 C++ 中如何拥有长期变量。或者真的,有什么长期的。 长期是什么意思?在下次运行 .exe
我在文本文件中有以下三行(最后 3 行): } } } 我想做的是做这样的事情: } } blablabla blablabla blabla
在 iOS 中,有没有一种简单的方法可以在每天的同一时间发送 10 天的推送通知?我不想向所有用户发送推送通知。我的应用程序的工作方式是,用户可以选择连续十天推送通知的时间。您有推荐的 API 吗?或
我正在努力寻找一种当前最先进的方法来处理频繁更新的通知(例如每 3 分钟一次)。似乎在较新的 Android 版本中内置了如此多的电源效率调整(幸运的是!),我之前成功使用的方法(使用 Broadca
我不得不在一些糟糕的房地产网站上花费大量时间。我比较精通 CSS,并且可以(在 FireFox 中)“检查元素”并更改 CSS 以隐藏或缩小特定页面的华而不实的元素。但我想将此自定义 CSS 应用于特
目前正在研究如何使用 signalR 在处理文件时向用户呈现文件的进度报告。我正在使用 asp.net MVC 4。通过 Ajax 进行发布/获取时,我可以轻松获取状态更改。 因为我需要上传一个文件(
这个问题在这里已经有了答案: How can I round up the time to the nearest X minutes? (15 个答案) Is there a simple fun
我有一个 php 脚本,我想运行特定的时间(例如 5 分钟),但只能运行一次。对于 cron 作业,这将无限期地运行。还有别的办法吗? 最佳答案 处理这个问题的方法是: 当某些事件触发需要 cron
我弄乱了我的 apache 和 php.ini 文件,我网站的用户仍然提示该网站在很短的时间后或每次他们关闭并打开同一个浏览器时将他们注销。 我正在运行 Apache 和 PHP。 我应该进行哪些设置
如何查询今天的总和需要减去前一天的总和,每天持续一个月。 SELECT COUNT(DISTINCT member_profile.memberProfileNumber) FROM member_p
这个问题在这里已经有了答案: How do I add a delay in a JavaScript loop? (32 个答案) 关闭 8 年前。 我认为这个问题之前一定有人问过,但我找不到其他
用户在我的网站上注册后,我们会向他发送一封确认电子邮件。我想要的是 - 三天内每 24 小时为用户重新发送一次电子邮件。例如: user_table id , name, date_registere
最近我从 Codeigniter 换到了 Laravel,一切都很顺利,除了我遇到了 Session::flash 的问题。 当我创建新用户时,我收到成功消息,但它会持续 2 个请求,即使我没有通过验
如果有人能帮助我解决这个问题,我将非常感激。 我正在尝试针对 CPU 使用率 >= 80% 持续 30 分钟或更长时间创建 Azure 监视器警报 我已附上警报规则条件的屏幕截图。在“评估依据”下,聚
如果有人能帮助我解决这个问题,我将非常感激。 我正在尝试针对 CPU 使用率 >= 80% 持续 30 分钟或更长时间创建 Azure 监视器警报 我已附上警报规则条件的屏幕截图。在“评估依据”下,聚
希望大家平安 1。我的目标 我正在尝试模拟 3 天的真实情况。系统每天只能工作 8 小时。 我的目标是模型运行 8 小时,持续 3 天,以获得足够的数据进行分析。 2。我的问题 我有一个代理预约时间表
我需要在 8 小时内每 5 分钟调用一次函数。问题是它必须是同一天。例如,如果用户在 3/29 晚上 11:59 登录系统,而现在是 3/30 凌晨 12:01,则不应再调用该函数。 我知道如何每
我正在开发一个 React Native 应用程序,该应用程序使用 Firebase 的 Firestore 作为后端。现在,每次收到新消息时,我都会从 Firestore 获取所有消息并更新我的状态
我是一名优秀的程序员,十分优秀!