与 Akka 流一起发送元数据
Send metadata along with Akka stream
这是我之前的问题:
我已经成功地通过 Akka 流发送了压缩和加密的文件。现在,我正在寻找将元数据与数据一起传输的方法,主要是文件名和哈希值(校验和)。
我目前的想法是使用 Flow.prepend 函数并以这种方式在数据之前插入元数据:
文件名,大小可以变化但总是以空字节结尾
固定大小哈希(校验和)
数据
然后,在接收端我将不得不使用 Flow.takeWhile 两次 - 一次读取文件名,第二次读取哈希,然后只读取数据。它看起来并不像优雅的解决方案,而且如果将来我想添加更多元数据,它会变得更糟。
我注意到方法 Flow.named,但是文档只说:
Add a ``name`` attribute to this Flow.
我不知道如何使用它(以及是否可以通过它传输文件名)。
问题是:通过 Akka 流传输元数据和数据是否有比上述更好的想法?
编辑:附上我的想法。
我认为将元数据放在前面很有意义。一种简单的方法是使用您用于发送数据的相同框架在元数据前添加。
接收端需要知道有多少个元数据块,并使用此信息进行拆分。请参阅下面的示例。
// client end
filenameSrc
.concat(hashSrc)
.concat(dataSrc)
.via(Framing.delimiter(ByteString("\n"), Int.MaxValue, allowTruncation = true))
.via(Tcp().outgoingConnection(???, ???))
.runForeach{ ??? }
// server end
val printMetadata =
Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val metadataSink = Sink.foreach(println)
val bcast = builder.add(Broadcast[ByteString](2))
bcast.out(0).take(2) ~> metadataSink
FlowShape(bcast.in, bcast.out(1).drop(2).outlet)
})
val handler =
Framing.delimiter(ByteString("\n"), Int.MaxValue)
.via(printMetadata)
.via(???)
这只是解决此问题的众多可能方法之一。但是无论您选择哪种解决方案,接收方都需要了解如何从通过 TCP 读取的原始字节流中提取元数据。
这是我之前的问题:
我已经成功地通过 Akka 流发送了压缩和加密的文件。现在,我正在寻找将元数据与数据一起传输的方法,主要是文件名和哈希值(校验和)。
我目前的想法是使用 Flow.prepend 函数并以这种方式在数据之前插入元数据:
文件名,大小可以变化但总是以空字节结尾
固定大小哈希(校验和)
数据
然后,在接收端我将不得不使用 Flow.takeWhile 两次 - 一次读取文件名,第二次读取哈希,然后只读取数据。它看起来并不像优雅的解决方案,而且如果将来我想添加更多元数据,它会变得更糟。
我注意到方法 Flow.named,但是文档只说:
Add a ``name`` attribute to this Flow.
我不知道如何使用它(以及是否可以通过它传输文件名)。
问题是:通过 Akka 流传输元数据和数据是否有比上述更好的想法?
编辑:附上我的想法。
我认为将元数据放在前面很有意义。一种简单的方法是使用您用于发送数据的相同框架在元数据前添加。
接收端需要知道有多少个元数据块,并使用此信息进行拆分。请参阅下面的示例。
// client end
filenameSrc
.concat(hashSrc)
.concat(dataSrc)
.via(Framing.delimiter(ByteString("\n"), Int.MaxValue, allowTruncation = true))
.via(Tcp().outgoingConnection(???, ???))
.runForeach{ ??? }
// server end
val printMetadata =
Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val metadataSink = Sink.foreach(println)
val bcast = builder.add(Broadcast[ByteString](2))
bcast.out(0).take(2) ~> metadataSink
FlowShape(bcast.in, bcast.out(1).drop(2).outlet)
})
val handler =
Framing.delimiter(ByteString("\n"), Int.MaxValue)
.via(printMetadata)
.via(???)
这只是解决此问题的众多可能方法之一。但是无论您选择哪种解决方案,接收方都需要了解如何从通过 TCP 读取的原始字节流中提取元数据。