从 PushPullStage 发出多个对象
emit multiple objects from PushPullStage
我一直在玩 Akka-Streams,我正在尝试通过实现我自己的 PushPullStage
来定制 Flow
。我希望 Flow
将从上游接收到的对象累积到一个列表中,并根据某些功能对它们进行分组,然后在上游完成时向下游发出组。
这似乎是一件很容易实现的事情,但我不知道该怎么做!似乎没有办法从 PushPullStage
.
发出多个对象
到目前为止,这是我的实现:
class Accumulate[A] extends PushPullStage[A, List[A]] {
private var groups: List[List[A]] = Nil
private def group(x: A): List[List[A]] = ...
override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
groups = group(elem)
ctx.pull()
}
override def onPull(ctx: Context[A]): SyncDirective =
if (ctx.isFinishing) {
for(group <- groups)
ctx.push(group) // this doesn't work
ctx.finish()
} else {
ctx.pull()
}
override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
ctx.absorbTermination()
}
}
编辑
我更改了代码以说明背压,现在一切正常。基本上我只需要让下游 Flow
做他们想做的事情并继续拉动元素:
class Accumulate[A] extends PushPullStage[A, List[A]] {
private var groups: List[List[A]] = Nil
private def group(x: A): List[List[A]] = ...
override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
groups = group(elem)
ctx.pull()
}
override def onPull(ctx: Context[A]): SyncDirective =
if (ctx.isFinishing) {
groups match {
case Nil => ctx.finish()
case head :: tail =>
groups = tail
ctx.push(head)
}
} else {
ctx.pull()
}
override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
ctx.absorbTermination()
}
}
你不能超过要求,因为那样会违反背压。
另外,值得注意的是,我不会推荐您尝试执行的操作,因为对于大型或无界流,这会引发 OutOfMemoryError。
class Accumulate[A] extends PushPullStage[A, List[A]] {
private var groups: List[List[A]] = Nil
private def group(x: A): List[List[A]] = ...
override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
groups = group(elem)
ctx.pull()
}
override def onPull(ctx: Context[A]): SyncDirective =
if (ctx.isFinishing) {
groups match {
case Nil => ctx.finish()
case group :: rest =>
groups = rest
ctx.push(group)
}
} else {
ctx.pull()
}
override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
ctx.absorbTermination()
}
}
我一直在玩 Akka-Streams,我正在尝试通过实现我自己的 PushPullStage
来定制 Flow
。我希望 Flow
将从上游接收到的对象累积到一个列表中,并根据某些功能对它们进行分组,然后在上游完成时向下游发出组。
这似乎是一件很容易实现的事情,但我不知道该怎么做!似乎没有办法从 PushPullStage
.
到目前为止,这是我的实现:
class Accumulate[A] extends PushPullStage[A, List[A]] {
private var groups: List[List[A]] = Nil
private def group(x: A): List[List[A]] = ...
override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
groups = group(elem)
ctx.pull()
}
override def onPull(ctx: Context[A]): SyncDirective =
if (ctx.isFinishing) {
for(group <- groups)
ctx.push(group) // this doesn't work
ctx.finish()
} else {
ctx.pull()
}
override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
ctx.absorbTermination()
}
}
编辑
我更改了代码以说明背压,现在一切正常。基本上我只需要让下游 Flow
做他们想做的事情并继续拉动元素:
class Accumulate[A] extends PushPullStage[A, List[A]] {
private var groups: List[List[A]] = Nil
private def group(x: A): List[List[A]] = ...
override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
groups = group(elem)
ctx.pull()
}
override def onPull(ctx: Context[A]): SyncDirective =
if (ctx.isFinishing) {
groups match {
case Nil => ctx.finish()
case head :: tail =>
groups = tail
ctx.push(head)
}
} else {
ctx.pull()
}
override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
ctx.absorbTermination()
}
}
你不能超过要求,因为那样会违反背压。 另外,值得注意的是,我不会推荐您尝试执行的操作,因为对于大型或无界流,这会引发 OutOfMemoryError。
class Accumulate[A] extends PushPullStage[A, List[A]] {
private var groups: List[List[A]] = Nil
private def group(x: A): List[List[A]] = ...
override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
groups = group(elem)
ctx.pull()
}
override def onPull(ctx: Context[A]): SyncDirective =
if (ctx.isFinishing) {
groups match {
case Nil => ctx.finish()
case group :: rest =>
groups = rest
ctx.push(group)
}
} else {
ctx.pull()
}
override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
ctx.absorbTermination()
}
}