gpt4 book ai didi

multithreading - 简单的多线程Haskell占用大量内存

转载 作者:行者123 更新时间:2023-12-04 15:44:05 28 4
gpt4 key购买 nike

我有一个相对简单的“复制”程序,该程序仅将一个文件的所有行复制到另一个文件。我在使用Haskell的TMQueueSTM并发支持,所以我想我可以这样尝试:

{-# LANGUAGE BangPatterns #-}

module Main where

import Control.Applicative
import Control.Concurrent.Async -- from async
import Control.Concurrent.Chan
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TMQueue -- from stm-chans
import Control.Monad (replicateM, forM_, forever, unless)
import qualified Data.ByteString.Char8 as B
import Data.Function (fix)
import Data.Maybe (catMaybes, maybe)
import System.IO (withFile, IOMode(..), hPutStrLn, hGetLine)
import System.IO.Error (catchIOError)

input = "data.dat"
output = "out.dat"
batch = 100 :: Int

consumer :: TMQueue B.ByteString -> IO ()
consumer q = withFile output WriteMode $ \fh -> fix $ \loop -> do
!items <- catMaybes <$> replicateM batch readitem
forM_ items $ B.hPutStrLn fh
unless (length items < batch) loop
where
readitem = do
!item <- atomically $ readTMQueue q
return item

producer :: TMQueue B.ByteString -> IO ()
producer q = withFile input ReadMode $ \fh ->
(forever (B.hGetLine fh >>= atomically . writeTMQueue q))
`catchIOError` const (atomically (closeTMQueue q) >> putStrLn "Done")

main :: IO ()
main = do
q <- atomically newTMQueue
thread <- async $ consumer q
producer q
wait thread

我可以像这样制作一些测试输入文件
ghc -e 'writeFile "data.dat" (unlines (map show [1..5000000]))'

并像这样构建它
ghc --make QueueTest.hs -O2 -prof -auto-all -caf-all -threaded -rtsopts -o q

当我像 ./q +RTS -s -prof -hc -L60 -N2这样运行它时,它说“正在使用的总内存为2117 MB”!但是输入文件只有38 MB!

我是剖析的新手,但是我已经生成了一个又一个的图,并且无法查明我的错误。

最佳答案

正如OP指出的那样,到目前为止,我还是可以写一个真实的答案。让我们从内存消耗开始。

有用的两个引用是Memory footprint of Haskell data typeshttp://blog.johantibell.com/2011/06/memory-footprints-of-some-common-data.html。我们还需要查看一些结构的定义。

-- from http://hackage.haskell.org/package/stm-chans-3.0.0.2/docs/src/Control-Concurrent-STM-TMQueue.html

data TMQueue a = TMQueue
{-# UNPACK #-} !(TVar Bool)
{-# UNPACK #-} !(TQueue a)
deriving Typeable


-- from http://hackage.haskell.org/package/stm-2.4.3/docs/src/Control-Concurrent-STM-TQueue.html

-- | 'TQueue' is an abstract type representing an unbounded FIFO channel.
data TQueue a = TQueue {-# UNPACK #-} !(TVar [a])
{-# UNPACK #-} !(TVar [a])
TQueue实现使用具有读取端和写入端的标准功能队列。

让我们为内存使用量设置一个上限,并假设我们在使用者执行任何操作之前将整个文件读入 TMQueue。在这种情况下,我们的TQueue的写端将包含一个列表,每输入行包含一个元素(存储为字节串)。每个列表节点看起来像
(:) bytestring tail

这需要3个字(每个字段1个+构造函数1个)。每个字节串是9个字,因此将这两个字加在一起,每行就有12个字的开销 ,不包括实际数据。您的测试数据为500万行,因此整个文件的开销为6000万个字(加上一些常量),在64位系统上约为460MB(假设我做得对,总是很可疑)。添加40MB的实际数据,我们得到的值与我在系统上看到的值非常接近。

那么,为什么我们的内存使用量接近这个上限?我有一个理论(作为练习剩下的调查工作!)。首先,生产者的运行速度可能比消费者更快,这仅仅是因为读取通常比写入快(我正在使用旋转磁盘,也许SSD会有所不同)。这是readTQueue的定义:
-- |Read the next value from the 'TQueue'.
readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(x:xs') -> do writeTVar read xs'
return x
[] -> do ys <- readTVar write
case ys of
[] -> retry
_ -> case reverse ys of
[] -> error "readTQueue"
(z:zs) -> do writeTVar write []
writeTVar read zs
return z

首先,我们尝试从读取端进行读取,如果该列为空,则在反转该列表之后,尝试从写入端进行读取。

我认为正在发生的事情是:当使用者需要从写端进行读取时,它需要遍历STM事务中的输入列表。这需要一些时间,这将导致它与生产者竞争。随着生产者进一步前进,此列表将更长,从而导致读取花费更多时间,在此期间生产者能够写入更多值,从而导致读取失败。重复此过程,直到生产者完成为止,然后消费者才有机会处理大量数据。这不仅会破坏并发性,还会增加更多的CPU开销,因为使用者事务不断重试和失败。

那么,菜呢?有几个主要区别。首先,unagi-chan在内部使用数组而不是列表。这样可以稍微减少开销。大部分开销来自ByteString指针,所以不是很多,而是很少。其次,木保留数组的大块。即使我们悲观地认为生产者总是赢得竞争,但在数组被填充后,它会被推离 channel 的生产者一侧。现在,生产者正在写一个新的数组,而消费者则从旧的数组中读取。这种情况是近乎理想的。没有共享资源的争用,使用者具有良好的引用位置,并且由于使用者正在处理不同的内存块,因此缓存一致性没有问题。与我对TMQueue的理论描述不同,现在您可以进行并发操作,从而使生产者可以清除一些内存使用量,因此它永远不会达到上限。

顺便说一句,我认为消费者分批处理是没有好处的。句柄已经由IO子系统缓冲,因此我认为这样做没有任何好处。对我来说,当我改变消费者以逐行方式进行操作时,性能会有所提高。

现在,您可以如何解决这个问题?从我的工作假设(即TMQueue遇到争用问题以及您的指定要求)出发,您只需要使用另一种类型的队列即可。显然菜效果很好。我还尝试了TMChan,它比unagi慢25%,但使用的内存少45%,因此这也是一个不错的选择。 (这并不奇怪,TMChanTMQueue具有不同的结构,因此它具有不同的性能特征)

您也可以尝试更改算法,以便生产者发送多行块。这将降低所有ByteStrings的内存开销。

那么,什么时候可以使用TMQueue呢?如果生产者和消费者的速度大致相同,或者消费者的速度更快,那应该没问题。另外,如果处理时间不一致,或者生产者突发运行,则可能会获得良好的摊销性能。这几乎是最坏的情况,也许应该将其报告为针对stm的错误?我认为如果将读取功能更改为
-- |Read the next value from the 'TQueue'.
readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(x:xs') -> do writeTVar read xs'
return x
[] -> do ys <- readTVar write
case ys of
[] -> retry
_ -> do writeTVar write []
let (z:zs) = reverse ys
writeTVar read zs
return z

这样可以避免这个问题。现在,应该同时评估zzs绑定(bind),因此列表遍历将在此事务之外进行,从而使读取操作有时在争用下也可以成功。当然,假设我首先对这个问题是正确的(并且这个定义已经足够懒惰了)。但是,可能还有其他意外的缺点。

关于multithreading - 简单的多线程Haskell占用大量内存,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25536604/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com