gpt4 book ai didi

asynchronous - F# 事件在异步工作流中不起作用

转载 作者:行者123 更新时间:2023-12-01 11:16:49 25 4
gpt4 key购买 nike

我想对代理进行火灾后回复。基本上,代理会触发一个事件,然后回复调用者。但是,我要么一直收到超时错误,要么事件没有正确触发。我尝试执行 Post-Fire,它停止了超时错误,但事件没有触发。

let evt = new Event<int>()
let stream = evt.Publish

type Agent<'T> = MailboxProcessor<'T>
type Fire = Fire of int

let agent = Agent.Start(fun inbox ->
let rec loop() = async {
let! msg = inbox.Receive()
let (Fire i) = msg
evt.Trigger i }
loop())

let on i fn =
stream
|> Observable.filter (fun x -> x = i)
|> Observable.filter (fun x -> x <> 1)
|> Observable.subscribe (fun x -> fn x)

let rec collatz n =
printfn "%d" n
on n (fun i ->
if (i % 2 = 0) then collatz (i/2)
else collatz (3*n + 1)) |> ignore

agent.Post (Fire n) // this does not work
// evt.Trigger n // this does works


collatz 13

这是一个简单的实验,重复创建一个函数来查找 Collat​​z 系列中的下一个数字,然后调用自身返回该值,直到它达到 1。

似乎发生的是触发器只触发一次。我尝试尝试我能想到但没有任何进展的 Async.RunSynchronously/Async.Start/StartChild/SynchronizationContext 的每种组合。我找到了一个 blog与我正在做的类似,但这对我也没有帮助

编辑感谢 Fyodor Soikin 指出我的疏忽。最初的问题仍然存在,我希望同时触发事件并回复结果,但超时。

let evt = new Event<int>()
let stream = evt.Publish

type Agent<'T> = MailboxProcessor<'T>
type Command =
| Fire of int
| Get of int * AsyncReplyChannel<int>

let agent = Agent.Start(fun inbox ->
let rec loop() = async {
let! msg = inbox.Receive()
match msg with
| Fire i -> evt.Trigger i
| Get (i,ch) ->
evt.Trigger i
ch.Reply(i)
return! loop() }
loop())

let on i fn =
stream
|> Observable.filter (fun x -> x = i)
|> Observable.filter (fun x -> x <> 1)
|> Observable.subscribe (fun x -> fn x)

let rec collatz n =
printfn "%d" n
on n (fun i ->
if (i % 2 = 0) then collatz (i/2)
else collatz (3*n + 1)) |> ignore

agent.PostAndReply (fun ch -> (Get (n, ch))) |> ignore // timeout
agent.PostAndAsyncReply (fun ch -> (Get (n, ch))) |> Async.Ignore |> Async.Start // works but I need the result
agent.PostAndAsyncReply (fun ch -> (Get (n, ch))) |> Async.RunSynchronously |> ignore // timeout

collatz 13

最佳答案

您的loop 函数不会循环。它收到第一条消息,触发事件,然后就……退出。从不尝试接收第二条消息。

您需要让该功能连续工作:处理第一条消息,然后立即返回接收下一条消息,然后继续接收下一条消息,依此类推。像这样:

let agent = Agent.Start(fun inbox -> 
let rec loop() = async {
let! msg = inbox.Receive()
let (Fire i) = msg
evt.Trigger i
return! loop() }
loop())

编辑

由于您已达到问题的上限,我将在此处回答您的修改。

您在第二个代码段中出现超时的原因是您的代码出现了死锁。让我们跟踪执行以了解这一点。

  1. 线程 1:代理已启动。
  2. 线程 2:第一个 collat​​z 调用。
  3. 线程 2:第一个 collat​​z 调用向代理发送一条消息。
  4. 线程 1:代理接收消息。
  5. 线程 1:代理触发事件。
  6. 线程 1:作为事件的结果,第二次 collat​​z 调用发生。
  7. 线程 1:第二个 collat​​z 调用向代理发送一条消息。
  8. 线程 1:第二个 collat​​z 调用开始等待代理响应。

这是执行结束的地方。此时代理无法响应(事实上,它甚至无法接收下一条消息!),因为它的指令指针仍在 evt.Trigger 中。 evt.Trigger 调用还没有返回,所以 loop 函数还没有递归,所以 inbox.Receive 函数还没有t 尚未被调用,因此第二条消息仍在代理队列中等待。

因此,您陷入了一个典型的死锁:collat​​z 正在等待代理接收其消息,但代理正在等待 collat​​z 完成对事件的处理。

最简单、最愚蠢的解决方案就是异步触发事件:

    async { evt.Trigger i } |> Async.Start

这将确保事件处理程序不是“就地”执行,而是异步执行,可能在不同的线程上执行。反过来,这将允许代理在继续其自己的执行循环之前不必等待事件被处理。

但一般来说,在处理多线程和异步时,永远不要直接调用未知代码。代理不应该直接调用 evt.Trigger 或它无法控制的任何其他内容,因为该代码可能正在等待代理本身(这就是您的情况),因此引入了死锁。

关于asynchronous - F# 事件在异步工作流中不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49775690/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com