如何使用 scalaz 流写入和读取外部进程

How do you write to and read from an external process using scalaz streams

我希望能够将数据从 scalaz 流发送到外部程序,然后在大约 100 毫秒后返回该项目的结果。尽管我能够通过将输出流 Sink 与输入流 Process 压缩然后丢弃 Sink 副作用来使用下面的代码做到这一点,但我觉得这个解决方案可能是很脆。

如果外部程序对其中一个输入项有错误,一切都将不同步。我觉得最好的办法是将某种增量 ID 发送到外部程序中,它可以在将来回显出来,这样如果发生错误我们可以重新同步。

我遇到的主要问题是将发送数据到外部程序 Process[Task, Unit] 的结果与程序 Process[Task, String] 的输出结合在一起。我觉得我应该使用 wyn 中的东西,但不确定。

import java.io.PrintStream
import scalaz._
import scalaz.concurrent.Task
import scalaz.stream.Process._
import scalaz.stream._

object Main extends App {
/*
  # echo.sh just prints to stdout what it gets on stdin
  while read line; do
    sleep 0.1
    echo $line
  done
*/
  val p: java.lang.Process = Runtime.getRuntime.exec("/path/to/echo.sh")

  val source: Process[Task, String] = Process.repeatEval(Task{
     Thread.sleep(1000)
     System.currentTimeMillis().toString
  })

  val linesR: stream.Process[Task, String] = stream.io.linesR(p.getInputStream)
  val printLines: Sink[Task, String] = stream.io.printLines(new PrintStream(p.getOutputStream))

  val in: Process[Task, Unit] = source to printLines

  val zip: Process[Task, (Unit, String)] = in.zip(linesR)
  val out: Process[Task, String] = zip.map(_._2) observe stream.io.stdOutLines
  out.run.run
}

在更深入地研究更高级的类型之后。看起来 Exchange 完全符合我的要求。

import java.io.PrintStream

import scalaz._
import scalaz.concurrent.Task
import scalaz.stream._
import scalaz.stream.io._

object Main extends App {
/*
  # echo.sh just prints to stdout what it gets on stdin
  while read line; do
    sleep 0.1
    echo $line
  done
*/
  val program: java.lang.Process = Runtime.getRuntime.exec("./echo.sh")

  val source: Process[Task, String] = Process.repeatEval(Task{
     Thread.sleep(100)
     System.currentTimeMillis().toString
  })

  val read: stream.Process[Task, String] = linesR(program.getInputStream)
  val write: Sink[Task, String] = printLines(new PrintStream(program.getOutputStream))
  val exchange: Exchange[String, String] = Exchange(read, write)
  println(exchange.run(source).take(10).runLog.run)
}