具有记录乘数的图上的至少一次传递语义
At Least Once Delivery Semantics on a Graph with a record multiplier
我正在针对 Kafka 使用 Akka Streams 2.4.8 并尝试使用至少一次交付语义构建图形。我的一些图表以如下内容开头:
committableSource ~> processing ~> mapOneRecordToNOtherRecords ~> ...
其中 mapOneRecordToNOtherRecords
接受一条消息并将其转换为任意数量 "N" 的另一种类型的消息以供使用。这意味着当我完成这个阶段时,每条传入消息都被乘以 N 条消息,每条消息都具有相同的 CommittableOffset
。问题是,为了支持 ALOS,我需要等到最后一个完成其处理步骤后才提交。
现在我一直在使用 Flow#mapConcat
来处理 mapOneRecordToNOtherRecords
的输出并继续下游。据我了解,mapConcat
的输出保留了顺序,所以我需要的是一个图形元素,它收集具有给定相关标识符(例如 CommittableOffset
)的 N 条消息,并在全部收集起来。
方法 Flow#groupBy
看起来很有趣,但没有关闭阀,而且我没有看到它可能泄露密钥的任何标准 - 也没有看到它决定为给定的发射的标准钥匙。其他 SO 帖子让我相信它的使用确实可能会造成内存泄漏。
我一直在仔细阅读文档寻找其他解决方案,但没有找到任何解决方案。任何人都可以指出我吗?当然这是一个普遍的要求?还是我必须构建自定义 GraphStage
?
为了防止在 Akka 的文档中找不到答案(正如我怀疑的那样),我写了一个自定义 GraphStage
来做到这一点。在这里
/**
* Accumulates records in order based on a propertyExtractor function and a complete function. Preserves order of incoming such that if a group of
* messages with property value P2 completes before a group with P1, and the first P1 was pulled before the first P2, they are not pushed until the
* P1s complete. If this has not happened within timeout, will throw a TimeoutException
*
* @param propertyExtractor finds the property on the object being sent by which to track incoming
* @param complete establishes criteria by which the accumulated sequence of records may be emitted back to the stream
* @param complete (optional, default 10 seconds) duration over which stage will wait for an earlier value before throwing a
* TimeoutException
* @tparam E the type of the record
* @tparam P the type of the property acting as a key for the record
*/
final class AccumulateWhileNotFull[E, P](propertyExtractor: E => P, bufferFull: (Seq[E]) => Boolean, timeout: Duration = 10 seconds) extends GraphStage[FlowShape[E, Seq[E]]] {
val in = Inlet[E]("AccumulateWhileUnchanged.in")
val out = Outlet[Seq[E]]("AccumulateWhileUnchanged.out")
override def shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
private val buffers = TrieMap[P, scala.collection.mutable.Builder[E, Vector[E]]]()
private val ordering = mutable.LinkedHashSet[P]()
private val timing = TrieMap[P, DateTime]()
setHandlers(in, out, new InHandler with OutHandler {
override def onPush(): Unit = {
//add next element to the buffer it requires
val nextElement = grab(in)
val nextState = propertyExtractor(nextElement)
val bufferForNextState = buffers.getOrElseUpdate(nextState, Vector.newBuilder[E])
bufferForNextState += nextElement
//track the next state allowed to be pushed
ordering += nextState
val allowedState = ordering.head
val firstAllowedAt = timing.getOrElseUpdate(allowedState, DateTime.now)
if (firstAllowedAt.isBefore(DateTime.now().minus(timeout.toMillis))) {
//reset timer in case exception is to be ignored
timing += allowedState -> DateTime.now
throw new TimeoutException(s"Failed to process expected number of messages for property ${allowedState} within ${timeout}")
}
if (nextState != allowedState) {
addToBuffersAndPull(nextState, bufferForNextState)
val bufferForAllowedState = buffers.getOrElseUpdate(allowedState, Vector.newBuilder[E]).result().toSeq
if (bufferFull(bufferForAllowedState)) {
removeFromBuffersAndPush(allowedState, bufferForAllowedState)
}
}
else {
//they are the same
val result = bufferForNextState.result().toSeq
if (bufferFull(result)) {
removeFromBuffersAndPush(allowedState, result)
}
else {
addToBuffersAndPull(nextState, bufferForNextState)
}
}
}
private def removeFromBuffersAndPush(state: P, result: Seq[E]) = {
buffers -= state
ordering -= state
timing -= state
push(out, result)
}
private def addToBuffersAndPull(state: P, builder: mutable.Builder[E, Vector[E]]) = {
buffers += state -> builder
pull(in)
}
override def onPull(): Unit = {
pull(in)
}
override def onUpstreamFinish(): Unit = {
var continuing = true
//go in order, first one not full shuts it all down
ordering.foreach { p =>
val result = buffers(p).result()
continuing = continuing && bufferFull(result)
if (continuing) {
emit(out, result)
}
}
completeStage()
}
})
override def postStop(): Unit = {
buffers.clear()
}
}
}
支持它的集成测试是
import java.util.concurrent.TimeoutException
import akka.actor.ActorSystem
import akka.stream.{ ActorMaterializer, Materializer }
import akka.stream.scaladsl.{ Flow, Source }
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestKit
import org.scalatest.{ FunSuiteLike, Matchers }
import scala.concurrent.duration.Duration
import scala.util.Try
import scala.concurrent.duration._
class AccumulateWhileNotFullTest extends TestKit(ActorSystem("AccumulateWhileNotFullTest")) with FunSuiteLike with Matchers {
implicit lazy val materializer: Materializer = ActorMaterializer()
test("test works singly") {
var result = runBasic(List(Thing("a", 1, 1)))
result.requestNext() should be(Seq(Thing("a", 1, 1)))
Try(result.requestNext()).isFailure should be(true)
}
test("test works with never complete") {
var result = runBasic(List(Thing("a", 1, 2)))
Try(result.requestNext()).isFailure should be(true)
}
test("test resets keys") {
var result = runBasic(List(Thing("a", 1, 1), Thing("a", 1, 1)))
result.requestNext() should be(Seq(Thing("a", 1, 1)))
result.requestNext() should be(Seq(Thing("a", 1, 1)))
Try(result.requestNext()).isFailure should be(true)
}
test("test accumulates multiple in a row") {
var result = runBasic(List(Thing("a", 1, 3), Thing("a", 5, 3), Thing("a", 10, 3)))
result.requestNext() should be(Seq(Thing("a", 1, 3), Thing("a", 5, 3), Thing("a", 10, 3)))
Try(result.requestNext()).isFailure should be(true)
}
test("test accumulates multiple in order") {
var result = runBasic(List(Thing("a", 1, 3), Thing("a", 5, 3), Thing("a", 10, 3), Thing("b", 5, 2), Thing("b", 10, 2)))
result.requestNext() should be(Seq(Thing("a", 1, 3), Thing("a", 5, 3), Thing("a", 10, 3)))
result.requestNext() should be(Seq(Thing("b", 5, 2), Thing("b", 10, 2)))
Try(result.requestNext()).isFailure should be(true)
}
test("test handles out of order correctly") {
var result = runBasic(List(Thing("a", 1, 3), Thing("b", 5, 2), Thing("a", 5, 3), Thing("c", 5, 1), Thing("a", 10, 3), Thing("b", 10, 2)))
result.requestNext() should be(Seq(Thing("a", 1, 3), Thing("a", 5, 3), Thing("a", 10, 3)))
result.requestNext() should be(Seq(Thing("b", 5, 2), Thing("b", 10, 2)))
result.requestNext() should be(Seq(Thing("c", 5, 1)))
Try(result.requestNext()).isFailure should be(true)
}
test("test timeout throws exception") {
var result = runBasic(List(Thing("a", 1, 2)), 1 seconds)
val initalTry = Try(result.requestNext())
initalTry.isFailure should be(true)
initalTry.recover { case ex: Throwable => ex.getMessage should be("expected OnNext, found OnComplete") } //test probe stock exception
Thread.sleep(1000)
Try(result.requestNext()).recover { case ex: TimeoutException => ex.getMessage should be(s"Failed to process expected number of messages for property a within 1 seconds") }
}
def runBasic(things: List[Thing], timeout: Duration = 10 seconds)(implicit actorSystem: ActorSystem, materializer: Materializer) = {
val f: Flow[Thing, Seq[Thing], _] = Flow.fromGraph(new AccumulateWhileNotFull[Thing, String](
in => in.prop,
vals => vals.headOption.map { v => vals.size == v.count }.getOrElse(false),
timeout
))
var result = List[Seq[Thing]]()
Source.fromIterator(() => things.toIterator).via(f).runWith(TestSink.probe(actorSystem))
}
case class Thing(prop: String, value: Int, count: Int)
}
我正在针对 Kafka 使用 Akka Streams 2.4.8 并尝试使用至少一次交付语义构建图形。我的一些图表以如下内容开头:
committableSource ~> processing ~> mapOneRecordToNOtherRecords ~> ...
其中 mapOneRecordToNOtherRecords
接受一条消息并将其转换为任意数量 "N" 的另一种类型的消息以供使用。这意味着当我完成这个阶段时,每条传入消息都被乘以 N 条消息,每条消息都具有相同的 CommittableOffset
。问题是,为了支持 ALOS,我需要等到最后一个完成其处理步骤后才提交。
现在我一直在使用 Flow#mapConcat
来处理 mapOneRecordToNOtherRecords
的输出并继续下游。据我了解,mapConcat
的输出保留了顺序,所以我需要的是一个图形元素,它收集具有给定相关标识符(例如 CommittableOffset
)的 N 条消息,并在全部收集起来。
方法 Flow#groupBy
看起来很有趣,但没有关闭阀,而且我没有看到它可能泄露密钥的任何标准 - 也没有看到它决定为给定的发射的标准钥匙。其他 SO 帖子让我相信它的使用确实可能会造成内存泄漏。
我一直在仔细阅读文档寻找其他解决方案,但没有找到任何解决方案。任何人都可以指出我吗?当然这是一个普遍的要求?还是我必须构建自定义 GraphStage
?
为了防止在 Akka 的文档中找不到答案(正如我怀疑的那样),我写了一个自定义 GraphStage
来做到这一点。在这里
/**
* Accumulates records in order based on a propertyExtractor function and a complete function. Preserves order of incoming such that if a group of
* messages with property value P2 completes before a group with P1, and the first P1 was pulled before the first P2, they are not pushed until the
* P1s complete. If this has not happened within timeout, will throw a TimeoutException
*
* @param propertyExtractor finds the property on the object being sent by which to track incoming
* @param complete establishes criteria by which the accumulated sequence of records may be emitted back to the stream
* @param complete (optional, default 10 seconds) duration over which stage will wait for an earlier value before throwing a
* TimeoutException
* @tparam E the type of the record
* @tparam P the type of the property acting as a key for the record
*/
final class AccumulateWhileNotFull[E, P](propertyExtractor: E => P, bufferFull: (Seq[E]) => Boolean, timeout: Duration = 10 seconds) extends GraphStage[FlowShape[E, Seq[E]]] {
val in = Inlet[E]("AccumulateWhileUnchanged.in")
val out = Outlet[Seq[E]]("AccumulateWhileUnchanged.out")
override def shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
private val buffers = TrieMap[P, scala.collection.mutable.Builder[E, Vector[E]]]()
private val ordering = mutable.LinkedHashSet[P]()
private val timing = TrieMap[P, DateTime]()
setHandlers(in, out, new InHandler with OutHandler {
override def onPush(): Unit = {
//add next element to the buffer it requires
val nextElement = grab(in)
val nextState = propertyExtractor(nextElement)
val bufferForNextState = buffers.getOrElseUpdate(nextState, Vector.newBuilder[E])
bufferForNextState += nextElement
//track the next state allowed to be pushed
ordering += nextState
val allowedState = ordering.head
val firstAllowedAt = timing.getOrElseUpdate(allowedState, DateTime.now)
if (firstAllowedAt.isBefore(DateTime.now().minus(timeout.toMillis))) {
//reset timer in case exception is to be ignored
timing += allowedState -> DateTime.now
throw new TimeoutException(s"Failed to process expected number of messages for property ${allowedState} within ${timeout}")
}
if (nextState != allowedState) {
addToBuffersAndPull(nextState, bufferForNextState)
val bufferForAllowedState = buffers.getOrElseUpdate(allowedState, Vector.newBuilder[E]).result().toSeq
if (bufferFull(bufferForAllowedState)) {
removeFromBuffersAndPush(allowedState, bufferForAllowedState)
}
}
else {
//they are the same
val result = bufferForNextState.result().toSeq
if (bufferFull(result)) {
removeFromBuffersAndPush(allowedState, result)
}
else {
addToBuffersAndPull(nextState, bufferForNextState)
}
}
}
private def removeFromBuffersAndPush(state: P, result: Seq[E]) = {
buffers -= state
ordering -= state
timing -= state
push(out, result)
}
private def addToBuffersAndPull(state: P, builder: mutable.Builder[E, Vector[E]]) = {
buffers += state -> builder
pull(in)
}
override def onPull(): Unit = {
pull(in)
}
override def onUpstreamFinish(): Unit = {
var continuing = true
//go in order, first one not full shuts it all down
ordering.foreach { p =>
val result = buffers(p).result()
continuing = continuing && bufferFull(result)
if (continuing) {
emit(out, result)
}
}
completeStage()
}
})
override def postStop(): Unit = {
buffers.clear()
}
}
}
支持它的集成测试是
import java.util.concurrent.TimeoutException
import akka.actor.ActorSystem
import akka.stream.{ ActorMaterializer, Materializer }
import akka.stream.scaladsl.{ Flow, Source }
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestKit
import org.scalatest.{ FunSuiteLike, Matchers }
import scala.concurrent.duration.Duration
import scala.util.Try
import scala.concurrent.duration._
class AccumulateWhileNotFullTest extends TestKit(ActorSystem("AccumulateWhileNotFullTest")) with FunSuiteLike with Matchers {
implicit lazy val materializer: Materializer = ActorMaterializer()
test("test works singly") {
var result = runBasic(List(Thing("a", 1, 1)))
result.requestNext() should be(Seq(Thing("a", 1, 1)))
Try(result.requestNext()).isFailure should be(true)
}
test("test works with never complete") {
var result = runBasic(List(Thing("a", 1, 2)))
Try(result.requestNext()).isFailure should be(true)
}
test("test resets keys") {
var result = runBasic(List(Thing("a", 1, 1), Thing("a", 1, 1)))
result.requestNext() should be(Seq(Thing("a", 1, 1)))
result.requestNext() should be(Seq(Thing("a", 1, 1)))
Try(result.requestNext()).isFailure should be(true)
}
test("test accumulates multiple in a row") {
var result = runBasic(List(Thing("a", 1, 3), Thing("a", 5, 3), Thing("a", 10, 3)))
result.requestNext() should be(Seq(Thing("a", 1, 3), Thing("a", 5, 3), Thing("a", 10, 3)))
Try(result.requestNext()).isFailure should be(true)
}
test("test accumulates multiple in order") {
var result = runBasic(List(Thing("a", 1, 3), Thing("a", 5, 3), Thing("a", 10, 3), Thing("b", 5, 2), Thing("b", 10, 2)))
result.requestNext() should be(Seq(Thing("a", 1, 3), Thing("a", 5, 3), Thing("a", 10, 3)))
result.requestNext() should be(Seq(Thing("b", 5, 2), Thing("b", 10, 2)))
Try(result.requestNext()).isFailure should be(true)
}
test("test handles out of order correctly") {
var result = runBasic(List(Thing("a", 1, 3), Thing("b", 5, 2), Thing("a", 5, 3), Thing("c", 5, 1), Thing("a", 10, 3), Thing("b", 10, 2)))
result.requestNext() should be(Seq(Thing("a", 1, 3), Thing("a", 5, 3), Thing("a", 10, 3)))
result.requestNext() should be(Seq(Thing("b", 5, 2), Thing("b", 10, 2)))
result.requestNext() should be(Seq(Thing("c", 5, 1)))
Try(result.requestNext()).isFailure should be(true)
}
test("test timeout throws exception") {
var result = runBasic(List(Thing("a", 1, 2)), 1 seconds)
val initalTry = Try(result.requestNext())
initalTry.isFailure should be(true)
initalTry.recover { case ex: Throwable => ex.getMessage should be("expected OnNext, found OnComplete") } //test probe stock exception
Thread.sleep(1000)
Try(result.requestNext()).recover { case ex: TimeoutException => ex.getMessage should be(s"Failed to process expected number of messages for property a within 1 seconds") }
}
def runBasic(things: List[Thing], timeout: Duration = 10 seconds)(implicit actorSystem: ActorSystem, materializer: Materializer) = {
val f: Flow[Thing, Seq[Thing], _] = Flow.fromGraph(new AccumulateWhileNotFull[Thing, String](
in => in.prop,
vals => vals.headOption.map { v => vals.size == v.count }.getOrElse(false),
timeout
))
var result = List[Seq[Thing]]()
Source.fromIterator(() => things.toIterator).via(f).runWith(TestSink.probe(actorSystem))
}
case class Thing(prop: String, value: Int, count: Int)
}