gpt4 book ai didi

haskell - 如何在 Hedis haskell pubSub 中发布

转载 作者:行者123 更新时间:2023-12-05 03:41:49 28 4
gpt4 key购买 nike

我目前正在学习 Haskell。现在我目前不太擅长函数式编程。我想制作一段代码,从订阅 Redis 中的某个主题获取数据,对其进行一些计算并将其发布到另一个主题。我猜有些语言特定的功能有问题。

我当前的代码:

{-# LANGUAGE OverloadedStrings #-}
module Main where

import Database.Redis
import System.IO

main = do
conn <- connect defaultConnectInfo
runRedis conn $ do
pubSub (subscribe ["commands"]) $ \msg -> do
putStrLn $ "Message from " ++ show (msgChannel msg)
publish "results" "Result of a very interesting calculation"
return mempty

现在我收到错误: • 没有因使用“发布”而产生的 (RedisCtx IO f0) 实例

将发布放在 pubsub 之外将使它工作。但是我想发布一个结果!我无法从文档中得到任何智慧。我缺少什么?

最佳答案

您对 putStrLn 的使用使类型检查器(正确!)推断您的 do block 旨在位于 IO 上下文中,然后对 publish 的调用要求上下文是 RedisCtx 的实例,而 IO 不是。

通常在 Redis 上下文中,解决方案是使用 liftIO 将 IO 操作提升到 Redis 上下文中: IO a -> m a 来自 MonadIO类,如文档中的示例所示:

runRedis conn $ do
set "hello" "hello"
set "world" "world"
helloworld <- multiExec $ do
hello <- get "hello"
world <- get "world"
return $ (,) <$> hello <*> world
<strong>liftIO (print helloworld)</strong>

MonadIO 是一组类型,您可以在其上下文中执行 IO 操作。

但是,在这种情况下,情况正好相反:pubSub 的功能参数返回一个 IO 操作,但是 publish 期望一个RedisCtx 单体。

我不清楚 pubSub 是否允许您在回调中调用 runRedis,就像这样,尽管我认为它应该进行类型检查:

{-# LANGUAGE OverloadedStrings #-}
module Main where

import Database.Redis
import System.IO

main = do
conn <- connect defaultConnectInfo
runRedis conn $ do
pubSub (subscribe ["commands"]) $ \msg -> do
putStrLn $ "Message from " ++ show (msgChannel msg)
<strong>runRedis conn $</strong> publish "results" "Result of a very interesting calculation"
return mempty

根据浏览文档,每个 runRedis 调用都从连接池中获取一个连接,连接池的默认大小为 50;但是,如果没有可用的连接,它会阻塞,所以我担心的是,因为 pubSub 的文档说它是“单线程”,这可能会在等待不会释放的连接时出现死锁因为您处于“嵌套”runRedis 调用中。

我认为我接下来要尝试的是使用更灵活的 pubSubForever API;在hedis test suite有一个将 pubSubForever 与单独的线程一起用于发布和处理订阅事件的示例。

main = do
ctrl <- newPubSubController [("foo", msgHandler)] []
conn <- connect defaultConnectInfo

withAsync (publishThread conn) $ \_pubT -> do
withAsync (handlerThread conn ctrl) $ \_handlerT -> do

void $ hPutStrLn stderr "Press enter to subscribe to bar" >> getLine
void $ addChannels ctrl [("bar", msgHandler)] []
-- …
-- (Add/remove various subscriptions.)
-- …

publishThread 使用runRedis 并调用publish:

publishThread :: Connection -> IO ()
publishThread c = <strong>runRedis</strong> c $ loop (0 :: Int)
where
loop i = do
let msg = encodeUtf8 $ pack $ "Publish iteration " ++ show i
void $ <strong>publish</strong> "foo" ("foo" <> msg)
-- …
liftIO $ threadDelay $ 2*1000*1000
loop (i+1)

handlerThread 使用 pubSubForever:

handlerThread :: Connection -> PubSubController -> IO ()
handlerThread conn ctrl = forever $
<strong>pubSubForever</strong> conn ctrl onInitialComplete
`catch` (\(e :: SomeException) -> do
hPutStrLn stderr $ "Got error: " ++ show e
threadDelay $ 50*1000)

这包含在对 forever 的调用中如果连接丢失,根据 docs for pubSubForever 重新订阅:

[…] if the network connection to Redis dies, pubSubForever will throw a ConnectionLost. When such an exception is thrown, you can recall pubSubForever with the same PubSubController which will open a new connection and resubscribe to all the channels which are tracked in the PubSubController.

此测试使用来自 asyncControl.Concurrent.Async用于管理任务的包,在我看来这是个好主意。如果你想避免这种依赖,你可以使用 forkIO 代替(例如 Chan 或 STM TChan 从处理程序发送事件),唯一的问题是,如果 fork 线程由于异常而终止,这不会自动通知其他线程,而 Async 提供了一些很好的异常安全保证。

关于haskell - 如何在 Hedis haskell pubSub 中发布,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67589410/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com