使用 OCaml Async 并发写入

Concurrent write with OCaml Async

我正在从网络中读取数据,我想在获取数据时将其写入文件。写入是并发且非顺序的(想想 P2P 文件共享)。在 C 中,我会获取文件的文件描述符(在程序运行期间),然后使用 lseek,然后是 write,最后关闭 fd。这些操作可以通过多线程设置中的互斥锁来保护(特别是,lseek 和 write 应该是原子的)。

我真的不知道如何在异步中获得这种行为。我最初的想法是有这样的东西。

 let write fd s pos = 
     let posl = Int64.of_int pos in
     Async_unix.Unix_syscalls.lseek fd ~mode:`Set posl
     >>| fun _ -> 
     let wr = Writer.create t.fd in
     let len = String.length s in
     Writer.write wr s ~pos:0 ~len

然后,在接收到数据时异步安排写入。

我的解决方案不正确。一方面,这个 write 任务需要是原子的,但事实并非如此,因为两个 lseek 可以在第一个 Writer.write 之前执行。即使我可以按顺序安排 write 也无济于事,因为 Writer.write 不是 return 和 Deferred.t。有什么想法吗?

顺便说一句,这是对之前回答 的跟进。

基本的方法是有一个工人队列,其中每个工人执行一个原子操作 seek/write1。不变的是一次只有一个工人运行。一个更复杂的策略将采用优先级队列,其中写入由一些最大化吞吐量的标准排序,例如,写入后续位置。如果您观察到大量小写入,您也可以实施复杂的缓冲策略,然后将它们合并成更大的块是个好主意。

但让我们从一个简单的非优先队列开始,通过 Async.Pipe.t 实现。对于位置写入,我们不能使用 Writer 接口,因为它是为缓冲顺序写入而设计的。因此,我们将使用 Async_unix.Std 和 Bigstring.really_writefunction. The really_write is a regular non-asynchronous function, so we need to lift it into the Async interface using theFd.syscall_in_thread` 函数中的 Unix.lseek,例如

let really_pwrite fd offset bytes = 
  Unix.lseek fd offset ~mode:`Set >>= fun (_ : int64) ->
  Fd.syscall_in_thread fd (fun desc -> 
    Bigstring.really_write desc bytes)

注意:此函数将写入系统决定的字节数,但不会超过 bytes 的长度。因此,您可能有兴趣实现一个将写入所有字节的 really_pwrite 函数。

整个方案将包括一个主线程,它将拥有一个文件描述符并通过 Async.Pipe 接受来自多个客户端的写请求。假设每个写请求都是如下类型的消息:

 type chunk = {
    offset : int;
    bytes : Bigstring.t;
 }

那么您的主线程将如下所示:

let process_requests fd = 
  Async.Pipe.iter ~f:(fun {offset; bytes} -> 
    really_pwrite fd offset bytes)

其中really_pwrite是一个真正写入所有字节并处理所有错误的函数。您还可以使用 Async.Pipe.iter' 函数并在实际执行 pwrite 系统调用之前对写入进行预排序和合并。

再做一个优化说明。分配一个 bigstring 是一项相当昂贵的操作,因此您可以考虑预先分配一个 big bigstring 并从中提供小块。这将创建一个有限的资源,因此您的客户端将等待其他客户端完成写入并释放它们的块。因此,您将拥有一个内存占用有限的受限系统。


1)理想情况下我们应该使用pwrite虽然Janestreet只提供pwrite_assume_fd_is_nonblocking函数,当调用系统时不会释放OCaml运行时pwrite 完成了,实际上会阻塞整个系统。所以我们需要结合使用查找和写入。后者将释放 OCaml 运行时,以便程序的其余部分可以继续。 (此外,鉴于他们对非阻塞 fd 的定义,这个函数并没有多大意义,因为只有套接字和 FIFO 被认为是非阻塞的,据我所知,它们不支持查找操作。我将提交一个他们的错误跟踪器上的问题。