gpt4 book ai didi

scala - 以编程方式停止 Alpakka Kafka 流的正确方法

转载 作者:行者123 更新时间:2023-12-04 15:17:25 31 4
gpt4 key购买 nike

我们正在尝试使用 Akka Streams 和 Alpakka Kafka 来使用服务中的事件流。为了处理事件处理错误,我们使用了 Kafka 自动提交和多个队列。例如,如果我们有主题 user_created ,我们想从产品服务中消费,我们还创建了 user_created_for_products_faileduser_created_for_products_dead_letter .这两个额外的主题与特定的 Kafka 消费者群体相关联。如果一个事件处理失败,它会进入失败的队列,我们​​在五分钟后尝试再次消费——如果它再次失败,它就会进入死信。

在部署时,我们希望确保不会丢失事件。所以我们试图在停止应用程序之前停止流。正如我所说,我们正在使用自动提交,但尚未处理所有这些“飞行”事件。一旦流和应用程序停止,我们就可以部署新代码并再次启动应用程序。

阅读文档后,我们看到了 KillSwitch 特征。我们在其中看到的问题是 shutdown方法返回 Unit相反 Future[Unit]正如我们所料。我们不确定使用它会不会丢失事件,因为在测试中它看起来太快而无法正常工作。

作为一种解决方法,我们创建了一个 ActorSystem对于每个流并使用 terminate方法(返回 Future[Terminate] )。这个解决方案的问题是我们不认为创建 ActorSystem每个流都会很好地扩展,并且 terminate需要很多时间来解决(在测试中最多需要一分钟才能关闭)。

你遇到过这样的问题吗?是否有更快的方法(与 ActorSystem.terminate 相比)来停止流并确保所有事件发生在 Source已发出已处理?

最佳答案

来自 documentation (强调我的):

When using external offset storage, a call to Consumer.Control.shutdown() suffices to complete the Source, which starts the completion of the stream.


val (consumerControl, streamComplete) =
Consumer
.plainSource(consumerSettings,
Subscriptions.assignmentWithOffset(
new TopicPartition(topic, 0) -> offset
))
.via(businessFlow)
.toMat(Sink.ignore)(Keep.both)
.run()

consumerControl.shutdown()
Consumer.control.shutdown()返回 Future[Done] .从它的 Scaladoc 描述:

Shutdown the consumer Source. It will wait for outstanding offset commit requests to finish before shutting down.



或者,如果您使用的是 Kafka 中的偏移存储 , 使用 Consumer.Control.drainAndShutdown ,它也返回一个 Future .再次来自文档(其中包含有关 drainAndShutdown 在幕后做了什么的更多信息):
val drainingControl =
Consumer
.committableSource(consumerSettings.withStopTimeout(Duration.Zero), Subscriptions.topics(topic))
.mapAsync(1) { msg =>
business(msg.record).map(_ => msg.committableOffset)
}
.toMat(Committer.sink(committerSettings))(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()

val streamComplete = drainingControl.drainAndShutdown()
drainAndShutdown 的 Scaladoc 描述:

Stop producing messages from the Source, wait for stream completion and shut down the consumer Source so that all consumed messages reach the end of the stream. Failures in stream completion will be propagated, the source will be shut down anyway.

关于scala - 以编程方式停止 Alpakka Kafka 流的正确方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55046169/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com