gpt4 book ai didi

scala - Scala Netty如何为基于字节数据的协议(protocol)创建简单的客户端?

转载 作者:行者123 更新时间:2023-12-04 17:53:33 24 4
gpt4 key购买 nike

本问题旨在介绍我在当前项目中遇到的问题。我将在下面介绍我的解决方案。

我正在做一个项目,要求我连接到具有专有协议以传输数据的数据馈送服务器,该服务器实质上以GZIP格式编码在TCP协议的数据部分中,并且需要提取。

来自数据提供者的数据协议的sample application使用Java中的一个简单套接字。我想使其适应scala / netty。另外,值得注意的是,提供的数据可能会分布在多个数据包中。

我一直在寻找有关如何使用Netty.io创建简单的客户端应用程序的简单明了的示例,但是所有示例似乎都过于复杂,并且缺乏足够的解释来简单地实现此目的。
更重要的是,许多netty / scala示例都是针对服务器应用程序的。

Netty教程“ Getting Started”也缺少足够的说明,以使其在实际入门时易于导航。

问题是,如何实现连接到服务器,接收数据并解析结果的简单netty应用程序?

下面是一些示例,旨在帮助您理解此概念:


Echo Client Handler
Scala By The Bay example

最佳答案

我在尝试使用套接字将Java应用程序复制到使用Netty的更复杂方法时遇到了这个问题。

我解决问题的方法是通过了解建立连接所需的netty库的各个元素:


NioEventLoopGroup
Bootstrap
Channel


这3个元素可确保创建并管理连接以进行进一步处理。

此外,使用Netty时还需要其他一些元素:


通道初始化器,通常是从ChannelInitializer子类化的自定义对象
解码器,可以是基于预期接收的消息类型的任何类型,通常是ChannelInboundHandlerAdapter的子类
一个编码器,类似于解码器,但用于传出消息,通常是ChannelOutboundHandlerAdapter的子类
处理程序,实际上告诉netty如何处理接收到的数据。


通道初始化程序负责准备Pipeline,该操作实际上将入站和出站数据通过一系列“过滤器”传递,以便在不同级别上处理数据,每个级别都接收由先前的编码器/解码器处理的数据。

如netty文档中所述,管道的工作方式如下:



I / O请求
通过渠道或
ChannelHandlerContext
|
+ ------------------------------------------------- -+ --------------- +
| ChannelPipeline | |
| \ | / |
| + --------------------- + + ----------- + ---------- + |
| |入站处理程序N | |出站处理程序1 | |
| + ---------- + ---------- + + ----------- + ---------- + |
| / | \ | |
| | \ | / |
| + ---------- + ---------- + + ----------- + ---------- + |
| |入站处理程序N-1 | |出站处理程序2 | |
| + ---------- + ---------- + + ----------- + ---------- + |
| / | \。 |
| 。 。 |
| ChannelHandlerContext.fireIN_EVT()ChannelHandlerContext.OUT_EVT()|
| [方法调用] [方法调用] |
| 。 。 |
| 。 \ | / |
| + ---------- + ---------- + + ----------- + ---------- + |
| |入站处理程序2 | |出库处理器M-1 | |
| + ---------- + ---------- + + ----------- + ---------- + |
| / | \ | |
| | \ | / |
| + ---------- + ---------- + + ----------- + ---------- + |
| |入站处理程序1 | |出库处理程序M | |
| + ---------- + ---------- + + ----------- + ---------- + |
| / | \ | |
+ --------------- + --------------------------------- -+ --------------- +
| \ | /
+ --------------- + --------------------------------- -+ --------------- +
| | | |
| [Socket.read()] [Socket.write()] |
| |
| Netty内部I / O线程(传输实现)|
+ ------------------------------------------------- ------------------ +



在问题的原始上下文中,没有预设的解码器允许解析具有预定字节的自定义数据。从本质上讲,这意味着必须创建用于入站数据的自定义解码器。

让我们从开始作为客户端应用程序启动的连接的基础开始:


导入io.netty.bootstrap.Bootstrap
导入io.netty.channel.nio.NioEventLoopGroup
导入io.netty.channel.socket.nio.NioSocketChannel
导入io.netty.channel.socket.SocketChannel

对象App {
def main(args:Array [String]){
connect()
}

def connect(){
val host =“ host.example.com”
val端口= 9999
val group = new NioEventLoopGroup()//启动事件循环组

尝试{
var b = new Bootstrap()//创建netty引导程序
.group(group)//将NioEventLoopGroup关联到引导程序
.channel(classOf [NioSocketChannel])//将通道关联到引导程序
.handler(MyChannelInitializer)//提供用于处理通道上传入/传出数据的处理程序


var ch = b.connect(host,port).sync()。channel()//启动与服务器的连接并将其链接到netty通道

ch.writeAndFlush(“ SERVICE_REQUEST”)//将请求发送到服务器

ch.closeFuture()。sync()//使连接保持活动状态,而不是在收到第一个数据包后关闭通道
}
赶上{
情况t:Throwable => t.printStackTrace(); group.shutdownGracefully()
}
最后{
group.shutdownGracefully()//关闭事件组
}
}
}



启动引导程序时调用的MyChannelInitializer是负责告诉程序如何处理传入和传出数据消息的部分:


导入io.netty.channel.ChannelInitializer
导入io.netty.channel.socket.SocketChannel
导入io.netty.handler.codec.string.StringEncoder

对象MyChannelInitializer扩展ChannelInitializer [SocketChannel] {

val STR_ENCODER = new StringEncoder //来自netty的通用StringEecoder仅允许准备一个字符串并将其发送到服务器

def initChannel(ch:SocketChannel){
val pipeline = ch.pipeline()//加载与通道关联的管道

//解码消息
pipeline.addLast(“ packet-decoder”,MyPacketDecoder)//第一个数据“过滤器”为第二个过滤器提取必要的字节
pipeline.addLast(“ gzip-inflater”,MyGZipDecoder)//第二个“过滤器”将内容解压缩

//编码要发送的字符串
pipeline.addLast(“ command-encoder”,STR_ENCODER)//传出数据的字符串编码器

//处理程序
pipeline.addLast(“ message-handler”,MyMessageHandler)//处理完所有“过滤器”后处理结束数据
}
}



在这种情况下,第一个管道项目MyPacketDecoder已创建为ReplayingDecoder的子类,它提供了一种简单的方法来执行数据包重构,以使消息具有所有必需的字节。 (简单地说,在继续操作之前,等待所有字节收集到ByteBuf变量中)

理解ByteBuf的工作方式对于这种类型的应用程序非常重要,尤其是read和get方法之间的区别,它们分别允许读取和移动读取索引或仅读取数据而不影响读取器索引。

下面提供了MyPacketDecoder的示例


导入io.netty.handler.codec.ReplayingDecoder
导入io.netty.channel.ChannelHandlerContext
导入io.netty.buffer.ByteBuf
导入java.util.List

对象MyPacketDecoder扩展了ReplayingDecoder [Int] {

值READ_HEADER = 0
值READ_CONTENT = 1

super.state(READ_HEADER)//通过调用超类构造函数来设置Decoder的初始状态

var blockSize:Int = 0 //预期的数据大小(由服务器接收的数据发布)会根据您的情况而有所不同,在处理实际数据之前可能会有其他标头字节

定义解码(ctx:ChannelHandlerContext,in:ByteBuf,out:List [AnyRef]):单位= {

var receive_size = in.izableBytes()

if(state()== READ_HEADER){
blockSize = in.readInt()//标头数据,如果已分段,则在当前数据包和后续数据包中具有预期数据的大小

checkpoint(READ_CONTENT)//更改对象的状态,以便继续获取使消息有效所需的所有必需字节
}
否则if(state()== READ_CONTENT){

var bytes = new Array [Byte](blockSize)
in.getBytes(0,bytes,0,blockSize)//将收集的字节添加到by数组中,以获得由blockSize变量定义的预期大小

var frame = in.readBytes(blockSize)//创建要传递给下一个“过滤器”的字节缓冲区

checkpoint(READ_HEADER)//更改状态以准备下一条消息
out.add(frame)//将数据传递到下一个“过滤器”
}
其他{
抛出新的错误(“案例不包括异常”)
}
}

}



前面的代码从所有数据包中接收字节,直到期望的字节大小,然后将其传递到下一个管道级别。

下一个管道级别处理接收到的数据的GZIP解压缩。这由MyGZipDecoder对象确保,该对象定义为ByteToMessageDecoder抽象对象的子类,以便将Byte信息作为接收的数据处理:


导入io.netty.handler.codec.ByteToMessageDecoder
导入io.netty.channel.ChannelHandlerContext
导入io.netty.buffer.ByteBuf
导入java.net._
导入java.io._
导入java.util._
导入java.util.zip._
导入java.text._

对象MyGZipDecoder扩展了ByteToMessageDecoder {

值MAX_DATA_SIZE = 100000

var inflater = new Inflater(true)
var compressionData = new Array [Byte](MAX_DATA_SIZE)
var uncompressedData =新的Array [Byte](MAX_DATA_SIZE)

定义解码(ctx:ChannelHandlerContext,in:ByteBuf,out:List [AnyRef]):单位= {

var receive_size = in.izableBytes()//读取可用字节数

in.readBytes(compressedData,0,receive_size)//将字节放入Byte数组

inflater.reset();
inflater.setInput(compressedData,0,receive_size)//准备将充气机解压缩数据
var resultLength = inflater.inflate(uncompressedData)//将数据解压缩为uncompressedData Byte数组

var message = new String(uncompressedData)//从未压缩的数据生成一个字符串

out.add(message)//将数据传递到下一个管道级别
}
}



该解码器对在分组中接收的压缩数据进行解压缩,并将该数据作为从在该级别接收的已解码字节中获得的字符串发送到下一个级别。

难题的最后一部分是MyMessageHandler对象,该对象实质上是为应用程序所需的目的对数据进行最终处理的。这是SimpleChannelInboundHandler的子类,带有String参数,可以作为通道数据的消息:


导入io.netty.channel。{ChannelHandlerContext,SimpleChannelInboundHandler}
导入io.netty.channel.ChannelHandler.Sharable

@分享
对象QMMessageHandler扩展了SimpleChannelInboundHandler [String] {

def channelRead0(ctx:ChannelHandlerContext,msg:String){

println(“ H​​andler =>收到消息:” + msg)
//在这里进行数据处理,但是出于应用目的需要

}
}



这实质上满足了此特定示例的要求,该示例连接到使用对基本数据包数据进行GZip压缩的专有数据协议中提供数据的服务器。

希望这可以为那些尝试实现类似方案的人提供良好的基础,但是主要思想是需要一些定制才能创建适用于专有协议的处理。

同样,重要的是要注意,这种类型的实现并非真正用于简单的客户端-服务器连接,而是用于需要数据分配/可伸缩性的应用程序,这些应用程序是由Netty库提供的(即,多个并发连接同时进行)并广播数据)。

对于在编写此答案时可能错过的任何错误,我深表歉意。

我希望这个简短的教程可以对其他人有所帮助,因为我个人不得不花费一些令人沮丧的时间,从网上的点点滴滴中找出答案。

关于scala - Scala Netty如何为基于字节数据的协议(protocol)创建简单的客户端?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33324139/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com