使用 OCaml Async 并发写入
Concurrent write with OCaml Async
我正在从网络中读取数据,我想在获取数据时将其写入文件。写入是并发且非顺序的(想想 P2P 文件共享)。在 C 中,我会获取文件的文件描述符(在程序运行期间),然后使用 lseek
,然后是 write
,最后关闭 fd
。这些操作可以通过多线程设置中的互斥锁来保护(特别是,lseek 和 write 应该是原子的)。
我真的不知道如何在异步中获得这种行为。我最初的想法是有这样的东西。
let write fd s pos =
let posl = Int64.of_int pos in
Async_unix.Unix_syscalls.lseek fd ~mode:`Set posl
>>| fun _ ->
let wr = Writer.create t.fd in
let len = String.length s in
Writer.write wr s ~pos:0 ~len
然后,在接收到数据时异步安排写入。
我的解决方案不正确。一方面,这个 write
任务需要是原子的,但事实并非如此,因为两个 lseek
可以在第一个 Writer.write
之前执行。即使我可以按顺序安排 write
也无济于事,因为 Writer.write
不是 return 和 Deferred.t
。有什么想法吗?
顺便说一句,这是对之前回答 的跟进。
基本的方法是有一个工人队列,其中每个工人执行一个原子操作 seek/write
1。不变的是一次只有一个工人运行。一个更复杂的策略将采用优先级队列,其中写入由一些最大化吞吐量的标准排序,例如,写入后续位置。如果您观察到大量小写入,您也可以实施复杂的缓冲策略,然后将它们合并成更大的块是个好主意。
但让我们从一个简单的非优先队列开始,通过 Async.Pipe.t
实现。对于位置写入,我们不能使用 Writer 接口,因为它是为缓冲顺序写入而设计的。因此,我们将使用 Async_unix.Std
和 Bigstring.really_writefunction. The really_write is a regular non-asynchronous function, so we need to lift it into the Async interface using the
Fd.syscall_in_thread` 函数中的 Unix.lseek
,例如
let really_pwrite fd offset bytes =
Unix.lseek fd offset ~mode:`Set >>= fun (_ : int64) ->
Fd.syscall_in_thread fd (fun desc ->
Bigstring.really_write desc bytes)
注意:此函数将写入系统决定的字节数,但不会超过 bytes
的长度。因此,您可能有兴趣实现一个将写入所有字节的 really_pwrite
函数。
整个方案将包括一个主线程,它将拥有一个文件描述符并通过 Async.Pipe 接受来自多个客户端的写请求。假设每个写请求都是如下类型的消息:
type chunk = {
offset : int;
bytes : Bigstring.t;
}
那么您的主线程将如下所示:
let process_requests fd =
Async.Pipe.iter ~f:(fun {offset; bytes} ->
really_pwrite fd offset bytes)
其中really_pwrite
是一个真正写入所有字节并处理所有错误的函数。您还可以使用 Async.Pipe.iter'
函数并在实际执行 pwrite
系统调用之前对写入进行预排序和合并。
再做一个优化说明。分配一个 bigstring 是一项相当昂贵的操作,因此您可以考虑预先分配一个 big bigstring 并从中提供小块。这将创建一个有限的资源,因此您的客户端将等待其他客户端完成写入并释放它们的块。因此,您将拥有一个内存占用有限的受限系统。
1)理想情况下我们应该使用pwrite
虽然Janestreet只提供pwrite_assume_fd_is_nonblocking
函数,当调用系统时不会释放OCaml运行时pwrite
完成了,实际上会阻塞整个系统。所以我们需要结合使用查找和写入。后者将释放 OCaml 运行时,以便程序的其余部分可以继续。 (此外,鉴于他们对非阻塞 fd 的定义,这个函数并没有多大意义,因为只有套接字和 FIFO 被认为是非阻塞的,据我所知,它们不支持查找操作。我将提交一个他们的错误跟踪器上的问题。
我正在从网络中读取数据,我想在获取数据时将其写入文件。写入是并发且非顺序的(想想 P2P 文件共享)。在 C 中,我会获取文件的文件描述符(在程序运行期间),然后使用 lseek
,然后是 write
,最后关闭 fd
。这些操作可以通过多线程设置中的互斥锁来保护(特别是,lseek 和 write 应该是原子的)。
我真的不知道如何在异步中获得这种行为。我最初的想法是有这样的东西。
let write fd s pos =
let posl = Int64.of_int pos in
Async_unix.Unix_syscalls.lseek fd ~mode:`Set posl
>>| fun _ ->
let wr = Writer.create t.fd in
let len = String.length s in
Writer.write wr s ~pos:0 ~len
然后,在接收到数据时异步安排写入。
我的解决方案不正确。一方面,这个 write
任务需要是原子的,但事实并非如此,因为两个 lseek
可以在第一个 Writer.write
之前执行。即使我可以按顺序安排 write
也无济于事,因为 Writer.write
不是 return 和 Deferred.t
。有什么想法吗?
顺便说一句,这是对之前回答
基本的方法是有一个工人队列,其中每个工人执行一个原子操作 seek/write
1。不变的是一次只有一个工人运行。一个更复杂的策略将采用优先级队列,其中写入由一些最大化吞吐量的标准排序,例如,写入后续位置。如果您观察到大量小写入,您也可以实施复杂的缓冲策略,然后将它们合并成更大的块是个好主意。
但让我们从一个简单的非优先队列开始,通过 Async.Pipe.t
实现。对于位置写入,我们不能使用 Writer 接口,因为它是为缓冲顺序写入而设计的。因此,我们将使用 Async_unix.Std
和 Bigstring.really_writefunction. The really_write is a regular non-asynchronous function, so we need to lift it into the Async interface using the
Fd.syscall_in_thread` 函数中的 Unix.lseek
,例如
let really_pwrite fd offset bytes =
Unix.lseek fd offset ~mode:`Set >>= fun (_ : int64) ->
Fd.syscall_in_thread fd (fun desc ->
Bigstring.really_write desc bytes)
注意:此函数将写入系统决定的字节数,但不会超过 bytes
的长度。因此,您可能有兴趣实现一个将写入所有字节的 really_pwrite
函数。
整个方案将包括一个主线程,它将拥有一个文件描述符并通过 Async.Pipe 接受来自多个客户端的写请求。假设每个写请求都是如下类型的消息:
type chunk = {
offset : int;
bytes : Bigstring.t;
}
那么您的主线程将如下所示:
let process_requests fd =
Async.Pipe.iter ~f:(fun {offset; bytes} ->
really_pwrite fd offset bytes)
其中really_pwrite
是一个真正写入所有字节并处理所有错误的函数。您还可以使用 Async.Pipe.iter'
函数并在实际执行 pwrite
系统调用之前对写入进行预排序和合并。
再做一个优化说明。分配一个 bigstring 是一项相当昂贵的操作,因此您可以考虑预先分配一个 big bigstring 并从中提供小块。这将创建一个有限的资源,因此您的客户端将等待其他客户端完成写入并释放它们的块。因此,您将拥有一个内存占用有限的受限系统。
1)理想情况下我们应该使用pwrite
虽然Janestreet只提供pwrite_assume_fd_is_nonblocking
函数,当调用系统时不会释放OCaml运行时pwrite
完成了,实际上会阻塞整个系统。所以我们需要结合使用查找和写入。后者将释放 OCaml 运行时,以便程序的其余部分可以继续。 (此外,鉴于他们对非阻塞 fd 的定义,这个函数并没有多大意义,因为只有套接字和 FIFO 被认为是非阻塞的,据我所知,它们不支持查找操作。我将提交一个他们的错误跟踪器上的问题。