gpt4 book ai didi

mapreduce - 为什么我修改后的(真实世界的 haskell)Mapreduce 实现失败并显示 "Too many open files"

转载 作者:行者123 更新时间:2023-12-01 13:01:38 25 4
gpt4 key购买 nike

我正在实现一个 haskell 程序,该程序将文件的每一行与文件中的每一行进行比较。为了简单起见,我们假设一行表示的数据结构只是一个 Int,而我的算法是平方距离。我将按如下方式实现:

--My operation
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

combineDistances :: [Int] -> Int
combineDistances = sum

--Applying my operation simply on a file
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
fileContents <- readFile path
return $ allDistances $ map read $ lines $ fileContents
where
allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
allDistances _ = 0

--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
where infiniteList :: Int->Int-> [Int]
infiniteList i j = (i + j) : infiniteList j (i+j)

不幸的是,完整的文件将保存在内存中。为了防止在非常大的文件上可能出现内存不足异常,我想在“allDistances”的每次递归中将文件光标找回文件的开头。

在“Real World Haskell”一书中,给出了 mapreduce 的实现,具有将文件分成 block 的功能(第 24 章,可用 here)。我修改了分块函数,而不是将整个文件分成 block ,而是返回与行一样多的 block ,每个 block 代表

的一个元素
tails . lines. readFile

完整的实现是(加上前面的代码区域)

import qualified Data.ByteString.Lazy.Char8 as Lazy
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)
import System.IO

--Applying my operation using mapreduce on a very big file
sumOfDistancesOnFile :: FilePath -> IO Int
sumOfDistancesOnFile path = chunkedFileOperation chunkByLinesTails (distancesUsingMapReduce) path

distancesUsingMapReduce :: [Lazy.ByteString] -> Int
distancesUsingMapReduce = mapReduce rpar (distancesFirstToTail . lexer)
rpar combineDistances
where lexer :: Lazy.ByteString -> [Int]
lexer chunk = map (read . Lazy.unpack) (Lazy.lines chunk)

distancesOneToMany :: Int -> [Int] -> Int
distancesOneToMany one many = combineDistances $ map (distance one) many

distancesFirstToTail :: [Int] -> Int
distancesFirstToTail s =
if not (null s)
then distancesOneToMany (head s) (tail s)
else 0
--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
-> (a -> b) -- map function
-> Strategy c -- evaluation strategy for reduction
-> ([b] -> c) -- reduce function
-> [a] -- list to map over
-> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
mapResult `pseq` reduceResult
where mapResult = parMap mapStrat mapFunc input
reduceResult = reduceFunc mapResult `using` reduceStrat


--Working with (file)chunks:
data ChunkSpec = CS{
chunkOffset :: !Int64
, chunkLength :: !Int64
} deriving (Eq,Show)

chunkedFileOperation :: (NFData a)=>
(FilePath-> IO [ChunkSpec])
-> ([Lazy.ByteString]-> a)
-> FilePath
-> IO a
chunkedFileOperation chunkCreator funcOnChunks path = do
(chunks, handles)<- chunkedRead chunkCreator path
let r = funcOnChunks chunks
(rdeepseq r `seq` return r) `finally` mapM_ hClose handles

chunkedRead :: (FilePath -> IO [ChunkSpec])
-> FilePath
-> IO ([Lazy.ByteString], [Handle])
chunkedRead chunkCreator path = do
chunks <- chunkCreator path
liftM unzip . forM chunks $ \spec -> do
h <- openFile path ReadMode
hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
chunk <- Lazy.take (chunkLength spec) `liftM` Lazy.hGetContents h
return (chunk,h)

-- returns set of chunks representing tails . lines . readFile
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
bracket (openFile path ReadMode) hClose $ \h-> do
totalSize <- fromIntegral `liftM` hFileSize h
let chunkSize = 1
findChunks offset = do
let newOffset = offset + chunkSize
hSeek h AbsoluteSeek (fromIntegral newOffset)
let findNewline lineSeekOffset = do
eof <- hIsEOF h
if eof
then return [CS offset (totalSize - offset)]
else do
bytes <- Lazy.hGet h 4096
case Lazy.elemIndex '\n' bytes of
Just n -> do
nextChunks <- findChunks (lineSeekOffset + n + 1)
return (CS offset (totalSize-offset):nextChunks)
Nothing -> findNewline (lineSeekOffset + Lazy.length bytes)
findNewline newOffset
findChunks 0

不幸的是,在更大的文件(例如 2000 行)上,mapreduce 版本会抛出异常:
* 异常:getCurrentDirectory:资源耗尽(打开的文件太多)

自己不会调试程序有点惭愧,但我只会调试java/c#代码。而且我也不知道如何正确测试文件分块和读取。我希望问题不是 mapreduce 函数本身的一部分,因为没有 mapreduce 的类似版本也会引发异常。在那次尝试中,我让 chunkedFileOperation 接受一个 block 的操作和它直接应用的“reduce”函数。

顺便说一句,我正在运行
Mac OS X 10.6.7(雪豹)上的 HaskellPlatform 2011.2.0
使用以下软件包:
字节串 0.9.1.10
并行 3.1.0.1
我有资格成为一名自学初学者/新鲜的 haskell 程序员

最佳答案

您正在使用惰性 IO,因此那些使用 readFile 打开的文件不会及时关闭。您需要考虑一种定期明确关闭文件的解决方案(例如,通过严格的 IO 或迭代 IO)。

关于mapreduce - 为什么我修改后的(真实世界的 haskell)Mapreduce 实现失败并显示 "Too many open files",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5541784/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com