gpt4 book ai didi

networking - 1个处理 channel ,2个同类型IO源

转载 作者:行者123 更新时间:2023-12-03 05:39:52 24 4
gpt4 key购买 nike

在我的使用 stm、网络管道和管道的 GHC Haskell 应用程序中,我为每个套接字都有一个链,它使用 runTCPServer 自动 fork 。链可以通过使用广播 TChan 与其他链进行通信。

这展示了我希望如何设置管道“链”:

enter image description here

因此,我们这里有两个源(每个都绑定(bind)到辅助管道),它们生成一个 Packet 对象,encoder 将接受该对象并将其转换为 ByteString ,然后发送套接字。我在有效(性能是一个问题)融合两个输入方面遇到了很大的困难。

如果有人能指出我正确的方向,我将不胜感激。

<小时/>

由于我在没有尝试的情况下发布这个问题是不礼貌的,所以我将把我之前尝试过的内容放在这里;

我已经编写/精心挑选了一个函数,该函数(阻塞)从 TMChan(可关闭 channel )生成源;

-- | Takes a generic type of STM chan and, given read and close functionality,
-- returns a conduit 'Source' which consumes the elements of the channel.
chanSource
:: (MonadIO m, MonadSTM m)
=> a -- ^ The channel
-> (a -> STM (Maybe b)) -- ^ The read function
-> (a -> STM ()) -- ^ The close/finalizer function
-> Source m b
chanSource ch readCh closeCh = ConduitM pull
where close = liftSTM $ closeCh ch
pull = PipeM $ liftSTM $ readCh ch >>= translate
translate = return . maybe (Done ()) (HaveOutput pull close)

同样,将 Chan 转变为水槽的功能;

-- | Takes a stream and, given write and close functionality, returns a sink
-- which wil consume elements and broadcast them into the channel
chanSink
:: (MonadIO m, MonadSTM m)
=> a -- ^ The channel
-> (a -> b -> STM()) -- ^ The write function
-> (a -> STM()) -- ^ The close/finalizer function
-> Sink b m ()
chanSink ch writeCh closeCh = ConduitM sink
where close = const . liftSTM $ closeCh ch
sink = NeedInput push close
write = liftSTM . writeCh ch
push x = PipeM $ write x >> return sink

那么 mergeSources 就很简单了; fork 2 个线程(我真的不想这样做,但到底是什么)可以将它们的新项目放入一个列表中,然后我生成一个列表;

-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns
-- a source which consumes the elements of the channel.
mergeSources
:: (MonadIO m, MonadBaseControl IO m, MonadSTM m)
=> [Source (ResourceT m) a] -- ^ The list of sources
-> ResourceT m (Source (ResourceT m) a)
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
where push c s = s $$ chanSink c writeTMChan closeTMChan
fsrc x c = mapM_ (\s -> resourceForkIO $ push c s) x
retn c = return $ chanSource c readTMChan closeTMChan

虽然我成功地对这些函数进行了类型检查,但我未能成功地利用这些函数进行类型检查;

-- | Helper which represents a conduit chain for each client connection
serverApp :: Application SessionIO
serverApp appdata = do
use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast
-- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata
mergsrc $$ protocol $= encoder =$ appSink appdata
where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan
mergsrc = mergeSources [appSource appdata $= decoder, chansrc]

-- | Structure which holds mutable information for clients
data SessionState = SessionState
{ _ssBroadcast :: TMChan Packet -- ^ Outbound packet broadcast channel
}

makeLenses ''SessionState

-- | A transformer encompassing both SessionReader and SessionState
type Session m = ReaderT SessionReader (StateT SessionState m)

-- | Macro providing Session applied to an IO monad
type SessionIO = Session IO

无论如何,我认为这种方法是有缺陷的——有很多中间列表和转换。这对性能不利。寻求指导。

<小时/>PS。据我所知,这不是重复的; Fusing conduits with multiple inputs ,因为在我的情况下,两个源都生成相同的类型,并且我不关心从哪个源生成 Packet 对象,只要我不等待一个源,而另一个源已准备好对象已消耗。

PPS。对于示例代码中 Lens 的使用(以及因此对知识的要求),我深表歉意。

最佳答案

我不知道这是否有任何帮助,但我尝试实现 Iain 的建议,并制作了 mergeSources' 的变体,一旦任何 channel 停止,它就会停止:

mergeSources' :: (MonadIO m, MonadBaseControl IO m)
=> [Source (ResourceT m) a] -- ^ The sources to merge.
-> Int -- ^ The bound of the intermediate channel.
-> ResourceT m (Source (ResourceT m) a)
mergeSources' sx bound = do
c <- liftSTM $ newTBMChan bound
mapM_ (\s -> resourceForkIO $
s $$ chanSink c writeTBMChan closeTBMChan) sx
return $ sourceTBMChan c

(这个简单的添加可用 here )。

对您的 mergeSources 版本的一些评论(持保留态度,可能是我不太理解某些内容):

  • 使用 ...TMChan 而不是 ...TBMChan 似乎很危险。如果编写者比读者快,你的堆就会崩溃。从你的图表来看,如果你的 TCP 对等方读取数据的速度不够快,这似乎很容易发生。因此,我肯定会使用 ...TBMChan ,其范围可能很大但有限。
  • 您不需要 MonadSTM m 约束。所有 STM 内容都被包装到 IO

    liftSTM = liftIO . atomically

    也许这会对您在 serverApp 中使用 mergeSources' 时有所帮助。

  • 我发现这只是一个外观问题

    liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn

    由于在 (->) r monad 上使用了 liftA2,因此非常难以阅读。我想说

    do
    c <- liftSTM newTMChan
    fsrc sx c
    retn c

    会更长,但更容易阅读。

您能否创建一个可以使用 serverApp 的独立项目?

关于networking - 1个处理 channel ,2个同类型IO源,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16757060/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com