从 akka.stream.scaladsl.Source 读取第一个字节
Reading first bytes from akka.stream.scaladsl.Source
我正在尝试从 akka.stream.scaladsl.Source[ByteString, Any]
中读取前 16 个字节并返回 [Array[Byte], Source[ByteString, Any]]
。
读取前 16 个字节后,我想像往常一样传输剩余的 Source
。
用例:
Source[ByteString, Any]
是一个加密流,流中的前16个字节是初始化向量。我需要获取初始化向量才能解密流的其余部分。
这是我试过的:
Source.single(ByteString("This is my test string"))
.prefixAndTail(16).runWith(Sink.head)
我想要这样的东西,但是 prefixAndTail
将元素数作为输入。 元素数不是字节数。
如果您有任何建议,请告诉我。谢谢!
以下示例对您的用例做出了一些假设:
Source
中的第一个 ByteString
元素始终包含 16 字节的初始化向量(我将在此称为 "key")。第一个元素中的剩余字节(即超过前 16 字节的字节)可以用密钥解密。 (为简单起见,此示例将前三个字节视为密钥。)
- 解密后的值为
String
。
val b1 = ByteString.fromString("abcdef")
val b2 = ByteString.fromString("ghijkl")
val b3 = ByteString.fromString("mnopqr")
val b4 = ByteString.fromString("stuvwx")
val byteStringSource = Source(Vector(b1, b2, b3, b4))
// The first value in the tuple argument is the ByteString key, the second is
// the encrypted ByteString. Returns the original encrypted ByteString and the
// decrypted String as a Some (or None if the decryption fails).
def decrypt(keyAndEncrypted: (ByteString, ByteString)): (ByteString, Option[String]) = {
// do fancy decryption stuff with the key
(keyAndEncrypted._2, Option(keyAndEncrypted._2.utf8String.toUpperCase))
}
val decryptionFlow = Flow.fromFunction(decrypt)
val decryptedSource: Source[(ByteString, Option[String]), NotUsed] =
byteStringSource
.prefixAndTail(1)
.map {
case (prefix, tail) =>
val (key, rest) = prefix.head.splitAt(3) // using head instead of headOption for simplicity
(key, Source(Vector(rest)).concat(tail))
}
.collect { case (key, bSource) => bSource.map(b => (key, b)) }
.flatMapConcat(identity)
.via(decryptionFlow)
decryptedSource.runForeach {
case (encrypted, decrypted) =>
println((encrypted.utf8String, decrypted))
}
运行 以上打印如下:
(def,Some(DEF))
(ghijkl,Some(GHIJKL))
(mnopqr,Some(MNOPQR))
(stuvwx,Some(STUVWX))
在本例中,我使用 Source
中第一个 ByteString
的前三个字节并将其用作密钥。初始 ByteString
中剩余的三个字节作为 Source
的其余部分(尾部)的前缀,然后对生成的 Source
进行转换,使得密钥与每个 [=12] 耦合=] 元素。然后 Source
被扁平化并通过 Flow
解密。 Flow
returns 原始加密 ByteString
和包含解密值的 Option[String]
。
希望这至少能为您的用例提供一些灵感和想法。
有几点需要注意:
- 由于输入源来自网络,部分ByteString可能为空
- 我们需要前 16 个字节才能正确解密流的其余部分
我会在代码中留下一些注释作为解释。
source
.via(Flow[ByteString].map(d => {
// Converts Source[ByteString] to Source[List[Byte]]
d.toByteBuffer.array().toList
}))
// Source[List[Byte]] to Source[Byte]
.mapConcat(identity)
// Get the first 16 bytes from Source[Byte] and a stream of the remaining bytes Source[(Seq[byte], Source[Byte])
.prefixAndTail(16)
// Source[(Seq[byte], Source[Byte]) to Source[Source[(Seq[Byte], Array[Byte])]]
.collect { case (key, source) =>
source.map(b => (key, Array(b)))
}
// Source[Source[(Seq[Byte], Array[Byte])]] to Source[(Seq[Byte], Array[Byte])]
.flatMapConcat(identity)
.runForeach {
case (key, rest) =>
println(s"${key.map(_.toChar).mkString} : ${rest.map(_.toChar).mkString}")
}
包含空 ByteString 的测试示例:
val source = Source(Iterable[ByteString](
ByteString(""), // empty ByteString to imitate empty element from database stream
ByteString("abcdefghijklmnop <- first 16 bytes"))
)
结果预计 abcdefghijklmnop
作为前 16 个字节
abcdefghijklmnop :
abcdefghijklmnop : <
abcdefghijklmnop : -
abcdefghijklmnop :
abcdefghijklmnop : f
abcdefghijklmnop : i
abcdefghijklmnop : r
abcdefghijklmnop : s
abcdefghijklmnop : t
abcdefghijklmnop :
abcdefghijklmnop : 1
abcdefghijklmnop : 6
abcdefghijklmnop :
abcdefghijklmnop : b
abcdefghijklmnop : y
abcdefghijklmnop : t
abcdefghijklmnop : e
abcdefghijklmnop : s
好老Java来拯救:
val ivBytesBuffer = new Array[Byte](16)
val is = new FileInputStream(fileName)
is.read(ivBytesBuffer)
val source = StreamConverters.fromInputStream(() => is)
decryptAes(source, keySpec, ivBytesBuffer)
如Read First 4 Bytes of File
所述
我正在尝试从 akka.stream.scaladsl.Source[ByteString, Any]
中读取前 16 个字节并返回 [Array[Byte], Source[ByteString, Any]]
。
读取前 16 个字节后,我想像往常一样传输剩余的 Source
。
用例:
Source[ByteString, Any]
是一个加密流,流中的前16个字节是初始化向量。我需要获取初始化向量才能解密流的其余部分。
这是我试过的:
Source.single(ByteString("This is my test string"))
.prefixAndTail(16).runWith(Sink.head)
我想要这样的东西,但是 prefixAndTail
将元素数作为输入。 元素数不是字节数。
如果您有任何建议,请告诉我。谢谢!
以下示例对您的用例做出了一些假设:
Source
中的第一个ByteString
元素始终包含 16 字节的初始化向量(我将在此称为 "key")。第一个元素中的剩余字节(即超过前 16 字节的字节)可以用密钥解密。 (为简单起见,此示例将前三个字节视为密钥。)- 解密后的值为
String
。
val b1 = ByteString.fromString("abcdef")
val b2 = ByteString.fromString("ghijkl")
val b3 = ByteString.fromString("mnopqr")
val b4 = ByteString.fromString("stuvwx")
val byteStringSource = Source(Vector(b1, b2, b3, b4))
// The first value in the tuple argument is the ByteString key, the second is
// the encrypted ByteString. Returns the original encrypted ByteString and the
// decrypted String as a Some (or None if the decryption fails).
def decrypt(keyAndEncrypted: (ByteString, ByteString)): (ByteString, Option[String]) = {
// do fancy decryption stuff with the key
(keyAndEncrypted._2, Option(keyAndEncrypted._2.utf8String.toUpperCase))
}
val decryptionFlow = Flow.fromFunction(decrypt)
val decryptedSource: Source[(ByteString, Option[String]), NotUsed] =
byteStringSource
.prefixAndTail(1)
.map {
case (prefix, tail) =>
val (key, rest) = prefix.head.splitAt(3) // using head instead of headOption for simplicity
(key, Source(Vector(rest)).concat(tail))
}
.collect { case (key, bSource) => bSource.map(b => (key, b)) }
.flatMapConcat(identity)
.via(decryptionFlow)
decryptedSource.runForeach {
case (encrypted, decrypted) =>
println((encrypted.utf8String, decrypted))
}
运行 以上打印如下:
(def,Some(DEF))
(ghijkl,Some(GHIJKL))
(mnopqr,Some(MNOPQR))
(stuvwx,Some(STUVWX))
在本例中,我使用 Source
中第一个 ByteString
的前三个字节并将其用作密钥。初始 ByteString
中剩余的三个字节作为 Source
的其余部分(尾部)的前缀,然后对生成的 Source
进行转换,使得密钥与每个 [=12] 耦合=] 元素。然后 Source
被扁平化并通过 Flow
解密。 Flow
returns 原始加密 ByteString
和包含解密值的 Option[String]
。
希望这至少能为您的用例提供一些灵感和想法。
有几点需要注意:
- 由于输入源来自网络,部分ByteString可能为空
- 我们需要前 16 个字节才能正确解密流的其余部分
我会在代码中留下一些注释作为解释。
source
.via(Flow[ByteString].map(d => {
// Converts Source[ByteString] to Source[List[Byte]]
d.toByteBuffer.array().toList
}))
// Source[List[Byte]] to Source[Byte]
.mapConcat(identity)
// Get the first 16 bytes from Source[Byte] and a stream of the remaining bytes Source[(Seq[byte], Source[Byte])
.prefixAndTail(16)
// Source[(Seq[byte], Source[Byte]) to Source[Source[(Seq[Byte], Array[Byte])]]
.collect { case (key, source) =>
source.map(b => (key, Array(b)))
}
// Source[Source[(Seq[Byte], Array[Byte])]] to Source[(Seq[Byte], Array[Byte])]
.flatMapConcat(identity)
.runForeach {
case (key, rest) =>
println(s"${key.map(_.toChar).mkString} : ${rest.map(_.toChar).mkString}")
}
包含空 ByteString 的测试示例:
val source = Source(Iterable[ByteString](
ByteString(""), // empty ByteString to imitate empty element from database stream
ByteString("abcdefghijklmnop <- first 16 bytes"))
)
结果预计 abcdefghijklmnop
作为前 16 个字节
abcdefghijklmnop :
abcdefghijklmnop : <
abcdefghijklmnop : -
abcdefghijklmnop :
abcdefghijklmnop : f
abcdefghijklmnop : i
abcdefghijklmnop : r
abcdefghijklmnop : s
abcdefghijklmnop : t
abcdefghijklmnop :
abcdefghijklmnop : 1
abcdefghijklmnop : 6
abcdefghijklmnop :
abcdefghijklmnop : b
abcdefghijklmnop : y
abcdefghijklmnop : t
abcdefghijklmnop : e
abcdefghijklmnop : s
好老Java来拯救:
val ivBytesBuffer = new Array[Byte](16)
val is = new FileInputStream(fileName)
is.read(ivBytesBuffer)
val source = StreamConverters.fromInputStream(() => is)
decryptAes(source, keySpec, ivBytesBuffer)
如Read First 4 Bytes of File
所述