与 Akka 流一起发送元数据

Send metadata along with Akka stream

这是我之前的问题:

我已经成功地通过 Akka 流发送了压缩和加密的文件。现在,我正在寻找将元数据与数据一起传输的方法,主要是文件名和哈希值(校验和)。

我目前的想法是使用 Flow.prepend 函数并以这种方式在数据之前插入元数据:

  1. 文件名,大小可以变化但总是以空字节结尾

  2. 固定大小哈希(校验和)

  3. 数据

然后,在接收端我将不得不使用 Flow.takeWhile 两次 - 一次读取文件名,第二次读取哈希,然后只读取数据。它看起来并不像优雅的解决方案,而且如果将来我想添加更多元数据,它会变得更糟。

我注意到方法 Flow.named,但是文档只说:

Add a ``name`` attribute to this Flow.

我不知道如何使用它(以及是否可以通过它传输文件名)。

问题是:通过 Akka 流传输元数据和数据是否有比上述更好的想法?

编辑:附上我的想法。

我认为将元数据放在前面很有意义。一种简单的方法是使用您用于发送数据的相同框架在元数据前添加。

接收端需要知道有多少个元数据块,并使用此信息进行拆分。请参阅下面的示例。

// client end
filenameSrc
  .concat(hashSrc)
  .concat(dataSrc)
  .via(Framing.delimiter(ByteString("\n"), Int.MaxValue, allowTruncation = true))
  .via(Tcp().outgoingConnection(???, ???))
  .runForeach{ ??? }

// server end
val printMetadata =
  Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    val metadataSink = Sink.foreach(println)
    val bcast = builder.add(Broadcast[ByteString](2))

    bcast.out(0).take(2) ~> metadataSink

    FlowShape(bcast.in, bcast.out(1).drop(2).outlet)
  })

val handler =
  Framing.delimiter(ByteString("\n"), Int.MaxValue)
  .via(printMetadata)
  .via(???)

这只是解决此问题的众多可能方法之一。但是无论您选择哪种解决方案,接收方都需要了解如何从通过 TCP 读取的原始字节流中提取元数据。