Stream (Of a) m r -> Stream (Of a) m s -> Str-6ren">
gpt4 book ai didi

haskell - 概括Haskell "Streaming"库的合并功能

转载 作者:行者123 更新时间:2023-12-02 18:37:10 24 4
gpt4 key购买 nike

目标是概括 Streaming.merge 函数,

merge :: (Monad m, Ord a) => Stream (Of a) m r -> Stream (Of a) m s -> Stream (Of a) m (r, s) 

到任意数量的源流。策略是使用 Data.Heap.Heap Stream (Of a) m ra 排序。 IE。 bigMerge会有签名

bigMerge :: (Monad m, Ord a) => [Stream (Of a) m r] -> Stream (Of a) m [r]

(该列表也可以替换为 Heap 。)

我拥有的是一种相当邪恶的混合物,不太正确。就这样:

为了完整,首先关闭导入:

import qualified Data.Heap as H
import Data.Heap (Heap)
import Data.List (sortBy)
import Data.Function (on)
import Streaming
import qualified Streaming.Prelude as S
import Streaming.Internal (Stream(..)) -- shouldn't!

为了使用Heap Ord 类的一个元素需要:

data Elt a m r = Elt Int (Maybe a) (Stream(Of a) m r)

额外的Int引入了在输入列表中携带流的索引,以便返回 [r]可以按正确的顺序使用元素构建。 Maybe a携带流的当前值。

EqOrd实例是:

instance Eq a => Eq (Elt a m r) where
(Elt i ma _) == (Elt i' ma' _) =
if i == i' then error "Internal error: Index clash in =="
else ma == ma'

instance Ord a => Ord (Elt a m r) where
(Elt i ma s) <= (Elt i' ma' s') | i==i' = error "Internal error: Index clash in <="
| otherwise = cmp (i, ma, s) (i', ma', s')
where
cmp _ (_, Nothing, Return _) = True
cmp (_, Nothing, Return _) _ = False
cmp (i, Just a, _) (i', Just a', _) = if a == a' then i <= i' else a <= a'
cmp (i, _, _) (i', _, _) = i <= i'

基本上,任何东西都是<=一个Return ,所有其他情况都使用 a和/或i排序Elt s。 (errors 用于调试目的。)

一些辅助函数 make Elt来自StreamHeap来自 Stream 的列表.

eltFromStream :: (Monad m, Ord a) => Int -> Stream (Of a) m r -> m (Elt a m r)
eltFromStream i (Return r) = return $ Elt i Nothing (Return r)
eltFromStream i (Effect m) = do
stream' <- m
return $ Elt i Nothing stream'
eltFromStream i (Step (a :> rest)) = return $ Elt i (Just a) rest

heapFromStreams :: (Monad m, Ord a) => [Stream (Of a) m r] -> m (Heap (Elt a m r))
heapFromStreams strs = H.fromList <$> (sequence $ fmap (uncurry eltFromStream) (zip [0..] strs))

核心部分是loop功能

loop :: (Monad m, Ord a) => Heap (Elt a m r) -> m (Heap (Elt a m r))
loop h = do
let (Elt i ma s, h') = unsafeUncons h
elt <- case s of
Return r -> return $ Elt i Nothing (Return r)
Effect m -> Elt i Nothing <$> m
Step (a :> rest) -> return $ Elt i (Just a) rest
return $ H.insert elt h'

厚脸皮的unsafeUncons正在

unsafeUncons :: Heap a -> (a, Heap a)
unsafeUncons h = case H.uncons h of
Nothing -> error "Internal error"
Just x -> x

loop函数用于 heapMerge这变成了Heap进入Stream

heapMerge :: (Monad m, Ord a) => Heap (Elt a m r) -> Stream (Of a) m [r]
heapMerge h = case (ma,s) of
(Nothing, Return _) -> Return $ getRs h
(_, Effect m) -> error "TODO"
(Just a, _) -> do
h' <- lift $ loop h
Step (a :> heapMerge h')
where
Elt i ma s = H.minimum h

getRs只需组装Return值放入列表

getRs :: (Monad m, Ord a) => Heap (Elt a m r) -> [r]
getRs h = snd <$> sortBy (compare `on` fst) (map f (H.toUnsortedList h))
where
f :: Monad m => Elt a m r -> (Int, r)
f (Elt i _ (Return r)) = (i,r)
f _ = error "Internal error: Call getR only after stream has finished!"

然后,最后,

bigMerge :: (Monad m, Ord a) => [Stream (Of a) m r] -> Stream (Of a) m [r]
bigMerge streams =
if null streams then Return []
else do
h <- lift $ heapFromStreams streams
heapMerge h

这很复杂,Effect没有正确对待,它依赖Return , Step , Effect而不是inspectnext 。它确实会在简单的输入上产生正确的结果,例如

s1 = S.each [2,4,5::Int]
s2 = S.each [1,2,4,5::Int]
s3 = S.each [3::Int]
S.print $ merge [s1,s2,s3]

我确信有一种方法可以正确且更惯用地执行此操作。一方面,Maybe aElt可能是多余的,我可以做 (Stream (Of a) m r) Ord 的一个实例直接,如果 Effect只是模式匹配,没有执行,那么这应该没问题。但是Stream (Of (Heap (Stream (Of a) m r, Int))) (Heap (Int,r))看起来很奇怪。 “带索引的流”IStream a m r = IStream Int ((Heap (Stream (Of a) m r) deriving Functorr中的一个仿函数,因此,适当的==<= ,我会看 Stream (IStream a m) m (Heap (Int, r))

streaming 的这个功能方面图书馆对我来说仍然有点困惑,所以任何帮助将不胜感激。

最佳答案

bigMerge 的签名看起来非常sequenceA 的签名来自Data.Traversable:

sequenceA :: Applicative f => [f r] -> f [r]

问题当然是我们不能对 Stream 使用标准的 Applicative 实例,因为它是连接而不是合并。但我们可以尝试通过新类型创建我们自己的实例:

{-# LANGUAGE DeriveFunctor #-}
import Streaming
import qualified Streaming.Prelude as S

newtype MergeStream a m r =
MergeStream { getMergeStream :: Stream (Of a) m r } deriving Functor

-- BEWARE! Only valid for ORDERED streams!
instance (Monad m, Ord a) => Applicative (MergeStream a m) where
pure x = MergeStream (pure x)
MergeStream f <*> MergeStream x = MergeStream (uncurry ($) <$> S.merge f x)

现在,使用示例中的 s1s2s3 以及标准 Traversable 函数:

ghci> S.toList_ $ getMergeStream . traverse MergeStream $ [s1,s2,s3]
[1,2,2,3,4,4,5,5]

这似乎有效。也就是说,出于效率原因,您尝试使用 Stream 内部结构和堆来实现 bigMerge 可能仍然是值得的。

关于haskell - 概括Haskell "Streaming"库的合并功能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59170314/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com