gpt4 book ai didi

haskell - 在 Haskell 中使用工作池运行并行 URL 下载

转载 作者:行者123 更新时间:2023-12-02 12:05:09 27 4
gpt4 key购买 nike

我想使用 Control.Concurrent.Async mapConcurrentlyhttp-conduit 执行并行下载。解决方案here对于我的情况来说还不够,因为我想处理 n 个任务,但将并发工作线程的数量限制为 m,其中 m << em>n。

传递给 mapConcurrently 多个 m block 是不够的,因为这样的话,活跃工作线程的数量往往会小于 m> 由于某些任务将比其他任务更早完成,从而留下利用率差距。

有没有一种简单的方法——几乎和我希望的使用mapConcurrently一样简单——实现一个工作池同时执行任务队列,直到所有任务完成?

或者只是保持 Haskell 简单并使用 xargs -P 进行进程级并行性更容易吗?

最佳答案

也许最简单的解决方案是使用 semaphore 来限制 IO 操作在将它们包裹在 Concurrently 中之前,使用像这样的辅助函数:

withConc :: QSem -> (a -> IO b) -> (a -> Concurrently b)
withConc sem f = \a -> Concurrently
(bracket_ (waitQSem sem) (signalQSem sem) (f a))

我们可以将 withConctraverse 结合使用对任何 Traversable 任务容器执行受限制的并发遍历:

traverseThrottled :: Int -> (a -> IO b) -> [a] -> IO [b]
traverseThrottled concLevel action tasks = do
sem <- newQSem concLevel
runConcurrently (traverse (withConc sem action) tasks)

这种方法的一个缺点是,使用Concurrently将创建与任务一样多的线程,并且在任何给定时间只有其中的一个子集会执行实际工作,这要归功于信号。

另一方面,Haskell 中的线程很便宜,因此我认为在任务数量不是很大的情况下,这是一个可以接受的解决方案。

编辑:traverseThrottled一个更通用的签名:

import Data.Traversable 
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception

traverseThrottled :: Traversable t => Int -> (a -> IO b) -> t a -> IO (t b)
traverseThrottled concLevel action taskContainer = do
sem <- newQSem concLevel
let throttledAction = bracket_ (waitQSem sem) (signalQSem sem) . action
runConcurrently (traverse (Concurrently . throttledAction) taskContainer)

关于haskell - 在 Haskell 中使用工作池运行并行 URL 下载,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29155068/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com