gpt4 book ai didi

java - Echo Server并发1000个客户端(丢失消息+错误连接)

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:58:07 25 4
gpt4 key购买 nike

我正在阅读“Netty In Action V5”。在阅读第 2.3 和 2.4 章时,我尝试使用 EchoServer 和 EchoClient 示例,当我测试连接到服务器的一个客户端时,一切正常......然后我将示例修改为多个客户端可以连接到服务器。我的目的是运行压力测试:1000 个客户端将连接到服务器,每个客户端将向服务器回显 100 条消息,当所有客户端完成后,我将获得所有过程的总时间。服务器部署在linux机器(VPS)上,客户端部署在window机器上。

当运行压力测试时,我遇到了 2 个问题:

一些客户收到错误信息:

java.io.IOException: An existing connection was forcibly closed by the remote host 
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:447)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)\at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:110)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)

但是有些客户端没有收到服务器的消息

工作环境:

  • Netty-all-4.0.30.Final

  • JDK1.8.0_25

  • Echo 客户端部署在 Window 7 Ultimate 上

  • Echo Server部署在Linux Centos 6上

NettyClient 类:

public class NettyClient {
private Bootstrap bootstrap;
private EventLoopGroup group;

public NettyClient(final ChannelInboundHandlerAdapter handler) {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(handler);
}
});
}

public void start(String host, int port) throws Exception {
bootstrap.remoteAddress(new InetSocketAddress(host, port));
bootstrap.connect();
}

public void stop() {
try {
group.shutdownGracefully().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

NettyServer 类:

public class NettyServer {
private EventLoopGroup parentGroup;
private EventLoopGroup childGroup;
private ServerBootstrap boopstrap;

public NettyServer(final ChannelInboundHandlerAdapter handler) {
parentGroup = new NioEventLoopGroup(300);
childGroup = new NioEventLoopGroup(300);
boopstrap = new ServerBootstrap();
boopstrap.group(parentGroup, childGroup);
boopstrap.channel(NioServerSocketChannel.class);
boopstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(handler);
}
});
}

public void start(int port) throws Exception {
boopstrap.localAddress(new InetSocketAddress(port));
ChannelFuture future = boopstrap.bind().sync();
System.err.println("Start Netty server on port " + port);
future.channel().closeFuture().sync();
}

public void stop() throws Exception {
parentGroup.shutdownGracefully().sync();
childGroup.shutdownGracefully().sync();
}
}

类 EchoClient

public class EchoClient {
private static final String HOST = "203.12.37.22";
private static final int PORT = 3344;
private static final int NUMBER_CONNECTION = 1000;
private static final int NUMBER_ECHO = 10;
private static CountDownLatch counter = new CountDownLatch(NUMBER_CONNECTION);

public static void main(String[] args) throws Exception {
List<NettyClient> listClients = Collections.synchronizedList(new ArrayList<NettyClient>());
for (int i = 0; i < NUMBER_CONNECTION; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
NettyClient client = new NettyClient(new EchoClientHandler(NUMBER_ECHO) {
@Override
protected void onFinishEcho() {
counter.countDown();
System.err.println((NUMBER_CONNECTION - counter.getCount()) + "/" + NUMBER_CONNECTION);
}
});
client.start(HOST, PORT);
listClients.add(client);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}).start();
}

long t1 = System.currentTimeMillis();
counter.await();
long t2 = System.currentTimeMillis();
System.err.println("Totla time: " + (t2 - t1));

for (NettyClient client : listClients) {
client.stop();
}
}

private static class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

private static final String ECHO_MSG = "Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo";
private int numberEcho;
private int curNumberEcho = 0;

public EchoClientHandler(int numberEcho) {
this.numberEcho = numberEcho;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_MSG, CharsetUtil.UTF_8));
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
curNumberEcho++;
if (curNumberEcho >= numberEcho) {
onFinishEcho();
} else {
ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_MSG, CharsetUtil.UTF_8));
}
}

protected void onFinishEcho() {

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}

EchoServer 类:

public class EchoServer {
private static final int PORT = 3344;

public static void main(String[] args) throws Exception {
NettyServer server = new NettyServer(new EchoServerHandler());
server.start(PORT);
System.err.println("Start server on port " + PORT);
}

@Sharable
private static class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}

最佳答案

你可能会改变两件事:

  1. 只创建一个客户端 Bootstrap 并为所有客户端重复使用,而不是为每个客户端创建一个。因此,从客户端部分中提取您的 Bootstrap 构建并仅保留连接,就像您在开始时所做的那样。这将在内部限制线程数。

  2. 当达到乒乓数时,在客户端关闭连接。目前,您只调用空方法 onFinishEcho,这会导致客户端根本没有关闭,因此没有客户端停止...因此也没有 channel 关闭...

您可能已经达到了客户端线程数量的一些限制。

另外一个因素也可能是一个问题:您没有指定任何编解码器(字符串编解码器或其他),这可能导致客户端或服务器的部分发送被视为完整响应。

例如,您可能有“Echo Echo Echo”的第一个 block 发送一个包含缓冲区开头的数据包,而其他部分(更多“Echo”)将通过后面的数据包发送。

为防止这种情况,您应该使用一个编解码器来确保您的最终处理程序获得真正的完整消息,而不是部分消息。如果没有,您可能会遇到其他问题,例如服务器端尝试发送额外数据包时出错,而客户端会按预期更快地关闭 channel ......

关于java - Echo Server并发1000个客户端(丢失消息+错误连接),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31881696/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com