gpt4 book ai didi

haskell - 管道流中的并行处理

转载 作者:行者123 更新时间:2023-12-04 18:06:16 25 4
gpt4 key购买 nike

我真的很喜欢用于将操作应用于流式 IO 源的管道/管道的概念。我对构建适用于非常大的日志文件的工具感兴趣。从 Python/Ruby 迁移到 Haskell 的吸引力之一是编写并行代码的更简单方法,但我找不到任何相关文档。我如何设置一个从文件中读取行并并行处理它们的管道流(即,使用 8 个内核,它应该读取 8 行,并将它们交给八个不同的线程进行处理,然后再次收集等),理想情况下,尽可能少的“仪式”......

或者,可以注意是否需要按顺序重新连接线路,如果这会影响过程的速度?

我确信可以使用 Parallel Haskell 书中的想法自己拼凑一些东西,但在我看来,在 Conduit 工作流程中间并行运行纯函数(parmap 等)应该很容易?

最佳答案

作为 Petr Pudlák 在他的评论中提到的“内部并行性”的一个例子,考虑这个函数(我正在使用 pipes ,但可以使用 conduit 来实现同样容易):

import Control.Monad
import Control.Lens (view)
import Control.Concurrent.Async (mapConcurrently)
import Pipes
import qualified Pipes.Group as G
import qualified Control.Foldl as L

concProd :: Int -> (a -> IO b) -> Producer a IO r -> Producer b IO r
concProd groupsize action producer =
L.purely G.folds L.list (view (G.chunksOf groupsize) producer)
>->
forever (await >>= liftIO . mapConcurrently action >>= mapM G.yield)

该函数将组大小作为参数,我们希望为 a 类型的每个值运行一个操作, 和 Producera值(value)观。

它返回一个新的 Producer .在内部,生产者读取 a groupsize 批处理中的值,同时处理它们,并一一产生结果。

代码使用 Pipes.Group将原始生产者“划分”为大小为 groupsize 的子生产者,然后是 Control.Foldl将每个子生产者“折叠”成一个列表。

对于更复杂的任务,您可以使用 pipes-concurrency 提供的异步 channel 。或 stm-conduit .但是这些让你有点脱离了 Vanilla 管道/导管的“单一管道”世界观。

关于haskell - 管道流中的并行处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26742276/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com