Akka Streams 高效 fold/merge 子流(WebSocket 框架 -> 消息)

Akka Streams efficiently fold/merge substreams (WebSocket Frames -> Messages)

tldr。我如何有效地耗尽 Akka HTTP 中的 BinaryMessages 以创建一个 ByteString 流,其中每个 ByteString 匹配一个 WS 对象。

我想构建一个 Akka WebSocket 服务器,它将完整的 WebSocket 对象作为 ByteString 进行流式传输,即 assembles WebSocket 帧,直到我有一个完整的 WS 对象并将其发送到下游。或者更一般地说,我有一个源流,并希望在转发到下游之前将每个源合并到一个元素中

E1(S1(a,b,c)), E2(S2(d,e,f,g)), E3(S3(h,i)) -> E1(abc), E2(defg), E3(hi)
// E = one element in the parent stream
// S a inner source, not all child elements might be available directly
// a-i the actual data elements

然而,我对 API / 有效执行此操作的最佳方法有点挣扎。我想出了以下代码,它使用 Sink.fold 来耗尽资源:

def flattenSink[Mat](sink: Sink[ByteString, Mat], materializer: Materializer): Sink[BinaryMessage, Mat] = {
  Flow[BinaryMessage]
    .map(d => {
      val graph = d.dataStream.toMat(Sink.fold(ByteString.empty)((a, b) => a ++ b))(Keep.right)
      val future = graph.run()(materializer)
      Source.fromFuture(future)
    })
    .flatMapConcat(identity)
    .toMat(sink)(Keep.right)
}

// or similar with the WS API 

Flow[BinaryMessage]
  .map(d => d.toStrict(timeout, materializer))
  ...

但在我看来,添加的物化器可能会变得低效,可能会有上下文切换到不同的线程...

有更好的方法吗?首选以明显作为主流的一部分运行的方式,没有不必要的上下文切换到另一个线程?

(我不关心 WS 对象可能具有的大小,assemble 它们可能花费的时间,在我的情况下两者都很小,我不会流式传输技嘉大小的对象)

谢谢!

我找到了一个使用 flatMapConcat 内置功能的解决方案。由于 flatMapConcat 在内部具体化了一个 Source,它还允许将我的 WebSocket 帧源转换为单个 ByteString 的 Source,而无需外部具体化器

  def flattenSink[Mat](sink: Sink[ByteString, Mat]): Sink[BinaryMessage, Mat] = {
    Flow[BinaryMessage]
      .flatMapConcat(msg => if (msg.isStrict) {
        Source.single(msg.getStrictData)
      } else {
        msg.dataStream
          .fold(new ByteStringBuilder())((b, e) => b.append(e))
          .map(x => x.result())
      })
      .toMat(sink)(Keep.right)
  }
  • materializer:它应该和运行 Flow 的一样
  • bytestring 连接:构建器应该尽可能高效
  • 严格消息:将它们包装在 Source.single 中似乎没有必要,但我找不到解决方法。