从 akka.stream.scaladsl.Source 读取第一个字节

Reading first bytes from akka.stream.scaladsl.Source

我正在尝试从 akka.stream.scaladsl.Source[ByteString, Any] 中读取前 16 个字节并返回 [Array[Byte], Source[ByteString, Any]]

读取前 16 个字节后,我想像往常一样传输剩余的 Source

用例:

Source[ByteString, Any]是一个加密流,流中的前16个字节是初始化向量。我需要获取初始化向量才能解密流的其余部分。

这是我试过的:

Source.single(ByteString("This is my test string"))
      .prefixAndTail(16).runWith(Sink.head)

我想要这样的东西,但是 prefixAndTail 将元素数作为输入。 元素数不是字节数。

如果您有任何建议,请告诉我。谢谢!

以下示例对您的用例做出了一些假设:

  • Source 中的第一个 ByteString 元素始终包含 16 字节的初始化向量(我将在此称为 "key")。第一个元素中的剩余字节(即超过前 16 字节的字节)可以用密钥解密。 (为简单起见,此示例将前三个字节视为密钥。)
  • 解密后的值为String
val b1 = ByteString.fromString("abcdef")
val b2 = ByteString.fromString("ghijkl")
val b3 = ByteString.fromString("mnopqr")
val b4 = ByteString.fromString("stuvwx")

val byteStringSource = Source(Vector(b1, b2, b3, b4))

// The first value in the tuple argument is the ByteString key, the second is
// the encrypted ByteString. Returns the original encrypted ByteString and the
// decrypted String as a Some (or None if the decryption fails).
def decrypt(keyAndEncrypted: (ByteString, ByteString)): (ByteString, Option[String]) = {
  // do fancy decryption stuff with the key
  (keyAndEncrypted._2, Option(keyAndEncrypted._2.utf8String.toUpperCase))
}

val decryptionFlow = Flow.fromFunction(decrypt)

val decryptedSource: Source[(ByteString, Option[String]), NotUsed] =
  byteStringSource
    .prefixAndTail(1)
    .map {
      case (prefix, tail) =>
        val (key, rest) = prefix.head.splitAt(3) // using head instead of headOption for simplicity
        (key, Source(Vector(rest)).concat(tail))
    }
    .collect { case (key, bSource) => bSource.map(b => (key, b)) }
    .flatMapConcat(identity)
    .via(decryptionFlow)

decryptedSource.runForeach {
  case (encrypted, decrypted) =>
    println((encrypted.utf8String, decrypted))
}

运行 以上打印如下:

(def,Some(DEF))
(ghijkl,Some(GHIJKL))
(mnopqr,Some(MNOPQR))
(stuvwx,Some(STUVWX))

在本例中,我使用 Source 中第一个 ByteString 的前三个字节并将其用作密钥。初始 ByteString 中剩余的三个字节作为 Source 的其余部分(尾部)的前缀,然后对生成的 Source 进行转换,使得密钥与每个 [=12] 耦合=] 元素。然后 Source 被扁平化并通过 Flow 解密。 Flow returns 原始加密 ByteString 和包含解密值的 Option[String]

希望这至少能为您的用例提供一些灵感和想法。

有几点需要注意:

  1. 由于输入源来自网络,部分ByteString可能为空
  2. 我们需要前 16 个字节才能正确解密流的其余部分

我会在代码中留下一些注释作为解释。

source
    .via(Flow[ByteString].map(d => {
      // Converts Source[ByteString] to Source[List[Byte]]
      d.toByteBuffer.array().toList
    }))
    // Source[List[Byte]] to Source[Byte]
    .mapConcat(identity)
    // Get the first 16 bytes from Source[Byte] and a stream of the remaining bytes Source[(Seq[byte], Source[Byte])
    .prefixAndTail(16)
    // Source[(Seq[byte], Source[Byte]) to Source[Source[(Seq[Byte], Array[Byte])]]
    .collect { case (key, source) =>      
      source.map(b => (key, Array(b)))
    }
    // Source[Source[(Seq[Byte], Array[Byte])]] to Source[(Seq[Byte], Array[Byte])]
    .flatMapConcat(identity)
    .runForeach {
      case (key, rest) =>
        println(s"${key.map(_.toChar).mkString} : ${rest.map(_.toChar).mkString}")
    }

包含空 ByteString 的测试示例:

val source = Source(Iterable[ByteString](
    ByteString(""), // empty ByteString to imitate empty element from database stream
    ByteString("abcdefghijklmnop <- first 16 bytes"))
  )

结果预计 abcdefghijklmnop 作为前 16 个字节

abcdefghijklmnop :  
abcdefghijklmnop : <
abcdefghijklmnop : -
abcdefghijklmnop :  
abcdefghijklmnop : f
abcdefghijklmnop : i
abcdefghijklmnop : r
abcdefghijklmnop : s
abcdefghijklmnop : t
abcdefghijklmnop :  
abcdefghijklmnop : 1
abcdefghijklmnop : 6
abcdefghijklmnop :  
abcdefghijklmnop : b
abcdefghijklmnop : y
abcdefghijklmnop : t
abcdefghijklmnop : e
abcdefghijklmnop : s

好老Java来拯救:

val ivBytesBuffer = new Array[Byte](16)
val is = new FileInputStream(fileName)
is.read(ivBytesBuffer)

val source = StreamConverters.fromInputStream(() => is)
decryptAes(source, keySpec, ivBytesBuffer)

Read First 4 Bytes of File

所述