scalaz-stream:如何以与其余部分不同的方式处理 "header"(第一个块)?
scalaz-stream: how to handle the "header" (first chunks) in a different way to the rest?
Context: I'm trying to write a Process1[ByteVector, spray.http.HttpResponsePart]
with output ChunkedResponseStart(bytes), MessageChunk(bytes), MessageChunk(bytes), ..., ChunkedResponseEnd
. I haven't yet fully wrapped my head around scalaz-stream and its vocabulary.
如何编写能够以不同方式处理第一个 n
块的进程?
我想到了这个(以字符串为例):
val headerChunk = process1.chunk[String](5).map(_.reduce(_ + _))
val headerChunkAndRest: Process1[String, String] =
headerChunk.take(1) ++ process1.id
io.linesR(Files.newInputStream(Paths.get("testdata/fahrenheit.txt")))
.pipe(headerChunkAndRest)
.to(io.stdOutLines)
.run.run
什么是惯用的并且可能是通常可组合的书写方式headerChunkAndRest
?
一般注意事项
有多种方法可以做到这一点,具体取决于您的需求细节。您可以使用以下属于 scalaz-streams 的辅助方法:
foldWithIndex
这会以数字形式为您提供块的当前索引。您可以根据该索引进行区分
zipWithState
您可以将方法调用的一次状态添加到下一次调用,并使用此状态跟踪您是否仍在解析 headers 或是否已达到 body。在下一步中,您可以使用此状态来处理 header 和 body 不同的
repartition
使用它来将所有 header 和所有 body 元素组合在一起。然后您可以在下一步中处理它们。
zipWithNext
此函数始终向您显示与当前元素分组的前一个元素。当您从 header 切换到 body 并做出相应反应时,您可以使用它来检测。
可能你应该re-think,你真正需要的。对于您的问题,它将是 zipwithIndex
,然后是 map
。但是如果你 re-think 你的问题,你可能会以 repartition
或 zipWithState
.
结尾
示例代码
让我们举一个简单的例子:一个 HTTP 客户端,它将 HTTP header 元素与 body 元素分开(HTTP,而不是 HTML)。在 header 中,像 cookies 这样的东西,在 body 中是真实的 "content",像图像或 HTTP 源。
一个简单的 HTTP 客户端可能如下所示:
import scalaz.stream._
import scalaz.concurrent.Task
import java.net.InetSocketAddress
import java.nio.channels.AsynchronousChannelGroup
implicit val AG = nio.DefaultAsynchronousChannelGroup
def httpGetRequest(hostname : String, path : String = "/"): Process[Nothing, String] =
Process(
s"GET $path HTTP/1.1",
s"Host: $hostname",
"Accept: */*",
"User-Agent: scalaz-stream"
).intersperse("\n").append(Process("\n\n"))
def simpleHttpClient(hostname : String, port : Int = 80, path : String = "/")(implicit AG: AsynchronousChannelGroup) : Process[Task, String] =
nio.connect(new InetSocketAddress(hostname, port)).flatMap(_.run(httpGetRequest(hostname, path).pipe(text.utf8Encode))).pipe(text.utf8Decode).pipe(text.lines())
现在我们可以使用此代码将 header 行与其余行分开。在 HTTP 中,header 是按行构造的。它与 body 之间用空行隔开。所以首先,让我们计算 header:
中的行数
val demoHostName="scala-lang.org" // Hope they won't mind...
simpleHttpClient(demoHostName).zipWithIndex.takeWhile(! _._1.isEmpty).runLast.run
// res3: Option[(String, Int)] = Some((Content-Type: text/html,8))
当我运行这个的时候,header里面有8行。我们先定义一个枚举,所以对响应的部分进行分类:
object HttpResponsePart {
sealed trait EnumVal
case object HeaderLine extends EnumVal
case object HeaderBodySeparator extends EnumVal
case object Body extends EnumVal
val httpResponseParts = Seq(HeaderLine, HeaderBodySeparator, Body)
}
然后让我们使用zipWithIndex
加上map
对响应的部分进行分类:
simpleHttpClient(demoHostName).zipWithIndex.map{
case (line, idx) if idx < 9 => (line, HeaderLine)
case (line, idx) if idx == 10 => (line, HeaderBodySeparator)
case (line, _) => (line, Body)
}.take(15).runLog.run
对我来说,这很好用。但当然,header 行的数量可以随时更改,恕不另行通知。使用考虑响应结构的非常简单的解析器要健壮得多。为此,我使用 zipWithState
:
simpleHttpClient(demoHostName).zipWithState(HeaderLine : EnumVal){
case (line, HeaderLine) if line.isEmpty => HeaderBodySeparator
case (_, HeaderLine) => HeaderLine
case (_, HeaderBodySeparator) => Body
case (line, Body) => Body
}.take(15).runLog.run
您可以看到,这两种方法都使用相似的结构,并且两种方法应该导致相同的结果。好的是,这两种方法都很容易重用。您可以只换出来源,例如使用文件,无需更改任何内容。与分类后的处理相同。 .take(15).runLog.run
在两种方法中完全相同。
Context: I'm trying to write a
Process1[ByteVector, spray.http.HttpResponsePart]
with outputChunkedResponseStart(bytes), MessageChunk(bytes), MessageChunk(bytes), ..., ChunkedResponseEnd
. I haven't yet fully wrapped my head around scalaz-stream and its vocabulary.
如何编写能够以不同方式处理第一个 n
块的进程?
我想到了这个(以字符串为例):
val headerChunk = process1.chunk[String](5).map(_.reduce(_ + _))
val headerChunkAndRest: Process1[String, String] =
headerChunk.take(1) ++ process1.id
io.linesR(Files.newInputStream(Paths.get("testdata/fahrenheit.txt")))
.pipe(headerChunkAndRest)
.to(io.stdOutLines)
.run.run
什么是惯用的并且可能是通常可组合的书写方式headerChunkAndRest
?
一般注意事项
有多种方法可以做到这一点,具体取决于您的需求细节。您可以使用以下属于 scalaz-streams 的辅助方法:
foldWithIndex
这会以数字形式为您提供块的当前索引。您可以根据该索引进行区分zipWithState
您可以将方法调用的一次状态添加到下一次调用,并使用此状态跟踪您是否仍在解析 headers 或是否已达到 body。在下一步中,您可以使用此状态来处理 header 和 body 不同的repartition
使用它来将所有 header 和所有 body 元素组合在一起。然后您可以在下一步中处理它们。zipWithNext
此函数始终向您显示与当前元素分组的前一个元素。当您从 header 切换到 body 并做出相应反应时,您可以使用它来检测。
可能你应该re-think,你真正需要的。对于您的问题,它将是 zipwithIndex
,然后是 map
。但是如果你 re-think 你的问题,你可能会以 repartition
或 zipWithState
.
示例代码
让我们举一个简单的例子:一个 HTTP 客户端,它将 HTTP header 元素与 body 元素分开(HTTP,而不是 HTML)。在 header 中,像 cookies 这样的东西,在 body 中是真实的 "content",像图像或 HTTP 源。
一个简单的 HTTP 客户端可能如下所示:
import scalaz.stream._
import scalaz.concurrent.Task
import java.net.InetSocketAddress
import java.nio.channels.AsynchronousChannelGroup
implicit val AG = nio.DefaultAsynchronousChannelGroup
def httpGetRequest(hostname : String, path : String = "/"): Process[Nothing, String] =
Process(
s"GET $path HTTP/1.1",
s"Host: $hostname",
"Accept: */*",
"User-Agent: scalaz-stream"
).intersperse("\n").append(Process("\n\n"))
def simpleHttpClient(hostname : String, port : Int = 80, path : String = "/")(implicit AG: AsynchronousChannelGroup) : Process[Task, String] =
nio.connect(new InetSocketAddress(hostname, port)).flatMap(_.run(httpGetRequest(hostname, path).pipe(text.utf8Encode))).pipe(text.utf8Decode).pipe(text.lines())
现在我们可以使用此代码将 header 行与其余行分开。在 HTTP 中,header 是按行构造的。它与 body 之间用空行隔开。所以首先,让我们计算 header:
中的行数val demoHostName="scala-lang.org" // Hope they won't mind...
simpleHttpClient(demoHostName).zipWithIndex.takeWhile(! _._1.isEmpty).runLast.run
// res3: Option[(String, Int)] = Some((Content-Type: text/html,8))
当我运行这个的时候,header里面有8行。我们先定义一个枚举,所以对响应的部分进行分类:
object HttpResponsePart {
sealed trait EnumVal
case object HeaderLine extends EnumVal
case object HeaderBodySeparator extends EnumVal
case object Body extends EnumVal
val httpResponseParts = Seq(HeaderLine, HeaderBodySeparator, Body)
}
然后让我们使用zipWithIndex
加上map
对响应的部分进行分类:
simpleHttpClient(demoHostName).zipWithIndex.map{
case (line, idx) if idx < 9 => (line, HeaderLine)
case (line, idx) if idx == 10 => (line, HeaderBodySeparator)
case (line, _) => (line, Body)
}.take(15).runLog.run
对我来说,这很好用。但当然,header 行的数量可以随时更改,恕不另行通知。使用考虑响应结构的非常简单的解析器要健壮得多。为此,我使用 zipWithState
:
simpleHttpClient(demoHostName).zipWithState(HeaderLine : EnumVal){
case (line, HeaderLine) if line.isEmpty => HeaderBodySeparator
case (_, HeaderLine) => HeaderLine
case (_, HeaderBodySeparator) => Body
case (line, Body) => Body
}.take(15).runLog.run
您可以看到,这两种方法都使用相似的结构,并且两种方法应该导致相同的结果。好的是,这两种方法都很容易重用。您可以只换出来源,例如使用文件,无需更改任何内容。与分类后的处理相同。 .take(15).runLog.run
在两种方法中完全相同。