gpt4 book ai didi

scala - 如何以编程方式从reactive-rabbit库中终止io.scalac.amqp.Connection

转载 作者:行者123 更新时间:2023-12-01 06:25:42 24 4
gpt4 key购买 nike

我将 akka 流与reactive-rabbit 库结合使用来构建一个脚本,该脚本将一些信息推送到我本地rabbitmq 服务器上的交换。

将信息推送到队列后,我希望程序自行关闭。然而Connection使程序保持事件状态,但我在 Connection 上找不到任何方法或其他如何杀死它的例子。不可避免地,我必须手动终止该进程。

我的代码看起来像这样:

package prototype

import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import io.scalac.amqp.{Message, Connection}

object PopulateTodoQueue extends App {
val connection = Connection()

val message = Message(ByteString("message"))

val source = Source(List(message))
val sink = Sink(connection.publishDirectly(queue = "todo"))

implicit val actorSystem = ActorSystem()
implicit val materializer = FlowMaterializer()

(source to sink).run()

// Quick hack to wait long enough for the message to send
Thread.sleep(1000)
actorSystem.shutdown()
}

这是我的 build.sbt 库依赖项的片段:
"com.typesafe.akka"          %%  "akka-actor"               % "2.3.7",
"com.typesafe.akka" %% "akka-stream-experimental" % "0.11",
"io.scalac" %% "reactive-rabbit" % "0.2.1",

对于这些关闭任务,是否有更好的模式 - 例如您将回调传递给的临时连接?我见过的示例中的所有用例都是针对长时间运行的客户端,这些客户端会一直运行到用户明确杀死它们为止。

提前致谢!

最佳答案

连接特征中有关闭方法

/** Shutdowns underlying connection.
* Publishers and subscribers are terminated and notified via `onError`.
* This method waits for all close operations to complete. */
def shutdown(): Future[Unit]

你可以这样打电话
connection.shutdown()

关于scala - 如何以编程方式从reactive-rabbit库中终止io.scalac.amqp.Connection,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29002677/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com