gpt4 book ai didi

haskell - 泛化一个函数以合并一组 Haskell 管道生产者

转载 作者:行者123 更新时间:2023-12-02 04:34:11 25 4
gpt4 key购买 nike

我正在使用 Haskell pipes package .

我正在尝试使用 pipes-concurrency将生产者列表合并在一起。

我想要达到的是:

merge :: MonadIO m => [Producer a m ()] -> Producer a m ()

所以给定一个生产者 s1 和另一个生产者 s2:r = merge [s1, s2]这将给出行为:

s1 --1--1--1--|
s2 ---2---2---2|
r --12-1-21--2|

按照我想出的教程页面中的代码:

mergeIO :: [Producer a IO ()] -> Producer a IO ()
mergeIO producers = do
(output, input) <- liftIO $ spawn Unbounded
_ <- liftIO $ mapM (fork output) producers
fromInput input
where
fork :: Output a -> Producer a IO () -> IO ()
fork output producer = void $ forkIO $ do runEffect $ producer >-> toOutput output
performGC

按预期工作。

但是我很难概括事物。

我的尝试:

merge :: (MonadIO m) => [Producer a m ()] -> Producer a m ()
merge producers = do
(output, input) <- liftIO $ spawn Unbounded
_ <- liftIO $ mapM (fork output) producers
fromInput input
where
runEffectIO :: Monad m => Effect m r -> IO (m r)
runEffectIO e = do
x <- evaluate $ runEffect e
return x
fork output producer = forkIO $ do runEffectIO $ producer >-> toOutput output
performGC

不幸的是,这可以编译,但不会做太多其他事情。我猜我把 runEffectIO 搞得一团糟.我当前的其他方法 runEffectIO没有产生更好的结果。

程序:

main = do
let producer = merge [repeater 1 (100 * 1000), repeater 2 (150 * 1000)]
_ <- runEffect $ producer >-> taker 20
where repeater :: Int -> Int -> Producer Int IO r
repeater val delay = forever $ do
lift $ threadDelay delay
yield val
taker :: Int -> Consumer Int IO ()
taker 0 = return ()
taker n = do
val <- await
liftIO $ putStrLn $ "Taker " ++ show n ++ ": " ++ show val
taker $ n - 1

点击 val <- await但没有到达 liftIO $ putStrLn因此它不产生任何输出。但是它没有挂起就可以正常退出。

当我替换为 mergeIO 时对于 merge然后程序运行我希望输出 20 行。

最佳答案

虽然 MonadIO 不足以完成此操作,但 MonadBaseControl(来自 monad-control)旨在允许在基本 monad 中嵌入任意转换器堆栈。配套套餐lifted-base提供一个适用于转换器堆栈的 fork 版本。我整理了一个使用它来解决您的问题的示例 in the following Gist , 虽然主要的魔法是:

import qualified Control.Concurrent.Lifted as L
fork :: (MonadBaseControl IO m, MonadIO m) => Output a -> Producer a m () -> m ThreadId
fork output producer = L.fork $ do
runEffect $ producer >-> toOutput output
liftIO performGC

请注意,当以这种方式处理时,您应该了解单子(monad)状态会发生什么:对子线程中执行的任何可变状态的修改将被隔离到那些子线程。换句话说,如果您使用的是 StateT,则每个子线程将以其 fork 时上下文中的相同状态值开始,但随后您将拥有许多不更新的不同状态彼此。

有一个 appendix in the Yesod book在 monad-control 上,尽管坦率地说它有点过时了。我只是不知道有任何更新的教程。

关于haskell - 泛化一个函数以合并一组 Haskell 管道生产者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22333846/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com