gpt4 book ai didi

scala - 使用 scala Actor 时我应该如何处理阻塞操作?

转载 作者:行者123 更新时间:2023-12-04 12:38:03 28 4
gpt4 key购买 nike

大约两天前,我开始学习 scala actor 框架。为了使这些想法在我的脑海中具体化,我决定实现一个基于 TCP 的回显服务器,它可以处理多个同时连接。

这是回显服务器的代码(不包括错误处理):

class EchoServer extends Actor {
private var connections = 0

def act() {
val serverSocket = new ServerSocket(6789)

val echoServer = self
actor { while (true) echoServer ! ("Connected", serverSocket.accept) }

while (true) {
receive {
case ("Connected", connectionSocket: Socket) =>
connections += 1
(new ConnectionHandler(this, connectionSocket)).start
case "Disconnected" =>
connections -= 1
}
}
}
}

基本上,服务器是处理“已连接”和“已断开”消息的 Actor。它将连接委托(delegate)给调用 的匿名参与者。接受() 上的方法(阻塞操作)服务器套接字 .当连接到达时,它通过“已连接”消息通知服务器,并将套接字传递给它以用于与新连接的客户端通信。 的一个实例连接处理程序 类处理与客户端的实际通信。

这是连接处理程序的代码(包括一些错误处理):
class ConnectionHandler(server: EchoServer, connectionSocket: Socket)
extends Actor {

def act() {
for (input <- getInputStream; output <- getOutputStream) {
val handler = self
actor {
var continue = true
while (continue) {
try {
val req = input.readLine
if (req != null) handler ! ("Request", req)
else continue = false
} catch {
case e: IOException => continue = false
}
}

handler ! "Disconnected"
}

var connected = true
while (connected) {
receive {
case ("Request", req: String) =>
try {
output.writeBytes(req + "\n")
} catch {
case e: IOException => connected = false
}
case "Disconnected" =>
connected = false
}
}
}

close()
server ! "Disconnected"
}

// code for getInputStream(), getOutputStream() and close() methods
}

连接处理程序使用匿名参与者,通过调用 等待将请求发送到套接字。读行()套接字输入流上的方法(阻塞操作)。当收到请求时,会向处理程序发送“请求”消息,然后处理程序将请求回显给客户端。如果处理程序或匿名参与者遇到了底层套接字的问题,则套接字将关闭,并向回显服务器发送“断开连接”消息,指示客户端已与服务器断开连接。

所以,我可以启动 echo 服务器并让它等待连接。然后我可以打开一个新终端并通过 telnet 连接到服务器。我可以向它发送请求,它会正确响应。现在,如果我打开另一个终端并连接到服务器,服务器会注册连接,但无法为这个新连接启动连接处理程序。当我通过任何现有连接向它发送消息时,我没有立即得到响应。这是有趣的部分。当我终止除一个现有客户端连接之外的所有客户端连接并使客户端 X 保持打开状态时,将返回对我通过客户端 X 发送的请求的所有响应。我做了一些测试,得出的结论是 行为() 即使我调用了 ,后续客户端连接上也不会调用该方法。开始() 创建连接处理程序的方法。

我想我在连接处理程序中错误地处理了阻塞操作。由于先前的连接是由一个连接处理程序处理的,该连接处理程序阻止了一个匿名参与者等待请求,我认为这个被阻止的参与者正在阻止其他参与者(连接处理程序)启动。

使用 scala Actor 时我应该如何处理阻塞操作?

任何帮助将不胜感激。

最佳答案

来自 the scaladoc for scala.actors.Actor :

Note: care must be taken when invoking thread-blocking methods other than those provided by the Actor trait or its companion object (such as receive). Blocking the underlying thread inside an actor may lead to starvation of other actors. This also applies to actors hogging their thread for a long time between invoking receive/react.

If actors use blocking operations (for example, methods for blocking I/O), there are several options:

  • The run-time system can be configured to use a larger thread pool size (for example, by setting the actors.corePoolSize JVM property).
  • The scheduler method of the Actor trait can be overridden to return a ResizableThreadPoolScheduler, which resizes its thread pool to avoid starvation caused by actors that invoke arbitrary blocking methods.
  • The actors.enableForkJoin JVM property can be set to false, in which case a ResizableThreadPoolScheduler is used by default to execute actors.

关于scala - 使用 scala Actor 时我应该如何处理阻塞操作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3346176/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com