- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我们正在尝试使用 Akka Streams 和 Alpakka Kafka 来使用服务中的事件流。为了处理事件处理错误,我们使用了 Kafka 自动提交和多个队列。例如,如果我们有主题 user_created
,我们想从产品服务中消费,我们还创建了 user_created_for_products_failed
和 user_created_for_products_dead_letter
.这两个额外的主题与特定的 Kafka 消费者群体相关联。如果一个事件处理失败,它会进入失败的队列,我们在五分钟后尝试再次消费——如果它再次失败,它就会进入死信。
在部署时,我们希望确保不会丢失事件。所以我们试图在停止应用程序之前停止流。正如我所说,我们正在使用自动提交,但尚未处理所有这些“飞行”事件。一旦流和应用程序停止,我们就可以部署新代码并再次启动应用程序。
阅读文档后,我们看到了 KillSwitch
特征。我们在其中看到的问题是 shutdown
方法返回 Unit
相反 Future[Unit]
正如我们所料。我们不确定使用它会不会丢失事件,因为在测试中它看起来太快而无法正常工作。
作为一种解决方法,我们创建了一个 ActorSystem
对于每个流并使用 terminate
方法(返回 Future[Terminate]
)。这个解决方案的问题是我们不认为创建 ActorSystem
每个流都会很好地扩展,并且 terminate
需要很多时间来解决(在测试中最多需要一分钟才能关闭)。
你遇到过这样的问题吗?是否有更快的方法(与 ActorSystem.terminate
相比)来停止流并确保所有事件发生在 Source
已发出已处理?
最佳答案
来自 documentation (强调我的):
When using external offset storage, a call to
Consumer.Control.shutdown()
suffices to complete theSource
, which starts the completion of the stream.
val (consumerControl, streamComplete) =
Consumer
.plainSource(consumerSettings,
Subscriptions.assignmentWithOffset(
new TopicPartition(topic, 0) -> offset
))
.via(businessFlow)
.toMat(Sink.ignore)(Keep.both)
.run()
consumerControl.shutdown()
Consumer.control.shutdown()
返回
Future[Done]
.从它的 Scaladoc 描述:
Shutdown the consumer
Source
. It will wait for outstanding offset commit requests to finish before shutting down.
Consumer.Control.drainAndShutdown
,它也返回一个
Future
.再次来自文档(其中包含有关
drainAndShutdown
在幕后做了什么的更多信息):
val drainingControl =
Consumer
.committableSource(consumerSettings.withStopTimeout(Duration.Zero), Subscriptions.topics(topic))
.mapAsync(1) { msg =>
business(msg.record).map(_ => msg.committableOffset)
}
.toMat(Committer.sink(committerSettings))(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
val streamComplete = drainingControl.drainAndShutdown()
drainAndShutdown
的 Scaladoc 描述:
Stop producing messages from the
Source
, wait for stream completion and shut down the consumerSource
so that all consumed messages reach the end of the stream. Failures in stream completion will be propagated, the source will be shut down anyway.
关于scala - 以编程方式停止 Alpakka Kafka 流的正确方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55046169/
我有一些 Scala 代码,它用两个不同版本的类型参数化函数做了一些漂亮的事情。我已经从我的应用程序中简化了很多,但最后我的代码充满了形式 w(f[Int],f[Double]) 的调用。哪里w()是
如果我在同一目录中有两个单独的未编译的 scala 文件: // hello.scala object hello { def world() = println("hello world") }
val schema = df.schema val x = df.flatMap(r => (0 until schema.length).map { idx => ((idx, r.g
环境: Play 2.3.0/Scala 2.11.1/IntelliJ 13.1 我使用 Typesafe Activator 1.2.1 用 Scala 2.11.1 创建一个新项目。项目创建好后
我只是想知道如何使用我自己的类扩展 Scala 控制台和“脚本”运行程序,以便我可以通过使用实际的 Scala 语言与其通信来实际使用我的代码?我应将 jar 放在哪里,以便无需临时配置即可从每个 S
我已经根据 README.md 文件安装了 ensime,但是,我在低级 ensime-server 缓冲区中出现以下错误: 信息: fatal error :scala.tools.nsc.Miss
我正在阅读《Scala 编程》一书。在书中,它说“一个函数文字被编译成一个类,当在运行时实例化时它是一个函数值”。并且它提到“函数值是对象,因此您可以根据需要将它们存储在变量中”。 所以我尝试检查函数
我有 hello world scala native 应用程序,想对此应用程序运行小型 scala 测试我使用通常的测试命令,但它抛出异常: NativeMain.scala object Nati
有few resources在网络上,在编写与代码模式匹配的 Scala 编译器插件方面很有指导意义,但这些对生成代码(构建符号树)没有帮助。我应该从哪里开始弄清楚如何做到这一点? (如果有比手动构建
我是 Scala 的新手。但是,我用 创建了一个中等大小的程序。斯卡拉 2.9.0 .现在我想使用一个仅适用于 的开源库斯卡拉 2.7.7 . 是吗可能 在我的 Scala 2.9.0 程序中使用这个
有没有办法在 Scala 2.11 中使用 scala-pickling? 我在 sonatype 存储库中尝试了唯一的 scala-pickling_2.11 工件,但它似乎不起作用。我收到消息:
这与命令行编译器选项无关。如何以编程方式获取代码内的 Scala 版本? 或者,Eclipse Scala 插件 v2 在哪里存储 scalac 的路径? 最佳答案 这无需访问 scala-compi
我正在阅读《Scala 编程》一书,并在第 6 章中的类 Rational 实现中遇到了一些问题。 这是我的 Rational 类的初始版本(基于本书) class Rational(numerato
我是 Scala 新手,我正在尝试开发一个使用自定义库的小项目。我在库内创建了一个mysql连接池。这是我的库的build.sbt organization := "com.learn" name :
我正在尝试运行一些 Scala 代码,只是暂时打印出“Hello”,但我希望在 SBT 项目中编译 Scala 代码之前运行 Scala 代码。我发现在 build.sbt 中有以下工作。 compi
Here链接到 maven Scala 插件使用。但没有提到它使用的究竟是什么 Scala 版本。我创建了具有以下配置的 Maven Scala 项目: org.scala-tools
我对 Scala 还很陌生,请多多包涵。我有一堆包裹在一个大数组中的 future 。 future 已经完成了查看几 TB 数据的辛勤工作,在我的应用程序结束时,我想总结上述 future 的所有结
我有一个 scala 宏,它依赖于通过包含其位置的静态字符串指定的任意 xml 文件。 def myMacro(path: String) = macro myMacroImpl def myMacr
这是我的功能: def sumOfSquaresOfOdd(in: Seq[Int]): Int = { in.filter(_%2==1).map(_*_).reduce(_+_) } 为什么我
这个问题在这里已经有了答案: Calculating the difference between two Java date instances (45 个答案) 关闭 5 年前。 所以我有一个这
我是一名优秀的程序员,十分优秀!