gpt4 book ai didi

NettyProtobuf处理粘包分析

转载 作者:我是一只小鸟 更新时间:2023-02-10 14:31:31 24 4
gpt4 key购买 nike

背景

最近消息中间件项目进行联调,我负责Server端,使用Java的Netty框架。同事负责Client端,使用Go的net包,消息使用Protobuf序列化。联调时Client发送的消息Server端解析出错,经过分析发现是Server与Client粘包处理方式不一致导致,Server使用的是Protobuf提供的粘包处理方式,Client使用的是消息头定义长度的处理方式,探索一下Protobuf粘包处理方式有何不同.

编码类

                        
                          public class ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder<ByteBuf> {

    @Override
    protected void encode(
            ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
        int bodyLen = msg.readableBytes();
        int headerLen = computeRawVarint32Size(bodyLen);
        out.ensureWritable(headerLen + bodyLen);
        writeRawVarint32(out, bodyLen);
        out.writeBytes(msg, msg.readerIndex(), bodyLen);
    }

    /**
     * Writes protobuf varint32 to (@link ByteBuf).
     * @param out to be written to
     * @param value to be written
     */
    static void writeRawVarint32(ByteBuf out, int value) {
        while (true) {
            if ((value & ~0x7F) == 0) {
                out.writeByte(value);
                return;
            } else {
                out.writeByte((value & 0x7F) | 0x80);
                value >>>= 7;
            }
        }
    }

    /**
     * Computes size of protobuf varint32 after encoding.
     * @param value which is to be encoded.
     * @return size of value encoded as protobuf varint32.
     */
    static int computeRawVarint32Size(final int value) {
        if ((value & (0xffffffff <<  7)) == 0) {
            return 1;
        }
        if ((value & (0xffffffff << 14)) == 0) {
            return 2;
        }
        if ((value & (0xffffffff << 21)) == 0) {
            return 3;
        }
        if ((value & (0xffffffff << 28)) == 0) {
            return 4;
        }
        return 5;
    }
}

                        
                      

encode()方法

                        
                          protected void encode(
            ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
        // 获取消息长度
	int bodyLen = msg.readableBytes(); 
	// 计算表示消息体长度所需的字节数量
        int headerLen = computeRawVarint32Size(bodyLen);
	// 拿到所有需要写入的数据长度,对缓冲区进行扩容
        out.ensureWritable(headerLen + bodyLen);
	// 将表示消息体长度的字节写入缓冲区
        writeRawVarint32(out, bodyLen);
        out.writeBytes(msg, msg.readerIndex(), bodyLen);
    }

                        
                      

writeRawVarint32()方法

先看 value & ~0x7F 、 (value & 0x7F) | 0x80 、 value >>>= 7 这几个看不懂的地方, & 、 | 、 ~ 、 >>>= 这些符号为计算机的位运算符号,分别代表与、或、非、忽略符号位右移( a>>>=n 相当于 a = a>>>n ) 。

计算 value & ~0x7F

分别假设value值为 100 、 200 。

100 转二进制为 01100100 , 200 转二进制为 11001000 。

计算 100 & ~0x7F 。

十进制 十六进制 运算符


100 0x64 0 1 1 0 0 1 0 0
-128 ~0x7f & 1 0 0 0 0 0 0 0
0 0x00
0 0 0 0 0 0 0 0

计算 200 & ~0x7F 。

十进制 十六进制 运算符


200 0xc8 1 1 0 0 1 0 0 0
-128 ~0x7f & 1 0 0 0 0 0 0 0
128 0x80
1 0 0 0 0 0 0 0

这里运算结果使用十进制表示二进制是不准确的,仅作参考,需要根据数据类型进行转换,比如: 10000000 转换为byte类型是 -128 ,转换为int是 128 。

通过以上计算可以看出:

可以使用小于7个位表示的数字即可满足条件,7个位可以表示$2^7=128$个数字,取值范围是 0~127 ,也就是说 0~127 可以满足条件,这一步的目的是保证写入 表示消息体长度 的最后一位字节是正数,后面会说到.

value=100 满足条件,所以向 bytebuf 中写入字节 01100100 ,然后 return 方法结束.

value=200 不满足条件,那么看 (value & 0x7F) | 0x80 这一步运算.

计算 (value & 0x7F) | 0x80

十进制 十六进制 运算符


200 0xc8 1 1 0 0 1 0 0 0
127 0x7f & 0 1 1 1 1 1 1 1
- - - - - - - - - - -
72 0x48
0 1 0 0 1 0 0 0
128 0x80 | 1 0 0 0 0 0 0 0
- - - - - - - - - - -
200 0xc8 1 1 0 0 1 0 0 0

计算结果还是 200 ,我们分析一下步骤:

value & 0x7f :取出最后七个位, |0x80 :将首位转为1 。

即取出最后7个位,高位补1,正好一个字节的长度,将 11001000 写入 bytebuf ,再看 value >>>= 7 .

计算 value >>>= 7

十进制 运算符


200 1 1 0 0 1 0 0 0

>>>






1 1 0 0 1 0 0 0
1
0 0 0 0 0 0 0 1

忽略符号右移其实就是将后7位挤出去,在前边补7个0.

计算机中一般首位是符号位,0表示正数,1表示负数.

这里需要注意是 >> 表示右移,不改变符号,最高位与原来符号保持一致。 >>> 是忽略符号位右移,最高位补0.

200 >>>= 7 的结果为 00000001 ,继续走 if 判断,满足条件,将 00000001 写入 bytebuf .

最终 value=200 写入 bytebuf 的字节是 11001000 、 00000001 .

至此,三个看不懂的位运算都理解了,那么我们连起来看一下:

如果 value 可以用7个字节表示(或者说是value在 0~127 范围内),将value转换为字节写入 bytebuf ,跳出循环,方法结束.

如果 value 不能用7个字节表示(或者说是value不在 0~127 范围内),取最后7个位,高位补1,写入 bytebuf 中,右移7位(将刚才取出的7位删掉),再次判断是否满足 if 条件,不满足就继续上面的操作,直到满足条件为止.

总结一下 writeRawVarint32 方法,其实是把一个整数拆分成多个字节,倒序写入 bytebuf 中,如果将每个字节转换为 byte 类型,最后一个字节总是正数,前面的字节都是负数。我们可以猜测,接收消息时以第一个正数为分割,将表示消息体长度的字节与消息体字节拆分开,再通过位运算将前者组合起来就得到了消息体的长度.

computeRawVarint32Size()方法

我们在 writeRawVarint32 方法分析中了解了位运算,再看 computeRawVarint32Size 方法就很简单了.

计算机表示负数

0xffffffff 转换为二进制是 11111111 1111111 11111111 11111111 转换为有 有符号int 类型是 -1 。为什么是 -1 ?

因为计算机使用二进制可以做加法运算,但是没办法做减法运算,加上一个负数就相当于做了减法运算,现在问题是如何表示负数?

曾经有原码表示法、反码表示法,这里不做赘述,现在使用的是补码表示法.

补码表示法是将正数的二进制取反,然后在最后一位+1.

通过例子看一下:

有符号int 类型的1用二进制可以表示为 00000000 0000000 00000000 0000001 取反得到 11111111 11111111 11111111 11111110 +1得到 11111111 11111111 11111111 11111111 转换为十六进制是 0xffffffff .

计算 value & (0xffffffff << 7)

<< 表示左移,从左边挤出去7个位,在右边补7个0.

这里仍然假设 value 分为为100,200.

                        
                          // 计算(100 & (0xffffffff <<  7))
  00000000 0000000 00000000 01100100 // 100
& 11111111 1111111 11111111 10000000 // 0xffffffff <<  7 
  00000000 0000000 00000000 00000000 // 结果:0

// 计算(200 & (0xffffffff <<  7))
  00000000 0000000 00000000 11001000 // 200
& 11111111 1111111 11111111 10000000 // 0xffffffff <<  7
  00000000 0000000 00000000 10000000 // 结果:128

// 计算(200 & (0xffffffff <<  14))
  00000000 0000000 00000000 11001000 // 200
& 11111111 1111111 11000000 00000000 // 0xffffffff <<  14
  00000000 0000000 00000000 00000000 // 结果:0

                        
                      

从以上计算可以看出,如果value可以用小于7个位来表示,则左移7个位可以满足,如果value可以用8~14个位来表示,左移14个位可以满足.

100、200计算结果分别为1、2,与 writeRawVarint32 方法写入的字节数量一致.

writeRawVarint32 是方法7个7个的取出位,这里按7个位来计算所需字节数量,最终返回表示消息体长度的字节数量.

解码类

                        
                          public class ProtobufVarint32FrameDecoder extends ByteToMessageDecoder {

    // TODO maxFrameLength + safe skip + fail-fast option
    //      (just like LengthFieldBasedFrameDecoder)

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
        in.markReaderIndex();
        int preIndex = in.readerIndex();
        int length = readRawVarint32(in);
        if (preIndex == in.readerIndex()) {
            return;
        }
        if (length < 0) {
            throw new CorruptedFrameException("negative length: " + length);
        }

        if (in.readableBytes() < length) {
            in.resetReaderIndex();
        } else {
            out.add(in.readRetainedSlice(length));
        }
    }

    /**
     * Reads variable length 32bit int from buffer
     *
     * @return decoded int if buffers readerIndex has been forwarded else nonsense value
     */
    private static int readRawVarint32(ByteBuf buffer) {
        if (!buffer.isReadable()) {
            return 0;
        }
        buffer.markReaderIndex();
        byte tmp = buffer.readByte();
        if (tmp >= 0) {
            return tmp;
        } else {
            int result = tmp & 127;
            if (!buffer.isReadable()) {
                buffer.resetReaderIndex();
                return 0;
            }
            if ((tmp = buffer.readByte()) >= 0) {
                result |= tmp << 7;
            } else {
                result |= (tmp & 127) << 7;
                if (!buffer.isReadable()) {
                    buffer.resetReaderIndex();
                    return 0;
                }
                if ((tmp = buffer.readByte()) >= 0) {
                    result |= tmp << 14;
                } else {
                    result |= (tmp & 127) << 14;
                    if (!buffer.isReadable()) {
                        buffer.resetReaderIndex();
                        return 0;
                    }
                    if ((tmp = buffer.readByte()) >= 0) {
                        result |= tmp << 21;
                    } else {
                        result |= (tmp & 127) << 21;
                        if (!buffer.isReadable()) {
                            buffer.resetReaderIndex();
                            return 0;
                        }
                        result |= (tmp = buffer.readByte()) << 28;
                        if (tmp < 0) {
                            throw new CorruptedFrameException("malformed varint.");
                        }
                    }
                }
            }
            return result;
        }
    }
}

                        
                      

readRawVarint32()方法

方法就不细看了,验证一下我们之前的猜测。还是使用 value=200 写入的 11001000 、 00000001 字节举例看一下.

                        
                          private static int readRawVarint32(ByteBuf buffer) {
        if (!buffer.isReadable()) {
            return 0;
        }
        buffer.markReaderIndex();
	// 读取第一个字节
        byte tmp = buffer.readByte(); // tmp = 11001000 首位是1,是个负数,小于0
	// 判断是否大于等于0,
	// 大于等于0说明是最后一个表示消息体长度的字节,直接return
        if (tmp >= 0) {
            return tmp;
        } else {
	    // 小于0 tmp & 127 取出后七位
            int result = tmp & 127; // result = 11001000 & 01111111 = 01001000
            if (!buffer.isReadable()) {
                buffer.resetReaderIndex();
                return 0;
            }
	    // 再取第二个字节,判断是否大于等于0 
            if ((tmp = buffer.readByte()) >= 0) { //tmp = 00000001  
		// 这一步操作相当于是把上一步取出的7个字节拿出来拼在tmp<<7的后面
                result |= tmp << 7;
		// result = 01001000 | tmp << 7
		// tmp << 7 = 10000000
                // 01001000 | 10000000 = 11001000 转换为int类型是200
            }
	    // 后面的代码与以上步骤大同小异,不再赘述了
            return result;
        }
    }

                        
                      

总结

涉及到的基础知识:计算机表示整数、位运算、进制转换 。

一般处理粘包的方式有三种:

  1. 定长消息:每次发送消息的长度固定,比如,总是发送100个字节.

  2. 特殊符号分割:以特殊字符作为分隔符,读到特殊字符时,认为上一条消息结束.

  3. 消息头定义长度:在消息体前增加消息体的长度,一般使用四个字节,读取消息时先读取四个字节,得到消息体长度,再根据长度读取消息.

Netty Protobuf提供的处理粘包处理方式是在消息体前加正负数,并且以第一个正数作为分割。可以说是消息头定义长度方式+特殊符号分割方式的结合版。巧妙利用二进制的位运算和计算机表示整数的特点实现动态消息长度,发送较短消息时可以比消息头定义长度的方式节省1-3个字节.

博客小白的第一篇文档,如有错误,还望指正.

最后此篇关于NettyProtobuf处理粘包分析的文章就讲到这里了,如果你想了解更多关于NettyProtobuf处理粘包分析的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com