Akka Streams:流中的状态
Akka Streams: State in a flow
我想使用 Akka Streams 读取多个大文件来处理每一行。想象一下,每个键都由一个 (identifier -> value)
组成。如果找到新的标识符,我想将它和它的值保存在数据库中;否则,如果在处理行流时已经找到标识符,我只想保存该值。为此,我认为我需要某种递归状态流来保留已经在 Map
中找到的标识符。我想我会在这个流程中收到一对 (newLine, contextWithIdentifiers)
.
我刚刚开始研究 Akka Streams。我想我可以管理自己来做无状态处理的事情,但我不知道如何保留 contextWithIdentifiers
。如果能指出正确的方向,我将不胜感激。
也许statefulMapConcat
之类的东西可以帮到你:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import scala.util.Random._
import scala.math.abs
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
//encapsulating your input
case class IdentValue(id: Int, value: String)
//some random generated input
val identValues = List.fill(20)(IdentValue(abs(nextInt()) % 5, "valueHere"))
val stateFlow = Flow[IdentValue].statefulMapConcat{ () =>
//state with already processed ids
var ids = Set.empty[Int]
identValue => if (ids.contains(identValue.id)) {
//save value to DB
println(identValue.value)
List(identValue)
} else {
//save both to database
println(identValue)
ids = ids + identValue.id
List(identValue)
}
}
Source(identValues)
.via(stateFlow)
.runWith(Sink.seq)
.onSuccess { case identValue => println(identValue) }
几年后,如果您只需要 1 对 1 映射(而不是 1-to-N),这里是我编写的一个实现:
import akka.stream.stage.{GraphStage, GraphStageLogic}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
object StatefulMap {
def apply[T, O](converter: => T => O) = new StatefulMap[T, O](converter)
}
class StatefulMap[T, O](converter: => T => O) extends GraphStage[FlowShape[T, O]] {
val in = Inlet[T]("StatefulMap.in")
val out = Outlet[O]("StatefulMap.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
val f = converter
setHandler(in, () => push(out, f(grab(in))))
setHandler(out, () => pull(in))
}
}
测试(和演示):
behavior of "StatefulMap"
class Counter extends (Any => Int) {
var count = 0
override def apply(x: Any): Int = {
count += 1
count
}
}
it should "not share state among substreams" in {
val result = await {
Source(0 until 10)
.groupBy(2, _ % 2)
.via(StatefulMap(new Counter()))
.fold(Seq.empty[Int])(_ :+ _)
.mergeSubstreams
.runWith(Sink.seq)
}
result.foreach(_ should be(1 to 5))
}
我想使用 Akka Streams 读取多个大文件来处理每一行。想象一下,每个键都由一个 (identifier -> value)
组成。如果找到新的标识符,我想将它和它的值保存在数据库中;否则,如果在处理行流时已经找到标识符,我只想保存该值。为此,我认为我需要某种递归状态流来保留已经在 Map
中找到的标识符。我想我会在这个流程中收到一对 (newLine, contextWithIdentifiers)
.
我刚刚开始研究 Akka Streams。我想我可以管理自己来做无状态处理的事情,但我不知道如何保留 contextWithIdentifiers
。如果能指出正确的方向,我将不胜感激。
也许statefulMapConcat
之类的东西可以帮到你:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import scala.util.Random._
import scala.math.abs
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
//encapsulating your input
case class IdentValue(id: Int, value: String)
//some random generated input
val identValues = List.fill(20)(IdentValue(abs(nextInt()) % 5, "valueHere"))
val stateFlow = Flow[IdentValue].statefulMapConcat{ () =>
//state with already processed ids
var ids = Set.empty[Int]
identValue => if (ids.contains(identValue.id)) {
//save value to DB
println(identValue.value)
List(identValue)
} else {
//save both to database
println(identValue)
ids = ids + identValue.id
List(identValue)
}
}
Source(identValues)
.via(stateFlow)
.runWith(Sink.seq)
.onSuccess { case identValue => println(identValue) }
几年后,如果您只需要 1 对 1 映射(而不是 1-to-N),这里是我编写的一个实现:
import akka.stream.stage.{GraphStage, GraphStageLogic}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
object StatefulMap {
def apply[T, O](converter: => T => O) = new StatefulMap[T, O](converter)
}
class StatefulMap[T, O](converter: => T => O) extends GraphStage[FlowShape[T, O]] {
val in = Inlet[T]("StatefulMap.in")
val out = Outlet[O]("StatefulMap.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
val f = converter
setHandler(in, () => push(out, f(grab(in))))
setHandler(out, () => pull(in))
}
}
测试(和演示):
behavior of "StatefulMap"
class Counter extends (Any => Int) {
var count = 0
override def apply(x: Any): Int = {
count += 1
count
}
}
it should "not share state among substreams" in {
val result = await {
Source(0 until 10)
.groupBy(2, _ % 2)
.via(StatefulMap(new Counter()))
.fold(Seq.empty[Int])(_ :+ _)
.mergeSubstreams
.runWith(Sink.seq)
}
result.foreach(_ should be(1 to 5))
}