将输入流式传输到 Scala 中的外部进程
Stream input to external process in Scala
我有一个 Iterable[String],我想将其流式传输到外部进程,return 一个 Iterable[String] 作为输出。
我觉得这在编译时应该可以工作
import scala.sys.process._
object PipeUtils {
implicit class IteratorStream(s: TraversableOnce[String]) {
def pipe(cmd: String) = s.toStream.#>(cmd).lines
def run(cmd: String) = s.toStream.#>(cmd).!
}
}
然而,Scala 试图执行 s 的内容而不是将它们传递给标准 in。谁能告诉我我做错了什么?
更新:
我认为我最初的问题是 s.toStream 被隐式转换为 ProcessBuilder 然后执行。这是不正确的,因为它是流程的输入。
我想出了以下解决方案。这感觉很老套而且是错误的,但它现在似乎有效。我不是把它写成一个答案,因为我觉得答案应该是一行而不是这个巨大的东西。
object PipeUtils {
/**
* This class feels wrong. I think that for the pipe command it actually loads all of the output
* into memory. This could blow up the machine if used wrong, however, I cannot figure out how to get it to
* work properly. Hopefully
* will get some good responses.
* @param s
*/
implicit class IteratorStream(s: TraversableOnce[String]) {
val in = (in: OutputStream) => {
s.foreach(x => in.write((x + "\n").getBytes))
in.close
}
def pipe(cmd: String) = {
val output = ListBuffer[String]()
val io = new ProcessIO(in,
out => {Source.fromInputStream(out).getLines.foreach(output += _)},
err => {Source.fromInputStream(err).getLines.foreach(println)})
cmd.run(io).exitValue
output.toIterable
}
def run(cmd: String) = {
cmd.run(BasicIO.standard(in)).exitValue
}
}
}
编辑
这样做的动机来自于在 RDD 上使用 Spark 的 .pipe 函数。我想在我的本地代码中使用完全相同的功能。
假设 scala 2.11+,您应该按照@edi 的建议使用 lineStream
。原因是当它可用时您会得到一个流式响应,而不是批处理响应。假设我有一个 shell 脚本 echo-sleep.sh
:
#/usr/bin/env bash
# echo-sleep.sh
while read line; do echo $line; sleep 1; done
我们想使用如下代码从 Scala 调用它:
import scala.sys.process._
import scala.language.postfixOps
import java.io.ByteArrayInputStream
implicit class X(in: TraversableOnce[String]) {
// Don't do the BAOS construction in real code. Just for illustration.
def pipe(cmd: String) =
cmd #< new ByteArrayInputStream(in.mkString("\n").getBytes) lineStream
}
然后如果我们像这样进行最终调用:
1 to 10 map (_.toString) pipe "echo-sleep.sh" foreach println
序列中的数字每 1 秒出现在 STDOUT 上。如果您像示例中那样缓冲并转换为 Iterable
,您将失去这种响应能力。
这是一个解决方案,演示了如何编写流程代码,以便它流式传输输入和输出。关键是生成一个 java.io.PipedInputStream
传递给流程的输入。该流通过 java.io.PipedOutputStream
从迭代器异步填充。显然,可以随意将隐式 class 的输入类型更改为 Iterable
.
这是一个用来展示这个作品的迭代器。
/**
* An iterator with pauses used to illustrate data streaming to the process to be run.
*/
class PausingIterator[A](zero: A, until: A, pauseMs: Int)(subsequent: A => A)
extends Iterator[A] {
private[this] var current = zero
def hasNext = current != until
def next(): A = {
if (!hasNext) throw new NoSuchElementException
val r = current
current = subsequent(current)
Thread.sleep(pauseMs)
r
}
}
这是您想要的实际代码
import java.io.PipedOutputStream
import java.io.PipedInputStream
import java.io.InputStream
import java.io.PrintWriter
// For process stuff
import scala.sys.process._
import scala.language.postfixOps
// For asynchronous stream writing.
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
/**
* A streaming version of the original class. This does not block to wait for the entire
* input or output to be constructed. This allows the process to get data ASAP and allows
* the process to return information back to the scala environment ASAP.
*
* NOTE: Don't forget about error handling in the final production code.
*/
implicit class X(it: Iterator[String]) {
def pipe(cmd: String) = cmd #< iter2is(it) lineStream
/**
* Convert an iterator to an InputStream for use in the pipe function.
* @param it an iterator to convert
*/
private[this] def iter2is[A](it: Iterator[A]): InputStream = {
// What is written to the output stream will appear in the input stream.
val pos = new PipedOutputStream
val pis = new PipedInputStream(pos)
val w = new PrintWriter(pos, true)
// Scala 2.11 (scala 2.10, use 'future'). Executes asynchrously.
// Fill the stream, then close.
Future {
it foreach w.println
w.close
}
// Return possibly before pis is fully written to.
pis
}
}
最后一次调用将显示 0 到 9,并在显示每个数字之间暂停 3 秒(第二次暂停在 scala 端,1 秒暂停在 shell 脚本端)。
// echo-sleep.sh is the same script as in my previous post
new PausingIterator(0, 10, 2000)(_ + 1)
.map(_.toString)
.pipe("echo-sleep.sh")
.foreach(println)
输出
0 [ pause 3 secs ]
1 [ pause 3 secs ]
...
8 [ pause 3 secs ]
9 [ pause 3 secs ]
我有一个 Iterable[String],我想将其流式传输到外部进程,return 一个 Iterable[String] 作为输出。
我觉得这在编译时应该可以工作
import scala.sys.process._
object PipeUtils {
implicit class IteratorStream(s: TraversableOnce[String]) {
def pipe(cmd: String) = s.toStream.#>(cmd).lines
def run(cmd: String) = s.toStream.#>(cmd).!
}
}
然而,Scala 试图执行 s 的内容而不是将它们传递给标准 in。谁能告诉我我做错了什么?
更新:
我认为我最初的问题是 s.toStream 被隐式转换为 ProcessBuilder 然后执行。这是不正确的,因为它是流程的输入。
我想出了以下解决方案。这感觉很老套而且是错误的,但它现在似乎有效。我不是把它写成一个答案,因为我觉得答案应该是一行而不是这个巨大的东西。
object PipeUtils {
/**
* This class feels wrong. I think that for the pipe command it actually loads all of the output
* into memory. This could blow up the machine if used wrong, however, I cannot figure out how to get it to
* work properly. Hopefully
* will get some good responses.
* @param s
*/
implicit class IteratorStream(s: TraversableOnce[String]) {
val in = (in: OutputStream) => {
s.foreach(x => in.write((x + "\n").getBytes))
in.close
}
def pipe(cmd: String) = {
val output = ListBuffer[String]()
val io = new ProcessIO(in,
out => {Source.fromInputStream(out).getLines.foreach(output += _)},
err => {Source.fromInputStream(err).getLines.foreach(println)})
cmd.run(io).exitValue
output.toIterable
}
def run(cmd: String) = {
cmd.run(BasicIO.standard(in)).exitValue
}
}
}
编辑
这样做的动机来自于在 RDD 上使用 Spark 的 .pipe 函数。我想在我的本地代码中使用完全相同的功能。
假设 scala 2.11+,您应该按照@edi 的建议使用 lineStream
。原因是当它可用时您会得到一个流式响应,而不是批处理响应。假设我有一个 shell 脚本 echo-sleep.sh
:
#/usr/bin/env bash
# echo-sleep.sh
while read line; do echo $line; sleep 1; done
我们想使用如下代码从 Scala 调用它:
import scala.sys.process._
import scala.language.postfixOps
import java.io.ByteArrayInputStream
implicit class X(in: TraversableOnce[String]) {
// Don't do the BAOS construction in real code. Just for illustration.
def pipe(cmd: String) =
cmd #< new ByteArrayInputStream(in.mkString("\n").getBytes) lineStream
}
然后如果我们像这样进行最终调用:
1 to 10 map (_.toString) pipe "echo-sleep.sh" foreach println
序列中的数字每 1 秒出现在 STDOUT 上。如果您像示例中那样缓冲并转换为 Iterable
,您将失去这种响应能力。
这是一个解决方案,演示了如何编写流程代码,以便它流式传输输入和输出。关键是生成一个 java.io.PipedInputStream
传递给流程的输入。该流通过 java.io.PipedOutputStream
从迭代器异步填充。显然,可以随意将隐式 class 的输入类型更改为 Iterable
.
这是一个用来展示这个作品的迭代器。
/**
* An iterator with pauses used to illustrate data streaming to the process to be run.
*/
class PausingIterator[A](zero: A, until: A, pauseMs: Int)(subsequent: A => A)
extends Iterator[A] {
private[this] var current = zero
def hasNext = current != until
def next(): A = {
if (!hasNext) throw new NoSuchElementException
val r = current
current = subsequent(current)
Thread.sleep(pauseMs)
r
}
}
这是您想要的实际代码
import java.io.PipedOutputStream
import java.io.PipedInputStream
import java.io.InputStream
import java.io.PrintWriter
// For process stuff
import scala.sys.process._
import scala.language.postfixOps
// For asynchronous stream writing.
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
/**
* A streaming version of the original class. This does not block to wait for the entire
* input or output to be constructed. This allows the process to get data ASAP and allows
* the process to return information back to the scala environment ASAP.
*
* NOTE: Don't forget about error handling in the final production code.
*/
implicit class X(it: Iterator[String]) {
def pipe(cmd: String) = cmd #< iter2is(it) lineStream
/**
* Convert an iterator to an InputStream for use in the pipe function.
* @param it an iterator to convert
*/
private[this] def iter2is[A](it: Iterator[A]): InputStream = {
// What is written to the output stream will appear in the input stream.
val pos = new PipedOutputStream
val pis = new PipedInputStream(pos)
val w = new PrintWriter(pos, true)
// Scala 2.11 (scala 2.10, use 'future'). Executes asynchrously.
// Fill the stream, then close.
Future {
it foreach w.println
w.close
}
// Return possibly before pis is fully written to.
pis
}
}
最后一次调用将显示 0 到 9,并在显示每个数字之间暂停 3 秒(第二次暂停在 scala 端,1 秒暂停在 shell 脚本端)。
// echo-sleep.sh is the same script as in my previous post
new PausingIterator(0, 10, 2000)(_ + 1)
.map(_.toString)
.pipe("echo-sleep.sh")
.foreach(println)
输出
0 [ pause 3 secs ]
1 [ pause 3 secs ]
...
8 [ pause 3 secs ]
9 [ pause 3 secs ]