gpt4 book ai didi

scala - 为什么异常不会停止 Akka Stream 流

转载 作者:行者123 更新时间:2023-12-05 00:23:43 24 4
gpt4 key购买 nike

我有一个依赖 API 响应的流程。当响应不符合我的预期时,将引发异常。此策略非常适用于 Spray 和使用 specs2 的直接方法测试。

但是,当我尝试使用带有异常抛出模块的流时,流只会停止。

这是我的流程:

 Source(() => file)
.via(csvToSeq)
.via(getFromElastic)
.via(futureExtrtactor)
.via(findLocaionOfId)
.foreach(v => v.map(v => println("foreached", v)))
.onComplete(_ => system.shutdown())

我的策略是使用 map为 future 。

像这样:
 val findLocaionOfId = Flow[Future[Seq[(String, JsValue)]]].map(future => future.map(jsSeq => {
jsSeq.zipWithIndex.flatMap { case (x, i) => x._2.asJsObject.getFields("_source").flatMap(js => {
js.asJsObject("Couldn't convert").getFields("externalId").map({
case JsString(str) => {
(i + 1, i == 0, js)
}
else (i, false, js)
}
case _ => (i, false, x)
})
})
}
}))

这是一个位于完全不同位置的潜在异常引发器:
val encoded_url = URLEncoder.encode(url, "UTF-8")

好像我错过了一些东西,但看不到什么。感谢您的任何指示。

最佳答案

这听起来像是一个问题,将在一次 Supervision for Akka Streams 中得到解决。被实现。 Akka Streams 仍处于“预实验阶段”,因此该功能尚未实现,但肯定会很快包含在内。

//在撰写此评论时,当前版本为 1.0-M2(预览里程碑)。

关于scala - 为什么异常不会停止 Akka Stream 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27844124/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com