- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
本问题旨在介绍我在当前项目中遇到的问题。我将在下面介绍我的解决方案。
我正在做一个项目,要求我连接到具有专有协议以传输数据的数据馈送服务器,该服务器实质上以GZIP格式编码在TCP协议的数据部分中,并且需要提取。
来自数据提供者的数据协议的sample application使用Java中的一个简单套接字。我想使其适应scala / netty。另外,值得注意的是,提供的数据可能会分布在多个数据包中。
我一直在寻找有关如何使用Netty.io创建简单的客户端应用程序的简单明了的示例,但是所有示例似乎都过于复杂,并且缺乏足够的解释来简单地实现此目的。
更重要的是,许多netty / scala示例都是针对服务器应用程序的。
Netty教程“ Getting Started”也缺少足够的说明,以使其在实际入门时易于导航。
问题是,如何实现连接到服务器,接收数据并解析结果的简单netty应用程序?
下面是一些示例,旨在帮助您理解此概念:
Echo Client Handler
Scala By The Bay example
最佳答案
我在尝试使用套接字将Java应用程序复制到使用Netty的更复杂方法时遇到了这个问题。
我解决问题的方法是通过了解建立连接所需的netty库的各个元素:
NioEventLoopGroup
Bootstrap
Channel
这3个元素可确保创建并管理连接以进行进一步处理。
此外,使用Netty时还需要其他一些元素:
通道初始化器,通常是从ChannelInitializer子类化的自定义对象
解码器,可以是基于预期接收的消息类型的任何类型,通常是ChannelInboundHandlerAdapter的子类
一个编码器,类似于解码器,但用于传出消息,通常是ChannelOutboundHandlerAdapter的子类
处理程序,实际上告诉netty如何处理接收到的数据。
通道初始化程序负责准备Pipeline,该操作实际上将入站和出站数据通过一系列“过滤器”传递,以便在不同级别上处理数据,每个级别都接收由先前的编码器/解码器处理的数据。
如netty文档中所述,管道的工作方式如下:
I / O请求
通过渠道或
ChannelHandlerContext
|
+ ------------------------------------------------- -+ --------------- +
| ChannelPipeline | |
| \ | / |
| + --------------------- + + ----------- + ---------- + |
| |入站处理程序N | |出站处理程序1 | |
| + ---------- + ---------- + + ----------- + ---------- + |
| / | \ | |
| | \ | / |
| + ---------- + ---------- + + ----------- + ---------- + |
| |入站处理程序N-1 | |出站处理程序2 | |
| + ---------- + ---------- + + ----------- + ---------- + |
| / | \。 |
| 。 。 |
| ChannelHandlerContext.fireIN_EVT()ChannelHandlerContext.OUT_EVT()|
| [方法调用] [方法调用] |
| 。 。 |
| 。 \ | / |
| + ---------- + ---------- + + ----------- + ---------- + |
| |入站处理程序2 | |出库处理器M-1 | |
| + ---------- + ---------- + + ----------- + ---------- + |
| / | \ | |
| | \ | / |
| + ---------- + ---------- + + ----------- + ---------- + |
| |入站处理程序1 | |出库处理程序M | |
| + ---------- + ---------- + + ----------- + ---------- + |
| / | \ | |
+ --------------- + --------------------------------- -+ --------------- +
| \ | /
+ --------------- + --------------------------------- -+ --------------- +
| | | |
| [Socket.read()] [Socket.write()] |
| |
| Netty内部I / O线程(传输实现)|
+ ------------------------------------------------- ------------------ +
在问题的原始上下文中,没有预设的解码器允许解析具有预定字节的自定义数据。从本质上讲,这意味着必须创建用于入站数据的自定义解码器。
让我们从开始作为客户端应用程序启动的连接的基础开始:
导入io.netty.bootstrap.Bootstrap
导入io.netty.channel.nio.NioEventLoopGroup
导入io.netty.channel.socket.nio.NioSocketChannel
导入io.netty.channel.socket.SocketChannel
对象App {
def main(args:Array [String]){
connect()
}
def connect(){
val host =“ host.example.com”
val端口= 9999
val group = new NioEventLoopGroup()//启动事件循环组
尝试{
var b = new Bootstrap()//创建netty引导程序
.group(group)//将NioEventLoopGroup关联到引导程序
.channel(classOf [NioSocketChannel])//将通道关联到引导程序
.handler(MyChannelInitializer)//提供用于处理通道上传入/传出数据的处理程序
var ch = b.connect(host,port).sync()。channel()//启动与服务器的连接并将其链接到netty通道
ch.writeAndFlush(“ SERVICE_REQUEST”)//将请求发送到服务器
ch.closeFuture()。sync()//使连接保持活动状态,而不是在收到第一个数据包后关闭通道
}
赶上{
情况t:Throwable => t.printStackTrace(); group.shutdownGracefully()
}
最后{
group.shutdownGracefully()//关闭事件组
}
}
}
启动引导程序时调用的MyChannelInitializer
是负责告诉程序如何处理传入和传出数据消息的部分:
导入io.netty.channel.ChannelInitializer
导入io.netty.channel.socket.SocketChannel
导入io.netty.handler.codec.string.StringEncoder
对象MyChannelInitializer扩展ChannelInitializer [SocketChannel] {
val STR_ENCODER = new StringEncoder //来自netty的通用StringEecoder仅允许准备一个字符串并将其发送到服务器
def initChannel(ch:SocketChannel){
val pipeline = ch.pipeline()//加载与通道关联的管道
//解码消息
pipeline.addLast(“ packet-decoder”,MyPacketDecoder)//第一个数据“过滤器”为第二个过滤器提取必要的字节
pipeline.addLast(“ gzip-inflater”,MyGZipDecoder)//第二个“过滤器”将内容解压缩
//编码要发送的字符串
pipeline.addLast(“ command-encoder”,STR_ENCODER)//传出数据的字符串编码器
//处理程序
pipeline.addLast(“ message-handler”,MyMessageHandler)//处理完所有“过滤器”后处理结束数据
}
}
在这种情况下,第一个管道项目MyPacketDecoder
已创建为ReplayingDecoder的子类,它提供了一种简单的方法来执行数据包重构,以使消息具有所有必需的字节。 (简单地说,在继续操作之前,等待所有字节收集到ByteBuf变量中)
理解ByteBuf的工作方式对于这种类型的应用程序非常重要,尤其是read和get方法之间的区别,它们分别允许读取和移动读取索引或仅读取数据而不影响读取器索引。
下面提供了MyPacketDecoder
的示例
导入io.netty.handler.codec.ReplayingDecoder
导入io.netty.channel.ChannelHandlerContext
导入io.netty.buffer.ByteBuf
导入java.util.List
对象MyPacketDecoder扩展了ReplayingDecoder [Int] {
值READ_HEADER = 0
值READ_CONTENT = 1
super.state(READ_HEADER)//通过调用超类构造函数来设置Decoder的初始状态
var blockSize:Int = 0 //预期的数据大小(由服务器接收的数据发布)会根据您的情况而有所不同,在处理实际数据之前可能会有其他标头字节
定义解码(ctx:ChannelHandlerContext,in:ByteBuf,out:List [AnyRef]):单位= {
var receive_size = in.izableBytes()
if(state()== READ_HEADER){
blockSize = in.readInt()//标头数据,如果已分段,则在当前数据包和后续数据包中具有预期数据的大小
checkpoint(READ_CONTENT)//更改对象的状态,以便继续获取使消息有效所需的所有必需字节
}
否则if(state()== READ_CONTENT){
var bytes = new Array [Byte](blockSize)
in.getBytes(0,bytes,0,blockSize)//将收集的字节添加到by数组中,以获得由blockSize变量定义的预期大小
var frame = in.readBytes(blockSize)//创建要传递给下一个“过滤器”的字节缓冲区
checkpoint(READ_HEADER)//更改状态以准备下一条消息
out.add(frame)//将数据传递到下一个“过滤器”
}
其他{
抛出新的错误(“案例不包括异常”)
}
}
}
前面的代码从所有数据包中接收字节,直到期望的字节大小,然后将其传递到下一个管道级别。
下一个管道级别处理接收到的数据的GZIP解压缩。这由MyGZipDecoder
对象确保,该对象定义为ByteToMessageDecoder抽象对象的子类,以便将Byte信息作为接收的数据处理:
导入io.netty.handler.codec.ByteToMessageDecoder
导入io.netty.channel.ChannelHandlerContext
导入io.netty.buffer.ByteBuf
导入java.net._
导入java.io._
导入java.util._
导入java.util.zip._
导入java.text._
对象MyGZipDecoder扩展了ByteToMessageDecoder {
值MAX_DATA_SIZE = 100000
var inflater = new Inflater(true)
var compressionData = new Array [Byte](MAX_DATA_SIZE)
var uncompressedData =新的Array [Byte](MAX_DATA_SIZE)
定义解码(ctx:ChannelHandlerContext,in:ByteBuf,out:List [AnyRef]):单位= {
var receive_size = in.izableBytes()//读取可用字节数
in.readBytes(compressedData,0,receive_size)//将字节放入Byte数组
inflater.reset();
inflater.setInput(compressedData,0,receive_size)//准备将充气机解压缩数据
var resultLength = inflater.inflate(uncompressedData)//将数据解压缩为uncompressedData Byte数组
var message = new String(uncompressedData)//从未压缩的数据生成一个字符串
out.add(message)//将数据传递到下一个管道级别
}
}
该解码器对在分组中接收的压缩数据进行解压缩,并将该数据作为从在该级别接收的已解码字节中获得的字符串发送到下一个级别。
难题的最后一部分是MyMessageHandler
对象,该对象实质上是为应用程序所需的目的对数据进行最终处理的。这是SimpleChannelInboundHandler的子类,带有String参数,可以作为通道数据的消息:
导入io.netty.channel。{ChannelHandlerContext,SimpleChannelInboundHandler}
导入io.netty.channel.ChannelHandler.Sharable
@分享
对象QMMessageHandler扩展了SimpleChannelInboundHandler [String] {
def channelRead0(ctx:ChannelHandlerContext,msg:String){
println(“ Handler =>收到消息:” + msg)
//在这里进行数据处理,但是出于应用目的需要
}
}
这实质上满足了此特定示例的要求,该示例连接到使用对基本数据包数据进行GZip压缩的专有数据协议中提供数据的服务器。
希望这可以为那些尝试实现类似方案的人提供良好的基础,但是主要思想是需要一些定制才能创建适用于专有协议的处理。
同样,重要的是要注意,这种类型的实现并非真正用于简单的客户端-服务器连接,而是用于需要数据分配/可伸缩性的应用程序,这些应用程序是由Netty库提供的(即,多个并发连接同时进行)并广播数据)。
对于在编写此答案时可能错过的任何错误,我深表歉意。
我希望这个简短的教程可以对其他人有所帮助,因为我个人不得不花费一些令人沮丧的时间,从网上的点点滴滴中找出答案。
关于scala - Scala Netty如何为基于字节数据的协议(protocol)创建简单的客户端?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33324139/
我有一些 Scala 代码,它用两个不同版本的类型参数化函数做了一些漂亮的事情。我已经从我的应用程序中简化了很多,但最后我的代码充满了形式 w(f[Int],f[Double]) 的调用。哪里w()是
如果我在同一目录中有两个单独的未编译的 scala 文件: // hello.scala object hello { def world() = println("hello world") }
val schema = df.schema val x = df.flatMap(r => (0 until schema.length).map { idx => ((idx, r.g
环境: Play 2.3.0/Scala 2.11.1/IntelliJ 13.1 我使用 Typesafe Activator 1.2.1 用 Scala 2.11.1 创建一个新项目。项目创建好后
我只是想知道如何使用我自己的类扩展 Scala 控制台和“脚本”运行程序,以便我可以通过使用实际的 Scala 语言与其通信来实际使用我的代码?我应将 jar 放在哪里,以便无需临时配置即可从每个 S
我已经根据 README.md 文件安装了 ensime,但是,我在低级 ensime-server 缓冲区中出现以下错误: 信息: fatal error :scala.tools.nsc.Miss
我正在阅读《Scala 编程》一书。在书中,它说“一个函数文字被编译成一个类,当在运行时实例化时它是一个函数值”。并且它提到“函数值是对象,因此您可以根据需要将它们存储在变量中”。 所以我尝试检查函数
我有 hello world scala native 应用程序,想对此应用程序运行小型 scala 测试我使用通常的测试命令,但它抛出异常: NativeMain.scala object Nati
有few resources在网络上,在编写与代码模式匹配的 Scala 编译器插件方面很有指导意义,但这些对生成代码(构建符号树)没有帮助。我应该从哪里开始弄清楚如何做到这一点? (如果有比手动构建
我是 Scala 的新手。但是,我用 创建了一个中等大小的程序。斯卡拉 2.9.0 .现在我想使用一个仅适用于 的开源库斯卡拉 2.7.7 . 是吗可能 在我的 Scala 2.9.0 程序中使用这个
有没有办法在 Scala 2.11 中使用 scala-pickling? 我在 sonatype 存储库中尝试了唯一的 scala-pickling_2.11 工件,但它似乎不起作用。我收到消息:
这与命令行编译器选项无关。如何以编程方式获取代码内的 Scala 版本? 或者,Eclipse Scala 插件 v2 在哪里存储 scalac 的路径? 最佳答案 这无需访问 scala-compi
我正在阅读《Scala 编程》一书,并在第 6 章中的类 Rational 实现中遇到了一些问题。 这是我的 Rational 类的初始版本(基于本书) class Rational(numerato
我是 Scala 新手,我正在尝试开发一个使用自定义库的小项目。我在库内创建了一个mysql连接池。这是我的库的build.sbt organization := "com.learn" name :
我正在尝试运行一些 Scala 代码,只是暂时打印出“Hello”,但我希望在 SBT 项目中编译 Scala 代码之前运行 Scala 代码。我发现在 build.sbt 中有以下工作。 compi
Here链接到 maven Scala 插件使用。但没有提到它使用的究竟是什么 Scala 版本。我创建了具有以下配置的 Maven Scala 项目: org.scala-tools
我对 Scala 还很陌生,请多多包涵。我有一堆包裹在一个大数组中的 future 。 future 已经完成了查看几 TB 数据的辛勤工作,在我的应用程序结束时,我想总结上述 future 的所有结
我有一个 scala 宏,它依赖于通过包含其位置的静态字符串指定的任意 xml 文件。 def myMacro(path: String) = macro myMacroImpl def myMacr
这是我的功能: def sumOfSquaresOfOdd(in: Seq[Int]): Int = { in.filter(_%2==1).map(_*_).reduce(_+_) } 为什么我
这个问题在这里已经有了答案: Calculating the difference between two Java date instances (45 个答案) 关闭 5 年前。 所以我有一个这
我是一名优秀的程序员,十分优秀!