gpt4 book ai didi

scala - 如何从 Async[IO] 创建 Async[Future]

转载 作者:行者123 更新时间:2023-12-04 10:13:56 27 4
gpt4 key购买 nike

我正在尝试在我的 doobie 存储库代码中隐式添加异步和同步。 Sync and Async[F] 工作正常 IO。我想将它们转换为 Future 并面临问题

我试图从 IO 创建我自己的 Aync

def futureAsync(implicit F: MonadError[Future, Throwable]): Async[Future] = new Async[Future] {
override def async[A](k: (Either[Throwable, A] => Unit) => Unit): Future[A] = IO.async(k).unsafeToFuture()

override def asyncF[A](k: (Either[Throwable, A] => Unit) => Future[Unit]): Future[A] =
throw new Exception("Not implemented Future.asyncF")

override def suspend[A](thunk: => Future[A]): Future[A] = thunk

override def bracketCase[A, B](acquire: Future[A])(use: A => Future[B])(release: (A, ExitCase[Throwable]) => Future[Unit]): Future[B] =
throw new Exception("Not implemented Future.bracketCase")

override def raiseError[A](e: Throwable): Future[A] = F.raiseError(e)

override def handleErrorWith[A](fa: Future[A])(f: Throwable => Future[A]): Future[A] = F.handleErrorWith(fa)(_ => f(new Exception("")))

override def pure[A](x: A): Future[A] = F.pure(x)

override def flatMap[A, B](fa: Future[A])(f: A => Future[B]): Future[B] = F.flatMap(fa)(f)

override def tailRecM[A, B](a: A)(f: A => Future[Either[A, B]]): Future[B] = F.tailRecM(a)(f)
}

我对在 asyncF 和 bracketCase 中实现两个函数感到震惊
有人可以帮忙吗?

最佳答案

正如 Reactormonk 在上面的评论中所说,不可能编写 Async 的实例。对于 Future具有正确的语义,因为 Async扩展 Sync , 和 Sync需要一个可以重复运行的计算表示,而 Scala 的 future 在定义时开始运行,并且不能重新运行。

不合法的事例

不过,亲眼看看这个很有启发性,我鼓励你尝试编写自己的可编译但(必然)非法的 Async[Future]实例而不查看下一个代码块。不过,为了这个例子,这是我脑海中的一个快速草图:

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
import cats.effect.{Async, ExitCase, IO}

def futureAsync(implicit c: ExecutionContext): Async[Future] = new Async[Future] {
def async[A](k: (Either[Throwable, A] => Unit) => Unit): Future[A] =
IO.async(k).unsafeToFuture()

def asyncF[A](k: (Either[Throwable, A] => Unit) => Future[Unit]): Future[A] = {
val p = Promise[A]()
val f = k {
case Right(a) => p.success(a)
case Left(e) => p.failure(e)
}
f.flatMap(_ => p.future)
}

def suspend[A](thunk: => Future[A]): Future[A] = Future(thunk).flatten

def bracketCase[A, B](acquire: Future[A])(use: A => Future[B])(
release: (A, ExitCase[Throwable]) => Future[Unit]
): Future[B] = acquire.flatMap { a =>
use(a).transformWith {
case Success(b) => release(a, ExitCase.Completed).map(_ => b)
case Failure(e) => release(a, ExitCase.Error(e)).flatMap(_ => Future.failed(e))
}
}

def raiseError[A](e: Throwable): Future[A] = Future.failed(e)
def handleErrorWith[A](fa: Future[A])(f: Throwable => Future[A]): Future[A] =
fa.recoverWith { case t => f(t) }

def pure[A](x: A): Future[A] = Future.successful(x)
def flatMap[A, B](fa: Future[A])(f: A => Future[B]): Future[B] = fa.flatMap(f)
def tailRecM[A, B](a: A)(f: A => Future[Either[A, B]]): Future[B] = f(a).flatMap {
case Right(b) => Future.successful(b)
case Left(a) => tailRecM(a)(f)
}
}

这将编译得很好,并且可能适用于某些情况(但请不要实际使用它!)。但是,我们已经说过它不能具有正确的语义,我们可以通过使用猫效应的法律模块来证明这一点。

检查法律

首先,我们需要一些你不需要担心的样板文件:
import cats.kernel.Eq, cats.implicits._
import org.scalacheck.Arbitrary

implicit val throwableEq: Eq[Throwable] = Eq.by[Throwable, String](_.toString)
implicit val nonFatalArbitrary: Arbitrary[Throwable] =
Arbitrary(Arbitrary.arbitrary[Exception].map(identity))

implicit def futureEq[A](implicit A: Eq[A], ec: ExecutionContext): Eq[Future[A]] =
new Eq[Future[A]] {
private def liftToEither(f: Future[A]): Future[Either[Throwable, A]] =
f.map(Right(_)).recover { case e => Left(e) }

def eqv(fx: Future[A], fy: Future[A]): Boolean =
scala.concurrent.Await.result(
liftToEither(fx).zip(liftToEither(fy)).map {
case (rx, ry) => rx === ry
},
scala.concurrent.duration.Duration(1, "second")
)
}

然后我们可以定义一个测试来检查 Async我们的例子的法律:
import cats.effect.laws.discipline.{AsyncTests, Parameters}
import org.scalatest.FunSuite
import org.typelevel.discipline.scalatest.Discipline

object FutureAsyncSuite extends FunSuite with Discipline {
implicit val ec: ExecutionContext = ExecutionContext.global

implicit val params: Parameters =
Parameters.default.copy(allowNonTerminationLaws = false)

checkAll(
"Async",
AsyncTests[Future](futureAsync).async[String, String, String]
)
}

然后我们可以运行法律测试:
scala> FutureAsyncSuite.execute()
FutureAsyncSuite:
- Async.async.acquire and release of bracket are uncancelable
- Async.async.ap consistent with product + map
- Async.async.applicative homomorphism
...

您会看到大多数测试都是绿色的。这个实例做对了很多事情。

哪里违法

但是,它确实显示了三个失败的测试,包括以下内容:
- Async.async.repeated sync evaluation not memoized *** FAILED ***
GeneratorDrivenPropertyCheckFailedException was thrown during property evaluation.
(Discipline.scala:14)
Falsified after 1 successful property evaluations.
Location: (Discipline.scala:14)
Occurred when passed generated values (
arg0 = "淳칇멀",
arg1 = org.scalacheck.GenArities$$Lambda$7154/1834868832@1624ea25
)
Label of failing property:
Expected: Future(Success(驅ṇ숆㽝珅뢈矉))
Received: Future(Success(淳칇멀))

如果您查看 laws definitions ,您会看到这是一个定义 Future 的测试。 delay 的值然后对其进行多次排序,如下所示:
val change = F.delay { /* observable side effect here */ }
val read = F.delay(cur)

change *> change *> read

其他两个失败是类似的“未内存”违规。这些测试应该看到副作用发生了两次,但在我们的例子中,不可能写 delaysuspend对于 Future以这样的方式发生(不过,值得尝试说服自己就是这种情况)。

你应该做什么

总结一下:你可以写一个 Async[Future]实例将通过 78 个中的 75 个 Async法律测试,但是不可能编写一个可以通过所有这些测试的实例,并且使用非法实例是一个非常糟糕的主意:代码的潜在用户和像 Doobie 这样的库都会假设你的实例是合法的,如果你不要辜负这个假设,你正在为复杂和烦人的错误打开大门。

值得注意的是,为 Future 编写一个最小包装器并不难。具有合法 Async例如(例如,我的 catbird 库中有一个名为 Rerunnable 的 Twitter future 包装器)。你真的应该坚持 cats.effect.IO , 但是,并使用提供的转换在您使用传统 Future 的代码的任何部分中转换为 future 和从 future 转换。 - 基于 API。

关于scala - 如何从 Async[IO] 创建 Async[Future],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55968102/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com