gpt4 book ai didi

scala - 发布/订阅中的 Redis 插件阻塞

转载 作者:IT王子 更新时间:2023-10-29 06:04:07 27 4
gpt4 key购买 nike

我正在尝试使用 Typesafe Redis Play plugin 连接到 Redis 以进行发布-订阅.

我有以下测试场景,由一个每秒生成消息的 actor 组成:

  // Initialization happens in Application.scala,
private lazy val fakeStreamActor = Akka.system.actorOf(Props[FakeStreamActor])

val actorPut = Akka.system.scheduler.schedule(
Duration(1000, MILLISECONDS),
Duration(1000, MILLISECONDS),
fakeStreamActor,
Put("This is a sample message"))

Actor 来源:

class FakeStreamActor extends Actor {
implicit val timeout = Timeout(1, SECONDS)

val CHANNEL = "channel1"
val plugin = Play.application.plugin(classOf[RedisPlugin]).get
val listener = new MyListener()

val pool = plugin.sedisPool

pool.withJedisClient{ client =>
client.subscribe(listener, CHANNEL)
}

def receive = {

case Put(msg: String) => {
//send data to Redis
Logger.info("Push %s".format(msg))
pool.withJedisClient { client =>
client.publish(CHANNEL, msg)
}

}
}
}

/** Messages */
case class Put(msg: String)

还有一个订阅监听器:

case class MyListener() extends JedisPubSub {
def onMessage(channel: String, message: String): Unit = {
Logger.info("onMessage[%s, %s]".format(channel, message))
}

def onSubscribe(channel: String, subscribedChannels: Int): Unit = {
Logger.info("onSubscribe[%s, %d]".format(channel, subscribedChannels))
}

def onUnsubscribe(channel: String, subscribedChannels: Int): Unit = {
Logger.info("onUnsubscribe[%s, %d]".format(channel, subscribedChannels))
}

def onPSubscribe(pattern: String, subscribedChannels: Int): Unit = {
Logger.info("onPSubscribe[%s, %d]".format(pattern, subscribedChannels))
}

def onPUnsubscribe(pattern: String, subscribedChannels: Int): Unit = {
Logger.info("onPUnsubscribe[%s, %d]".format(pattern, subscribedChannels))
}

def onPMessage(pattern: String, channel: String, message: String): Unit = {
Logger.info("onPMessage[%s, %s, %s]".format(pattern, channel, message))
}
}

现在,理想情况下我应该订阅定义的 channel 并在日志中查看Listener 每秒如何处理接收到的消息。但这并没有发生,因为订阅操作会锁定线程。

我的问题是:

有什么方法可以利用 Play 的异步特性来实现非阻塞订阅?

最佳答案

是的。这就是我在 Global.scala 中的做法:

Akka.future { 
val j = new RedisPlugin(app).jedisPool.getResource
j.subscribe(PubSub, "*")
}

我在实例化插件时遇到了问题,但您基本上将 withJedisClient 位放在了 future block 中。

感谢您向我展示如何在 Scala 中实例化插件!

关于scala - 发布/订阅中的 Redis 插件阻塞,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14968660/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com