gpt4 book ai didi

multithreading - Akka中的线程数一直在增加。有什么事吗

转载 作者:行者123 更新时间:2023-12-02 22:39:38 25 4
gpt4 key购买 nike

为什么线程数不断增加?

在此图像底部查看

Thread count keeps increasing

总体流程如下:

Akka HTTP Server API 
-> on http request, sendMessageTo DataProcessingActor
-> sendMessageTo StorageActor
-> sendMessageTo DataBaseActor
-> sendMessageTo IndexActor

这是Akka HTTP API的定义(使用伪代码):
Main {
path("input/") {
post {
dataProcessingActor forward message
}
}
}

以下是参与者定义(以伪代码形式):
DataProcessingActor {
case message =>
message = parse message
storageActor ! message
}


StorageActor {
case message =>
indexActor ! message
databaseActor ! message
}


DataBaseActor {
case message =>
val c = get monogCollection
c.store(message)
}

IndexActor {
case message =>
elasticSearch.index(message)
}

运行此设置后,将多个HTTP请求发送到“input/” HTTP端点时,出现错误:
for( i <- 0 until 1000000) {
post("input/", someMessage+i)
}

错误:
[ERROR] [04/22/2016 13:20:54.016] [Main-akka.actor.default-dispatcher-15] [akka.tcp://Main@127.0.0.1:2558/system/IO-TCP/selectors/$a/0] Accept error: could not accept new connection
java.io.IOException: Too many open files
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
at akka.io.TcpListener.acceptAllPending(TcpListener.scala:107)
at akka.io.TcpListener$$anonfun$bound$1.applyOrElse(TcpListener.scala:82)
at akka.actor.Actor$class.aroundReceive(Actor.scala:480)
at akka.io.TcpListener.aroundReceive(TcpListener.scala:32)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

编辑1

这是正在使用的 application.conf文件:
akka {
loglevel = "INFO"
stdout-loglevel = "INFO"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"

actor {
default-dispatcher {
throughput = 10
}
}

actor {
provider = "akka.remote.RemoteActorRefProvider"
}

remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2558
}
}
}

最佳答案

我发现 Elasticsearch 是问题所在。我正在将Java API用于ElasticSearch,并且由于从Java API使用它的方式而导致套接字泄漏。现在按此处所述解决。

以下是使用Java API的Elastic Search客户端服务

trait ESClient { def getClient(): Client }

case class ElasticSearchService() extends ESClient {
def getClient(): Client = {
val client = new TransportClient().addTransportAddress(
new InetSocketTransportAddress(Config.ES_HOST, Config.ES_PORT)
)
client
}
}

这是导致泄漏的 Actor :
class IndexerActor() extends Actor {

val elasticSearchSvc = new ElasticSearchService()
lazy val client = elasticSearchSvc.getClient()

override def preStart = {
// initialize index, and mappings etc.
}

def receive() = {
case message =>
// do indexing here
indexMessage(ES.client, message)
}
}

注意:每次创建actor实例时,都会建立一个新的连接。

每次对 new ElasticSearchService()的调用都在创建与ElasticSearch的新连接。我将其移动到一个单独的对象中,如下所示,并且actor改用该对象:
object ES {
val elasticSearchSvc = new ElasticSearchService()
lazy val client = elasticSearchSvc.getClient()
}


class IndexerActor() extends Actor {

override def preStart = {
// initialize index, and mappings etc.
}

def receive() = {
case message =>
// do indexing here
indexMessage(ES.client, message)
}
}

关于multithreading - Akka中的线程数一直在增加。有什么事吗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36789460/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com