gpt4 book ai didi

Haskell 快速并发队列

转载 作者:行者123 更新时间:2023-12-03 10:42:31 25 4
gpt4 key购买 nike

问题

你好!我正在编写一个日志库,我很想创建一个记录器,它会在单独的线程中运行,而所有应用程序线程只会向它发送消息。我想为这个问题找到最高效的解决方案。我在这里需要简单的未绑定(bind)队列。

联系方式

我已经创建了一些测试来查看可用解决方案的执行情况,我在这里得到了非常奇怪的结果。我基于以下内容测试了 4 个实现(源代码如下):

  • pipes-concurrency
  • Control.Concurrent.Chan
  • Control.Concurrent.Chan.Unagi
  • MVar based as described in the book "Parallel and Concurrent Programming in Haskell"请注意,此技术为我们提供了容量为 1 的有界队列 - 它仅用于测试

  • 测试

    这是用于测试的源代码:
    {-# LANGUAGE NoMonomorphismRestriction #-}

    import Control.Concurrent (threadDelay)
    import Control.Monad (forever)
    import Pipes
    import qualified Pipes.Concurrent as Pipes
    import Control.Applicative
    import Control.Monad (replicateM_)
    import System.Environment (getArgs)

    import Control.Concurrent.Chan
    import Control.Concurrent (forkIO)
    import qualified Control.Concurrent.Chan.Unagi as U
    import Control.Concurrent.MVar
    import Criterion.Main

    data Event = Msg String | Status | Quit deriving (Show)

    ----------------------------------------------------------------------
    -- Pipes
    ----------------------------------------------------------------------

    pipesLogMsg = yield (Msg "hello")
    pipesManyLogs num = replicateM_ num pipesLogMsg

    pipesAddProducer num o = Pipes.forkIO $ do runEffect $ (pipesManyLogs num) >-> Pipes.toOutput o
    Pipes.performGC

    pipesHandler max = loop 0
    where
    loop mnum = do
    if mnum == max
    then lift $ pure ()
    else do event <- await
    case event of
    Msg _ -> loop (mnum + 1)
    Status -> (lift $ putStrLn (show mnum)) *> loop mnum
    Quit -> return ()

    ----------------------------------------------------------------------
    -- Chan
    ----------------------------------------------------------------------

    chanAddProducer num ch = forkIO $ chanManyLogs num ch
    chanManyLogs num ch = replicateM_ num (writeChan ch (Msg "hello"))
    chanHandler ch max = handlerIO (readChan ch) max

    ----------------------------------------------------------------------
    -- Unagi-Chan
    ----------------------------------------------------------------------

    uchanAddProducer num ch = forkIO $ uchanManyLogs num ch
    uchanManyLogs num ch = replicateM_ num (U.writeChan ch (Msg "hello"))
    uchanHandler ch max = handlerIO (U.readChan ch) max

    ----------------------------------------------------------------------
    -- MVars
    ----------------------------------------------------------------------

    mvarAddProducer num m = forkIO $ mvarManyLogs num m
    mvarManyLogs num m = replicateM_ num (putMVar m (Msg "hello"))
    mvarHandler m max = handlerIO (takeMVar m) max

    ----------------------------------------------------------------------
    -- Utils
    ----------------------------------------------------------------------

    handlerIO f max = loop 0 where
    loop mnum = do
    if mnum == max
    then pure ()
    else do event <- f
    case event of
    Msg _ -> loop (mnum + 1)
    Status -> putStrLn (show mnum) *> loop mnum
    Quit -> return ()

    ----------------------------------------------------------------------
    -- Main
    ----------------------------------------------------------------------

    main = defaultMain [
    bench "pipes" $ nfIO $ do
    (output, input) <- Pipes.spawn Pipes.Unbounded
    replicateM_ prodNum (pipesAddProducer msgNum output)
    runEffect $ Pipes.fromInput input >-> pipesHandler totalMsg
    , bench "Chan" $ nfIO $ do
    ch <- newChan
    replicateM_ prodNum (chanAddProducer msgNum ch)
    chanHandler ch totalMsg
    , bench "Unagi-Chan" $ nfIO $ do
    (inCh, outCh) <- U.newChan
    replicateM_ prodNum (uchanAddProducer msgNum inCh)
    uchanHandler outCh totalMsg
    , bench "MVar" $ nfIO $ do
    m <- newEmptyMVar
    replicateM_ prodNum (mvarAddProducer msgNum m)
    mvarHandler m totalMsg
    ]
    where
    prodNum = 20
    msgNum = 1000
    totalMsg = msgNum * prodNum

    你可以用 ghc -O2 Main.hs 编译它并运行它。
    测试创建了 20 个消息生产者,每个生产者生产 1000000 条消息。

    结果
    benchmarking pipes
    time 46.68 ms (46.19 ms .. 47.31 ms)
    0.999 R² (0.999 R² .. 1.000 R²)
    mean 47.59 ms (47.20 ms .. 47.95 ms)
    std dev 708.3 μs (558.4 μs .. 906.1 μs)

    benchmarking Chan
    time 4.252 ms (4.171 ms .. 4.351 ms)
    0.995 R² (0.991 R² .. 0.998 R²)
    mean 4.233 ms (4.154 ms .. 4.314 ms)
    std dev 244.8 μs (186.3 μs .. 333.5 μs)
    variance introduced by outliers: 35% (moderately inflated)

    benchmarking Unagi-Chan
    time 1.209 ms (1.198 ms .. 1.224 ms)
    0.996 R² (0.993 R² .. 0.999 R²)
    mean 1.267 ms (1.244 ms .. 1.308 ms)
    std dev 102.4 μs (61.70 μs .. 169.3 μs)
    variance introduced by outliers: 62% (severely inflated)

    benchmarking MVar
    time 1.746 ms (1.714 ms .. 1.774 ms)
    0.997 R² (0.995 R² .. 0.998 R²)
    mean 1.716 ms (1.694 ms .. 1.739 ms)
    std dev 73.99 μs (65.32 μs .. 85.48 μs)
    variance introduced by outliers: 29% (moderately inflated)

    问题

    我很想问问你为什么管道并发版本执行得这么慢,为什么它甚至比基于 chan 的版本慢得多。我很惊讶 MVar 是所有版本中最快的一个 - 谁能告诉更多,为什么我们会得到这个结果,我们是否可以在任何情况下做得更好?

    最佳答案

    所以我可以给大家简单介绍一下Chan的一些分析。和 TQueue (这里 pipes-concurrency 正在内部使用)激发了一些进入 unagi-chan 的设计决策。 .我不确定它是否会回答你的问题。我建议在进行基准测试时 fork 不同的队列并进行变化,以真正了解正在发生的事情。


    Chan好像:

    data Chan a
    = Chan (MVar (Stream a)) -- pointer to "head", where we read from
    (MVar (Stream a)) -- pointer to "tail", where values written to

    type Stream a = MVar (ChItem a)
    data ChItem a = ChItem a (Stream a)

    这是一个 MVar 的链表s。两人 MVar s 在 Chan type 分别充当指向列表当前头部和尾部的指针。这是写的样子:
    writeChan :: Chan a -> a -> IO () 
    writeChan (Chan _ writeVar) val = do
    new_hole <- newEmptyMVar mask_ $ do
    old_hole <- takeMVar writeVar -- [1]
    putMVar old_hole (ChItem val new_hole) -- [2]
    putMVar writeVar new_hole -- [3]

    在 1 时,作者在写端锁定,在 2 时我们的项目 a可供读者使用,并且在 3 处为其他作者解锁写端。

    这实际上在单消费者/单生产者场景中表现得很好(参见 the graph here ),因为读取和写入不竞争。但是一旦你有多个并发作者,你就会开始遇到麻烦:
  • 一个写入 1 而另一个写入 2 的写入将被阻塞并被取消调度(我能够测量上下文切换的最快速度是 ~150ns(非常快);在某些情况下它可能会慢得多)。所以当你有很多作家在竞争时
    你基本上是通过调度程序进行一次大的往返,进入 MVar 的等待队列。然后终于可以完成写入。
  • 当写入器在 2 时被取消调度(因为它超时)时,它会持有一个锁,并且在可以再次重新调度之前不允许写入完成;当我们超额订阅时,即当我们的线程/核心比率很高时,这会成为一个更大的问题。

  • 最后,使用 MVar -per-item 在分配方面需要一些开销,更重要的是,当我们积累许多可变对象时,我们会造成很大的 GC 压力。

    队列
    TQueue很棒,因为 STM使得推断其正确性变得非常简单。这是一个功能性出队风格的队列,还有一个 write包括简单地读取 writer 堆栈,consing 我们的元素,然后将其写回:
    data TQueue a = TQueue (TVar [a])
    (TVar [a])

    writeTQueue :: TQueue a -> a -> STM ()
    writeTQueue (TQueue _ write) a = do
    listend <- readTVar write -- a transaction with a consistent
    writeTVar write (a:listend) -- view of memory

    如果在 writeTQueue 之后写回它的新堆栈,另一个交错写入也做同样的事情,其中​​一个写入将被重试。更多 writeTQueue s 是交错的,争用的效果会恶化。然而,性能下降比 Chan 慢得多。因为只有一个 writeTVar可以取消竞争的操作 writeTQueue s,并且交易非常小(只是一个读取和一个 (:) )。

    读取的工作原理是从写入端“出列”堆栈,反转它,并将反转的堆栈存储在它自己的变量中以便于“弹出”(总之,这给了我们摊销的 O(1) 推送和弹出)
    readTQueue :: TQueue a -> STM a
    readTQueue (TQueue read write) = do
    xs <- readTVar read
    case xs of
    (x:xs') -> do writeTVar read xs'
    return x
    [] -> do ys <- readTVar write
    case ys of
    [] -> retry
    _ -> case reverse ys of
    [] -> error "readTQueue"
    (z:zs) -> do writeTVar write []
    writeTVar read zs
    return z

    读者对作者有一个对称的温和争论问题。在一般情况下,读者和作者不会竞争,但是当读者堆栈耗尽时,读者会与其他读者和作者竞争。我怀疑你是否预装了 TQueue有足够的值,然后启动 4 个读取器和 4 个写入器,您可能会引发活锁,因为在下一次写入之前反向难以完成。值得注意的是,与 MVar 不同的是, 写信给 TVar许多读者正在等待,同时唤醒他们(这可能或多或少效率高,取决于场景)。

    我怀疑你没有看到 TQueue 的很多弱点在你的测试中;主要是您看到了写入争用的适度影响以及大量分配和 GC 大量可变对象的开销。

    鳗鱼酱
    unagi-chan最初旨在很好地处理争用。它在概念上非常简单,但实现有一些复杂性
    data ChanEnd a = ChanEnd AtomicCounter (IORef (Int , Stream a))

    data Stream a = Stream (Array (Cell a)) (IORef (Maybe (Stream a)))

    data Cell a = Empty | Written a | Blocking (MVar a)

    队列的读写端共享 Stream它们协调传递值(从写入器到读取器)和阻塞指示(从读取器到写入器),读取和写入端每个都有一个独立的原子计数器。写操作如下:
  • 作者调用原子 incrCounter在写计数器上接收其唯一索引以与其(单个)读取器协调
  • 编写器找到它的单元格并执行 Written a 的 CAS
  • 如果成功它退出,否则它看到一个读者已经击败它并且正在阻塞(或继续阻塞),所以它执行 (\Blocking v)-> putMVar v a)并退出。

  • 读取以类似且明显的方式工作。

    第一个创新是使争用点成为原子操作,在争用时不会降级(如 CAS/重试循环或类似 Chan 的锁)。基于简单的基准测试和实验, fetch-and-add primop, exposed by the atomic-primops library效果最好。

    然后在 2 中,读取器和写入器都只需要执行一次比较和交换(读取器的快速路径是简单的非原子读取)来完成协调。

    所以要尝试制作 unagi-chan很好,我们
  • 使用 fetch-and-add 处理争用点
  • 使用无锁技术,这样当我们超额订阅一个在不合适的时间被取消调度的线程时,不会阻塞其他线程的进程(被阻塞的写入器最多可以阻塞计数器“分配”给它的读取器;读取警告re. async exceptions在 unagi-chan 文档中,并注意 Chan 在这里有更好的语义)
  • 使用数组来存储我们的元素,它具有更好的局部性(但见下文)每个元素的开销更低,并且对 GC 的压力更小

  • 最后一个注意事项。使用数组:并发写入数组对于扩展来说通常是一个坏主意,因为您会导致大量缓存一致性流量,因为缓存行在编写器线程之间来回无效。通用术语是“虚假共享”。但是我能想到的替代设计也有缓存方面的优点和缺点,比如 strip 写入或其他东西;我一直在对此进行一些试验,但目前没有任何结论。

    我们合理关注虚假共享的一个地方是在我们的计数器中,我们将其对齐并填充为 64 字节;这确实出现在基准测试中,唯一的缺点是内存使用量增加。

    关于Haskell 快速并发队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27933941/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com