gpt4 book ai didi

asynchronous - 使用 OCaml Async 并发写入

转载 作者:行者123 更新时间:2023-12-02 01:11:18 27 4
gpt4 key购买 nike

我正在从网络中读取数据,并且我想在获取数据时将其写入文件。写入是并发且非顺序的(想想 P2P 文件共享)。在 C 中,我会获取文件的文件描述符(在程序运行期间),然后使用 lseek,然后是 write,最后关闭 fd。这些操作可以通过多线程设置中的互斥锁来保护(特别是,lseek 和 write 应该是原子的)。

我真的不知道如何在异步中获得这种行为。我最初的想法是有这样的东西。

 let write fd s pos = 
let posl = Int64.of_int pos in
Async_unix.Unix_syscalls.lseek fd ~mode:`Set posl
>>| fun _ ->
let wr = Writer.create t.fd in
let len = String.length s in
Writer.write wr s ~pos:0 ~len

然后,在接收到数据时异步安排写入。

我的解决方案不正确。一方面,这个 write 任务需要是原子的,但事实并非如此,因为两个 lseek 可以在第一个 Writer.write 之前执行>。即使我可以按顺序安排 write 也无济于事,因为 Writer.write 不返回 Deferred.t。有什么想法吗?

顺便说一句,这是对先前回答 question 的跟进.

最佳答案

基本方法是拥有一个工作队列,每个工作队列执行一个原子seek/write1 操作。不变的是一次只有一个 worker 在运行。一个更复杂的策略将采用优先级队列,其中写入由一些最大化吞吐量的标准排序,例如,写入后续位置。如果您观察到大量小写入,您也可以实现复杂的缓冲策略,然后将它们合并成更大的 block 是个好主意。

但让我们从一个简单的非优先队列开始,它通过 Async.Pipe.t 实现。对于位置写入,我们不能使用 Writer 接口(interface),因为它是为缓冲顺序写入而设计的。因此,我们将使用 Async_unix.Std 中的 Unix.lseek 和 Bigstring.really_write 函数。 really_write 是一个常规的非异步函数,因此我们需要使用 Fd.syscall_in_thread` 函数将其提升到 Async 接口(interface)中,例如,

let really_pwrite fd offset bytes = 
Unix.lseek fd offset ~mode:`Set >>= fun (_ : int64) ->
Fd.syscall_in_thread fd (fun desc ->
Bigstring.really_write desc bytes)

注意:此函数将写入系统决定的字节数,但不会超过bytes 的长度。因此,您可能有兴趣实现一个将写入所有字节的 really_pwrite 函数。

整个方案将包括一个主线程,它将拥有一个文件描述符并通过 Async.Pipe 接受来自多个客户端的写入请求。假设每个写请求都是一个如下类型的消息:

 type chunk = {
offset : int;
bytes : Bigstring.t;
}

那么您的主线程将如下所示:

let process_requests fd = 
Async.Pipe.iter ~f:(fun {offset; bytes} ->
really_pwrite fd offset bytes)

其中 really_pwrite 是一个真正写入所有字节并处理所有错误的函数。您还可以使用 Async.Pipe.iter' 函数并在实际执行 pwrite 系统调用之前对写入进行预排序和合并。

再做一个优化说明。分配一个 bigstring 是一项相当昂贵的操作,因此您可以考虑预先分配一个 big bigstring 并从中提供小块。这将创建一个有限的资源,因此您的客户端将等待其他客户端完成写入并释放它们的 block 。因此,您将拥有一个内存占用有限的受限系统。


1)理想情况下我们应该使用 pwrite 虽然 Janestreet 只提供了 pwrite_assume_fd_is_nonblocking 函数,当调用system pwrite 已完成,实际上会阻塞整个系统。所以我们需要结合使用查找和写入。后者将释放 OCaml 运行时,以便程序的其余部分可以继续。 (此外,鉴于他们对非阻塞 fd 的定义,这个函数并没有多大意义,因为只有套接字和 FIFO 被认为是非阻塞的,据我所知,它们不支持查找操作。我将提交一个关于他们的错误跟踪器的问题。

关于asynchronous - 使用 OCaml Async 并发写入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45160445/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com