将输入流式传输到 Scala 中的外部进程

Stream input to external process in Scala

我有一个 Iterable[String],我想将其流式传输到外部进程,return 一个 Iterable[String] 作为输出。

我觉得这在编译时应该可以工作

import scala.sys.process._

object PipeUtils {
  implicit class IteratorStream(s: TraversableOnce[String]) {
    def pipe(cmd: String) = s.toStream.#>(cmd).lines
    def run(cmd: String) = s.toStream.#>(cmd).!
  }
}

然而,Scala 试图执行 s 的内容而不是将它们传递给标准 in。谁能告诉我我做错了什么?

更新:

我认为我最初的问题是 s.toStream 被隐式转换为 ProcessBuilder 然后执行。这是不正确的,因为它是流程的输入。

我想出了以下解决方案。这感觉很老套而且是错误的,但它现在似乎有效。我不是把它写成一个答案,因为我觉得答案应该是一行而不是这个巨大的东西。

object PipeUtils {

  /**
   * This class feels wrong.  I think that for the pipe command it actually loads all of the output
   * into memory.  This could blow up the machine if used wrong, however, I cannot figure out how to get it to
   * work properly.  Hopefully 
   * will get some good responses.
   * @param s
   */
  implicit class IteratorStream(s: TraversableOnce[String]) {

    val in = (in: OutputStream) => {
      s.foreach(x => in.write((x + "\n").getBytes))
      in.close
    }

    def pipe(cmd: String) = {
      val output = ListBuffer[String]()
      val io = new ProcessIO(in,
      out => {Source.fromInputStream(out).getLines.foreach(output += _)},
      err => {Source.fromInputStream(err).getLines.foreach(println)})

      cmd.run(io).exitValue
      output.toIterable
    }

    def run(cmd: String) = {
      cmd.run(BasicIO.standard(in)).exitValue
    }
  }
}

编辑

这样做的动机来自于在 RDD 上使用 Spark 的 .pipe 函数。我想在我的本地代码中使用完全相同的功能。

假设 scala 2.11+,您应该按照@edi 的建议使用 lineStream。原因是当它可用时您会得到一个流式响应,而不是批处理响应。假设我有一个 shell 脚本 echo-sleep.sh:

#/usr/bin/env bash
# echo-sleep.sh
while read line; do echo $line; sleep 1; done

我们想使用如下代码从 Scala 调用它:

import scala.sys.process._
import scala.language.postfixOps
import java.io.ByteArrayInputStream

implicit class X(in: TraversableOnce[String]) {
  // Don't do the BAOS construction in real code.  Just for illustration.
  def pipe(cmd: String) = 
    cmd #< new ByteArrayInputStream(in.mkString("\n").getBytes) lineStream
}

然后如果我们像这样进行最终调用:

1 to 10 map (_.toString) pipe "echo-sleep.sh" foreach println

序列中的数字每 1 秒出现在 STDOUT 上。如果您像示例中那样缓冲并转换为 Iterable,您将失去这种响应能力。

这是一个解决方案,演示了如何编写流程代码,以便它流式传输输入和输出。关键是生成一个 java.io.PipedInputStream 传递给流程的输入。该流通过 java.io.PipedOutputStream 从迭代器异步填充。显然,可以随意将隐式 class 的输入类型更改为 Iterable.

这是一个用来展示这个作品的迭代器。

/**
 * An iterator with pauses used to illustrate data streaming to the process to be run.
 */
class PausingIterator[A](zero: A, until: A, pauseMs: Int)(subsequent: A => A) 
extends Iterator[A] {
  private[this] var current = zero
  def hasNext = current != until
  def next(): A = {
    if (!hasNext) throw new NoSuchElementException
    val r = current
    current = subsequent(current)
    Thread.sleep(pauseMs)
    r
  }
}

这是您想要的实际代码

import java.io.PipedOutputStream
import java.io.PipedInputStream
import java.io.InputStream
import java.io.PrintWriter

// For process stuff
import scala.sys.process._
import scala.language.postfixOps

// For asynchronous stream writing.
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

/**
 * A streaming version of the original class.  This does not block to wait for the entire 
 * input or output to be constructed.  This allows the process to get data ASAP and allows 
 * the process to return information back to the scala environment ASAP.  
 *
 * NOTE: Don't forget about error handling in the final production code.
 */
implicit class X(it: Iterator[String]) {
  def pipe(cmd: String) = cmd #< iter2is(it) lineStream

  /**
   * Convert an iterator to an InputStream for use in the pipe function.
   * @param it an iterator to convert
   */
  private[this] def iter2is[A](it: Iterator[A]): InputStream = {
    // What is written to the output stream will appear in the input stream.
    val pos = new PipedOutputStream
    val pis = new PipedInputStream(pos)
    val w = new PrintWriter(pos, true)

    // Scala 2.11 (scala 2.10, use 'future').  Executes asynchrously.  
    // Fill the stream, then close.
    Future {
      it foreach w.println
      w.close
    }

    // Return possibly before pis is fully written to.
    pis
  }
}

最后一次调用将显示 0 到 9,并在显示每个数字之间暂停 3 秒(第二次暂停在 scala 端,1 秒暂停在 shell 脚本端)。

// echo-sleep.sh is the same script as in my previous post
new PausingIterator(0, 10, 2000)(_ + 1)
  .map(_.toString)
  .pipe("echo-sleep.sh")
  .foreach(println)

输出

0          [ pause 3 secs ]
1          [ pause 3 secs ]
...
8          [ pause 3 secs ]
9          [ pause 3 secs ]