gpt4 book ai didi

scala - Akka Stream + Akka Http - 获取错误请求

转载 作者:行者123 更新时间:2023-12-01 09:57:51 25 4
gpt4 key购买 nike

我有以下运行良好的流:

source
.map(x => HttpRequest(uri = x.rawRequest))
.via(Http().outgoingConnection(host, port))
.to(Sink.actorRef(myActor, IsDone))
.run()

和一个简单的 actor 来处理流完成时的响应状态和最终消息:

/**
* A simple actor to count how many rows have been processed
* in the complete process given a http status
*
* It also finish the main thread upon a message of type [[IsDone]] is received
*/
class MyActor extends Actor with ActorLogging {

var totalProcessed = 0

def receive = LoggingReceive {

case response: HttpResponse =>

if(response.status.isSuccess()) {
totalProcessed = totalProcessed + 1
} else if(response.status.isFailure()) {
log.error(s"Http response error: ${response.status.intValue()} - ${response.status.reason()}")
} else {
log.error(s"Error: ${response.status.intValue()} - ${response.status.reason()}")
}

case IsDone =>
println(s"total processed: $totalProcessed")
sys.exit()
}
}

case object IsDone

我不知道这是否是计算事物和处理响应状态的最佳方法,但它目前有效。

问题是如何以一种我可以知道哪个请求导致了特定错误的方式将原始请求传递给参与者。

我的 Actor 可以期待以下内容:

case (request: String, response: HttpResponse) =>

但是如何传递我在管道开始时拥有的信息呢?

我想像这样映射:

source
.map(x => (HttpRequest(uri = x.rawRequest), x.rawRequest))

但我不知道如何触发 Http 流。

有什么建议吗?

最佳答案

在@cmbaxter 的帮助下,我可以使用以下代码解决我的问题:

val poolClientFlow = Http().cachedHostConnectionPool[String](host, port)

source
.map(url => HttpRequest(uri = url) -> url)
.via(poolClientFlow)
.to(Sink.actorRef(myActor, IsDone))
.run()

现在我的 Actor 能够收到这个:

case (respTry: Try[HttpResponse], request: String) =>

关于scala - Akka Stream + Akka Http - 获取错误请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36776762/

25 4 0