akka-streams 有状态子流

akka-streams stateful substream flow

有状态流:

val stream = Flow[Event].statefulMapConcat {
  () =>

    val state = ...

    {
      element =>
        // change the state
        element :: Nil
    }
}

并且它是流程的一部分

Flow[Event]
  .groupBy(1000000, event => event.key2, allowClosedSubstreamRecreation = true)
  .via(stream)
  .mergeSubstreams

有没有办法让每个子流在 stream 中有一个 state(在这个例子中,在 groupBy 之后的每个键)? 我认为它应该按子流具体化,但不知道该怎么做。

您在该设置中确实获得了每个子流的状态:

  val stream = Flow[Int].statefulMapConcat {
    () => {

      var state: List[Int] = Nil

      element => {
        state = element :: state
        List(state)
      }
    }
  }

  val groupByFlow =
  Flow[Int]
    .groupBy(1000000, identity, allowClosedSubstreamRecreation = true)
    .via(stream)
    .mergeSubstreams

  Source(List(1,1,2,3,3,3))
    .via(groupByFlow)
    .runForeach(i => println(i))

将打印

List(1)
List(1, 1)
List(3)
List(2)
List(3, 3)
List(3, 3, 3)