gpt4 book ai didi

haskell - Haskell 中并发 channel 的严格评估技术

转载 作者:行者123 更新时间:2023-12-02 16:42:00 24 4
gpt4 key购买 nike

我正在摆弄 Haskell 线程,并且遇到了跨 channel 通信延迟评估值的问题。例如,对于 N 个工作线程和 1 个输出线程,工作线程传达未评估的工作,而输出线程最终为它们完成工作。

我在各种文档中阅读了有关此问题的信息,并看到了各种解决方案,但我只找到了一种有效的解决方案,而其余的则无效。下面是一些代码,其中工作线程启动一些可能需要很长时间的计算。我按降序启动线程,以便第一个线程应该花费最长的时间,而后面的线程应该更早完成。

import Control.Concurrent (forkIO)
import Control.Concurrent.Chan -- .Strict
import Control.Concurrent.MVar
import Control.Exception (finally, evaluate)
import Control.Monad (forM_)
import Control.Parallel.Strategies (using, rdeepseq)

main = (>>=) newChan $ (>>=) (newMVar []) . run

run :: Chan (Maybe String) -> MVar [MVar ()] -> IO ()
run logCh statVars = do
logV <- spawn1 readWriteLoop
say "START"
forM_ [18,17..10] $ spawn . busyWork
await
writeChan logCh Nothing -- poison the logger
takeMVar logV
putStrLn "DONE"
where
say mesg = force mesg >>= writeChan logCh . Just

force s = mapM evaluate s -- works
-- force s = return $ s `using` rdeepseq -- no difference
-- force s = return s -- no-op; try this with strict channel

busyWork = say . show . sum . filter odd . enumFromTo 2 . embiggen
embiggen i = i*i*i*i*i

readWriteLoop = readChan logCh >>= writeReadLoop
writeReadLoop Nothing = return ()
writeReadLoop (Just mesg) = putStrLn mesg >> readWriteLoop

spawn1 action = do
v <- newEmptyMVar
forkIO $ action `finally` putMVar v ()
return v

spawn action = do
v <- spawn1 action
modifyMVar statVars $ \vs -> return (v:vs, ())

await = do
vs <- modifyMVar statVars $ \vs -> return ([], vs)
mapM_ takeMVar vs

使用大多数技术,结果按照产生的顺序报告;也就是说,首先运行时间最长的计算。我将此解释为输出线程正在完成所有工作:

-- results in order spawned (longest-running first = broken)
START
892616806655
503999185040
274877906943
144162977343
72313663743
34464808608
15479341055
6484436675
2499999999
DONE

我认为这个问题的答案是严格 channel ,但它们不起作用。我知道字符串的 WHNF 是不够的,因为这只会强制最外面的构造函数(字符串的第一个字符为 nil 或 cons)。 rdeepseq 应该完全评估,但这没有什么区别。我发现唯一有效的方法是将 Control.Exception.evaluate::a -> IO a 映射到字符串中的所有字符。 (请参阅代码中的 force 函数注释,了解几种不同的替代方案。)以下是 Control.Exception.evaluate 的结果:

-- results in order finished (shortest-running first = correct)
START
2499999999
6484436675
15479341055
34464808608
72313663743
144162977343
274877906943
503999185040
892616806655
DONE

那么为什么严格 channel 或rdeepseq不会产生这个结果呢?还有其他技术吗?我是否误解了为什么第一个结果被破坏了?

最佳答案

这里有两个问题。

第一次尝试(使用显式 rnf)不起作用的原因是,通过使用 return,您创建了一个 thunk,当它已被评估,但thunk本身尚未被评估。请注意,evaluate 的类型是 a -> IO a:它返回 IO 中的值这一事实意味着 evaluate 可以强制排序:

return (error "foo")   >> return 1 == return 1
evaluate (error "foo") >> return 1 == error "foo"

结果是这段代码:

force s = evaluate $ s `using` rdeepseq

可以工作(与 mapM_evaluate s 具有相同的行为)。

<小时/>

使用严格 channel 的情况有点棘手,但我相信这是由于严格并发中的错误造成的。昂贵的计算实际上是在工作线程上运行的,但这对您没有多大好处(您可以通过在字符串中隐藏一些异步异常并查看异常出现在哪个线程上来显式检查这一点)。

有什么错误吗?我们看一下严格writeChan的代码:

writeChan :: NFData a => Chan a -> a -> IO ()
writeChan (Chan _read write) val = do
new_hole <- newEmptyMVar
modifyMVar_ write $ \old_hole -> do
putMVar old_hole $! ChItem val new_hole
return new_hole

我们看到在评估 thunk 之前,在 write 上调用了 modifyMVar_。那么操作顺序是:

  1. 已输入writeChan
  2. 我们takeMVar write(阻止任何其他想要写入 channel 的人)
  3. 我们评估昂贵的重击
  4. 我们将昂贵的重击放到 channel 上
  5. 我们putMVar write,解除所有其他线程的阻塞

您不会在 evaluate 变体中看到此行为,因为它们在获取锁之前执行评估。

我会向​​ Don 发送有关此问题的邮件,看看他是否同意这种行为不太理想。

Don 同意这种行为不是最理想的。我们正在开发补丁。

关于haskell - Haskell 中并发 channel 的严格评估技术,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5211827/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com