multithreading - Haskell - 基于 Actor 的可变性

我正在开发一个haskell 网络应用程序,我使用actor 模式来管理多线程。我遇到的一件事是如何存储例如一组客户端套接字/句柄。当然,所有线程都必须可以访问,并且可以在客户端登录/注销时更改。


import Control.Concurrent
import Control.Monad
import Network
import System.IO
import Data.List
import Data.Maybe
import System.Environment
import Control.Exception

newStorage :: (Eq a, Show a) => IO (Chan (String, Maybe (Chan [a]), Maybe a))
newStorage = do
q <- newChan
forkIO $ storage [] q
return q

newHandleStorage :: IO (Chan (String, Maybe (Chan [Handle]), Maybe Handle))
newHandleStorage = newStorage

storage :: (Eq a, Show a) => [a] -> Chan (String, Maybe (Chan [a]), Maybe a) -> IO ()
storage s q = do
let loop = (`storage` q)
(req, reply, d) <- readChan q
print ("processing " ++ show(d))
case req of
"add" -> loop ((fromJust d) : s)
"remove" -> loop (delete (fromJust d) s)
"get" -> do
writeChan (fromJust reply) s
loop s

store s d = writeChan s ("add", Nothing, Just d)
unstore s d = writeChan s ("remove", Nothing, Just d)
request s = do
chan <- newChan
writeChan s ("get", Just chan, Nothing)
readChan chan


  • 这是管理共享可变变量的“好”方式(在 Actor 世界中)吗?
  • 是否已经有这种模式的库? (我已经搜索过,但没有找到)

  • 问候,


    这是一个使用 stm 的快速而肮脏的示例和 pipes-network .这将设置一个简单的服务器,允许客户端连接并增加或减少计数器。它将显示一个非常简单的状态栏,显示所有已连接客户端的当前计数,并在断开连接时从栏中删除客户端计数。


    import Control.Concurrent.STM (STM, atomically)
    import Control.Concurrent.STM.TVar
    import qualified Data.HashMap.Strict as H
    import Data.Foldable (forM_)

    import Control.Concurrent (forkIO, threadDelay)
    import Control.Monad (unless)
    import Control.Monad.Trans.State.Strict
    import qualified Data.ByteString.Char8 as B
    import Control.Proxy
    import Control.Proxy.TCP
    import System.IO

    main = do
    hSetBuffering stdout NoBuffering

    {- These are the internal data structures. They should be an implementation
    detail and you should never expose these references to the
    "business logic" part of the application. -}
    -- I use nRef to keep track of creating fresh Ints (which identify users)
    nRef <- newTVarIO 0 :: IO (TVar Int)
    {- hMap associates every user (i.e. Int) with a counter

    Notice how I've "striped" the hash map by storing STM references to the
    values instead of storing the values directly. This means that I only
    actually write the hashmap when adding or removing users, which reduces
    contention for the hash map.

    Since each user gets their own unique STM reference for their counter,
    modifying counters does not cause contention with other counters or
    contention with the hash map. -}
    hMap <- newTVarIO H.empty :: IO (TVar (H.HashMap Int (TVar Int)))

    {- The following code makes heavy use of Haskell's pure closures. Each
    'let' binding closes over its current environment, which is safe since
    Haskell is pure. -}

    let {- 'getCounters' is the only server-facing command in our STM API. The
    only permitted operation is retrieving the current set of user

    'getCounters' closes over the 'hMap' reference currently in scope so
    that the server never needs to be aware about our internal
    implementation. -}
    getCounters :: STM [Int]
    getCounters = do
    refs <- fmap H.elems (readTVar hMap)
    mapM readTVar refs

    {- 'init' is the only client-facing command in our STM API. It
    initializes the client's entry in the hash map and returns two
    commands: the first command is what the client calls to 'increment'
    their counter and the second command is what the client calls to log
    off and delete
    'delete' command.

    Notice that those two returned commands each close over the client's
    unique STM reference so the client never needs to be aware of how
    exactly 'init' is implemented under the hood. -}
    init :: STM (STM (), STM ())
    init = do
    n <- readTVar nRef
    writeTVar nRef $! n + 1

    ref <- newTVar 0
    modifyTVar' hMap (H.insert n ref)

    let incrementRef :: STM ()
    incrementRef = do
    mRef <- fmap (H.lookup n) (readTVar hMap)
    forM_ mRef $ \ref -> modifyTVar' ref (+ 1)

    deleteRef :: STM ()
    deleteRef = modifyTVar' hMap (H.delete n)

    return (incrementRef, deleteRef)

    {- Now for the actual program logic. Everything past this point only uses
    the approved STM API (i.e. 'getCounters' and 'init'). If I wanted I
    could factor the above approved STM API into a separate module to enforce
    the encapsulation boundary, but I am lazy. -}

    {- Fork a thread which polls the current state of the counters and displays
    it to the console. There is a way to implement this without polling but
    this gets the job done for now.

    Most of what it is doing is just some simple tricks to reuse the same
    console line instead of outputting a stream of lines. Otherwise it
    would be just:

    forkIO $ forever $ do
    ns <- atomically getCounters
    print ns
    forkIO $ (`evalStateT` 0) $ forever $ do
    del <- get
    lift $ do
    putStr (replicate del '\b')
    putStr (replicate del ' ' )
    putStr (replicate del '\b')
    ns <- lift $ atomically getCounters
    let str = show ns
    lift $ putStr str
    put $! length str
    lift $ threadDelay 10000

    {- Fork a thread for each incoming connection, which listens to the client's
    commands and translates them into 'STM' actions -}
    serve HostAny "8080" $ \(socket, _) -> do
    (increment, delete) <- atomically init

    {- Right now, just do the dumb thing and convert all keypresses into
    increment commands, with the exception of the 'q' key, which will
    quit -}
    let handler :: (Proxy p) => () -> Consumer p Char IO ()
    handler () = runIdentityP loop
    loop = do
    c <- request ()
    unless (c == 'q') $ do
    lift $ atomically increment

    {- This uses my 'pipes' library. It basically is a high-level way to

    * Read binary packets from the socket no bigger than 4096 bytes

    * Get the first character from each packet and discard the rest

    * Handle the character using the above 'handler' function -}
    runProxy $ socketReadS 4096 socket >-> mapD B.head >-> handler

    {- The above pipeline finishes either when the socket closes or
    'handler' stops looping because it received a 'q'. Either case means
    that the client is done so we log them out using 'delete'. -}
    atomically delete

    import Control.Monad
    import Control.Proxy
    import Control.Proxy.Safe
    import Control.Proxy.TCP.Safe
    import Data.ByteString.Char8 (pack)
    import System.IO

    main = do
    hSetBuffering stdin NoBuffering
    hSetEcho stdin False

    {- Again, this uses my 'pipes' library. It basically says:

    * Read characters from the console using 'commands'

    * Pack them into a binary format

    * send them to a server running at

    This finishes looping when the user types a 'q' or the connection is
    closed for whatever reason.
    runSafeIO $ runProxy $ runEitherK $
    try . commands
    >-> mapD (\c -> pack [c])
    >-> connectWriteD Nothing "" "8080"

    commands :: (Proxy p) => () -> Producer p Char IO ()
    commands () = runIdentityP loop
    loop = do
    c <- lift getChar
    respond c
    unless (c == 'q') loop

    很简单: commands生成 Char 的流s,然后转换为 ByteString s 然后作为数据包发送到服务器。



    请注意 pipes这些示例的组成部分将在即将发布的 pipes-4.0.0 中大大简化。发布,但当前 pipes生态系统仍然按原样完成工作。

