gpt4 book ai didi

haskell - 使用管道的顺序二进制数据解码

转载 作者:行者123 更新时间:2023-12-04 15:24:57 25 4
gpt4 key购买 nike

目标是拥有一个具有以下类型签名的管道

protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
管道应重复解析通过 TCP/IP(使用 ByteString -> a 包)接收的 Protocol Buffer (使用 network-conduit 函数)。
有线消息格式为
{length (32 bits big endian)}{protobuf 1}{length}{protobuf 2}...
(花括号不是协议(protocol)的一部分,仅在此处用于分隔实体)。
第一个想法是使用 sequenceSink重复申请 Sink能够解析一个 ProtoBuf:
[...]
import qualified Data.Binary as B
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Util as CU

protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
protobufConduit protobufDecode =
CU.sequenceSink () $ \() ->
do lenBytes <- CB.take 4 -- read protobuf length
let len :: Word32
len = B.decode lengthBytes -- decode ProtoBuf length
intLen = fromIntegral len
protobufBytes <- CB.take intLen -- read the ProtoBuf bytes
return $ CU.Emit () [ protobufDecode protobufBytes ] -- emit decoded ProtoBuf
它不起作用(仅适用于第一个 Protocol Buffer ),因为似乎有许多“剩余”字节已从源读取但未通过 CB.take 使用被丢弃。
而且我发现没有办法将“其余部分推回源头”。
我完全错误地理解了这个概念吗?
PS:即使我在这里使用 Protocol Buffer ,问题也与 Protocol Buffer 无关。为了调试问题,我总是使用 {length}{UTF8 encoded string}{length}{UTF8 encoded string}...以及与上述类似的管道( utf8StringConduit :: MonadResource m => Conduit ByteString m Text )。
更新:
我只是尝试用剩余的字节替换状态(上面示例中没有状态 ())并替换了 CB.take通过调用一个函数来调用,该函数首先消耗已经读取的字节(来自状态)并调用 await仅在需要时(当状态不够大时)。不幸的是,这也不起作用,因为一旦 Source 没有剩余字节, sequenceSink不执行代码,但状态仍然包含剩余的字节:-(。
如果您应该对代码感兴趣(没有优化或非常好,但应该足以测试):
utf8StringConduit :: forall m. MonadResource m => Conduit ByteString m Text
utf8StringConduit =
CU.sequenceSink [] $ \st ->
do (lengthBytes, st') <- takeWithState BS.empty st 4
let len :: Word32
len = B.decode $ BSL.fromChunks [lengthBytes]
intLength = fromIntegral len
(textBytes, st'') <- takeWithState BS.empty st' intLength
return $ CU.Emit st'' [ TE.decodeUtf8 $ textBytes ]

takeWithState :: Monad m
=> ByteString
-> [ByteString]
-> Int
-> Pipe l ByteString o u m (ByteString, [ByteString])
takeWithState acc state 0 = return (acc, state)
takeWithState acc state neededLen =
let stateLenSum = foldl' (+) 0 $ map BS.length state
in if stateLenSum >= neededLen
then do let (firstChunk:state') = state
(neededChunk, pushBack) = BS.splitAt neededLen firstChunk
acc' = acc `BS.append` neededChunk
neededLen' = neededLen - BS.length neededChunk
state'' = if BS.null pushBack
then state'
else pushBack:state'
takeWithState acc' state'' neededLen'
else do aM <- await
case aM of
Just a -> takeWithState acc (state ++ [a]) neededLen
Nothing -> error "to be fixed later"

最佳答案

对于 Protocol Buffer 解析和序列化,我们使用 messageWithLengthPutMmessageWithLengthGetM (见下文)但我认为它对长度使用 varint 编码,这不是您所需要的。我可能会尝试通过替换 messageWithLength 来调整下面的实现。获取/放置类似的东西

myMessageWithLengthGetM = 
do size <- getWord32be
getMessageWithSize size

但我不知道如何实现 getMessageWithSize使用 Protocol Buffer 包中的可用功能。另一方面,您可以 getByteString然后“重新解析”字节串。

关于管道:您是否尝试过在没有 Data.Conduit.Util 的情况下实现管道? ?就像是
protobufConduit protobufDecode = loop
where
loop =
do len <- liftM convertLen (CB.take 4)
bs <- CB.take len
yield (protobufDecode bs)
loop

这是我们使用的代码:
pbufSerialize :: (ReflectDescriptor w, Wire w) => Conduit w IO ByteString
pbufSerialize = awaitForever f
where f pb = M.mapM_ yield $ BSL.toChunks $ runPut (messageWithLengthPutM pb)

pbufParse :: (ReflectDescriptor w, Wire w, Show w) => Conduit ByteString IO w
pbufParse = new
where
new = read (runGet messageWithLengthGetM . BSL.fromChunks . (:[]))
read parse =
do mbs <- await
case mbs of
Just bs -> checkResult (parse bs)
Nothing -> return ()
checkResult result =
case result of
Failed _ errmsg -> fail errmsg
Partial cont -> read (cont . Just . BSL.fromChunks . (:[]))
Finished rest _ msg ->
do yield msg
checkResult (runGet messageWithLengthGetM rest)

关于haskell - 使用管道的顺序二进制数据解码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12568900/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com