gpt4 book ai didi

java - 关闭套接字唤醒选择器

转载 作者:行者123 更新时间:2023-11-29 04:41:59 26 4
gpt4 key购买 nike

我们编写了一个以这种方式工作的传入 react 器:

  1. 打开一个选择器
  2. 打开服务器套接字 channel
  3. 启动一个选择循环,其中:ServerSocketChannel 接受新的 SocketChannels 进入循环,每个 SocketChannel 读取数据并将其传输给一个 worker。

reactor 的关闭过程迭代 selector.keys() 并为它们中的每一个关闭相应的 channel 并取消 key 。

我们为关机程序编写了以下单元测试:

  1. 打开一个运行选择循环的 react 器线程。
  2. 打开几个发件人线程。每个人都打开一个连接到 react 器的套接字并读取。
  3. 读取阻塞,直到它得到 -1(意味着 react 器关闭套接字)。
  4. 读取返回-1后,发送方关闭套接字并完成。

测试导致 ConcurrentModificationException 指向循环遍历套接字并关闭它们(在主线程上下文中)。

我们的假设是,当 Sender 读取方法得到 -1 时,它关闭套接字并以某种方式唤醒选择器 select 方法,然后选择器访问其键集,该键集由关闭循环迭代,因此出现异常。

我们通过使用选择器的所有键创建一个新列表来解决这个问题。通过迭代此列表来取消这些键可防止两个对象修改同一键的集合。

我们的问题是:

  1. 我们的假设是否正确?当客户端套接字调用关闭方法时——它真的唤醒了选择器吗?
  2. 创建新列表是合适的解决方案还是只是一种变通方法?

编辑:添加了一些代码片段以进行说明(我们尽量缩小代码范围)

传入 react 器:

public boolean startAcceptingIncomingData() {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open());
serverSocketChannel.bind(new InetSocketAddress(incomingConnectionsPort));
serverSocketChannel.configureBlocking(false);
SelectionKey acceptorSelectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
acceptorSelectionKey.attach((Worker) this::acceptIncomingSocket);
startSelectionLoop(selector);
return true;
}

private boolean acceptIncomingSocket() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
selectionKey.attach(new WorkerImpl() /*Responsible for reading data and tranferring it into a parsing thread*/);
return true;
} catch (IOException e) {
return false;
}
}

private void startSelectionLoop(Selector selector) {
shouldLoop = true;
while (shouldLoop) {
try {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (!shouldLoop) {
break;
}
selectedKeys.forEach((key) -> {
boolean workSuccess = ((Worker) key.attachment()).work();
if (!workSuccess) {
key.channel().close();
key.cancel();
}
});
selectedKeys.clear();
} catch (ClosedSelectorException ignore) {
}
}
}

public void shutDown() {
shouldLoop = false;
selector.keys().forEach(key -> { /***EXCEPTION - This is where the exception points to (this is line 129) ***/
key.channel().close();
key.cancel();
});
try {
selector.close();
} catch (IOException e) {
}
}

单元测试:

   @Test
public void testMaximumConnectionsWithMultipleThreads() {
final int PORT = 24785;
final int MAXINUM_CONNECTIONS = 10;

IncomingReactor incomingReactor = new IncomingReactor(PORT);
Callable<Boolean> acceptorThread = () -> {
incomingReactor.startAcceptingIncomingData();
return true;
};

ExecutorService threadPool = Executors.newFixedThreadPool(MAXIMUM_CONNECTIONS + 1);
Future<Boolean> acceptorFuture = threadPool.submit(acceptorThread);

List<Future<Boolean>> futureList = new ArrayList<>(MAXIMUM_CONNECTIONS);
for (int currentSenderThread = 0; currentSenderThread < MAXIMUM_CONNECTIONS; currentSenderThread++) {
Future<Boolean> senderFuture = threadPool.submit(() -> {
Socket socket = new Socket(LOCALHOST, PORT);
int bytesRead = socket.getInputStream().read();
if (bytesRead == -1) { //The server has closed us
socket.close();
return true;
} else {
throw new RuntimeException("Got real bytes from socket.");
}
});
futureList.add((senderFuture));
}

Thread.sleep(1000); //We should wait to ensure that the evil socket is indeed the last one that connects and the one that will be closed
Socket shouldCloseSocket = new Socket(LOCALHOST, PORT);
Assert.assertEquals(shouldCloseSocket.getInputStream().read(), -1);
shouldCloseSocket.close();
incomingReactor.shutDown();
for (Future<Boolean> senderFuture : futureList) {
senderFuture.get();
}
acceptorFuture.get();
threadPool.shutdown();
}

异常(exception):

java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437)
at java.util.HashMap$KeyIterator.next(HashMap.java:1461)
at java.lang.Iterable.forEach(Iterable.java:74)
at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1080)
at mypackage.IncomingReactor.shutDown(IncomingReactor.java:129)
at mypackage.tests.TestIncomingReactor.testMaximumConnectionsWithMultipleThreads(TestIncomingReactor.java:177)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:85)
at org.testng.internal.Invoker.invokeMethod(Invoker.java:659)
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:845)
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1153)
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:108)
at org.testng.TestRunner.privateRun(TestRunner.java:771)
at org.testng.TestRunner.run(TestRunner.java:621)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:357)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:352)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:310)
at org.testng.SuiteRunner.run(SuiteRunner.java:259)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1199)
at org.testng.TestNG.runSuitesLocally(TestNG.java:1124)
at org.testng.TestNG.run(TestNG.java:1032)
at org.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:74)
at org.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:124)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

最佳答案

The shutting down procedure of the reactor is iterating over the selector.keys() and for each of them closing the corresponding channel and cancelling the key.

它应该从停止选择器循环开始。注意关闭 channel 会取消 key 。您不必自行取消。

We wrote the following unit test for the shutdown procedure:

Open a reactor thread running the selction loop.Open several Sender threads. Each opens a socket to the reactor and reads.The read blocks until it gets -1 (meaning the reactor closed the socket).

react 堆关闭了它的接受的套接字。您的客户端套接字保持打开状态。

After the read returns -1, the sender closes the socket and finishes.

我希望这意味着发送方关闭了它的客户端套接字。

The test causes ConcurrentModificationException pointing to the loop iterating over the sockets and closes them (which was in the main thread context).

真的吗?我在您的问题中没有看到任何堆栈跟踪。

Our assumption is that when a Sender read method got -1, it closed the socket and somehow it woke up the selector select method

不可能,除非 react 堆没有关闭 channel ,在这种情况下你不会从读取等中得到 -1。

The selector then accessed its keys set which was iterated by the shutdown loop and hence the exception.

异常是在迭代过程中修改了key集导致的。服务器代码中的错误。

We worked around this problem by creating a new list with all the keys of the selector. Canceling those keys by iterating this list prevent two objects from modifying the same key's set.

您需要解决实际问题,为此您需要发布实际代码。

Our question are:

Is our assumption correct? When the client socket calls the close method- does it really wake up the selector?

除非选择器端 channel 仍然打开。

Does the creation of a new list is the appropriate solution or is it just a work-around?

对于您尚未确定的问题,这只是一个糟糕的解决方法。

关于java - 关闭套接字唤醒选择器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38822191/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com