gpt4 book ai didi

multithreading - 有没有办法让管道从多个来源获取数据而不阻塞其中任何一个?

转载 作者:行者123 更新时间:2023-12-02 03:23:06 24 4
gpt4 key购买 nike

我正在编写一个服务器,其中一个要求是它需要能够向客户端推送数据,而无需客户端直接请求数据。我正在使用管道,但感觉这超出了管道的能力。我遇到的问题是,似乎没有办法判断套接字是否有可用数据,等待将阻止执行,直到有可用数据。假设我有以下功能

getPacket :: Conduit ByteString IO ClientPacket --take a bytestring and yield a ClientPacket i.e. the ByteString deserialized into a sensible form
processPacket :: Conduit ClientPacket IO ServerPacket --take a ClientPacket and yield a ServerPacket i.e. a response to the client's request
putPacket :: Conduit ServerPacket IO ByteString --serialize the ServerPacket

然后我将管道与来自 Conduit.Network 库的源和接收器连接在一起

appSource appData $$ getPacket =$= processPacket =$= putPacket $= appSink appData

现在,我从管道外部引入一个数据源,我想将该数据合并到管道中。例如,如果这是一个聊天服务器,则外部数据将是其他客户端发送的消息。问题是无论我尝试在何处引入这些外部数据,它都会被等待调用阻塞。本质上,我最终会得到如下所示的代码。

yield processOutsideData --deal with the outside data
data <- await --await data from upstream

处理更多外部数据的唯一方法是上游组件产生某些东西,但上游只有在从客户端获取数据时才会产生,这正是我试图避免的。我试过使用多线程和 TChan 来解决这个问题,但似乎 appSource 和 appSink 必须在同一个线程中使用,否则我会从 recv 中得到无效的文件描述符异常(这是有道理的)。

但是,如果套接字源和接收器在同一个线程中运行,我将再次遇到等待阻塞的问题,并且我无法检查套接字中是否有数据可用。在这一点上,我似乎遇到了管道问题。

但我真的很喜欢使用导管,并且愿意继续使用它们。所以我的问题是:有没有办法通过管道实现我想要实现的目标?

最佳答案

Michael Snoyman 的 conduit network examples use concurrency . telnet 客户端示例运行一个线程用于发送输入,另一个线程用于显示接收到的内容。我已经调整它来发送和接收整行

{-# LANGUAGE OverloadedStrings #-}
import Conduit
import Control.Concurrent.Async (concurrently)
import Control.Monad (liftM, void)
import Data.ByteString (ByteString)
import Data.ByteString.Char8 (unpack)
import Data.Conduit.Network
import Data.String (IsString, fromString)
import Network (withSocketsDo)

getLines :: (IsString a, MonadIO m) => Producer m a
getLines = repeatMC . liftM fromString $ liftIO getLine

putLines :: (MonadIO m) => Consumer ByteString m ()
putLines = mapM_C $ liftIO . putStrLn . unpack

main :: IO ()
main = withSocketsDo $
runTCPClient (clientSettings 4000 "localhost") $ \server ->
void $ concurrently
(getLines $$ appSink server)
(appSource server $$ putLines)

我们可以在服务器上做同样的事情。创建一个STM channel ,将接收到的数据写入 channel ,并将 channel 中的数据发送给客户端。这使用 stm-conduit包对 STM channel 的简单包装,sourceTBMChansinkTBMChan

{-# LANGUAGE OverloadedStrings #-}
import Conduit
import Control.Concurrent.Async (concurrently)
import Control.Concurrent.STM.TBMChan (newTBMChan)
import Control.Monad (void)
import Control.Monad.STM (atomically)
import Data.Conduit.Network
import Data.Conduit.TMChan (sourceTBMChan, sinkTBMChan)
import Network (withSocketsDo)

main :: IO ()
main = withSocketsDo $ do
channel <- atomically $ newTBMChan 10
runTCPServer (serverSettings 4000 "*") $ \server ->
void $ concurrently
(appSource server $$ sinkTBMChan channel False)
(sourceTBMChan channel $$ appSink server)

如果我们在只连接一个客户端的情况下运行服务器,它会回显客户端发送的内容。

----------
| a | (sent)
| a | (received)
| b | (sent)
| b | (received)
| c | (sent)
| c | (received)
----------

如果我们在连接多个客户端的情况下运行服务器,则消息会在客户端之间分发,一个客户端获取每条消息。

----------             ----------
| 1 | (sent) | 1 | (received)
| 2 | (sent) | 3 | (received)
| 2 | (received) | |
| 3 | (sent) | |
| | | |
| | | |
---------- ----------

此示例不处理客户端关闭连接时要执行的操作。

关于multithreading - 有没有办法让管道从多个来源获取数据而不阻塞其中任何一个?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31932931/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com