是否可以提取 akkastreams 中的子流密钥?
Is it possible to extract the substream key in akkastreams?
我似乎找不到任何关于此的文档,但我知道 AkkaStreams 在内存中调用 groupBy 时存储用于将流分组为子流的键。是否可以从子流中提取这些密钥?假设我从我的主流创建了一堆子流,将它们传递给一个对每个子流中的对象进行计数的折叠,然后将计数存储在 class 中。我可以让子流的密钥也传递给那个 class 吗?或者有更好的方法吗?我需要计算每个子流的每个元素,但我还需要存储计数属于哪个组。
中显示了一个很好的示例
val counts: Source[(String, Int), NotUsed] = words
// split the words into separate streams first
.groupBy(MaximumDistinctWords, identity)
//transform each element to pair with number of words in it
.map(_ -> 1)
// add counting logic to the streams
.reduce((l, r) => (l._1, l._2 + r._2))
// get a stream of word counts
.mergeSubstreams
然后是:
val words = Source(List("Hello", "world", "let's", "say", "again", "Hello", "world"))
counts.runWith(Sink.foreach(println))
将打印:
(world,2)
(Hello,2)
(let's,1)
(again,1)
(say,1)
我想到的另一个例子是用余数计算数字。所以下面,例如:
Source(0 to 101)
.groupBy(10, x => x % 4)
.map(e => e % 4 -> 1)
.reduce((l, r) => (l._1, l._2 + r._2))
.mergeSubstreams.to(Sink.foreach(println)).run()
将打印:
(0,26)
(1,26)
(2,25)
(3,25)
我似乎找不到任何关于此的文档,但我知道 AkkaStreams 在内存中调用 groupBy 时存储用于将流分组为子流的键。是否可以从子流中提取这些密钥?假设我从我的主流创建了一堆子流,将它们传递给一个对每个子流中的对象进行计数的折叠,然后将计数存储在 class 中。我可以让子流的密钥也传递给那个 class 吗?或者有更好的方法吗?我需要计算每个子流的每个元素,但我还需要存储计数属于哪个组。
val counts: Source[(String, Int), NotUsed] = words
// split the words into separate streams first
.groupBy(MaximumDistinctWords, identity)
//transform each element to pair with number of words in it
.map(_ -> 1)
// add counting logic to the streams
.reduce((l, r) => (l._1, l._2 + r._2))
// get a stream of word counts
.mergeSubstreams
然后是:
val words = Source(List("Hello", "world", "let's", "say", "again", "Hello", "world"))
counts.runWith(Sink.foreach(println))
将打印:
(world,2)
(Hello,2)
(let's,1)
(again,1)
(say,1)
我想到的另一个例子是用余数计算数字。所以下面,例如:
Source(0 to 101)
.groupBy(10, x => x % 4)
.map(e => e % 4 -> 1)
.reduce((l, r) => (l._1, l._2 + r._2))
.mergeSubstreams.to(Sink.foreach(println)).run()
将打印:
(0,26)
(1,26)
(2,25)
(3,25)