重定向 Lwt 频道

Redirecting Lwt channels

我在 Unix 套接字上有一个 ssh 服务 运行 并且我有一个本地 TCP 服务器,我希望将其定向到 unix 套接字的通道。

基本上当我这样做时:

$ ssh root@localhost -p 2000

然后我的本地 TCP 服务器获取请求并将其通过管道传输到 Unix 套接字,TCP 客户端(在本例中为 ssh)从 Unix 套接字获取回复。 相关代码:

  let running_tunnel debug (tcp_ic, tcp_oc) () =
    Lwt_io.with_connection a_unix_addr begin fun (mux_ic, mux_oc) ->
      let%lwt _ = some_call with_an_arg
      and _ =

        (* Some setup code *)


      let rec forever () =
        Lwt_io.read_line tcp_ic >>= fun opening_message ->
        Lwt_io.write_from_string_exactly
        mux_oc opening_message 0 (String.length opening_message) >>= fun () ->
        Lwt_io.read_line mux_ic >>= fun reply ->
        Lwt_io.printl reply >>= fun () ->
        Lwt_io.write_line tcp_oc reply >>= fun () ->
        forever ()
      in
      forever ()
      in
      Lwt.return_unit
    end

还有这类作品。当我在命令行上调用 ssh 时它得到 "stuck",但我知道我正在获取一些数据,因为另一方的 ssh header 是正确的,SSH-2.0-OpenSSH_6.7。我还让我打印出初始 ssh 握手的更多部分,即我看到打印的是:

??^?W\zJ?~??curve25519-sha256@libssh.org,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group-exchange-sha256,diffie-hellman-group14-sha1ssh-rsa,ssh-dss>aes128-ctr,aes192-ctr,aes256-ctr,chacha20-poly1305@openssh.com>aes128-ctr,aes192-ctr,aes256-ctr,chacha20-poly1305@openssh.com?umac-64-etm@openssh.com,umac-128-etm@openssh.com,hmac-sha2-256-etm@openssh.com,hmac-sha2-512-etm@openssh.com,hmac-sha1-etm@openssh.com,umac-64@openssh.com,umac-128@openssh.com,hmac-sha2-256,hmac-sha2-512,hmac-sha1?umac-64-etm@openssh.com,umac-128-etm@openssh.com,hmac-sha2-256-etm@openssh.com,hmac-sha2-512-etm@openssh.com,hmac-sha1-etm@openssh.com,umac-64@openssh.com,umac-128@openssh.com,hmac-sha2-256,hmac-sha2-512,hmac-sha1none,zlib@openssh.comnone,zlib@openssh.co 

等,这似乎是正确的。我认为挂起的原因是因为我正在使用 Lwt_io.read_line 所以我尝试了这个:

            let rec forever () =
              Lwt_io.read tcp_ic >>= fun opening_message ->
              Lwt_io.write_from_string_exactly
                mux_oc opening_message 0 (String.length opening_message) >>= fun () ->
              Lwt_io.read mux_ic >>= fun reply ->
              Lwt_io.printl reply >>= fun () ->
              Lwt_io.write tcp_oc reply >>= fun () ->
              forever ()
            in
            forever ()

实际上效果更差,它甚至没有打印出初始握手。我还尝试了专用的 {write,read}_into... 功能,但收效甚微。 运行 在 strace/dtruce 下,我看到最终结果如下:

read(0x6, "SSH-2.0-OpenSSH_6.9\r\n[=14=]", 0x1000)       = 21 0
write(0x1, "SSH-2.0-OpenSSH_6.9\n[=14=]", 0x14)      = 20 0
read(0x7, "[=14=]", 0x1000)      = -1 Err#35
write(0x7, "SSH-2.0-OpenSSH_6.9[=14=]", 0x13)        = 19 0
select(0x9, 0x7FFF5484F880, 0x7FFF5484F800, 0x7FFF5484F780, 0x0)         = 1 0
read(0x7, "SSH-2.0-OpenSSH_6.7\r\n[=14=]", 0x1000)       = 21 0
write(0x1, "SSH-2.0-OpenSSH_6.7\n[=14=]", 0x14)      = 20 0
read(0x6, "[=14=]", 0x1000)      = -1 Err#35
write(0x6, "SSH-2.0-OpenSSH_6.7\n[=14=]", 0x14)      = 20 0
select(0x9, 0x7FFF5484F880, 0x7FFF5484F800, 0x7FFF5484F780, 0x0)         = 1 0
read(0x6, "[=14=]", 0x1000)      = 1968 0
read(0x6, "[=14=]", 0x1000)      = -1 Err#35
^C

其中 6.9 是我本地机器的 ssh,6.7 是 Unix 套接字后面的远程机器。对我来说确实奇怪的一件事是 \r 是如何被删除的,这将 read/write 计数更改为 1。我不确定这是否是关键的区别。

理想情况下,我想要某种来自 Lwt 的抽象,它会说每当此可读通道(TCP 套接字)上有可用数据时,将其直接写入可写通道(Unix 套接字),反之亦然。

带有 readline 的变体不起作用,因为数据流是二进制的,而 readline 用于基于文本行的输入。带有 Lwt_io.read 函数的第二个变体不起作用,因为此函数将读取所有输入直到最后,除非您指定了可选的 count 参数。这意味着,只有在 reader 端的 EOF 之后,该控制才会传递给 write。使用 Lwt_io.read 和一些计数,例如 Lwt_io.read ~count:1024 mux_ic 并不是一个非常糟糕的主意。另外,如果您希望您的流是有限的,您不应该忘记检查 return 值。 read_into 应谨慎使用,因为与 read 函数不同,它不保证它会读取您请求的确切数据量。换句话说,会有短读。 write_into 函数也是如此。此函数的 _exactly 版本不会遇到此问题,因此最好改用它们。

还有一件事您应该考虑。 Lwt_io 为缓冲输入和输出提供了一个接口。这意味着,该模块中的所有函数都在写入或读取某个内部缓冲区,而不是通过设备描述符直接与操作系统交互。这意味着,当您将数据从一个缓冲源传输到另一个缓冲源时,两端都会有一些意外的延迟。所以你应该预料到他们会使用同花顺。否则,当您进行双向交互时,可能会引入竞争条件。

此外,虽然缓冲 io 简化了很多事情,但它也有代价。事实上,你有几个不必要的缓冲区层,当你使用 Lwt_io 时,你也分配了很多不必要的数据,用垃圾来破坏你的内存。问题是 Lwt_io 有它自己的内部缓冲区,它不会向临时用户显示,并且 returns 数据或消耗数据的所有函数都需要执行额外的复制操作到或从内部功能。例如,使用 Lwt_io.{read,write},将执行以下操作:

  1. 将数据从内核复制到内部缓冲区
  2. 分配一个字符串
  3. 将数据从内部缓冲区复制到分配的字符串
  4. (现在是 write 部分) 将字符串中的数据复制到内部缓冲区
  5. 将数据从内部缓冲区复制到内核。
  6. (在某个地方,有时在 GC 内部) 复制分配的字符串 从小堆到大堆(如果字符串足够小以适合小堆)或将它从一个位置复制到另一个位置(如果压缩算法决定移动它,并且字符串仍然存在,这是很有可能的,如果生产者out运行s 读取数据的消费者和生命周期 变得很长)。

看来我们可以去掉2、3、4、6中的拷贝了。我们可以使用我们自己的buffer,把kernel中的数据copy进去,再从这个kernel中copy回kernel。我们甚至可以通过使用 splice and tee 系统调用来摆脱 1 和 5 中的副本,这些系统调用直接在内核缓冲区之间复制数据,而不涉及用户 space。但那样的话,我们将失去检查数据的能力,通常这就是我们想要的。

所以,让我们尝试从内核中删除除副本之外的所有副本space。 我们可以在 Lwt_io 中使用内部缓冲区的低级接口,如 direct_access 和新添加的 block 函数,但这需要了解 Lwt_io 的内部知识不是很琐碎,但仍然 doable。相反,我们将使用一种使用 Lwt_unix 库的更简单的方法。这个库直接与内核交互,没有任何中间缓冲区,让我们自己缓冲。

open Lwt.Infix

let bufsiz = 32768

let echo ic oc =
  let buf = Lwt_bytes.create bufsiz in
  let rec loop p =
    let p = p mod bufsiz in
    Lwt_bytes.read ic buf p (bufsiz - p) >>= function
    | 0 -> Lwt.return ()
    | n -> Lwt_bytes.write oc buf p n >>= loop in
  loop 0

这将实现简单快速的数据复制,复制数据的速度与 cat 程序相同。尽管如此,仍有一些改进的空间。例如,应该添加错误处理以提高鲁棒性(特别是 EINTR 信号)。此外,此功能实现 同步复制,其中输入和输出被紧紧锁定。有时这不是一种选择。考虑以下示例,输入是一个 UDP 套接字,它可能很容易超过 运行 消费者,并且数据将被丢弃,即使生产者平均比消费者慢。要处理这个问题,您需要将 readers 和编写器分成两个单独的线程,通过一些弹性队列进行通信。

Lwt 是一个非常低级的库,它不会也不应该为你解决这个问题。它提供了机制,可用于为每种情况构建解决方案。有一些库确实为一些常见模式提供了解决方案,0MQ and nanomessages 就是很好的例子。

更新

我可能是太低级了,也可能是我挖的太深了。如果您真的在寻找高级方法,那么您应该使用 Lwt_stream,在这种情况下,您可以将 foo.pipe(bar).pipe(foo) 的节点等值编码为

let echo ic oc = Lwt_io.(write_chars oc (read_chars ic))

当然这会慢很多,但这取决于你的任务。

是的,要执行双向重定向,您应该 运行 两个线程,如下所示:echo ic oc <&> echo ic oc 对于带有文件描述符的版本,它们都是可写的。如果您使用的是 Lwt_io 通道,它们像管道一样是单向的,那么您将获得每个部分的两个端点。让我们将它们分别命名为 fifo 用于前端输入和输出,并为后端部分命名为 bibo。然后你需要像这样连接它:echo fo bi <&> echo bo fi,使用带有流的 echo 的第二个版本。

性能成本

通常高级抽象会带来性能成本。在我们的特殊情况下,使用第一个版本的 echo,每秒的吞吐量超过 1Gb。带有流的版本具有平均吞吐量 5MB/s。根据您的设置,它可能会或可能不会工作。这对于常规 ssh 会话来说绰绰有余,但可能会对本地网络中的 scp 产生影响。