为什么流永远不会被触发?

Why does the stream never get triggered?

我有以下流,在 flatMapConcat 之后从未到达 map

  private def stream[A](ref: ActorRef[ServerHealthStreamer])(implicit system: ActorSystem[A])
  : KillSwitch = {

    implicit val materializer = ActorMaterializer()
    implicit val dispatcher = materializer.executionContext

    system.log.info("=============> Start KafkaDetectorStream <=============")

    val addr = system
      .settings
      .config
      .getConfig("kafka")
      .getString("servers")

    val sink: Sink[ServerHealthEvent, NotUsed] =
      ActorSink.actorRefWithAck[ServerHealthEvent, ServerHealthStreamer, Ack](
        ref = ref,
        onCompleteMessage = Complete,
        onFailureMessage = Fail.apply,
        messageAdapter = Message.apply,
        onInitMessage = Init.apply,
        ackMessage = Ack)

    Source.tick(1.seconds, 5.seconds, NotUsed)
      .flatMapConcat(_ => Source.fromFuture(health(addr)))
      .map {
        case true =>
          KafkaActiveConfirmed
        case false =>
          KafkaInactiveConfirmed
      }
      .viaMat(KillSwitches.single)(Keep.right)
      .to(sink)
      .run()
  }

  private def health(server: String)(implicit executor: ExecutionContext): Future[Boolean] = {
    val props = new Properties
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, server)
    props.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "10000")
    props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000")

    Future {
      AdminClient
        .create(props)
        .listTopics()
        .names()
        .get()
    }
      .map(_ => true)
      .recover {
        case _: Throwable => false
      }
  }

我的意思是,这部分:

.map {
  case true =>
    KafkaActiveConfirmed
  case false =>
    KafkaInactiveConfirmed
} 

从未执行过,我不知道原因。方法 health 按预期执行。

尝试在 flatMapConcatmap 之间添加 .log 以查看发出的元素。 log 还可以记录错误和取消流。 https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/log.html

注意,.log 使用隐式记录器

而你的 .flatMapConcat(_ => Source.fromFuture(health(addr))) 接缝很棘手, 尝试 .mapAsyncUnordered(1)(_ => health(addr))