gpt4 book ai didi

java - 当我添加 ExecutionHandler 时,netty 服务器似乎被阻止了?

转载 作者:行者123 更新时间:2023-12-02 07:55:36 24 4
gpt4 key购买 nike

场景:

我正在编写一个 echo 客户端和服务器。正在传输的数据是一个字符串:

客户端编码一个字符串,并将其发送到服务器。服务器接收数据,解码字符串,然后将接收到的字符串编码,发送回客户端。

以上过程将重复100000次。(注意:连接是持久的)。

不同条件:

当我同时运行一台服务器和两个客户端时,一切正常,每个客户端收到 100000 条消息并正常终止。

但是当我在服务器上添加一个ExecutionHandler,然后同时运行一台服务器和两个客户端时,一个客户端将永远不会终止,并且网络流量是零。

我暂时找不到这个问题的关键点,您能给我一些建议吗?

我的代码:

字符串编码器、字符串解码器、客户端处理程序、服务器处理程序、客户端主程序、服务器主程序。

//解码器============================================== ===========

import java.nio.charset.Charset;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;

public class Dcd extends FrameDecoder {
public static final Charset cs = Charset.forName("utf8");

@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer) throws Exception {

if (buffer.readableBytes() < 4) {
return null;
}

int headlen = 4;
int length = buffer.getInt(0);
if (buffer.readableBytes() < length + headlen) {
return null;
}

String ret = buffer.toString(headlen, length, cs);
buffer.skipBytes(length + headlen);

return ret;
}
}

//编码器============================================== ===========

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;

public class Ecd extends OneToOneEncoder {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
if (!(msg instanceof String)) {
return msg;
}

byte[] data = ((String) msg).getBytes();

ChannelBuffer buf = ChannelBuffers.dynamicBuffer(data.length + 4, ctx
.getChannel().getConfig().getBufferFactory());
buf.writeInt(data.length);
buf.writeBytes(data);

return buf;
}
}

//客户端处理程序 ============================================= ===========

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;

/**
* Handler implementation for the echo client. It initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*/
public class EchoClientHandler extends SimpleChannelUpstreamHandler {

private static final Logger logger = Logger
.getLogger(EchoClientHandler.class.getName());

private final AtomicLong transferredBytes = new AtomicLong();
private final AtomicInteger counter = new AtomicInteger(0);
private final AtomicLong startTime = new AtomicLong(0);

private String dd;

/**
* Creates a client-side handler.
*/
public EchoClientHandler(String data) {
dd = data;
}

public long getTransferredBytes() {
return transferredBytes.get();
}

@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
// Send the first message. Server will not send anything here
// because the firstMessage's capacity is 0.
startTime.set(System.currentTimeMillis());

Channels.write(ctx.getChannel(), dd);
}

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
// Send back the received message to the remote peer.
transferredBytes.addAndGet(((String) e.getMessage()).length());
int i = counter.incrementAndGet();
int N = 100000;
if (i < N) {
e.getChannel().write(e.getMessage());
} else {
ctx.getChannel().close();
System.out.println(N * 1.0
/ (System.currentTimeMillis() - startTime.get()) * 1000);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
}

//客户端主 ============================================= ===========

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;

/**
* Sends one message when a connection is open and echoes back any received data
* to the server. Simply put, the echo client initiates the ping-pong traffic
* between the echo client and server by sending the first message to the
* server.
*/
public class EchoClient {

private final String host;
private final int port;

public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}

public void run() {
// Configure the client.
final ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));

// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new Dcd(), new Ecd(),
new EchoClientHandler("abcdd"));
}
});

bootstrap.setOption("sendBufferSize", 1048576);
bootstrap.setOption("receiveBufferSize", 1048576);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("writeBufferLowWaterMark", 32 * 1024);
bootstrap.setOption("writeBufferHighWaterMark", 64 * 1024);

List<ChannelFuture> list = new ArrayList<ChannelFuture>();
for (int i = 0; i < 1; i++) {
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(
host, port));
// Wait until the connection is closed or the connection
// attempt
// fails.
list.add(future);
}

for (ChannelFuture f : list) {
f.getChannel().getCloseFuture().awaitUninterruptibly();
}

// Shut down thread pools to exit.
bootstrap.releaseExternalResources();
}

private static void testOne() {
final String host = "192.168.0.102";
final int port = 8000;

new EchoClient(host, port).run();
}

public static void main(String[] args) throws Exception {
testOne();
}
}

//服务器处理程序 ============================================= ===========

import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;

/**
* Handler implementation for the echo server.
*/
public class EchoServerHandler extends SimpleChannelUpstreamHandler {

private static final Logger logger = Logger
.getLogger(EchoServerHandler.class.getName());

private final AtomicLong transferredBytes = new AtomicLong();

public long getTransferredBytes() {
return transferredBytes.get();
}

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
// Send back the received message to the remote peer.
transferredBytes.addAndGet(((String) e.getMessage()).length());
Channels.write(ctx.getChannel(), e.getMessage());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
}

//服务器主============================================= ===========

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;

/**
* Echoes back any received data from a client.
*/
public class EchoServer {

private final int port;

public EchoServer(int port) {
this.port = port;
}

public void run() {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));

System.out.println(Runtime.getRuntime().availableProcessors() * 2);

final ExecutionHandler executionHandler = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));

// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
System.out.println("new pipe");
return Channels.pipeline(new Dcd(), new Ecd(),
executionHandler, new EchoServerHandler());
}
});

bootstrap.setOption("child.sendBufferSize", 1048576);
bootstrap.setOption("child.receiveBufferSize", 1048576);
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.writeBufferLowWaterMark", 32 * 1024);
bootstrap.setOption("child.writeBufferHighWaterMark", 64 * 1024);

// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(port));
}

public static void main(String[] args) throws Exception {
int port = 8000;
new EchoServer(port).run();
}
}

最佳答案

我现在找到了原因,虽然辛苦,但充满乐趣。

当添加 ExecutionHandler 时,消息将被包装到 Runnable 任务中,并在 ChildExecutor 中执行。关键点在于:当执行器快要退出时,可能会向 ChildExecutor 添加一个任务,然后该任务将被 ChildExecutor 忽略。

我添加了三行代码和一些注释,最终代码如下所示,现在可以运行了,我应该邮寄给作者吗? :

private final class ChildExecutor implements Executor, Runnable {
private final Queue<Runnable> tasks = QueueFactory
.createQueue(Runnable.class);
private final AtomicBoolean isRunning = new AtomicBoolean();

public void execute(Runnable command) {
// TODO: What todo if the add return false ?
tasks.add(command);

if (!isRunning.get()) {
doUnorderedExecute(this);
} else {
}
}

public void run() {
// check if its already running by using CAS. If so just return
// here. So in the worst case the thread
// is executed and do nothing
boolean acquired = false;
if (isRunning.compareAndSet(false, true)) {
acquired = true;
try {
Thread thread = Thread.currentThread();
for (;;) {
final Runnable task = tasks.poll();
// if the task is null we should exit the loop
if (task == null) {
break;
}

boolean ran = false;
beforeExecute(thread, task);
try {
task.run();
ran = true;
onAfterExecute(task, null);
} catch (RuntimeException e) {
if (!ran) {
onAfterExecute(task, e);
}
throw e;
}
}
//TODO NOTE (I added): between here and "isRunning.set(false)",some new tasks maybe added.
} finally {
// set it back to not running
isRunning.set(false);
}
}

//TODO NOTE (I added): Do the remaining works.
if (acquired && !isRunning.get() && tasks.peek() != null) {
doUnorderedExecute(this);
}
}
}

关于java - 当我添加 ExecutionHandler 时,netty 服务器似乎被阻止了?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9697371/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com