gpt4 book ai didi

haskell - 管道:多个流消费者

转载 作者:行者123 更新时间:2023-12-03 04:39:36 27 4
gpt4 key购买 nike

我编写了一个程序来计算语料库中 NGram 的频率。我已经有一个函数,它使用 token 流并生成一个订单的 NGram:

ngram :: Monad m => Int -> Conduit t m [t]
trigrams = ngram 3
countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int)

目前我只能将一个流消费者连接到流源:

tokens --- trigrams --- countFreq

如何将多个流消费者连接到同一个流源?我想要这样的东西:

           .--- unigrams --- countFreq
|--- bigrams --- countFreq
tokens ----|--- trigrams --- countFreq
'--- ... --- countFreq

一个优点是并行运行每个消费者

编辑:感谢 Petr,我想出了这个解决方案

spawnMultiple orders = do
chan <- atomically newBroadcastTMChan

results <- forM orders $ \_ -> newEmptyMVar
threads <- forM (zip results orders) $
forkIO . uncurry (sink chan)

forkIO . runResourceT $ sourceFile "test.txt"
$$ javascriptTokenizer
=$ sinkTMChan chan

forM results readMVar

where
sink chan result n = do
chan' <- atomically $ dupTMChan chan
freqs <- runResourceT $ sourceTMChan chan'
$$ ngram n
=$ frequencies
putMVar result freqs

最佳答案

我假设您希望所有接收器接收所有值。

我建议:

  1. 使用newBroadcastTMChan创建一个新 channel Control.Concurrent.STM.TMChan (stm-chans)。
  2. 使用此 channel 通过 sinkTBMChan 构建接收器来自您的主要生产者的 Data.Conduit.TMChan (stm-conduit)。
  3. 对于每个客户端,使用dupTMChan 创建自己的副本以供读取。启动一个新线程,使用 sourceTBMChan 读取此副本。
  4. 从您的帖子中收集结果。
  5. 确保您的客户端能够以与生成数据一样快的速度读取数据,否则可能会出现堆溢出。

(我还没有尝试过,请让我们知道它是如何工作的。)

<小时/>

更新:收集结果的一种方法是创建 MVar对于每个消费者线程。他们每个人都会在完成后 putMVar 其结果。您的主线程将对所有这些 MVar 进行 takeMVar,从而等待每个线程完成。例如,如果 varsMVar 的列表,主线程将发出 mapM takeMVar vars 来收集所有结果。

关于haskell - 管道:多个流消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17931053/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com