- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
Netty 似乎只能通过单个 TCP 连接处理读取或写入操作,但不能同时处理这两种操作。我有一个连接到用 Netty 编写的回显服务器应用程序并发送大约 200k 消息的客户端。
echo 服务器简单地接受客户端连接并发回客户端发送的任何消息。
问题是我无法让 Netty 在全双工模式下使用 TCP 连接。我想在服务器端同时处理读写操作。在我的例子中,Netty 从客户端读取所有消息,然后将它们发回,这导致了高延迟。
客户端应用程序为每个连接触发两个线程。一个用于任何写操作,另一个用于读操作。是的,客户端是以普通的旧 Java IO 风格编写的。
也许问题与我在服务器端设置的 TCP 选项有关:
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, bufferWatermarkHigh)
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, bufferWatermarkLow)
.childOption(ChannelOption.SO_RCVBUF, bufferInSize)
.childOption(ChannelOption.SO_SNDBUF, bufferOutSize)
.childOption(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true);
最佳答案
在您使用 github 存储库提供的示例中,有多个错误:
您直接从channelActive
方法编写
在netty中,有一个策略是每个handler同时只执行一个传入的方法,这是为了让开发更简单,同时为了保证类的方法以正确的顺序执行,它也是这样做的以确保方法的副作用在其他类中可见。
channelReadComplete
channelReadComplete
在当前消息缓冲区清空后调用,channelRead
在调用前可能会被多次调用。
构建消息或计算消息的大小是检测内部有多少字节的方法。如果没有这个成帧器,客户端的 2 次写入可能是服务器的 1 次读取,作为测试,我使用了 new io.netty.handler.codec.FixedLengthFrameDecoder(120)
,所以我可以使用 i++
到达服务器和客户端的消息数量。
**使用重量级打印实现轻量级操作。
根据我的分析器,大部分时间都花在调用 LOG.info()
上,这通常是记录器的情况,因为它们在幕后做很多事情,比如输出同步溪流。通过让记录器仅记录每 1000 条消息,我获得了巨大的速度提升(并且由于我在双核上运行,所以计算机非常慢......)
重发代码
发送代码每次都会重新创建ByteBuf
。通过重用 ByteBuf
,您可以进一步提高发送速度,您可以通过创建 ByteBuf 1 次,然后每次调用 .retain()
来实现通过它。
这很容易做到:
ByteBuf buf = createMessage(MESSAGE_SIZE);
for (int i = 0; i < NUMBER_OF_MESSAGES; ++i) {
ctx.writeAndFlush(buf.retain());
}
减少刷新量
通过减少刷新量,您可以获得更高的 native 性能。每次调用 flush() 都是调用网络堆栈以发送未决消息。如果我们将该规则应用于上面的代码,它将给出以下代码:
ByteBuf buf = createMessage(MESSAGE_SIZE);
for (int i = 0; i < NUMBER_OF_MESSAGES; ++i) {
ctx.write(buf.retain());
}
ctx.flush();
有时,您只想看到结果并亲自尝试一下:
App.java(未更改)
public class App {
public static void main( String[] args ) throws InterruptedException {
final int PORT = 8080;
runInSeparateThread(() -> new Server(PORT));
runInSeparateThread(() -> new Client(PORT));
}
private static void runInSeparateThread(Runnable runnable) {
new Thread(runnable).start();
}
}
客户端.java
public class Client {
public Client(int port) {
EventLoopGroup group = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = createBootstrap(group).connect("192.168.171.102", port).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
private Bootstrap createBootstrap(EventLoopGroup group) {
return new Bootstrap().group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new io.netty.handler.codec.FixedLengthFrameDecoder(200));
ch.pipeline().addLast(new ClientHandler());
}
}
);
}
}
ClientHandler.java
public class ClientHandler extends ChannelInboundHandlerAdapter {
private final Logger LOG = LoggerFactory.getLogger(ClientHandler.class.getSimpleName());
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final int MESSAGE_SIZE = 200;
final int NUMBER_OF_MESSAGES = 200000;
new Thread(()->{
ByteBuf buf = createMessage(MESSAGE_SIZE);
for (int i = 0; i < NUMBER_OF_MESSAGES; ++i) {
ctx.writeAndFlush(buf.retain());
}}).start();
}
int i;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(i++%10000==0)
LOG.info("Got a message back from the server "+(i));
((io.netty.util.ReferenceCounted)msg).release();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
private ByteBuf createMessage(int size) {
ByteBuf message = Unpooled.buffer(size);
for (int i = 0; i < size; ++i) {
message.writeByte((byte) i);
}
return message;
}
}
服务器.java
public class Server {
public Server(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = createServerBootstrap(bossGroup, workerGroup).bind(port).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private ServerBootstrap createServerBootstrap(EventLoopGroup bossGroup,
EventLoopGroup workerGroup) {
return new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new io.netty.handler.codec.FixedLengthFrameDecoder(200));
ch.pipeline().addLast(new ServerHandler());
}
});
}
}
ServerHandler.java
public class ServerHandler extends ChannelInboundHandlerAdapter {
private final Logger LOG = LoggerFactory.getLogger(ServerHandler.class.getSimpleName());
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.writeAndFlush(msg).addListener(f->{if(f.cause()!=null)LOG.info(f.cause().toString());});
if(i++%10000==0)
LOG.info("Send the message back to the client "+(i));
;
}
int i;
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// LOG.info("Send the message back to the client "+(i++));
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
我决定测试如果我改变记录的 incmoing 消息的频率会发生什么,这些是测试结果:
What to print: max message latency time taken*
(always) > 20000 >10 min
i++ % 10 == 0 > 20000 >10 min
i++ % 100 == 0 16000 4 min
i++ % 1000 == 0 0-3000 51 sec
i++ % 10000 == 0 <10000 22 sec
* 时间应该有所保留,没有进行真正的基准测试,程序只有 1 次快速运行
这表明通过减少对日志的调用量(精度),我们可以获得更好的传输率(速度)。
关于java - 是否可以在全双工 TCP 通信中使用 Netty?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35171737/
我是 ZMQ 的新手。我发现 ZMQ 套接字实现比 winsock 简单得多。但我怀疑 “使用 ZMQ TCP 套接字创建的客户端可以与传统的 TCP 服务器通信吗?” 换句话说我的 ZMQ 客户端可
我想使用 TCP 协议(protocol) 将数据发送到 Logstash。为了发送数据,我正在使用 Node-RED。一个简单的配置如下所示: 在 Logstash 文件夹中,我创建了一个名为 no
当我尝试更改窗口缩放选项时,作为 root,我可以通过在 /proc/sys/net/中执行 net.ipv4.tcp_mem=16777000 来更改值。如果我必须更改这 100 个系统,那将需要大
明天做一些练习题,这道做不出来 TCP 服务器连接 TCP 客户端进行通信所需的最小套接字端口数是多少? 肯定只有两个吧?一个用于服务器,一个用于客户端,但这似乎是显而易见的。我的伙伴们认为 TCP
考虑一个存在一个服务器和多个客户端的场景。每个客户端创建 TCP 连接以与服务器交互。 TCP alive的三种用法: 服务器端保活:服务器发送 TCP 保活以确保客户端处于事件状态。如果客户端死了,
TCP TAHOE 和 TCP RENO 有什么区别。 我想知道的是关于 3-dup-ack 和超时的行为? SST 发生了什么变化? 谢谢! 最佳答案 TCP Tahoe 和 Reno 是处理 TC
大家早上好。我一直在阅读(其中大部分在堆栈溢出中)关于如何进行安全密码身份验证(散列 n 次,使用盐等)但我怀疑我将如何在我的 TCP 客户端中实际实现它-服务器架构。 我已经实现并测试了我需要的方法
在遍历 RFC793 时,我开始知道应该以这种方式选择初始序列号段重叠被阻止。 有人能解释一下如果发生重叠,重复段将如何影响 TCP? 最佳答案 不同的操作系统有不同的行为。参见 http://ins
你能举例说明一下tcp/ip中nagle算法的概念吗? 最佳答案 我认为Wikipedia在开头的段落中做得很好。 Nagle's document, Congestion Control in IP
似乎最大 TCP 接收窗口大小为 1GB(使用缩放时)。因此,仍然可以用一个连接填充 100Gb 管道的最大 RTT 是 40ms(因为 2 * 40E-3 * 100E9/8 = 1GB)。这会将这
考虑在两个 TCP 端点之间建立的 TCP 连接,其中一个调用: 关闭():此处,不允许进一步读取或写入。 关机(fd,SHUT_WR):这会将全双工连接转换为单工连接,其中调用 SHUT_WR 的端
我是在 Lua 中编写解析器的新手,我有两个简短的问题。我有一个包含 TCP 选项的数据包,如 MSS、TCP SACK、时间戳、NOP、窗口比例、未知。我基本上是在尝试剖析 TCP 选项字段中的未知
TCP 是否不负责通过在传输过程中发生丢失等情况时采取任何可能必要的措施来确保通过网络完整地发送流? 它做的不对吗? 为什么更高的应用层协议(protocol)及其应用程序仍然执行校验和? 最佳答案
考虑使用 10 Mbps 链路的单个 TCP (Reno) 连接。假设此链路不缓冲数据并且接收方的接收缓冲区比拥塞窗口大得多。设每个 TCP 段的大小为 1500 字节,发送方和接收方之间连接的双向传
考虑这样一个场景,有client-a和server-b。 server-b 禁用了 TCP keepalive。 server-b 没有任何应用程序逻辑来检查 TCP 连接是否打开。 client-a
我正在尝试用 Rust 编写回显服务器。 use std::net::{TcpStream, TcpListener}; use std::io::prelude::*; fn main() {
听说对于TCP连接,服务器会监听一个端口,并使用另一个端口发送数据。 例如,Web 服务器监听端口 80。每当客户端连接到它时,该服务器将使用另一个端口(比如 9999)向客户端发送数据(Web 内容
我试图了解带有标记 PSH 和标记 URG 的 TCP 段之间的区别。我阅读了 RFC,但仍然无法理解,其中一个在将数据发送到进程之前缓冲数据而另一个没有吗? 最佳答案 它们是两种截然不同的机制。 #
有第三方服务公开 TCP 服务器,我的 Node 服务器(TCP 客户端)应使用 tls Node 模块与其建立 TCP 连接。作为 TCP 客户端, Node 服务器同时也是 HTTP 服务器,它应
我正在发送一些 TCP SYN 数据包以获得 TCP RST 的返回。为了识别每个探测器,我在 TCP 序列字段中包含一个计数器。我注意到以下几点: 当SYN probe中的sequence numb
我是一名优秀的程序员,十分优秀!