gpt4 book ai didi

max-open-requests 后 akka 客户端 http 被阻止

转载 作者:行者123 更新时间:2023-12-01 13:32:11 25 4
gpt4 key购买 nike

我从 akka http doc 中拿了这个简单的例子:
http://doc.akka.io/docs/akka-http/current/scala/http/client-side/request-level.html

我稍微修改了一下以请求一百个请求。应用程序在 32 个请求后阻塞(默认 max-open-requests 配置)。
为什么?

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.util.ByteString

import scala.io.StdIn

object AkkaClientExample extends App {
val system: ActorSystem = ActorSystem("BatchAkka")
try {
val unformattedAddresses = (1 to 100).map(i => s"Rue de la Gracieuse $i, Préverenges, Switzerland")

val googleGeocoder = system.actorOf(GoogleGeocoder.props, "GoogleGeocoder")

unformattedAddresses.foreach(e => googleGeocoder ! GoogleGeocoder.GeoCode(e))

println(">>> Press ENTER to exit <<<")
StdIn.readLine()
} finally {
system.terminate()
}
}

object GoogleGeocoder {
def props: Props = Props[GoogleGeocoder]

final case class GeoCode(unformattedAddress: String)
}

class GoogleGeocoder extends Actor with ActorLogging {
import GoogleGeocoder._
import akka.pattern.pipe
import context.dispatcher

final implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(context.system))

val http = Http(context.system)

def receive = {
case GeoCode(unformattedAddress) =>
log.info(s"GeoCode $unformattedAddress")
http
.singleRequest(HttpRequest(uri = url(unformattedAddress)))
.map(r => (unformattedAddress, r))
.pipeTo(self)

case (unformattedAddress: String, resp @ HttpResponse(StatusCodes.OK, headers, entity, _)) =>
log.info(s"Success response comming for $unformattedAddress")
entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
val response = body.utf8String.replaceAll("\\s+", " ").take(50)
log.info(s"Success response for $unformattedAddress: $response")
}

case (unformattedAddress: String, resp @ HttpResponse(code, _, _, _)) =>
log.info(s"Request failed, response code: $code for $unformattedAddress")
resp.discardEntityBytes()
}

def url(unformattedAddress: String): String =
//s"https://maps.googleapis.com/maps/api/geocode/json?address=${URLEncoder.encode(unformattedAddress, "UTF-8")}&key=${URLEncoder.encode(googleApiKey, "UTF-8")}"
s"https://www.epfl.ch/"
}

输出:
[INFO] [07/28/2017 20:08:26.977] [BatchAkka-akka.actor.default-dispatcher-4] [akka://BatchAkka/user/GoogleGeocoder] GeoCode Rue de la Gracieuse 1, Préverenges, Switzerland
[INFO] [07/28/2017 20:08:27.080] [BatchAkka-akka.actor.default-dispatcher-4] [akka://BatchAkka/user/GoogleGeocoder] GeoCode Rue de la Gracieuse 2, Préverenges, Switzerland
...
[INFO] [07/28/2017 20:08:27.098] [BatchAkka-akka.actor.default-dispatcher-13] [akka://BatchAkka/user/GoogleGeocoder] GeoCode Rue de la Gracieuse 99, Préverenges, Switzerland
[INFO] [07/28/2017 20:08:27.098] [BatchAkka-akka.actor.default-dispatcher-13] [akka://BatchAkka/user/GoogleGeocoder] GeoCode Rue de la Gracieuse 100, Préverenges, Switzerland

[INFO] [07/28/2017 20:08:27.615] [BatchAkka-akka.actor.default-dispatcher-11] [akka://BatchAkka/user/GoogleGeocoder] Success response comming for Rue de la Gracieuse 1, Préverenges, Switzerland
[INFO] [07/28/2017 20:08:27.620] [BatchAkka-akka.actor.default-dispatcher-11] [akka://BatchAkka/user/GoogleGeocoder] Success response comming for Rue de la Gracieuse 4, Préverenges, Switzerland
[INFO] [07/28/2017 20:08:27.668] [BatchAkka-akka.actor.default-dispatcher-17] [akka://BatchAkka/user/GoogleGeocoder] Success response for Rue de la Gracieuse 4, Préverenges, Switzerland: <!doctype html><html lang="fr" class="no-js"><head
[INFO] [07/28/2017 20:08:27.668] [BatchAkka-akka.actor.default-dispatcher-21] [akka://BatchAkka/user/GoogleGeocoder] Success response for Rue de la Gracieuse 1, Préverenges, Switzerland: <!doctype html><html lang="fr" class="no-js"><head
...
[INFO] [07/28/2017 20:08:27.787] [BatchAkka-akka.actor.default-dispatcher-5] [akka://BatchAkka/user/GoogleGeocoder] Success response comming for Rue de la Gracieuse 31, Préverenges, Switzerland
[INFO] [07/28/2017 20:08:27.795] [BatchAkka-akka.actor.default-dispatcher-15] [akka://BatchAkka/user/GoogleGeocoder] Success response comming for Rue de la Gracieuse 32, Préverenges, Switzerland
[INFO] [07/28/2017 20:08:27.802] [BatchAkka-akka.actor.default-dispatcher-16] [akka://BatchAkka/user/GoogleGeocoder] Success response for Rue de la Gracieuse 31, Préverenges, Switzerland: <!doctype html><html lang="fr" class="no-js"><head
[INFO] [07/28/2017 20:08:27.806] [BatchAkka-akka.actor.default-dispatcher-17] [akka://BatchAkka/user/GoogleGeocoder] Success response for Rue de la Gracieuse 32, Préverenges, Switzerland: <!doctype html><html lang="fr" class="no-js"><head

在前 32 个请求后被阻止。

更新 考虑到@shutty 的回答:

我对程序进行了如下修改,它的工作原理如下:
class GoogleGeocoder extends Actor with ActorLogging {
import GoogleGeocoder._
import akka.pattern.pipe
import context.dispatcher

final implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(context.system))

val http = Http(context.system)

val queue = new scala.collection.mutable.Queue[String]
var currentRequests = 0
val MaxCurrentRequest = 10

def receive = {
case GeoCode(unformattedAddress) =>
if (currentRequests < MaxCurrentRequest)
query(unformattedAddress)
else
queue += unformattedAddress

case (unformattedAddress: String, resp @ HttpResponse(StatusCodes.OK, headers, entity, _)) =>
log.info(s"Success response comming for $unformattedAddress")
entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
currentRequests = currentRequests - 1
queryNext()
val response = body.utf8String.replaceAll("\\s+", " ").take(50)
log.info(s"Success response for $unformattedAddress: $response")
}

case (unformattedAddress: String, resp @ HttpResponse(code, _, _, _)) =>
log.info(s"Request failed, response code: $code for $unformattedAddress")
resp.discardEntityBytes()
currentRequests = currentRequests - 1
queryNext()

case f: Status.Failure =>
log.info("failure" + textSample(f))

case m =>
log.info("unexpected message: " + textSample(m))
}

def query(unformattedAddress: String) {
log.info(s"GeoCode $unformattedAddress")
http
.singleRequest(HttpRequest(uri = url(unformattedAddress)))
.map(r => (unformattedAddress, r))
.pipeTo(self)
}

def queryNext() {
if (queue.nonEmpty) {
query(queue.dequeue)
}
}

def url(unformattedAddress: String): String =
//s"https://maps.googleapis.com/maps/api/geocode/json?address=${URLEncoder.encode(unformattedAddress, "UTF-8")}&key=${URLEncoder.encode(googleApiKey, "UTF-8")}"
s"https://www.epfl.ch/"
}

所以,基本上添加了一个队列。

但是,有没有更好的方法来实现这一目标?

我想象这种实现可能会失败的情况:例如,如果 http.singleRequest产生一个失败的 future , currentRequests不会减少。我可以在 case f: Status.Failure 上处理这个问题,但是,这个解决方案看起来很容易出错。

也许 akka 已经提供了一些处理队列的机制?

有没有办法向客户端添加背压(以便 AkkaClientExample : unformattedAddresses.foreach(e => googleGeocoder ! GoogleGeocoder.GeoCode(e)) 在到达 MaxCurrentRequest 时被阻塞)?

最佳答案

如果您使用 akka.logging = DEBUG 运行您的示例,您会注意到以下输出:
InputBuffer (max-open-requests = 32) now filled with 31 request after enqueuing GET / Empty
InputBuffer (max-open-requests = 32) now filled with 32 request after enqueuing GET / Empty
InputBuffer (max-open-requests = 32) exhausted when trying to enqueue GET / Empty
InputBuffer (max-open-requests = 32) exhausted when trying to enqueue GET / Empty
InputBuffer (max-open-requests = 32) exhausted when trying to enqueue GET / Empty

有相当a comprehensive description akka-http 如何处理客户端请求的池化,但简而言之,如果你用超过 max-open-requests 来重载池,它将开始丢弃请求:

http
.singleRequest(HttpRequest(uri = url(unformattedAddress)))
.map(r => (unformattedAddress, r)) // <- HERE
.pipeTo(self)

当您在 Scala 中对 Future 进行映射时,它只会在成功完成 Future 时执行您的回调,而在您的代码中则不是这种情况。如果您以不同的方式重写代码,例如:
http
.singleRequest(HttpRequest(uri = url(unformattedAddress)))
.onComplete {
case Success(r) =>
self ! (unformattedAddress, r)
case Failure(ex) =>
log.error(ex, "pool overflow")
}

你会看到一堆异常提示 Future 失败。

更新:

至于我自己的观点,当您需要背压时,actor 和流不太适合。作为一种选择,您可以完全在没有 actor 的情况下重写您的代码:
def url(addr: String) = "http://httpbin.org/headers"
implicit val system: ActorSystem = ActorSystem("BatchAkka")
implicit val mat: ActorMaterializer = ActorMaterializer()
import system.dispatcher
val http = Http()
val addresses = (1 to 100).map(i => s"Rue de la Gracieuse $i, Préverenges, Switzerland")
Source(addresses)
.mapAsync(4)(addr => http.singleRequest(HttpRequest(uri = url(addr))))
.map(response => println(response.status))
.runWith(Sink.seq)
.map(_ => println("done"))

在此解决方案中,您将只有 4 个带有背压、花里胡哨的服务器的并行请求。

关于max-open-requests 后 akka 客户端 http 被阻止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45379930/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com