gpt4 book ai didi

java - Java NIO 2 中的多线程模型 - (前摄器模式)是什么?

转载 作者:行者123 更新时间:2023-12-02 06:17:47 26 4
gpt4 key购买 nike

我正在尝试使用 Java NIO 2(基于 Proactor 模式)构建一个简单的 Echo 服务。

在最简单的实现中,我们有 4 个主要组件; ProactorInitiator、AcceptConnectionHandler、ReadConnectionHandler 和 WriteConnectionHandler。

以下是我的示例代码。

ProactorInitiator.java

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;

public class ProactorInitiator {
static int ASYNC_SERVER_PORT = 4333;

public void initiateProactiveServer(int port)
throws IOException {

final AsynchronousServerSocketChannel listener =
AsynchronousServerSocketChannel.open().bind(
new InetSocketAddress(port));
AcceptCompletionHandler acceptCompletionHandler =
new AcceptCompletionHandler(listener);

SessionState state = new SessionState();
listener.accept(state, acceptCompletionHandler);
System.out.println("Proactor Initiator Running on "+Thread.currentThread().getName());
}

public static void main(String[] args) {
try {
System.out.println("Async server listening on port : " +
ASYNC_SERVER_PORT);
new ProactorInitiator().initiateProactiveServer(
ASYNC_SERVER_PORT);
} catch (IOException e) {
e.printStackTrace();
}

// Sleep indefinitely since otherwise the JVM would terminate
while (true) {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

AcceptCompletionHandler.java

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AcceptCompletionHandler
implements
CompletionHandler<AsynchronousSocketChannel, SessionState> {

private AsynchronousServerSocketChannel listener;

public AcceptCompletionHandler(
AsynchronousServerSocketChannel listener) {
this.listener = listener;
}

@Override
public void completed(AsynchronousSocketChannel socketChannel,
SessionState sessionState) {

System.out.println("Accept Handler running on "+Thread.currentThread().getName());
// accept the next connection
SessionState newSessionState = new SessionState();
listener.accept(newSessionState, this);

// handle this connection
ByteBuffer inputBuffer = ByteBuffer.allocate(2048);
ReadCompletionHandler readCompletionHandler =
new ReadCompletionHandler(socketChannel, inputBuffer);
socketChannel.read(
inputBuffer, sessionState, readCompletionHandler);
}

@Override
public void failed(Throwable exc, SessionState sessionState) {
// Handle connection failure...
}

}

ReadCompletionHandler.java

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class ReadCompletionHandler implements
CompletionHandler<Integer, SessionState> {

private AsynchronousSocketChannel socketChannel;
private ByteBuffer inputBuffer;

public ReadCompletionHandler(
AsynchronousSocketChannel socketChannel,
ByteBuffer inputBuffer) {
this.socketChannel = socketChannel;
this.inputBuffer = inputBuffer;
}

@Override
public void completed(
Integer bytesRead, SessionState sessionState) {

System.out.println("Read Handler running on "+Thread.currentThread().getName());

byte[] buffer = new byte[bytesRead];
inputBuffer.rewind();
// Rewind the input buffer to read from the beginning

inputBuffer.get(buffer);
String message = new String(buffer);

// System.out.println("Received message from client : " + message);

// message = GetRequestParser.getHTTPRequest(message, "200 OK");

// Echo the message back to client
WriteCompletionHandler writeCompletionHandler =
new WriteCompletionHandler(socketChannel);

ByteBuffer outputBuffer = ByteBuffer.wrap(message.getBytes());

socketChannel.write(
outputBuffer, sessionState, writeCompletionHandler);
}

@Override
public void failed(Throwable exc, SessionState attachment) {
//Handle read failure.....
}

}

WriteCompletionHandler.java

import java.io.IOException;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class WriteCompletionHandler implements
CompletionHandler<Integer, SessionState> {

private AsynchronousSocketChannel socketChannel;

public WriteCompletionHandler(
AsynchronousSocketChannel socketChannel) {
this.socketChannel = socketChannel;
}

@Override
public void completed(
Integer bytesWritten, SessionState attachment) {
try {
System.out.println("Write Handler running on "+Thread.currentThread().getName());
System.out.println("\n");
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, SessionState attachment) {
// Handle write failure.....
}

}

SessionState.java

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class SessionState {

private Map<String, String> sessionProps =
new ConcurrentHashMap<>();

public String getProperty(String key) {
return sessionProps.get(key);
}

public void setProperty(String key, String value) {
sessionProps.put(key, value);
}

}

为了检查线程行为,我将每个处理程序运行的线程打印到“sys.out”。

以下是我得到的不同结果,对于许多相继发送到服务器的请求。

请求1

Accept Handler running on Thread-4
Read Handler running on Thread-4
Write Handler running on Thread-4

请求2

Accept Handler running on Thread-4
Read Handler running on Thread-2
Write Handler running on Thread-2

请求3

Accept Handler running on Thread-5
Read Handler running on Thread-3
Write Handler running on Thread-3

根据上面的结果,看来,对于不同的请求,服务器使用不同的线程。此外,对于给定的请求,Read Handler 和 Write Handler 都在同一线程上运行。

有人能解释一下这个结果吗?至于处理程序如何在不同线程上调度?

最佳答案

如每个完成处理程序的 Thread.getCurrentThread().getName() 结果所示,在 NIO2(前摄器模式)中,不同完成处理程序的线程分配未指定,并且似乎是随机的。因此,最佳实践是不要假设任何线程行为。

为了完整起见,我在下面添加了 NIO 的线程行为。

在 NIO 中,每个 Activity (无论是套接字接受、读取还是写入)都在单个线程中运行(选择器循环在该线程中运行。)

关于java - Java NIO 2 中的多线程模型 - (前摄器模式)是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55856321/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com