gpt4 book ai didi

java - Netty ByteToMessageCodec 两次解码消息(部分)

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:46:27 26 4
gpt4 key购买 nike

我正在使用 EmbeddedChannel测试我的 handlerscodecs以下列格式处理消息:

 +------------+------------------+----------------+      
| Header | Payload Length | Payload |
| 16 bytes | 2 bytes | "Some data" |
+------------+------------------+----------------+

首先,我要实现的目标:

  1. 通过创建一个对象来处理前 16 个字节以存储 header 详细信息并将解码后的 header 对象添加到 AttributeMapChannelHandlerContext供以后使用;
  2. 等待/检索整个负载数据;
  3. 将 Header 对象和整个负载设为 ByteBuf在最终处理程序上可用以路由消息。

我使用以下处理程序:

  1. ByteToMessageCodec<ByteBuf>提取 header 信息并将其添加到属性列表。
  2. LengthFieldBasedFrameDecoder读取有效负载长度并等待/检索整个帧。
  3. SimpleChannelInboundHandler它将使用从属性列表中检索到的 header 对象来相应地路由有效负载。

当消息传递到 decodeByteToMessageCodec 的方法,标题被正确处理和提取。然后我继续将 Header 对象添加到 AttributeMap并添加 ByteBuf (它有一个 readableBytes = 2 个字节(有效载荷长度指示符)+ 有效载荷长度)。

假设负载长度为 1020 字节。该消息最初由 codec 接收会有readableBytes = 16 bytes + 2 bytes + 1020 bytes . header 由 decode 读取方法和剩余的可用字节 (1022) 然后添加到 List<Object> out .

如果我的理解是正确的,剩余的字节现在将传递给下一个处理程序,即 LengthFieldBasedFrameDecoder。这将读取长度指示器并将有效负载(1020 字节)传递给 SimpleChannelHanlder ,但我一定是弄错了。

decode方法被再次调用,添加到 List<Object> out 的相同 1022 字节.

在 decode 方法的 JavaDoc 中有以下内容:

Decode the from one ByteBuf to an other. This method will be called till either the input ByteBuf
has nothing to read when return from this method or till nothing was read from the input ByteBuf.

这是否意味着 decode将被调用直到 readableBytes == 0

将消息的其余部分传递给 LengthFieldBasedFrameDecoder 的最有效方法是什么? ?

我假设 LengthFieldBasedFrameDecoder需要 ByteBuf作为输入,这是否意味着我需要设置 readerIndex = 0并将 ByteBuf 的副本添加到 List<Object> out

任何帮助/建议/批评都将不胜感激,我想以最干净的方式做到这一点。

这是我的 decode方法:

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
byte [] headerBytes = new byte[HEADER_LENGTH];
in.readBytes(headerBytes, 0, HEADER_LENGTH);

Header header = new Header(headerBytes);
System.out.println("Decoded Header: \n" + header);

//Set the header attribute so it can be used by routing handlers
ctx.attr(ChannelAttributes.HEADER).getAndSet(header);
//pass to next handler
out.add(in);
}

注意:我正在阅读 Netty in Action MEAP v8

最佳答案

Does this mean decode will be called until readableBytes == 0?

基本上,是的。 ByteToMessageDecoder 的简化 View 如下所示

while (in.isReadable()) {
int outputSizeBefore = out.size();
int readableBytesBefore = in.readableBytes();

callYourDecodeImpl(ctx, in, out);

int outputSizeAfter = out.size();
int readableBytesAfter = in.readableBytes();

boolean didNotDecodeAnything = outputSizeBefore == outputSizeAfter;
boolean didNotReadAnything = readableBytesBefore == readableBytesAfter;

if(didNotDecodeAnything && didNotReadAnything) {
break;
}

// next iteration, continue with decoding
}

因此,您的解码器将持续读取 header ,直到输入缓冲区耗尽。

要获得您想要的行为,您必须将 isSingleDecode 标志设置为 true:

class MyDecoder extends ByteToMessageDecoder {

MyDecoder() {
setSingleDecode(true);
}

// your decode impl as before
}

MyDecoder decoder = new MyDecoder();
decoder.setSingleDecode(true);

这将在您的解码实现解码某些内容后停止循环。现在,您的 LengthFieldBasedFrameDecoder 将使用您添加到 out 列表的 ByteBuf 进行调用。帧解码按照您的描述工作,无需向列表添加副本。您的 SimpleChannelInboundHandler 将使用负载帧作为 msg 调用。

但是,您将无法从 SimpleChannelInboundHandler 中的 AttributeMap 中读取 header 由于 ChannelHandlerContext 对于每个 channel 处理程序都是不同的,因此不共享属性。

解决此问题的一种方法是为此使用一个事件。在您的解码器中,不要将Header 添加到AttributeMap,而是将其作为事件发送:

// instead of
// ctx.attr(Header.ATTRIBUTE_KEY).getAndSet(header);
// do this
ctx.fireUserEventTriggered(ChannelAttributes.HEADER);

然后,像这样编写您的SimpleChannelInboundHandler

class MyMessageHandler extends SimpleChannelInboundHandler<ByteBuf> {

private Header header = null;

MyMessageHandler() {
super(true);
}

@Override
public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
if (evt instanceof Header) {
header = (Header) evt;
} else {
super.userEventTriggered(ctx, evt);
}
}

@Override
protected void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) throws Exception {
if (header != null) {
System.out.println("header = " + header);
// continue with header, such as routing...
}
header = null;
}
}

另一种方法是将两个对象都发送到管道中并使用ChannelInboundHandlerAdapter 而不是 SimpleChannelInboundHandler。在您的解码器中,不是将Header 添加到AttributeMap,而是将它添加到out:

// ...
out.add(header);
out.add(in);

然后,像这样编写你的ChannelInboundHandler

class MyMessageHandler extends ChannelInboundHandlerAdapter {
private Header header = null;

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
if (msg instanceof Header) {
header = (Header) msg;
System.out.println("got the header " + header);
} else if (msg instanceof ByteBuf) {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("got the message " + msg);
try {
// continue with header, such as routing...
} finally {
ReferenceCountUtil.release(msg);
}
} else {
super.channelRead(ctx, msg);
}
}
}

LengthFieldBasedFrameDecoder 简单地忽略不是 ByteBuf 的消息,所以你的 Header 只会传递它(假设它没有实现 ByteBuf)并且到达您的 ChannelInboundHandler。然后,消息将被解码为有效负载帧并传递给您的 ChannelInboundHandler

关于java - Netty ByteToMessageCodec<ByteBuf> 两次解码消息(部分),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26604579/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com