为什么这段基于 Lwt 且看似并发的代码如此不一致

Why is this Lwt based and seemingly concurrent code so inconsistent

我正在尝试创建 Lwt 的并发示例并想出了这个小示例

let () =
  Lwt_main.run (
      let start = Unix.time () in
      Lwt_io.open_file Lwt_io.Input "/dev/urandom" >>= fun data_source ->
      Lwt_unix.mkdir "serial" 0o777 >>= fun () ->
      Lwt_list.iter_p
        (fun count ->
         let count = string_of_int count in
         Lwt_io.open_file
           ~flags:[Unix.O_RDWR; Unix.O_CREAT]
           ~perm:0o777
           ~mode:Lwt_io.Output ("serial/file"^ count ^ ".txt") >>= fun h ->
         Lwt_io.read ~count:52428800
                     data_source >>= Lwt_io.write_line h)
        [0;1;2;3;4;5;6;7;8;9] >>= fun () ->
      let finished = Unix.time () in
      Lwt_io.printlf "Execution time took %f seconds" (finished -. start))

编辑:要求 50GB 是: “然而,这非常慢,而且基本上没有用。 是否需要以某种方式强制执行内部绑定?

编辑:我最初写过要求 50 GB 但它从未完成,现在我有一个不同的问题要求 50MB,执行几乎是即时的并且 du -sh 仅报告 80k 的目录大小。

编辑:我也尝试过明确关闭文件句柄的代码,但结果相同。

我使用的是 OS X 最新版本并使用

编译

ocamlfind ocamlopt -package lwt.unix main.ml -linkpkg -o Test

(我也试过/dev/random,是的,我用的是挂钟时间。)

如果您查看您的文件,您会发现它们每个都是 4097K – 这是从 /dev/urandom 读取的 4096K,加上换行符的一个字节。你达到了 Lwt_io.read 的最大缓冲区,所以即使你说 ~count:awholelot,它也只会给你 ~count:4096.

我不知道规范的 Lwt 方法是什么,但这里有一个替代方法:

open Lwt

let stream_a_little source n = 
    let left = ref n in
    Lwt_stream.from (fun () -> 
        if !left <= 0 then return None
        else Lwt_io.read ~count:!left source >>= (fun s -> 
            left:=!left - (Bytes.length s);
            return (Some s)
        ))

let main () =
    Lwt_io.open_file ~buffer_size:(4096*8) ~mode:Lwt_io.Input "/dev/urandom" >>= fun data_source ->
        Lwt_unix.mkdir "serial" 0o777 >>= fun () ->
            Lwt_list.iter_p
        (fun count ->
            let count = string_of_int count in
            Lwt_io.open_file
           ~flags:[Unix.O_RDWR; Unix.O_CREAT]
           ~perm:0o777
           ~mode:Lwt_io.Output ("serial/file"^ count ^ ".txt") >>= (fun h ->
               Lwt_stream.iter_s (Lwt_io.write h)
               (stream_a_little data_source 52428800)))
        [0;1;2;3;4;5;6;7;8;9]

let timeit f =
        let start = Unix.time () in
        f () >>= fun () ->
            let finished = Unix.time () in
            Lwt_io.printlf "Execution time took %f seconds" (finished -. start)

let () =
    Lwt_main.run (timeit main)

编辑:请注意 lwt 是一个 cooperative 线程库;当你有两个线程 "at the same time" 时,它们实际上不会同时在你的 OCaml 进程中做一些事情。 OCaml(到目前为止)是单核的,所以当一个线程移动时,其他线程会很好地等待,直到该线程说 "OK, I've done some work, you others go"。因此,当您尝试同时流式传输到 8 个文件时,您基本上是在向文件 1 分配一点随机性,然后向文件 2 分配一点随机性,……向文件 8 分配一点随机性,然后(如果还有工作要做)一点到 file1,然后一点到 file2 等。如果您无论如何都在等待大量输入(假设您的输入来自网络),那么这很有意义,并且您的主进程有很多时间来遍历每个线程并检查"is there any input?",但是当你的所有线程都只是从 /dev/random 读取时,先填充一个文件,然后再填充第二个,等等会快得多。假设有几个 [=29] =]'s 并行读取 /dev/(u)random(并且您的驱动器可以跟上),同时加载 ncpu 读取当然要快得多,但是您需要多核(或者只是做这在 shell 脚本中)。

EDIT2:展示了如何在 reader 上增加缓冲区大小,稍微提高速度 ;) 请注意,您也可以简单地在旧示例中​​将 buffer_size 设置得尽可能高,它会一次性读完所有内容,但除非你多读几遍,否则你无法获得超过 buffer_size 的内容。

所以,您的代码有一些问题。

第 1 期

主要问题是您对 Lwt_io.read 功能的理解不正确(没有人可以责怪您!)。

val read : ?count : int -> input_channel -> string Lwt.t
  (** [read ?count ic] reads at most [len] characters from [ic]. It
      returns [""] if the end of input is reached. If [count] is not
      specified, it reads all bytes until the end of input. *)

当指定 ~count:len 时,它将读取最多 len 个字符。最多,意味着它可以读取更少。但是如果省略 count 选项,那么它将读取所有数据。我个人认为这种行为不符合直觉,即使不是很奇怪。因此,这个 at most 意味着最多 len 或更少,即不保证它会准确读取 len 字节。事实上,如果您在程序中添加检查:

 Lwt_io.read ~count:52428800 data_source >>= fun data ->
 Lwt_io.printlf "Read %d bytes" (String.length data) >>= fun () ->
 Lwt_io.write h data >>= fun () ->

您会看到,每次尝试它只会读取 4096 个字节:

Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes

为什么 4096?因为这是默认的缓冲区大小。但其实没关系。

第 2 期

Lwt_io 模块实现缓冲 IO。这意味着您所有的写入和读取都不会直接进入文件,而是缓冲在内存中。这意味着,您应该记住 flushclose。您的代码不会在完成时关闭描述符,因此您可能会遇到程序终止后某些缓冲区未被刷新的情况。 Lwt_io 特别是在程序退出前刷新所有缓冲区。但是你不应该依赖这个未记录的特性(当你尝试任何其他缓冲 io,比如标准 C 库中的 fstreams 时,它可能会在未来打击你)。所以,总是关闭你的文件(另一个问题是今天文件描述符是最宝贵的资源,它们的泄漏很难找到)。

第 3 期

不要使用 /dev/urandom/dev/random 来测量 io。对于前者,您将测量随机数生成器的性能,对于后者,您将测量机器中的熵流。两者都很慢。根据你CPU的速度,你很少会得到超过16个Mb/s,而且要少得多,那么Lwt就可以吞吐了。从 /dev/zero 读取并写入 /dev/null 将实际执行内存中的所有传输并显示实际速度,这可以通过您的程序实现。 well-written 程序仍将受内核速度的限制。在下面提供的示例程序中,这将显示平均速度为 700 MB/s.

第 4 期

如果您真的追求性能,请不要使用缓冲 io。你永远不会得到最大值。例如,Lwt_io.read 将首先读取缓冲区,然后创建一个 string 并将数据复制到该字符串。如果您确实需要一些性能,那么您应该提供自己的缓冲。在大多数情况下,不需要这样做,因为 Lwt_io 的性能非常好。但是如果你需要每秒处理几十兆字节,或者需要一些特殊的缓冲策略(non-linear),你可能需要考虑提供你自己的缓冲。好消息是 Lwt_io 允许您这样做。您可以看一下 example 程序,它将衡量 Lwt input/output 的性能。它模仿 well-known pv 程序。

第 5 期

您希望通过 运行 线程并行获得一些性能。问题是在您的测试中没有并发的地方。 /dev/random(以及 /dev/zero)是一种仅受 CPU 限制的设备。这与调用 random 函数是一样的。它总是 available,所以系统调用不会阻塞它。写入常规文件也不是并发的好地方。首先,通常只有一个hard-drive,里面有一个书写头。即使系统调用将阻塞并将控制权交给另一个线程,这也会导致性能偏离,因为两个线程现在将竞争 header 位置。如果你有 SSD,header 不会有任何竞争,但性能会更差,因为你会破坏你的缓存。但幸运的是,通常在常规文件上写入不会阻塞。因此,您的线程将 运行 因此,即它们将被序列化。