gpt4 book ai didi

asynchronous - 如何组合 Lwt 过滤器?

转载 作者:行者123 更新时间:2023-12-04 04:59:44 24 4
gpt4 key购买 nike

我目前正在学习Lwt .我对使用异步进程用 OCaml 例程替换一些 shell 例程很感兴趣。

让我们看一下简化的第一次尝试,其中通过组合运行 cat 的两个线程来创建过滤器:

let filter_cat ()=
Lwt_process.pmap_lines ("cat", [| "cat" |])

let filter_t () =
Lwt_io.stdin
|> Lwt_io.read_lines
|> filter_cat ()
|> filter_cat ()
|> Lwt_io.write_lines Lwt_io.stdout

let () =
filter_t ()
|> Lwt_main.run

此过滤器以某种方式工作,但在其标准输入关闭而不是退出时挂起。如果我删除其中一个 filter_cat,它会按预期工作。

我猜我没有适本地组合这些过滤器,因此无法加入我开始的两个线程。编写这些过滤器的正确方法是什么,以便程序在读取 stdin 上的 EOF 后终止?


您可以找到此程序以及 BSD Owl Github gist 中的 Makefile .

最佳答案

这个问题的答案是 Lwt 中有一个小错误。有一个内部函数,monitor执行管道的那个:

(* Monitor the thread [sender] in the stream [st] so write errors are
reported. *)
let monitor sender st =
let sender = sender >|= fun () -> None in
let state = ref Init in
Lwt_stream.from
(fun () ->
match !state with
| Init ->
let getter = Lwt.apply Lwt_stream.get st in
let result _ =
match Lwt.state sender with
| Lwt.Sleep ->
(* The sender is still sleeping, behave as the
getter. *)
getter
| Lwt.Return _ ->
(* The sender terminated successfully, we are
done monitoring it. *)
state := Done;
getter
| Lwt.Fail _ ->
(* The sender failed, behave as the sender for
this element and save current getter. *)
state := Save getter;
sender
in
Lwt.try_bind (fun () -> Lwt.choose [sender; getter]) result result
| Save t ->
state := Done;
t
| Done ->
Lwt_stream.get st)

问题出在定义上

let getter = Lwt.apply Lwt_stream.get st

getter 进程遇到流的结尾时,然后它被保存,但是sender 丢失了,这似乎阻止了完成。这可以通过改进 getter 的定义来解决,方法是告诉它在到达流末尾时充当 sender

关于asynchronous - 如何组合 Lwt 过滤器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29049860/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com