gpt4 book ai didi

multithreading - 使用带超时的 Tchan

转载 作者:行者123 更新时间:2023-12-04 11:24:49 24 4
gpt4 key购买 nike

我有一个 TChan 作为线程的输入,它的行为应该是这样的:

如果 sombody 在特定时间内写入 TChan,则应检索内容。如果在指定时间内没有写入任何内容,则应解除阻塞并继续 Nothing .

我对此的尝试是使用 System.Timeout 中的超时功能。像这样:

timeout 1000000 $ atomically $ readTChan pktChannel

这似乎有效,但现在我发现,我有时会丢失数据包(它们被写入 channel ,但在另一侧没有读取。在日志中我得到这个:
2014.063.11.53.43.588365 Pushing Recorded Packet: 2 1439
2014.063.11.53.43.592319 Run into timeout
2014.063.11.53.44.593396 Run into timeout
2014.063.11.53.44.593553 Pushing Recorded Packet: 3 1439
2014.063.11.53.44.597177 Sending Recorded Packet: 3 1439

其中“Pushing Recorded Packet”是来自一个线程的写入,“Sending Recorded Packet”是从发送者线程中的 TChan 读取的。带有 Sending Recorded Packet 2 1439 的行丢失,这表明从 TChan 读取成功。

似乎如果在错误的时间点收到超时, channel 就会丢失数据包。我怀疑 threadKill timeout 内部使用的函数和STM一起玩不好。

这个对吗?有人有另一种不会丢失数据包的解决方案吗?

最佳答案

使用registerDelay , 一个 STM 函数, 来表示 TVar达到超时时。然后您可以使用 orElse函数或 Alternative接线员 <|>在下一个 TChan 之间进行选择值或超时。

import Control.Applicative
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM
import System.Random

-- write random values after a random delay
packetWriter :: Int -> TChan Int -> IO ()
packetWriter maxDelay chan = do
let xs = randomRs (10000 :: Int, maxDelay + 50000) (mkStdGen 24036583)
forM_ xs $ \ x -> do
threadDelay x
atomically $ writeTChan chan x

-- block (retry) until the delay TVar is set to True
fini :: TVar Bool -> STM ()
fini = check <=< readTVar

-- Read the next value from a TChan or timeout
readTChanTimeout :: Int -> TChan a -> IO (Maybe a)
readTChanTimeout timeoutAfter pktChannel = do
delay <- registerDelay timeoutAfter
atomically $
Just <$> readTChan pktChannel
<|> Nothing <$ fini delay

-- | Print packets until a timeout is reached
readLoop :: Show a => Int -> TChan a -> IO ()
readLoop timeoutAfter pktChannel = do
res <- readTChanTimeout timeoutAfter pktChannel
case res of
Nothing -> putStrLn "timeout"
Just val -> do
putStrLn $ "packet: " ++ show val
readLoop timeoutAfter pktChannel

main :: IO ()
main = do
let timeoutAfter = 1000000

-- spin up a packet writer simulation
pktChannel <- newTChanIO
tid <- forkIO $ packetWriter timeoutAfter pktChannel

readLoop timeoutAfter pktChannel

killThread tid

关于multithreading - 使用带超时的 Tchan,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22171895/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com