gpt4 book ai didi

haskell - 并行 haskell 。对生产者进行速率限制

转载 作者:行者123 更新时间:2023-12-02 15:19:22 26 4
gpt4 key购买 nike

在 Haskell 中的并行和并发编程中,Simon Marlow 提供了 Stream a基于以下数据以及一些生产者和消费者:

data IList a
= Nil
| Cons a (IVar (IList a))

type Stream a = IVar (IList a)

streamFromList :: NFData a => [a] -> Par (Stream a)
streamFromList xs = do
var <- new
fork $ loop xs var
return var
where
loop [] var = put var Nil
loop (x:xs) var = do
tail <- new
put var (Cons x tail)
loop xs tail

Later,他提到了这种方法的缺点并提出了解决方案:

In our previous example, the consumer was faster than the producer. If, instead, the producer had been faster than the consumer, then there would be nothing to stop the producer from getting a long way ahead of the consumer and building up a long IList chain in memory. This is undesirable, because large heap data structures incur overhead due to garbage collection, so we might want to rate-limit the producer to avoid it getting too far ahead. There’s a trick that adds some automatic rate-limiting to the stream API. It entails adding another constructor to the IList type:

data IList a
= Nil
| Cons a (IVar (IList a))
| Fork (Par ()) (IList a)

但是,他并没有完成这个方法:

I’ll leave the rest of the implementation of this idea as an exercise for you to try on your own. See if you can modify streamFromList, streamFold, and streamMap to incorporate the Fork constructor. The chunk size and fork distance should be parameters to the producers (streamFromList and streamMap).

同样的问题has been asked on the mailing list ,但没有人给出答案。

那么如何限制生产者的速率呢?

最佳答案

重要的部分在于loop函数:

  loop [] var = put var Nil
loop (x:xs) var = do
tail <- new
put var (Cons x tail)
loop xs tail

我们需要添加 fork 距离f和 block 大小c作为参数:

  loop _ _ [] var = put var Nil
loop 0 c (x:xs) var = -- see below
loop f c (x:xs) var = do
tail <- new
put var (Cons x tail)
loop (f-1) c xs tail

fork 距离在每次迭代中都会减少。当货叉距离为零时我们需要做什么?我们提供了一个Fork op t,其中op继续生成列表:

  loop 0 c (x:xs) var = do
tail <- new
let op = loop c xs tail
put var (Fork op (Cons x tail))

请注意,如果列表为空,我们不会使用Fork。这是可能的,但有点愚蠢,毕竟已经没有什么可生产的了。更改 streamFromList 现在很简单:

streamFromList :: NFData a => Int -> Int -> [a] -> Par (Stream a)
streamFromList f c xs = do
var <- new
fork $ loop f c xs var
return var

现在,为了使用它,我们需要更改 streamFold 中的 case:

streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn acc instrm = acc `seq` do
ilst <- get instrm
case ilst of
Cons h t -> streamFold fn (fn acc h) t
Fork p (Cons h t) -> -- see below
_ -> return acc

请记住,我们的 streamFromList 中的 Fork 不允许有空列表,但以防万一我们匹配它(并且 Nil) 通过通配符。

如果遇到带有数据的Fork,我们需要做什么?首先,我们需要使用fork来运行Par()操作,以便传播t,然后我们就可以开始使用它。所以我们的最后一个案例是

    Fork p (Cons h t) -> fork p >> streamFold fn (fn acc h) t

streamMap 是类似的。仅在这种情况下,您才可以像 streamFromList 中那样再次在循环中使用其他参数。

关于haskell - 并行 haskell 。对生产者进行速率限制,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24773130/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com