是否可以提取 akkastreams 中的子流密钥?

Is it possible to extract the substream key in akkastreams?

我似乎找不到任何关于此的文档,但我知道 AkkaStreams 在内存中调用 groupBy 时存储用于将流分组为子流的键。是否可以从子流中提取这些密钥?假设我从我的主流创建了一堆子流,将它们传递给一个对每个子流中的对象进行计数的折叠,然后将计数存储在 class 中。我可以让子流的密钥也传递给那个 class 吗?或者有更好的方法吗?我需要计算每个子流的每个元素,但我还需要存储计数属于哪个组。

stream-cookbook:

中显示了一个很好的示例
val counts: Source[(String, Int), NotUsed] = words
// split the words into separate streams first
  .groupBy(MaximumDistinctWords, identity)
  //transform each element to pair with number of words in it
  .map(_ -> 1)
  // add counting logic to the streams
  .reduce((l, r) => (l._1, l._2 + r._2))
  // get a stream of word counts
  .mergeSubstreams

然后是:

val words = Source(List("Hello", "world", "let's", "say", "again", "Hello", "world"))
counts.runWith(Sink.foreach(println))

将打印:

(world,2)
(Hello,2)
(let's,1)
(again,1)
(say,1)

我想到的另一个例子是用余数计算数字。所以下面,例如:

Source(0 to 101)
  .groupBy(10, x => x % 4)
  .map(e => e % 4 -> 1)
  .reduce((l, r) => (l._1, l._2 + r._2))
  .mergeSubstreams.to(Sink.foreach(println)).run()

将打印:

(0,26)
(1,26)
(2,25)
(3,25)