gpt4 book ai didi

java - 使用ExecutionHandler多线程处理receivedMessage

转载 作者:行者123 更新时间:2023-12-02 05:06:31 24 4
gpt4 key购买 nike

我的管道工厂

private final ExecutionHandler exeHandler = new ExecutionHandler(new MemoryAwareThreadPoolExecutor (1, 0, 0, 20, TimeUnit.SECONDS, Executors.defaultThreadFactory()));

public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast("FixedLengthStringDecoder",new FixedLengthStringDecoder(4,CharsetUtil.UTF_8));
pipeline.addLast("decode", new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast("execution",exeHandler);
pipeline.addLast("process", this.getHandler());
return pipeline;
}

我的处理程序

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
//i write constructedMessage to the client;
e.getChannel().write(constructMessage(e.getMessage().toString()));
}

问题是在 NioWorker 中:

  private boolean read(SelectionKey k) {
...
if (readBytes > 0) {
bb.flip();

final ChannelBufferFactory bufferFactory =
channel.getConfig().getBufferFactory();
final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
buffer.setBytes(0, bb);
buffer.writerIndex(readBytes);

recvBufferPool.release(bb);

// Update the predictor.
predictor.previousReceiveBufferSize(readBytes);

// Fire the event.
fireMessageReceived(channel, buffer);
} else {
recvBufferPool.release(bb);
}

if (ret < 0 || failure) {
k.cancel();
close(channel, succeededFuture(channel)); //here close
return false;
}

fireMessageReceived(channel, buffer) 在新线程中执行,但是主 nioworker 线程将关闭 channel ,所以当我在 messageReceived() 方法中向客户端写入消息时,

java.nio.channels.ClosedChannelException
at org.jboss.netty.channel.socket.nio.NioWorker.cleanUpWriteBuffer(NioWorker.java:645)
at org.jboss.netty.channel.socket.nio.NioWorker.writeFromUserCode(NioWorker.java:372)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleAcceptedSocket(NioServerSocketPipelineSink.java:137)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:76)
at org.jboss.netty.channel.Channels.write(Channels.java:632)
at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:70)
at org.jboss.netty.handler.execution.ExecutionHandler.handleDownstream(ExecutionHandler.java:167)

有什么问题吗?

最佳答案

您可以尝试修改您的代码使其像这样工作吗?

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
//i write constructedMessage to the client;
Channel channel = e.getChannel();
ctx.sendDownstream(new DownstreamMessageEvent(
channel,
Channels.future(channel),
constructMessage(e.getMessage().toString()),
channel.getRemoteAddress())
);
}

关于java - 使用ExecutionHandler多线程处理receivedMessage,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10927225/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com