为什么流永远不会被触发?
Why does the stream never get triggered?
我有以下流,在 flatMapConcat
之后从未到达 map
。
private def stream[A](ref: ActorRef[ServerHealthStreamer])(implicit system: ActorSystem[A])
: KillSwitch = {
implicit val materializer = ActorMaterializer()
implicit val dispatcher = materializer.executionContext
system.log.info("=============> Start KafkaDetectorStream <=============")
val addr = system
.settings
.config
.getConfig("kafka")
.getString("servers")
val sink: Sink[ServerHealthEvent, NotUsed] =
ActorSink.actorRefWithAck[ServerHealthEvent, ServerHealthStreamer, Ack](
ref = ref,
onCompleteMessage = Complete,
onFailureMessage = Fail.apply,
messageAdapter = Message.apply,
onInitMessage = Init.apply,
ackMessage = Ack)
Source.tick(1.seconds, 5.seconds, NotUsed)
.flatMapConcat(_ => Source.fromFuture(health(addr)))
.map {
case true =>
KafkaActiveConfirmed
case false =>
KafkaInactiveConfirmed
}
.viaMat(KillSwitches.single)(Keep.right)
.to(sink)
.run()
}
private def health(server: String)(implicit executor: ExecutionContext): Future[Boolean] = {
val props = new Properties
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, server)
props.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "10000")
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000")
Future {
AdminClient
.create(props)
.listTopics()
.names()
.get()
}
.map(_ => true)
.recover {
case _: Throwable => false
}
}
我的意思是,这部分:
.map {
case true =>
KafkaActiveConfirmed
case false =>
KafkaInactiveConfirmed
}
从未执行过,我不知道原因。方法 health
按预期执行。
尝试在 flatMapConcat
和 map
之间添加 .log
以查看发出的元素。 log
还可以记录错误和取消流。
https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/log.html
注意,.log
使用隐式记录器
而你的 .flatMapConcat(_ => Source.fromFuture(health(addr)))
接缝很棘手,
尝试 .mapAsyncUnordered(1)(_ => health(addr))
我有以下流,在 flatMapConcat
之后从未到达 map
。
private def stream[A](ref: ActorRef[ServerHealthStreamer])(implicit system: ActorSystem[A])
: KillSwitch = {
implicit val materializer = ActorMaterializer()
implicit val dispatcher = materializer.executionContext
system.log.info("=============> Start KafkaDetectorStream <=============")
val addr = system
.settings
.config
.getConfig("kafka")
.getString("servers")
val sink: Sink[ServerHealthEvent, NotUsed] =
ActorSink.actorRefWithAck[ServerHealthEvent, ServerHealthStreamer, Ack](
ref = ref,
onCompleteMessage = Complete,
onFailureMessage = Fail.apply,
messageAdapter = Message.apply,
onInitMessage = Init.apply,
ackMessage = Ack)
Source.tick(1.seconds, 5.seconds, NotUsed)
.flatMapConcat(_ => Source.fromFuture(health(addr)))
.map {
case true =>
KafkaActiveConfirmed
case false =>
KafkaInactiveConfirmed
}
.viaMat(KillSwitches.single)(Keep.right)
.to(sink)
.run()
}
private def health(server: String)(implicit executor: ExecutionContext): Future[Boolean] = {
val props = new Properties
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, server)
props.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "10000")
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000")
Future {
AdminClient
.create(props)
.listTopics()
.names()
.get()
}
.map(_ => true)
.recover {
case _: Throwable => false
}
}
我的意思是,这部分:
.map {
case true =>
KafkaActiveConfirmed
case false =>
KafkaInactiveConfirmed
}
从未执行过,我不知道原因。方法 health
按预期执行。
尝试在 flatMapConcat
和 map
之间添加 .log
以查看发出的元素。 log
还可以记录错误和取消流。
https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/log.html
注意,.log
使用隐式记录器
而你的 .flatMapConcat(_ => Source.fromFuture(health(addr)))
接缝很棘手,
尝试 .mapAsyncUnordered(1)(_ => health(addr))