gpt4 book ai didi

.net - 如果其中一个作业出现异常,Async.Parallel 是否会取消所有作业?

转载 作者:行者123 更新时间:2023-12-02 09:06:27 25 4
gpt4 key购买 nike

我有兴趣查找 Task.WhenAny 的 F# 等效项(处理 Async<'T> 而不是 Task<'T> ),我发现的最接近的是 Async.Choice但是,此实现需要作业返回 Option<'T>而不仅仅是'T所以我写了自己的Async.WhenAny (根据我在互联网上找到的一些片段进行调整):

type internal ResultWrapper<'T>(value : 'T) =
inherit Exception()

member self.Value = value

module AsyncExtensions =

let private RaiseResult (e: ResultWrapper<'T>) =
Async.FromContinuations(fun (_, econt, _) -> econt e)

// like Async.Choice, but with no need for Option<T> types
let WhenAny<'T>(jobs: seq<Async<'T>>): Async<'T> =
let wrap job =
async {
let! res = job
return! RaiseResult <| ResultWrapper res
}

async {
try
do!
jobs
|> Seq.map wrap
|> Async.Parallel
|> Async.Ignore

// unreachable
return failwith "No successful result?"
with
| :? ResultWrapper<'T> as ex ->
return ex.Value
}

这是一个非常简单的实现,根本不处理取消,正如您所看到的(至少与 Async.Choice 相比)。

但是,我发现它仍然以某种方式取消较慢的作业,我不明白为什么。是因为怎样Async.Parallel在幕后工作?

注意:为了查明较慢的作业是否被 AsyncChoice 或 AsyncWhenAny 取消,我编写了此单元测试:

[<Test>]
member __.``AsyncExtensions-WhenAny job cancellation``() =
let shortJobRes = 1
let shortTime = TimeSpan.FromSeconds 2.
let shortJob = async {
do! Async.Sleep (int shortTime.TotalMilliseconds)
return shortJobRes
}

let longJobRes = 2
let mutable longJobFinished = false
let longTime = TimeSpan.FromSeconds 3.
let longJob = async {
do! Async.Sleep (int longTime.TotalMilliseconds)
longJobFinished <- true
return longJobRes
}

let result =
AsyncExtensions.WhenAny [longJob; shortJob]
|> Async.RunSynchronously

Assert.That(result, Is.EqualTo shortJobRes)
Assert.That(longJobFinished, Is.EqualTo false, "#before")
Threading.Thread.Sleep(TimeSpan.FromSeconds 7.0)
Assert.That(longJobFinished, Is.EqualTo false, "#after")

使用 Async.Parallel:

[<Test>]
member __.``AsyncParallel cancels all jobs if there's an exception in one?``() =
let shortJobRes = 1
let shortTime = TimeSpan.FromSeconds 2.
let shortJob = async {
do! Async.Sleep (int shortTime.TotalMilliseconds)
return failwith "foo"
}

let longJobRes = 2
let mutable longJobFinished = false
let longTime = TimeSpan.FromSeconds 3.
let longJob = async {
do! Async.Sleep (int longTime.TotalMilliseconds)
longJobFinished <- true
return longJobRes
}

let result =
try
Async.Parallel [longJob; shortJob]
|> Async.RunSynchronously |> Some
with
| _ -> None

Assert.That(result, Is.EqualTo None)
Assert.That(longJobFinished, Is.EqualTo false, "#before")
Threading.Thread.Sleep(TimeSpan.FromSeconds 7.0)
Assert.That(longJobFinished, Is.EqualTo false, "#after")

最佳答案

让我们look at the source对于Async.Parallel。实际上阅读并理解它可能需要熟悉 F# Async 在底层的工作原理,但幸运的是,第 1220 行有一个方便的注释:

// Attempt to cancel the individual operations if an exception happens on any of the other threads

下面的代码并不难以阅读:然后它创建一个取消 token 并将该相同的 token 传递给它并行启动的所有异步操作。如果其中任何一个抛出异常,则该 token 将被取消,这意味着所有并行操作都将被取消。因此,注释准确地反射(reflect)了代码的其余部分,并且这个答案基本上是对您在问题标题中提出的问题说"is"的冗长方式。 :-)

关于.net - 如果其中一个作业出现异常,Async.Parallel 是否会取消所有作业?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57799841/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com