akka-streams 有状态子流
akka-streams stateful substream flow
有状态流:
val stream = Flow[Event].statefulMapConcat {
() =>
val state = ...
{
element =>
// change the state
element :: Nil
}
}
并且它是流程的一部分
Flow[Event]
.groupBy(1000000, event => event.key2, allowClosedSubstreamRecreation = true)
.via(stream)
.mergeSubstreams
有没有办法让每个子流在 stream
中有一个 state
(在这个例子中,在 groupBy 之后的每个键)?
我认为它应该按子流具体化,但不知道该怎么做。
您在该设置中确实获得了每个子流的状态:
val stream = Flow[Int].statefulMapConcat {
() => {
var state: List[Int] = Nil
element => {
state = element :: state
List(state)
}
}
}
val groupByFlow =
Flow[Int]
.groupBy(1000000, identity, allowClosedSubstreamRecreation = true)
.via(stream)
.mergeSubstreams
Source(List(1,1,2,3,3,3))
.via(groupByFlow)
.runForeach(i => println(i))
将打印
List(1)
List(1, 1)
List(3)
List(2)
List(3, 3)
List(3, 3, 3)
有状态流:
val stream = Flow[Event].statefulMapConcat {
() =>
val state = ...
{
element =>
// change the state
element :: Nil
}
}
并且它是流程的一部分
Flow[Event]
.groupBy(1000000, event => event.key2, allowClosedSubstreamRecreation = true)
.via(stream)
.mergeSubstreams
有没有办法让每个子流在 stream
中有一个 state
(在这个例子中,在 groupBy 之后的每个键)?
我认为它应该按子流具体化,但不知道该怎么做。
您在该设置中确实获得了每个子流的状态:
val stream = Flow[Int].statefulMapConcat {
() => {
var state: List[Int] = Nil
element => {
state = element :: state
List(state)
}
}
}
val groupByFlow =
Flow[Int]
.groupBy(1000000, identity, allowClosedSubstreamRecreation = true)
.via(stream)
.mergeSubstreams
Source(List(1,1,2,3,3,3))
.via(groupByFlow)
.runForeach(i => println(i))
将打印
List(1)
List(1, 1)
List(3)
List(2)
List(3, 3)
List(3, 3, 3)