如何在 Akka HTTP 中将数字流作为 CSV 值完成?

How to complete a stream of numbers as CSV values in Akka HTTP?

我有一个 Akka 源作为数字流实现为:

Source(Stream(1, 2, 3, 4, 5))

我正在尝试利用 Akka HTTP 中的 Akka 流式传输支持来 return 作为 逗号分隔值 .

的流式响应

我在简单的 csv 源流上关注 akka doc 以得出以下实现:

implicit val csvFormat = Marshaller.strict[Int, ByteString] { res =>
    Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () => {
    ByteString(List(res).mkString(","))
    })
}

implicit val streamingSupport: CsvEntityStreamingSupport = EntityStreamingSupport.csv()

complete(Source(Stream(1, 2, 3, 4, 5)))

但显然这不是 CSV 实体流支持对我的目的的正确用例。这导致每个数字都在新行中流式传输。

但这不是我想要的。我想以逗号分隔的列表形式回复,例如 1,2,3,4,5.

如何使用 Akka HTTP 中的流支持实现这一点?

您缺少更高级别的class。它不能是 Int 并由多列组成一行。创建如下 MyBO 这样的 class 就可以了。

    case class MyBO(a: Int, b: Int, c: Int, d: Int, e: Int)
    
    implicit val myBOAsCsv = Marshaller.strict[MyBO, ByteString] { t =>
      Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () => {
        ByteString(List(t.a, t.b, t.c, t.d, t.e).mkString(","))
        })
      }

    implicit val csvStreaming = EntityStreamingSupport.csv()

    val route: Route =
      path("ping") {
        get {
          complete(Source(Stream(MyBO(1,3,5,7,11))))
        }
      }
 curl localhost:8080/ping -v                                                                                                                               [3f8df8e]
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /ping HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: */*
>
< HTTP/1.1 200 OK
< Server: akka-http/10.2.4
< Date: Tue, 20 Jul 2021 07:10:18 GMT
< Transfer-Encoding: chunked
< Content-Type: text/csv; charset=UTF-8
<
1,3,5,7,11
* Connection #0 to host localhost left intact
* Closing connection 0

该换行符由 EntityStreamingSupport.csv() 中定义的流渲染器添加。

我们需要定义自己的自定义 EntityStreamingSupport 才能正常工作。

val route =
  path("test") {
    val responseSource: Source[Int, NotUsed] =
      Source.fromIterator(() => Stream(1, 2, 3, 4, 5).iterator)

    val byteStringSource: Source[ByteString, NotUsed] =
      responseSource.map(i => ByteString(i.toString))

    val streamingSource =
      byteStringSource.map(bs => HttpEntity(ContentTypes.`text/plain(UTF-8)`, bs))

    implicit val streamingSupport =
      EntityStreamingSupport.csv(maxLineLength = 16 * 1024)
        .withSupported(ContentTypeRange(ContentTypes.`text/plain(UTF-8)`))
        .withContentType(ContentTypes.`text/plain(UTF-8)`)
        .withFramingRenderer(Flow[ByteString].map(bs => bs ++ ByteString(",")))

    complete((streamingSource))
  }
curl localhost:8080/test -v 
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /test HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: */*
> 
< HTTP/1.1 200 OK
< Server: akka-http/10.2.4
< Date: Tue, 20 Jul 2021 07:50:46 GMT
< Transfer-Encoding: chunked
< Content-Type: text/plain; charset=UTF-8
< 
* Connection #0 to host localhost left intact
1,2,3,4,5,* Closing connection 0

编辑:要消除拖尾逗号,我们可以使用 window hack。

val route =
  path("test") {
    val responseSource: Source[Int, NotUsed] =
      Source.fromIterator(() => Stream(1, 2, 3, 4, 5).iterator)

    val startByteString = ByteString("$start$")

    val byteStringSource: Source[ByteString, NotUsed] =
        responseSource.map(i => ByteString(i.toString)).prepend(Source.single(startByteString))

    val streamingSource =
      byteStringSource.map(bs => HttpEntity(ContentTypes.`text/plain(UTF-8)`, bs))

    implicit val streamingSupport =
      EntityStreamingSupport.csv(maxLineLength = 16 * 1024)
        .withSupported(ContentTypeRange(ContentTypes.`text/plain(UTF-8)`))
        .withContentType(ContentTypes.`text/plain(UTF-8)`)
        .withFramingRenderer(
          Flow[ByteString].sliding(2, 1)
            .map { bsSeq =>
              if (startByteString.equals(bsSeq(0))) {
                // first int; no need for comma
                bsSeq(1)
              } else {
                // not first int; add comma
                ByteString(",") ++ bsSeq(1)
              }

            }
        )

    complete((streamingSource))
  }
curl localhost:8080/test -v 
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /test HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: */*
> 
< HTTP/1.1 200 OK
< Server: akka-http/10.2.4
< Date: Tue, 20 Jul 2021 08:28:05 GMT
< Transfer-Encoding: chunked
< Content-Type: text/plain; charset=UTF-8
< 
* Connection #0 to host localhost left intact
1,2,3,4,5* Closing connection 0