gpt4 book ai didi

scala - http4s - 如何创建线程数有限的 blaze 客户端?

转载 作者:行者123 更新时间:2023-12-03 19:09:22 25 4
gpt4 key购买 nike

我试图用有限数量的线程创建 blaze 客户端,如下所示:

object ReactiveCats extends IOApp {
private val PORT = 8083
private val DELAY_SERVICE_URL = "http://localhost:8080"

// trying create client with limited number of threads
val clientPool: ExecutorService = Executors.newFixedThreadPool(64)
val clientExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(clientPool)

private val httpClient = BlazeClientBuilder[IO](clientExecutor).resource

private val httpApp = HttpRoutes.of[IO] {
case GET -> Root / delayMillis =>
httpClient.use { client =>
client
.expect[String](s"$DELAY_SERVICE_URL/$delayMillis")
.flatMap(response => Ok(s"ReactiveCats: $response"))
}
}.orNotFound

// trying to create server on fixed thread pool
val serverPool: ExecutorService = Executors.newFixedThreadPool(64)
val serverExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(serverPool)

// start server
override def run(args: List[String]): IO[ExitCode] =
BlazeServerBuilder[IO](serverExecutor)
.bindHttp(port = PORT, host = "localhost")
.withHttpApp(httpApp)
.serve
.compile
.drain
.as(ExitCode.Success)
}
full code and load-tests

但是负载测试结果看起来像一个请求一个线程:
enter image description here
我如何限制我的 blaze 客户端的线程数?

最佳答案

您的代码有两个明显的错误:

  • 您正在创建一个 Executor 而不在完成后将其关闭。
  • 您正在使用 use httpClient的方法HTTP 路由内部的资源,意味着每次调用该路由时,都会创建、使用和销毁 http 客户端。您应该在启动期间创建一次。

  • Executors,像任何其他资源(例如文件句柄等)应该总是使用 Resource.make 分配像这样:
      val clientPool: Resource[IO, ExecutorService] = Resource.make(IO(Executors.newFixedThreadPool(64)))(ex => IO(ex.shutdown()))
    val clientExecutor: Resource[IO, ExecutionContextExecutor] = clientPool.map(ExecutionContext.fromExecutor)

    private val httpClient = clientExecutor.flatMap(ex => BlazeClientBuilder[IO](ex).resource)
    通过在构建 HTTP 应用程序之前分配 httpClient 可以轻松解决第二个问题:
      private def httpApp(client: Client[IO]): Kleisli[IO, Request[IO], Response[IO]] = HttpRoutes.of[IO] {
    case GET -> Root / delayMillis =>
    client
    .expect[String](s"$DELAY_SERVICE_URL/$delayMillis")
    .flatMap(response => Ok(s"ReactiveCats: $response"))
    }.orNotFound



    override def run(args: List[String]): IO[ExitCode] =
    httpClient.use { client =>
    BlazeServerBuilder[IO](serverExecutor)
    .bindHttp(port = PORT, host = "localhost")
    .withHttpApp(httpApp(client))
    .serve
    .compile
    .drain
    .as(ExitCode.Success)
    }

    另一个潜在的问题是您正在使用 IOApp ,并且它带有自己的线程池。解决这个问题的最好方法可能是混合在 IOApp.WithContext 中。 trait 并实现此方法:
      override protected def executionContextResource: Resource[SyncIO, ExecutionContext] = ???

    关于scala - http4s - 如何创建线程数有限的 blaze 客户端?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62656115/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com