将监督策略与 GraphStage 一起使用不起作用

Using supervision strategy with GraphStage doesn't work

我正在尝试使用 Supervision 策略恢复,当我使用 map 编写 Flow stages 时它有效,但如果我使用 graph stage,它永远不会被捕获,并且整个管道失败

    object  test extends App{

      val stageSupervisionDecider: Supervision.Decider = {
        case cEx: IllegalArgumentException =>
          println("Supervision Catch")
          Supervision.Resume
        case _ => Supervision.Stop
      }

      implicit val system = ActorSystem("system")

      implicit val materializer = ActorMaterializer(
        ActorMaterializerSettings(system)
          .withSupervisionStrategy(stageSupervisionDecider)
      )

      Source(Vector(1,2,3,4,5,6,7))
        .via(new FailFlow)
        .runWith(Sink.foreach(println))
    }


    class FailFlow extends GraphStage[FlowShape[Int, Int]] {

      val in = Inlet[Int]("FailFlow.In")
      val out = Outlet[Int]("FailFlow.Out")

      override def shape: FlowShape[Int, Int] = FlowShape.of(in, out)

      override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
        new GraphStageLogic(shape) {
          setHandler(in, new InHandler {
            override def onPush(): Unit = {
              val m = grab(in)
              if(m % 2 == 0)
                throw new IllegalArgumentException("illegal value")
              else
              push(out,m)
            }
          })

          setHandler(out, new OutHandler {
            override def onPull(): Unit = {
                pull(in)
            }
          })
        }
      }
    }

知道这里出了什么问题吗?

根据documentation(大红框):

ZipWith, GraphStage junction, ActorPublisher source and ActorSubscriber sink components do not honour the supervision strategy attribute yet