gpt4 book ai didi

haskell - 流媒体库中惯用的预取

转载 作者:行者123 更新时间:2023-12-02 16:45:53 25 4
gpt4 key购买 nike

我正在与 streaming 合作图书馆,但会接受使用管道或导管的答案。

说我有

import Streaming (Stream, Of)
import qualified Streaming.Prelude as S

streamChunks :: Int -> Stream (Of Thing) IO ()
streamChunks lastID = do
flip fix 0 $ \go thingID ->
unless (thingID > lastID) $ do
thing <- highLatencyGet thingID
S.yield thing
go (thingID+1)

为了减少延迟,我想 fork highLatencyGet 来检索下一个 Thing,同时在消费者中处理前一个 Thing

显然,我可以在调用 yield 等之前转换我的函数,创建一个新的 MVar 并 fork 下一批。

但是我想知道是否有一种惯用的(可组合的)方法来做到这一点,这样它就可以打包在一个库中,并且可以在任意 IO Stream 上使用。理想情况下,我们也可以配置预取值,例如:

prefetching :: Int -> Stream (Of a) IO () -> Stream (Of a) IO ()

最佳答案

此解决方案使用管道,但可以轻松地进行修改以使用。准确地说,它需要pipespipes-concurrencyasync包。

它不能以“直接”方式工作。它不是简单地转换 Producer,而是采用一个消耗 Producer 的“折叠函数”。这种连续传递风格对于设置和拆除并发机制是必要的。

import Pipes
import Pipes.Concurrent (spawn',bounded,fromInput,toOutput,atomically)
import Control.Concurrent.Async (Concurrently(..))
import Control.Exception (finally)

prefetching :: Int -> Producer a IO () -> (Producer a IO () -> IO r) -> IO r
prefetching bufsize source foldfunc = do
(outbox,inbox,seal) <- spawn' (bounded bufsize)
let cutcord effect = effect `finally` atomically seal
runConcurrently $
Concurrently (cutcord (runEffect (source >-> toOutput outbox)))
*>
Concurrently (cutcord (foldfunc (fromInput inbox)))

原始生产者的输出被重定向到有界队列。同时,我们将生产者折叠功能应用于从队列中读取的生产者。

每当每个并发操作完成时,我们都会注意及时关闭 channel ,以避免让另一端挂起。

关于haskell - 流媒体库中惯用的预取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44145974/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com