将监督策略与 GraphStage 一起使用不起作用
Using supervision strategy with GraphStage doesn't work
我正在尝试使用 Supervision 策略恢复,当我使用 map 编写 Flow stages 时它有效,但如果我使用 graph stage,它永远不会被捕获,并且整个管道失败
object test extends App{
val stageSupervisionDecider: Supervision.Decider = {
case cEx: IllegalArgumentException =>
println("Supervision Catch")
Supervision.Resume
case _ => Supervision.Stop
}
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system)
.withSupervisionStrategy(stageSupervisionDecider)
)
Source(Vector(1,2,3,4,5,6,7))
.via(new FailFlow)
.runWith(Sink.foreach(println))
}
class FailFlow extends GraphStage[FlowShape[Int, Int]] {
val in = Inlet[Int]("FailFlow.In")
val out = Outlet[Int]("FailFlow.Out")
override def shape: FlowShape[Int, Int] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val m = grab(in)
if(m % 2 == 0)
throw new IllegalArgumentException("illegal value")
else
push(out,m)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
}
知道这里出了什么问题吗?
根据documentation(大红框):
ZipWith, GraphStage junction, ActorPublisher source and ActorSubscriber sink components do not honour the supervision strategy attribute yet
我正在尝试使用 Supervision 策略恢复,当我使用 map 编写 Flow stages 时它有效,但如果我使用 graph stage,它永远不会被捕获,并且整个管道失败
object test extends App{
val stageSupervisionDecider: Supervision.Decider = {
case cEx: IllegalArgumentException =>
println("Supervision Catch")
Supervision.Resume
case _ => Supervision.Stop
}
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system)
.withSupervisionStrategy(stageSupervisionDecider)
)
Source(Vector(1,2,3,4,5,6,7))
.via(new FailFlow)
.runWith(Sink.foreach(println))
}
class FailFlow extends GraphStage[FlowShape[Int, Int]] {
val in = Inlet[Int]("FailFlow.In")
val out = Outlet[Int]("FailFlow.Out")
override def shape: FlowShape[Int, Int] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val m = grab(in)
if(m % 2 == 0)
throw new IllegalArgumentException("illegal value")
else
push(out,m)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
}
知道这里出了什么问题吗?
根据documentation(大红框):
ZipWith, GraphStage junction, ActorPublisher source and ActorSubscriber sink components do not honour the supervision strategy attribute yet