Akka Streams 高效 fold/merge 子流(WebSocket 框架 -> 消息)
Akka Streams efficiently fold/merge substreams (WebSocket Frames -> Messages)
tldr。我如何有效地耗尽 Akka HTTP 中的 BinaryMessages 以创建一个 ByteString 流,其中每个 ByteString 匹配一个 WS 对象。
我想构建一个 Akka WebSocket 服务器,它将完整的 WebSocket 对象作为 ByteString 进行流式传输,即 assembles WebSocket 帧,直到我有一个完整的 WS 对象并将其发送到下游。或者更一般地说,我有一个源流,并希望在转发到下游之前将每个源合并到一个元素中
E1(S1(a,b,c)), E2(S2(d,e,f,g)), E3(S3(h,i)) -> E1(abc), E2(defg), E3(hi)
// E = one element in the parent stream
// S a inner source, not all child elements might be available directly
// a-i the actual data elements
然而,我对 API / 有效执行此操作的最佳方法有点挣扎。我想出了以下代码,它使用 Sink.fold 来耗尽资源:
def flattenSink[Mat](sink: Sink[ByteString, Mat], materializer: Materializer): Sink[BinaryMessage, Mat] = {
Flow[BinaryMessage]
.map(d => {
val graph = d.dataStream.toMat(Sink.fold(ByteString.empty)((a, b) => a ++ b))(Keep.right)
val future = graph.run()(materializer)
Source.fromFuture(future)
})
.flatMapConcat(identity)
.toMat(sink)(Keep.right)
}
// or similar with the WS API
Flow[BinaryMessage]
.map(d => d.toStrict(timeout, materializer))
...
但在我看来,添加的物化器可能会变得低效,可能会有上下文切换到不同的线程...
有更好的方法吗?首选以明显作为主流的一部分运行的方式,没有不必要的上下文切换到另一个线程?
(我不关心 WS 对象可能具有的大小,assemble 它们可能花费的时间,在我的情况下两者都很小,我不会流式传输技嘉大小的对象)
谢谢!
我找到了一个使用 flatMapConcat 内置功能的解决方案。由于 flatMapConcat 在内部具体化了一个 Source,它还允许将我的 WebSocket 帧源转换为单个 ByteString 的 Source,而无需外部具体化器
def flattenSink[Mat](sink: Sink[ByteString, Mat]): Sink[BinaryMessage, Mat] = {
Flow[BinaryMessage]
.flatMapConcat(msg => if (msg.isStrict) {
Source.single(msg.getStrictData)
} else {
msg.dataStream
.fold(new ByteStringBuilder())((b, e) => b.append(e))
.map(x => x.result())
})
.toMat(sink)(Keep.right)
}
- materializer:它应该和运行 Flow 的一样
- bytestring 连接:构建器应该尽可能高效
- 严格消息:将它们包装在 Source.single 中似乎没有必要,但我找不到解决方法。
tldr。我如何有效地耗尽 Akka HTTP 中的 BinaryMessages 以创建一个 ByteString 流,其中每个 ByteString 匹配一个 WS 对象。
我想构建一个 Akka WebSocket 服务器,它将完整的 WebSocket 对象作为 ByteString 进行流式传输,即 assembles WebSocket 帧,直到我有一个完整的 WS 对象并将其发送到下游。或者更一般地说,我有一个源流,并希望在转发到下游之前将每个源合并到一个元素中
E1(S1(a,b,c)), E2(S2(d,e,f,g)), E3(S3(h,i)) -> E1(abc), E2(defg), E3(hi)
// E = one element in the parent stream
// S a inner source, not all child elements might be available directly
// a-i the actual data elements
然而,我对 API / 有效执行此操作的最佳方法有点挣扎。我想出了以下代码,它使用 Sink.fold 来耗尽资源:
def flattenSink[Mat](sink: Sink[ByteString, Mat], materializer: Materializer): Sink[BinaryMessage, Mat] = {
Flow[BinaryMessage]
.map(d => {
val graph = d.dataStream.toMat(Sink.fold(ByteString.empty)((a, b) => a ++ b))(Keep.right)
val future = graph.run()(materializer)
Source.fromFuture(future)
})
.flatMapConcat(identity)
.toMat(sink)(Keep.right)
}
// or similar with the WS API
Flow[BinaryMessage]
.map(d => d.toStrict(timeout, materializer))
...
但在我看来,添加的物化器可能会变得低效,可能会有上下文切换到不同的线程...
有更好的方法吗?首选以明显作为主流的一部分运行的方式,没有不必要的上下文切换到另一个线程?
(我不关心 WS 对象可能具有的大小,assemble 它们可能花费的时间,在我的情况下两者都很小,我不会流式传输技嘉大小的对象)
谢谢!
我找到了一个使用 flatMapConcat 内置功能的解决方案。由于 flatMapConcat 在内部具体化了一个 Source,它还允许将我的 WebSocket 帧源转换为单个 ByteString 的 Source,而无需外部具体化器
def flattenSink[Mat](sink: Sink[ByteString, Mat]): Sink[BinaryMessage, Mat] = {
Flow[BinaryMessage]
.flatMapConcat(msg => if (msg.isStrict) {
Source.single(msg.getStrictData)
} else {
msg.dataStream
.fold(new ByteStringBuilder())((b, e) => b.append(e))
.map(x => x.result())
})
.toMat(sink)(Keep.right)
}
- materializer:它应该和运行 Flow 的一样
- bytestring 连接:构建器应该尽可能高效
- 严格消息:将它们包装在 Source.single 中似乎没有必要,但我找不到解决方法。