gpt4 book ai didi

haskell - 共享可变状态: when to use IORefs

转载 作者:行者123 更新时间:2023-12-02 15:58:30 27 4
gpt4 key购买 nike

我有一个写入 Map 和 PSQ 的主线程。在 Map 和 PSQ 中,我使用相同的键,以便通过查看 PSQ,可以以 O(1) 复杂度找到具有最小优先级的条目,并将其映射到 Map 中的值。

现在,虽然我的主线程在需要时添加/修改了 Map 和 PSQ,但我还有第二个线程不断(forever $ do)查看 PSQ 以确定最旧的 key 何时出现是 N 毫秒前,然后应该刷新它。

要实现这一点,两个线程都需要查看相同的可变数据。维持状态的最佳方法是什么? IOREfs 会遇到这种情况吗?还有什么可能的方法来解决这个问题?

“一些”预 Alpha 代码在这里:

import Data.Time
import Data.Functor
import Data.Time.Clock.POSIX
import qualified Data.PSQueue as PSQ
import qualified Data.Map as Map
import Data.Maybe
import Control.Concurrent
import Control.Concurrent.MVar
import Control.Monad
import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString
import qualified Data.ByteString.Char8 as B

--PSQ = (host, PID) POSIXTime
--where the tuple is k and POSIXTime is p

--Map is (host, PortNumber) [messages]
--where the tuple is the key and [messages] is a list of messages

key = ("192.168.1.1", 4711)
messages = ["aaa", "bbbb", "ccccc"]

newRq :: IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
newRq = do
time <- getPOSIXTime
let q = PSQ.singleton key time
let m = Map.singleton key messages
return (q, m)

appendMsg :: String -> (String, Integer) -> Map.Map (String, Integer) [String] -> Map.Map (String, Integer) [String]
appendMsg newmsgs (host, port) m =
let Just messages' = Map.lookup (host,port) m
l = length . concat $ messages'
l' = l + length newmsgs
in
if l' < 1400 then Map.adjust (++ [newmsgs]) (host, port) m else m

insertNewRec :: (String, Integer) -> [String] -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
insertNewRec (a,b) c q m = do
time <- getPOSIXTime
let q1 = PSQ.insert (a,b) time q
let m1 = Map.insert (a,b) c m
return (q1, m1)

sendq :: Socket -> B.ByteString -> String -> PortNumber -> IO ()
sendq s datastring host port = do
hostAddr <- inet_addr host
sendAllTo s datastring (SockAddrInet port hostAddr)
return ()

deleteRec :: (String, Integer) -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
deleteRec (host, port) q m = (q', m')
where
m' = Map.delete (host, port) m
q' = PSQ.delete (host, port) q

loopMyQ q m1 done = forever $ do
let Just m = PSQ.findMin q
let time = (PSQ.prio m) + 0.200 --adds 200ms
now <- getPOSIXTime
if now < time
then print (m1)
--here eventually I would call the send function to flush the queue
else putMVar done ()

sendrecv :: Socket -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> String -> IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
sendrecv s q1 m1 msg = do
let m2 = appendMsg msg key m1
(q3, m3) = case m2 of
val | m2 == m1 -> deleteRec key q1 m1
| otherwise -> (q1, m2)
(q5, m5) <- if (m2 == m1) then (do (q4, m4) <- insertNewRec key (words msg) q3 m3
return (q4, m4)) else return (q1, m2)
when (m2 == m1) (let Just messages = Map.lookup ("192.168.1.1", 4711) m1 in sendq s (B.pack $ unwords messages) "192.168.1.1" 4711)
return (q5, m5)

--main :: IO()
main = withSocketsDo $ do
s <- socket AF_INET Datagram defaultProtocol
(q1, m1) <- newRq
done <- newEmptyMVar
forkIO $ loopMyQ q1 m1 done
(q', m') <- foldM (\(q, m) _ -> sendrecv s q m "ping") (q1, m1) [1..1000]
takeMVar done
--print ("longer than 200ms ago")

最佳答案

您很可能想使用MVarsTVars保持线程间的一致状态。 IORef 不是线程安全的。

我建议使用 STM(和 TVar)来解决此问题。您正在处理对多个数据结构的并发访问,并且 STM 的可组合性比必须考虑 MVar 的锁定顺序更容易处理。

查看您的代码后,TVar 似乎是您最好的选择。将您的 PSQ 和 Map 包装在两个不同的 TVar 中。将需要两者一致 View 的所有代码包装在原子事务中。在大多数情况下,您的代码将“正常工作”。但是,如果存在锁争用,原子 block 将被重试,直到它起作用为止。

关于haskell - 共享可变状态: when to use IORefs,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9369205/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com