如何在 Akka HTTP 中将数字流作为 CSV 值完成?
How to complete a stream of numbers as CSV values in Akka HTTP?
我有一个 Akka 源作为数字流实现为:
Source(Stream(1, 2, 3, 4, 5))
我正在尝试利用 Akka HTTP 中的 Akka 流式传输支持来 return 作为 逗号分隔值 .
的流式响应
我在简单的 csv 源流上关注 akka doc 以得出以下实现:
implicit val csvFormat = Marshaller.strict[Int, ByteString] { res =>
Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () => {
ByteString(List(res).mkString(","))
})
}
implicit val streamingSupport: CsvEntityStreamingSupport = EntityStreamingSupport.csv()
complete(Source(Stream(1, 2, 3, 4, 5)))
但显然这不是 CSV 实体流支持对我的目的的正确用例。这导致每个数字都在新行中流式传输。
但这不是我想要的。我想以逗号分隔的列表形式回复,例如 1,2,3,4,5
.
如何使用 Akka HTTP 中的流支持实现这一点?
您缺少更高级别的class。它不能是 Int 并由多列组成一行。创建如下 MyBO
这样的 class 就可以了。
case class MyBO(a: Int, b: Int, c: Int, d: Int, e: Int)
implicit val myBOAsCsv = Marshaller.strict[MyBO, ByteString] { t =>
Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () => {
ByteString(List(t.a, t.b, t.c, t.d, t.e).mkString(","))
})
}
implicit val csvStreaming = EntityStreamingSupport.csv()
val route: Route =
path("ping") {
get {
complete(Source(Stream(MyBO(1,3,5,7,11))))
}
}
curl localhost:8080/ping -v [3f8df8e]
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /ping HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: */*
>
< HTTP/1.1 200 OK
< Server: akka-http/10.2.4
< Date: Tue, 20 Jul 2021 07:10:18 GMT
< Transfer-Encoding: chunked
< Content-Type: text/csv; charset=UTF-8
<
1,3,5,7,11
* Connection #0 to host localhost left intact
* Closing connection 0
该换行符由 EntityStreamingSupport.csv()
中定义的流渲染器添加。
我们需要定义自己的自定义 EntityStreamingSupport
才能正常工作。
val route =
path("test") {
val responseSource: Source[Int, NotUsed] =
Source.fromIterator(() => Stream(1, 2, 3, 4, 5).iterator)
val byteStringSource: Source[ByteString, NotUsed] =
responseSource.map(i => ByteString(i.toString))
val streamingSource =
byteStringSource.map(bs => HttpEntity(ContentTypes.`text/plain(UTF-8)`, bs))
implicit val streamingSupport =
EntityStreamingSupport.csv(maxLineLength = 16 * 1024)
.withSupported(ContentTypeRange(ContentTypes.`text/plain(UTF-8)`))
.withContentType(ContentTypes.`text/plain(UTF-8)`)
.withFramingRenderer(Flow[ByteString].map(bs => bs ++ ByteString(",")))
complete((streamingSource))
}
curl localhost:8080/test -v
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /test HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: */*
>
< HTTP/1.1 200 OK
< Server: akka-http/10.2.4
< Date: Tue, 20 Jul 2021 07:50:46 GMT
< Transfer-Encoding: chunked
< Content-Type: text/plain; charset=UTF-8
<
* Connection #0 to host localhost left intact
1,2,3,4,5,* Closing connection 0
编辑:要消除拖尾逗号,我们可以使用 window hack。
val route =
path("test") {
val responseSource: Source[Int, NotUsed] =
Source.fromIterator(() => Stream(1, 2, 3, 4, 5).iterator)
val startByteString = ByteString("$start$")
val byteStringSource: Source[ByteString, NotUsed] =
responseSource.map(i => ByteString(i.toString)).prepend(Source.single(startByteString))
val streamingSource =
byteStringSource.map(bs => HttpEntity(ContentTypes.`text/plain(UTF-8)`, bs))
implicit val streamingSupport =
EntityStreamingSupport.csv(maxLineLength = 16 * 1024)
.withSupported(ContentTypeRange(ContentTypes.`text/plain(UTF-8)`))
.withContentType(ContentTypes.`text/plain(UTF-8)`)
.withFramingRenderer(
Flow[ByteString].sliding(2, 1)
.map { bsSeq =>
if (startByteString.equals(bsSeq(0))) {
// first int; no need for comma
bsSeq(1)
} else {
// not first int; add comma
ByteString(",") ++ bsSeq(1)
}
}
)
complete((streamingSource))
}
curl localhost:8080/test -v
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /test HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: */*
>
< HTTP/1.1 200 OK
< Server: akka-http/10.2.4
< Date: Tue, 20 Jul 2021 08:28:05 GMT
< Transfer-Encoding: chunked
< Content-Type: text/plain; charset=UTF-8
<
* Connection #0 to host localhost left intact
1,2,3,4,5* Closing connection 0
我有一个 Akka 源作为数字流实现为:
Source(Stream(1, 2, 3, 4, 5))
我正在尝试利用 Akka HTTP 中的 Akka 流式传输支持来 return 作为 逗号分隔值 .
的流式响应我在简单的 csv 源流上关注 akka doc 以得出以下实现:
implicit val csvFormat = Marshaller.strict[Int, ByteString] { res =>
Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () => {
ByteString(List(res).mkString(","))
})
}
implicit val streamingSupport: CsvEntityStreamingSupport = EntityStreamingSupport.csv()
complete(Source(Stream(1, 2, 3, 4, 5)))
但显然这不是 CSV 实体流支持对我的目的的正确用例。这导致每个数字都在新行中流式传输。
但这不是我想要的。我想以逗号分隔的列表形式回复,例如 1,2,3,4,5
.
如何使用 Akka HTTP 中的流支持实现这一点?
您缺少更高级别的class。它不能是 Int 并由多列组成一行。创建如下 MyBO
这样的 class 就可以了。
case class MyBO(a: Int, b: Int, c: Int, d: Int, e: Int)
implicit val myBOAsCsv = Marshaller.strict[MyBO, ByteString] { t =>
Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () => {
ByteString(List(t.a, t.b, t.c, t.d, t.e).mkString(","))
})
}
implicit val csvStreaming = EntityStreamingSupport.csv()
val route: Route =
path("ping") {
get {
complete(Source(Stream(MyBO(1,3,5,7,11))))
}
}
curl localhost:8080/ping -v [3f8df8e]
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /ping HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: */*
>
< HTTP/1.1 200 OK
< Server: akka-http/10.2.4
< Date: Tue, 20 Jul 2021 07:10:18 GMT
< Transfer-Encoding: chunked
< Content-Type: text/csv; charset=UTF-8
<
1,3,5,7,11
* Connection #0 to host localhost left intact
* Closing connection 0
该换行符由 EntityStreamingSupport.csv()
中定义的流渲染器添加。
我们需要定义自己的自定义 EntityStreamingSupport
才能正常工作。
val route =
path("test") {
val responseSource: Source[Int, NotUsed] =
Source.fromIterator(() => Stream(1, 2, 3, 4, 5).iterator)
val byteStringSource: Source[ByteString, NotUsed] =
responseSource.map(i => ByteString(i.toString))
val streamingSource =
byteStringSource.map(bs => HttpEntity(ContentTypes.`text/plain(UTF-8)`, bs))
implicit val streamingSupport =
EntityStreamingSupport.csv(maxLineLength = 16 * 1024)
.withSupported(ContentTypeRange(ContentTypes.`text/plain(UTF-8)`))
.withContentType(ContentTypes.`text/plain(UTF-8)`)
.withFramingRenderer(Flow[ByteString].map(bs => bs ++ ByteString(",")))
complete((streamingSource))
}
curl localhost:8080/test -v
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /test HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: */*
>
< HTTP/1.1 200 OK
< Server: akka-http/10.2.4
< Date: Tue, 20 Jul 2021 07:50:46 GMT
< Transfer-Encoding: chunked
< Content-Type: text/plain; charset=UTF-8
<
* Connection #0 to host localhost left intact
1,2,3,4,5,* Closing connection 0
编辑:要消除拖尾逗号,我们可以使用 window hack。
val route =
path("test") {
val responseSource: Source[Int, NotUsed] =
Source.fromIterator(() => Stream(1, 2, 3, 4, 5).iterator)
val startByteString = ByteString("$start$")
val byteStringSource: Source[ByteString, NotUsed] =
responseSource.map(i => ByteString(i.toString)).prepend(Source.single(startByteString))
val streamingSource =
byteStringSource.map(bs => HttpEntity(ContentTypes.`text/plain(UTF-8)`, bs))
implicit val streamingSupport =
EntityStreamingSupport.csv(maxLineLength = 16 * 1024)
.withSupported(ContentTypeRange(ContentTypes.`text/plain(UTF-8)`))
.withContentType(ContentTypes.`text/plain(UTF-8)`)
.withFramingRenderer(
Flow[ByteString].sliding(2, 1)
.map { bsSeq =>
if (startByteString.equals(bsSeq(0))) {
// first int; no need for comma
bsSeq(1)
} else {
// not first int; add comma
ByteString(",") ++ bsSeq(1)
}
}
)
complete((streamingSource))
}
curl localhost:8080/test -v
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /test HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: */*
>
< HTTP/1.1 200 OK
< Server: akka-http/10.2.4
< Date: Tue, 20 Jul 2021 08:28:05 GMT
< Transfer-Encoding: chunked
< Content-Type: text/plain; charset=UTF-8
<
* Connection #0 to host localhost left intact
1,2,3,4,5* Closing connection 0